Coverage for src / beautyspot / db.py: 74%

468 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-03-11 19:14 +0900

1# src/beautyspot/db.py 

2 

3import sqlite3 

4import os 

5import logging 

6import queue 

7import threading 

8import time 

9from collections.abc import Iterator 

10from contextlib import contextmanager 

11import dataclasses 

12from dataclasses import dataclass 

13from datetime import datetime, timezone 

14from pathlib import Path 

15from abc import ABC, abstractmethod 

16from typing import Optional, TYPE_CHECKING, Any, Callable, Protocol, runtime_checkable 

17import weakref 

18from beautyspot.types import TaskRecord 

19 

20 

21@runtime_checkable 

22class TaskDBCore(Protocol): 

23 """ 

24 Core interface for task metadata storage required during execution. 

25 """ 

26 

27 def init_schema(self) -> None: ... 

28 

29 def get( 

30 self, cache_key: str, *, include_expired: bool = False 

31 ) -> Optional[TaskRecord]: ... 

32 

33 def save( 

34 self, 

35 cache_key: str, 

36 func_name: str, 

37 func_identifier: Optional[str], 

38 input_id: str, 

39 version: Optional[str], 

40 result_type: str, 

41 content_type: Optional[str], 

42 result_value: Optional[str] = None, 

43 result_data: Optional[bytes] = None, 

44 expires_at: Optional[datetime] = None, 

45 ) -> None: ... 

46 

47 def delete(self, cache_key: str) -> bool: ... 

48 

49 

50@runtime_checkable 

51class Flushable(Protocol): 

52 """Protocol for backends that support flushing pending writes.""" 

53 

54 def flush(self, timeout: Optional[float] = None) -> bool: ... 

55 

56 

57@runtime_checkable 

58class Shutdownable(Protocol): 

59 """Protocol for backends that require graceful shutdown.""" 

60 

61 def shutdown(self, wait: bool = True) -> None: ... 

62 

63 

64@runtime_checkable 

65class Maintenable(Protocol): 

66 """ 

67 Extended interface for maintenance tasks (GC, CLI, Dashboard). 

68 """ 

69 

70 def delete_expired(self) -> int: ... 

71 

72 def prune(self, older_than: datetime, func_name: Optional[str] = None) -> int: ... 

73 

74 def get_outdated_tasks( 

75 self, older_than: datetime, func_name: Optional[str] = None 

76 ) -> list[tuple[str, str, str]]: ... 

77 

78 def get_blob_refs(self) -> Optional[set[str]]: ... 

79 

80 def delete_all(self, func_name: Optional[str] = None) -> int: ... 

81 

82 def get_keys_start_with(self, prefix: str) -> list[str]: ... 

83 

84 def get_history(self, limit: int = 1000) -> "pd.DataFrame": ... 

85 

86 

87@runtime_checkable 

88class TaskDBMaintenable(TaskDBCore, Maintenable, Protocol): 

89 ... 

90 

91 

92 

93class _ReadConnWrapper: 

94 def __init__(self, conn: sqlite3.Connection): 

95 self.conn = conn 

96 self.lock = threading.RLock() 

97 self._closed = False 

98 self._shutdown_requested = False 

99 

100 def close(self, wait: bool = True): 

101 """ 

102 Args: 

103 wait: True の場合はロック解放を待機。 

104 False (シャットダウン時) の場合は即座に試行し、他が使用中ならスキップする。 

105 """ 

106 # wait=False の場合は blocking=False になり、取得できなければ直ちに False を返す 

107 if not self.lock.acquire(blocking=wait): 107 ↛ 110line 107 didn't jump to line 110 because the condition on line 107 was never true

108 # 誰かがクエリ実行中なので、強制クローズによるクラッシュを防ぐために諦める。 

109 # 代わりにシャットダウン要求フラグを立てて、クエリ完了後に自身でクローズさせる。 

110 if not wait: 

111 self._shutdown_requested = True 

112 return 

113 

114 try: 

115 if not self._closed: 

116 try: 

117 self.conn.close() 

118 except Exception: 

119 pass 

120 self._closed = True 

121 finally: 

122 self.lock.release() 

123 

124 def __del__(self): 

125 self.close() 

126 

127 

128if TYPE_CHECKING: 

129 import pandas as pd 

130 

131logger = logging.getLogger(__name__) 

132logger.addHandler(logging.NullHandler()) 

133 

134 

135def _ensure_utc_isoformat(dt: Optional[datetime]) -> Optional[str]: 

136 """datetime を UTC 保証の ISO 8601 文字列に変換する。None はそのまま返す。""" 

137 if dt is None: 

138 return None 

139 if dt.tzinfo is None: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true

140 dt = dt.replace(tzinfo=timezone.utc) 

141 else: 

142 dt = dt.astimezone(timezone.utc) 

143 return dt.isoformat(" ") 

144 

145 

146@dataclass 

147class _WriteTask: 

148 fn: Callable[[sqlite3.Connection], Any] 

149 event: threading.Event 

150 result: Any = None 

151 error: Exception | BaseException | None = None 

152 state: str = "PENDING" # "PENDING", "RUNNING", "DONE", "CANCELLED" 

153 _state_lock: threading.Lock = dataclasses.field(default_factory=threading.Lock) 

154 

155 def try_cancel(self) -> bool: 

156 """PENDING 状態のタスクをキャンセルする。成功時 True。""" 

157 with self._state_lock: 

158 if self.state == "PENDING": 

159 self.state = "CANCELLED" 

160 return True 

161 return False 

162 

163 def try_start(self) -> bool: 

164 """PENDING → RUNNING に遷移する。CANCELLED なら False を返す。""" 

165 with self._state_lock: 

166 if self.state == "CANCELLED": 

167 return False 

168 self.state = "RUNNING" 

169 return True 

170 

171 def mark_done(self) -> None: 

172 """RUNNING → DONE に遷移する。""" 

173 with self._state_lock: 

174 if self.state != "CANCELLED": 174 ↛ exitline 174 didn't jump to the function exit

175 self.state = "DONE" 

176 

177 

178_STOP = object() 

179 

180 

181class TaskDBBase(ABC): 

182 """ 

183 Abstract base class providing default no-op implementations for maintenance methods. 

184 Actual backends should implement TaskDBCore and optionally TaskDBMaintenance. 

185 """ 

186 

187 @abstractmethod 

188 def init_schema(self): 

189 pass 

190 

191 @abstractmethod 

192 def get( 

193 self, cache_key: str, *, include_expired: bool = False 

194 ) -> Optional[TaskRecord]: 

195 pass 

196 

197 @abstractmethod 

198 def save( 

199 self, 

200 cache_key: str, 

201 func_name: str, 

202 func_identifier: Optional[str], 

203 input_id: str, 

204 version: Optional[str], 

205 result_type: str, 

206 content_type: Optional[str], 

207 result_value: Optional[str] = None, 

208 result_data: Optional[bytes] = None, 

209 expires_at: Optional[datetime] = None, 

210 ): 

211 pass 

212 

213 @abstractmethod 

214 def delete(self, cache_key: str) -> bool: 

215 pass 

216 

217 # --- Maintenance Methods (Default implementations) --- 

218 def delete_expired(self) -> int: 

219 """Delete tasks that have passed their expiration time.""" 

220 return 0 

221 

222 def prune(self, older_than: datetime, func_name: Optional[str] = None) -> int: 

223 """Delete tasks older than the specified datetime.""" 

224 return 0 

225 

226 def get_outdated_tasks( 

227 self, older_than: datetime, func_name: Optional[str] = None 

228 ) -> list[tuple[str, str, str]]: 

229 """Retrieve tasks older than the specified datetime (Preview for prune).""" 

230 return [] 

231 

232 def get_blob_refs(self) -> Optional[set[str]]: 

233 """Retrieve all 'result_value' entries that point to external storage.""" 

234 return None 

235 

236 def delete_all(self, func_name: Optional[str] = None) -> int: 

237 """Delete all tasks, optionally filtered by function name.""" 

238 return 0 

239 

240 def get_keys_start_with(self, prefix: str) -> list[str]: 

241 """Retrieve cache keys that start with the given prefix.""" 

242 return [] 

243 

244 def get_history(self, limit: int = 1000) -> "pd.DataFrame": 

245 """Get task history. Returns an empty DataFrame by default.""" 

246 try: 

247 import pandas as pd 

248 

249 return pd.DataFrame() 

250 except ImportError: 

251 raise ImportError("Pandas is required for this feature.") 

252 

253 

254class WriterTaintedError(RuntimeError): 

255 """ 

256 Indicates that the SQLite writer thread is hung beyond repair. 

257 Manual reset or process restart is required. 

258 """ 

259 pass 

260 

261 

262class SQLiteTaskDB(TaskDBCore, Flushable, Shutdownable, Maintenable): 

263 """ 

264 Default implementation using SQLite. 

265 """ 

266 

267 def __init__(self, db_path: str | Path | None = None, timeout: float = 30.0): 

268 self.db_path = ( 

269 Path(db_path).resolve() if db_path else Path(f".beautyspot/{hash(self)}.db") 

270 ) 

271 self._ensure_cache_dir(self.db_path.parent) 

272 self.timeout = timeout 

273 self._local = threading.local() 

274 self._write_queue: queue.Queue[object] = queue.Queue() 

275 self._shutdown_lock = threading.Lock() 

276 self._shutdown_requested = False 

277 self._writer_ready = threading.Event() 

278 self._writer_error: Exception | None = None 

279 self._writer_tainted = False 

280 self._writer_generation = 0 

281 # ライタースレッドの接続を保持(interrupt() 用) 

282 self._writer_conn: sqlite3.Connection | None = None 

283 self._writer_conn_lock = threading.Lock() 

284 # 読み取り専用スレッドローカル接続を追跡し、 

285 # shutdown() 時に一括クローズする。WAL チェックポイントの妨げを防ぐ。 

286 self._read_wrappers = weakref.WeakSet() 

287 self._read_conns_lock = threading.Lock() 

288 self._writer_thread = threading.Thread( 

289 target=self._writer_loop, 

290 args=(self._writer_generation,), 

291 daemon=True, 

292 name=f"BeautySpot-SQLiteWriter-{self._writer_generation}", 

293 ) 

294 self._writer_thread.start() 

295 self._writer_ready.wait() 

296 if self._writer_error: 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true

297 raise self._writer_error 

298 

299 def reset(self) -> None: 

300 """ 

301 Forcefully restart the writer thread. Use this if the writer is tainted or hung. 

302 """ 

303 with self._shutdown_lock: 

304 if self._shutdown_requested: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true

305 return 

306 

307 # 1. 現在の接続に interrupt を試みる(もし生きていれば) 

308 with self._writer_conn_lock: 

309 if self._writer_conn: 309 ↛ 316line 309 didn't jump to line 316

310 try: 

311 self._writer_conn.interrupt() 

312 except Exception: 

313 pass 

314 

315 # 2. 世代を上げ、汚染フラグを下げる 

316 self._writer_generation += 1 

317 self._writer_tainted = False 

318 self._writer_ready = threading.Event() 

319 self._writer_error = None 

320 

321 # 3. 新しいスレッドを起動 

322 self._writer_thread = threading.Thread( 

323 target=self._writer_loop, 

324 args=(self._writer_generation,), 

325 daemon=True, 

326 name=f"BeautySpot-SQLiteWriter-{self._writer_generation}", 

327 ) 

328 self._writer_thread.start() 

329 self._writer_ready.wait() 

330 if self._writer_error: 330 ↛ 331line 330 didn't jump to line 331 because the condition on line 330 was never true

331 raise self._writer_error 

332 logger.info( 

333 f"SQLiteTaskDB writer thread reset to generation {self._writer_generation}" 

334 ) 

335 

336 @staticmethod 

337 def _ensure_cache_dir(directory: Path) -> None: 

338 """ 

339 データベース格納用の親ディレクトリを作成し、.gitignore を配置する。 

340 """ 

341 directory.mkdir(parents=True, exist_ok=True) 

342 gitignore_path = directory / ".gitignore" 

343 if not gitignore_path.exists(): 

344 try: 

345 gitignore_path.write_text("*\n") 

346 except OSError as e: 

347 logging.warning(f"Failed to create .gitignore in {directory}: {e}") 

348 

349 @contextmanager 

350 def _read_connect(self) -> Iterator[sqlite3.Connection]: 

351 """ 

352 Thread-safe connection context manager for read-only operations. 

353 Uses a thread-local pool to reuse connections and reduce overhead. 

354 PRAGMA query_only = ON により、誤った書き込みを接続レベルで防止する。 

355 

356 新規接続を _read_wrappers に登録し、 

357 shutdown() 時の一括クローズで WAL チェックポイント妨害を防ぐ。 

358 また、_ReadConnWrapper によってスレッド終了時に接続がクローズされ、メモリリークを防止。 

359 """ 

360 if self._shutdown_requested: 360 ↛ 361line 360 didn't jump to line 361 because the condition on line 360 was never true

361 raise RuntimeError("SQLiteTaskDB is shutting down.") 

362 

363 wrapper = getattr(self._local, "read_conn_wrapper", None) 

364 if wrapper is None or wrapper._closed: 

365 # シャットダウン後に新しい接続がリークするのを防ぐため再チェック。 

366 # 最初のチェック通過後に別スレッドが shutdown() を呼び出し、 

367 # 全ラッパーをクローズした場合にここに到達する。 

368 if self._shutdown_requested: 368 ↛ 369line 368 didn't jump to line 369 because the condition on line 368 was never true

369 raise RuntimeError("SQLiteTaskDB is shutting down.") 

370 conn = sqlite3.connect( 

371 self.db_path, timeout=self.timeout, check_same_thread=False 

372 ) 

373 try: 

374 conn.execute("PRAGMA query_only = ON;") 

375 except Exception: 

376 conn.close() 

377 raise 

378 wrapper = _ReadConnWrapper(conn) 

379 with self._read_conns_lock: 

380 # ロック内で再度チェックし、shutdown() による _read_wrappers.clear() と 

381 # 新規追加の間の競合を完全に排除する。 

382 if self._shutdown_requested: 382 ↛ 383line 382 didn't jump to line 383 because the condition on line 382 was never true

383 conn.close() 

384 raise RuntimeError("SQLiteTaskDB is shutting down.") 

385 self._read_wrappers.add(wrapper) 

386 self._local.read_conn_wrapper = wrapper 

387 

388 with wrapper.lock: 

389 if wrapper._closed: 389 ↛ 390line 389 didn't jump to line 390 because the condition on line 389 was never true

390 raise RuntimeError("Database connection was closed") 

391 try: 

392 yield wrapper.conn 

393 except sqlite3.Error: 

394 # 接続が壊れた場合等のリカバリ (BUG-3) 

395 # 現在の接続を破棄し、次回のアクセス時に新しく作り直す 

396 wrapper.close() 

397 with self._read_conns_lock: 

398 self._read_wrappers.discard(wrapper) 

399 self._local.read_conn_wrapper = None 

400 raise 

401 

402 # クエリ実行中にシャットダウン要求があった場合、自身でクローズする 

403 if getattr(wrapper, "_shutdown_requested", False): 403 ↛ 404line 403 didn't jump to line 404 because the condition on line 403 was never true

404 wrapper.close() 

405 with self._read_conns_lock: 

406 self._read_wrappers.discard(wrapper) 

407 self._local.read_conn_wrapper = None 

408 

409 def _writer_loop(self, generation: int) -> None: 

410 conn: sqlite3.Connection | None = None 

411 try: 

412 conn = sqlite3.connect(self.db_path, timeout=self.timeout) 

413 conn.execute("PRAGMA journal_mode=WAL;") 

414 with self._writer_conn_lock: 

415 if generation == self._writer_generation: 415 ↛ 423line 415 didn't jump to line 423

416 self._writer_conn = conn 

417 except Exception as e: 

418 if generation == self._writer_generation: 

419 self._writer_error = e 

420 self._writer_ready.set() 

421 return 

422 

423 self._writer_ready.set() 

424 try: 

425 while True: 

426 # 世代チェック: 自分が最新でないなら終了 

427 if generation != self._writer_generation: 

428 break 

429 

430 try: 

431 task = self._write_queue.get(timeout=1.0) 

432 except queue.Empty: 

433 continue 

434 

435 if task is _STOP: 

436 self._write_queue.task_done() 

437 break 

438 assert isinstance(task, _WriteTask) 

439 if not task.try_start(): 

440 # CANCELLED 状態 — スキップ 

441 task.event.set() 

442 self._write_queue.task_done() 

443 continue 

444 

445 try: 

446 task.result = task.fn(conn) 

447 conn.commit() 

448 except sqlite3.OperationalError as e: 

449 # interrupt() によって中断された場合も含めロールバック 

450 conn.rollback() 

451 task.error = e 

452 except BaseException as e: 

453 conn.rollback() 

454 task.error = e 

455 finally: 

456 task.mark_done() 

457 task.event.set() 

458 self._write_queue.task_done() 

459 finally: 

460 with self._writer_conn_lock: 

461 if self._writer_conn is conn: 

462 self._writer_conn = None 

463 if conn is not None: 463 ↛ exitline 463 didn't return from function '_writer_loop' because the condition on line 463 was always true

464 try: 

465 conn.close() 

466 except Exception: 

467 pass 

468 

469 def _enqueue_write(self, fn: Callable[[sqlite3.Connection], Any]) -> Any: 

470 self._writer_ready.wait() 

471 if self._writer_error: 471 ↛ 472line 471 didn't jump to line 472 because the condition on line 471 was never true

472 raise RuntimeError( 

473 "SQLite writer thread failed to start." 

474 ) from self._writer_error 

475 

476 with self._shutdown_lock: 

477 if self._writer_tainted: 

478 raise WriterTaintedError( 

479 "SQLite writer thread is tainted (hung) and cannot process writes. " 

480 "Call reset() or restart the process." 

481 ) 

482 if self._shutdown_requested: 482 ↛ 483line 482 didn't jump to line 483 because the condition on line 482 was never true

483 raise RuntimeError("SQLiteTaskDB is shutting down.") 

484 if not self._writer_thread.is_alive(): 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true

485 raise RuntimeError("SQLite writer thread is not running.") 

486 task = _WriteTask(fn=fn, event=threading.Event()) 

487 self._write_queue.put(task) 

488 

489 start = time.monotonic() 

490 while not task.event.wait(timeout=0.5): 

491 if not self._writer_thread.is_alive(): 491 ↛ 492line 491 didn't jump to line 492 because the condition on line 491 was never true

492 raise RuntimeError("SQLite writer thread stopped unexpectedly.") 

493 if self._shutdown_requested: 493 ↛ 494line 493 didn't jump to line 494 because the condition on line 493 was never true

494 raise RuntimeError("SQLiteTaskDB is shutting down.") 

495 

496 # 待機中に汚染された場合 

497 if self._writer_tainted: 497 ↛ 498line 497 didn't jump to line 498 because the condition on line 497 was never true

498 raise WriterTaintedError("Writer thread became tainted during wait.") 

499 

500 elapsed = time.monotonic() - start 

501 if elapsed > self.timeout: 501 ↛ 490line 501 didn't jump to line 490 because the condition on line 501 was always true

502 if task.try_cancel(): 

503 # PENDING(未着手)のタスクはキャンセル可能 

504 raise TimeoutError( 

505 f"SQLite write did not start within {self.timeout}s and was cancelled." 

506 ) 

507 else: 

508 # RUNNING(実行中)のタスクは interrupt() を試みる。 

509 with self._writer_conn_lock: 

510 if self._writer_conn: 510 ↛ 514line 510 didn't jump to line 514

511 self._writer_conn.interrupt() 

512 

513 # interrupt() 後、ライタースレッドが例外を処理して event を set するのを少し待つ。 

514 if task.event.wait(timeout=1.0): 

515 break 

516 else: 

517 # interrupt() が効かない場合、Writerスレッドを「汚染」としてマークする。 

518 self._writer_tainted = True 

519 raise WriterTaintedError( 

520 f"SQLite write timed out after {self.timeout}s and interrupt failed. " 

521 "Writer thread is now tainted." 

522 ) 

523 if task.error: 

524 if isinstance(task.error, sqlite3.OperationalError) and "interrupted" in str(task.error).lower(): 524 ↛ 528line 524 didn't jump to line 528 because the condition on line 524 was always true

525 raise TimeoutError( 

526 f"SQLite write timed out after {self.timeout}s and was interrupted." 

527 ) from task.error 

528 raise task.error 

529 return task.result 

530 

531 def shutdown(self, wait: bool = True) -> None: 

532 with self._shutdown_lock: 

533 if self._shutdown_requested: 533 ↛ 534line 533 didn't jump to line 534 because the condition on line 533 was never true

534 return 

535 self._shutdown_requested = True 

536 

537 if not self._writer_thread.is_alive(): 537 ↛ 538line 537 didn't jump to line 538 because the condition on line 537 was never true

538 logger.error( 

539 "SQLite writer thread is not running; pending writes may be lost." 

540 ) 

541 return 

542 

543 if wait: 

544 self._write_queue.join() 

545 self._write_queue.put(_STOP) 

546 if wait: 

547 self._writer_thread.join() 

548 

549 # 全スレッドのread-only接続を一括クローズ。 

550 # スレッドローカル接続が開いたままだと WAL チェックポイントを妨げるため。 

551 with self._read_conns_lock: 

552 for wrapper in self._read_wrappers: 

553 try: 

554 wrapper.close(wait=False) 

555 except Exception: 

556 pass 

557 self._read_wrappers.clear() 

558 

559 def init_schema(self): 

560 # スキーマ初期化および全マイグレーションを Writer Thread の接続で実行する。 

561 # _connect() による別コネクション経由の DDL は、Writer Thread が保持する 

562 # WAL ライターロックと競合するリスクがあるため、_enqueue_write に委譲する。 

563 def _op(conn: sqlite3.Connection) -> None: 

564 conn.execute("PRAGMA journal_mode=WAL;") 

565 conn.execute(""" 

566 CREATE TABLE IF NOT EXISTS tasks ( 

567 cache_key TEXT PRIMARY KEY, 

568 func_name TEXT, 

569 func_identifier TEXT, 

570 input_id TEXT, 

571 result_type TEXT, 

572 result_value TEXT, 

573 result_data BLOB, 

574 content_type TEXT, 

575 version TEXT, 

576 expires_at TIMESTAMP, 

577 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 

578 ) 

579 """) 

580 

581 # Auto Migration 

582 cursor = conn.execute("PRAGMA table_info(tasks)") 

583 columns = [row[1] for row in cursor.fetchall()] 

584 

585 if "content_type" not in columns: 585 ↛ 586line 585 didn't jump to line 586 because the condition on line 585 was never true

586 try: 

587 conn.execute("ALTER TABLE tasks ADD COLUMN content_type TEXT;") 

588 except sqlite3.OperationalError as e: 

589 if "duplicate column name" not in str(e).lower(): 

590 raise 

591 pass 

592 if "version" not in columns: 592 ↛ 593line 592 didn't jump to line 593 because the condition on line 592 was never true

593 try: 

594 conn.execute("ALTER TABLE tasks ADD COLUMN version TEXT;") 

595 except sqlite3.OperationalError as e: 

596 if "duplicate column name" not in str(e).lower(): 

597 raise 

598 pass 

599 if "result_data" not in columns: 599 ↛ 600line 599 didn't jump to line 600 because the condition on line 599 was never true

600 try: 

601 conn.execute("ALTER TABLE tasks ADD COLUMN result_data BLOB;") 

602 except sqlite3.OperationalError as e: 

603 if "duplicate column name" not in str(e).lower(): 

604 raise 

605 pass 

606 

607 if "func_identifier" not in columns: 607 ↛ 608line 607 didn't jump to line 608 because the condition on line 607 was never true

608 try: 

609 conn.execute("ALTER TABLE tasks ADD COLUMN func_identifier TEXT;") 

610 except sqlite3.OperationalError as e: 

611 if "duplicate column name" not in str(e).lower(): 

612 raise 

613 pass 

614 conn.execute( 

615 "CREATE INDEX IF NOT EXISTS idx_func_identifier ON tasks(func_identifier);" 

616 ) 

617 

618 if "expires_at" not in columns: 

619 try: 

620 conn.execute("ALTER TABLE tasks ADD COLUMN expires_at TIMESTAMP;") 

621 except sqlite3.OperationalError as e: 

622 if "duplicate column name" not in str(e).lower(): 

623 raise 

624 pass 

625 conn.execute( 

626 "CREATE INDEX IF NOT EXISTS idx_expires_at ON tasks(expires_at);" 

627 ) 

628 

629 self._enqueue_write(_op) 

630 

631 def get( 

632 self, cache_key: str, *, include_expired: bool = False 

633 ) -> Optional[TaskRecord]: 

634 with self._read_connect() as conn: 

635 # [MOD] Include expires_at in query 

636 row = conn.execute( 

637 "SELECT result_type, result_value, result_data, expires_at FROM tasks WHERE cache_key=?", 

638 (cache_key,), 

639 ).fetchone() 

640 

641 if row: 

642 r_type, r_val, r_data, exp_str = row 

643 

644 # [ADD] Lazy Expiration Check (skip when include_expired=True) 

645 if exp_str and not include_expired: 

646 try: 

647 # SQLite returns timestamps as strings usually 

648 # Replace space with T for compatibility with Python <= 3.10 

649 expires_at = datetime.fromisoformat(exp_str.replace(" ", "T")) 

650 # Naive datetimes stored before timezone support are treated as UTC 

651 if expires_at.tzinfo is None: 651 ↛ 652line 651 didn't jump to line 652 because the condition on line 651 was never true

652 expires_at = expires_at.replace(tzinfo=timezone.utc) 

653 if expires_at < datetime.now(timezone.utc): 

654 # Expired -> Treat as Cache Miss 

655 # (Physical deletion is deferred to `beautyspot gc`) 

656 return None 

657 except (ValueError, TypeError): 

658 pass # Ignore parsing errors, treat as valid 

659 

660 return TaskRecord( 

661 result_type=r_type, 

662 result_value=r_val, 

663 result_data=r_data, 

664 expires_at=exp_str, 

665 ) 

666 return None 

667 

668 def save( 

669 self, 

670 cache_key: str, 

671 func_name: str, 

672 func_identifier: Optional[str], 

673 input_id: str, 

674 version: Optional[str], 

675 result_type: str, 

676 content_type: Optional[str], 

677 result_value: Optional[str] = None, 

678 result_data: Optional[bytes] = None, 

679 expires_at: Optional[datetime] = None, # [ADD] Added argument 

680 ): 

681 def _op(conn: sqlite3.Connection) -> None: 

682 effective_identifier = func_identifier or func_name 

683 # updated_at を明示的に設定し、expires_at と同じ形式 

684 # (_ensure_utc_isoformat) で統一する。 

685 # SQLite の DEFAULT CURRENT_TIMESTAMP は秒精度でフォーマットが異なるため、 

686 # prune/get_outdated_tasks との比較精度を揃える。 

687 now_str = _ensure_utc_isoformat(datetime.now(timezone.utc)) 

688 conn.execute( 

689 """ 

690 INSERT OR REPLACE INTO tasks 

691 (cache_key, func_name, func_identifier, input_id, version, result_type, content_type, result_value, result_data, expires_at, updated_at) 

692 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 

693 """, 

694 ( 

695 cache_key, 

696 func_name, 

697 effective_identifier, 

698 input_id, 

699 version, 

700 result_type, 

701 content_type, 

702 result_value, 

703 result_data, 

704 _ensure_utc_isoformat(expires_at), 

705 now_str, 

706 ), 

707 ) 

708 

709 self._enqueue_write(_op) 

710 

711 def get_history(self, limit: int = 1000) -> "pd.DataFrame": 

712 try: 

713 import pandas as pd 

714 except ImportError as e: 

715 raise ImportError("Pandas is required for this feature.") from e 

716 

717 if not os.path.exists(self.db_path): 717 ↛ 718line 717 didn't jump to line 718 because the condition on line 717 was never true

718 return pd.DataFrame() 

719 

720 with self._read_connect() as conn: 

721 query = """ 

722 SELECT 

723 cache_key, func_name, func_identifier, input_id, version, result_type, 

724 content_type, result_value, result_data, updated_at, expires_at 

725 FROM tasks 

726 ORDER BY updated_at DESC LIMIT ? 

727 """ 

728 return pd.read_sql_query(query, conn, params=[limit]) 

729 

730 def delete(self, cache_key: str) -> bool: 

731 def _op(conn: sqlite3.Connection) -> bool: 

732 cursor = conn.execute("DELETE FROM tasks WHERE cache_key=?", (cache_key,)) 

733 return cursor.rowcount > 0 

734 

735 return bool(self._enqueue_write(_op)) 

736 

737 def delete_all(self, func_name: Optional[str] = None) -> int: 

738 def _op(conn: sqlite3.Connection) -> int: 

739 if func_name: 

740 cursor = conn.execute( 

741 "DELETE FROM tasks WHERE func_name = ? OR func_identifier = ?", 

742 (func_name, func_name), 

743 ) 

744 else: 

745 cursor = conn.execute("DELETE FROM tasks") 

746 return cursor.rowcount 

747 

748 return int(self._enqueue_write(_op)) 

749 

750 def prune(self, older_than: datetime, func_name: Optional[str] = None) -> int: 

751 cutoff_str = _ensure_utc_isoformat(older_than) 

752 

753 def _op(conn: sqlite3.Connection) -> int: 

754 if func_name: 

755 cursor = conn.execute( 

756 "DELETE FROM tasks WHERE updated_at < ? AND (func_name = ? OR func_identifier = ?)", 

757 (cutoff_str, func_name, func_name), 

758 ) 

759 else: 

760 cursor = conn.execute( 

761 "DELETE FROM tasks WHERE updated_at < ?", 

762 (cutoff_str,), 

763 ) 

764 return cursor.rowcount 

765 

766 return int(self._enqueue_write(_op)) 

767 

768 def get_outdated_tasks( 

769 self, older_than: datetime, func_name: Optional[str] = None 

770 ) -> list[tuple[str, str, str]]: 

771 cutoff_str = _ensure_utc_isoformat(older_than) 

772 if not os.path.exists(self.db_path): 772 ↛ 773line 772 didn't jump to line 773 because the condition on line 772 was never true

773 return [] 

774 

775 with self._read_connect() as conn: 

776 if func_name: 

777 cursor = conn.execute( 

778 "SELECT cache_key, COALESCE(func_identifier, func_name), updated_at FROM tasks " 

779 "WHERE updated_at < ? AND (func_name = ? OR func_identifier = ?)", 

780 (cutoff_str, func_name, func_name), 

781 ) 

782 else: 

783 cursor = conn.execute( 

784 "SELECT cache_key, COALESCE(func_identifier, func_name), updated_at FROM tasks WHERE updated_at < ?", 

785 (cutoff_str,), 

786 ) 

787 return [(row[0], row[1], str(row[2])) for row in cursor.fetchall()] 

788 

789 def delete_expired(self) -> int: 

790 if not os.path.exists(self.db_path): 790 ↛ 791line 790 didn't jump to line 791 because the condition on line 790 was never true

791 return 0 

792 

793 # save() と同じ _ensure_utc_isoformat を使い、フォーマットを統一する 

794 now_str = _ensure_utc_isoformat(datetime.now(timezone.utc)) 

795 

796 def _op(conn: sqlite3.Connection) -> int: 

797 cursor = conn.execute( 

798 "DELETE FROM tasks WHERE expires_at IS NOT NULL AND expires_at < ?", 

799 (now_str,), 

800 ) 

801 return cursor.rowcount 

802 

803 return int(self._enqueue_write(_op)) 

804 

805 def get_blob_refs(self) -> Optional[set[str]]: 

806 if not os.path.exists(self.db_path): 806 ↛ 807line 806 didn't jump to line 807 because the condition on line 806 was never true

807 return set() 

808 

809 with self._read_connect() as conn: 

810 cursor = conn.execute( 

811 "SELECT result_value FROM tasks WHERE result_type = 'FILE' AND result_value IS NOT NULL" 

812 ) 

813 # Return full location strings for precise matching 

814 return {row[0] for row in cursor.fetchall() if row[0]} 

815 

816 def get_keys_start_with(self, prefix: str) -> list[str]: 

817 if not os.path.exists(self.db_path): 817 ↛ 818line 817 didn't jump to line 818 because the condition on line 817 was never true

818 return [] 

819 

820 with self._read_connect() as conn: 

821 # LIKE ワイルドカード文字をエスケープしてプレフィックス検索 

822 escaped = ( 

823 prefix.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") 

824 ) 

825 cursor = conn.execute( 

826 "SELECT cache_key FROM tasks WHERE cache_key LIKE ? ESCAPE '\\' LIMIT 50", 

827 (f"{escaped}%",), 

828 ) 

829 return [row[0] for row in cursor.fetchall()] 

830 

831 @staticmethod 

832 def count_tasks(db_path: Path, timeout: float = 5.0) -> int: 

833 """ 

834 Writer Thread を起動せずに tasks テーブルの件数を返す軽量ユーティリティ。 

835 CLI の一覧表示など、読み込みのみを目的とした用途向け。 

836 エラー時は -1 を返す。 

837 """ 

838 try: 

839 conn = sqlite3.connect(str(db_path), timeout=timeout) 

840 try: 

841 # 読み取り専用ユーティリティに journal_mode=WAL 設定は不要。 

842 # query_only=ON との組み合わせで動作が曖昧になる可能性もあるため削除。 

843 conn.execute("PRAGMA query_only = ON;") 

844 cursor = conn.execute("SELECT COUNT(*) FROM tasks") 

845 result = cursor.fetchone() 

846 return result[0] if result else 0 

847 finally: 

848 conn.close() 

849 except Exception: 

850 return -1 

851 

852 def flush(self, timeout: Optional[float] = None) -> bool: 

853 """ 

854 キューに溜まっているすべての書き込み操作が完了するまで待機します。 

855 

856 No-op(何もしない)タスクをキューの末尾に挿入し、そのタスクが処理されるまで 

857 待機することで、先行するすべてのタスクの完了を保証します。 

858 

859 Args: 

860 timeout: 待機する最大秒数。タイムアウトした場合は False を返します。 

861 None の場合は無期限に待機しますが、ライタースレッドの 

862 死活監視ループにより永久ハングは防止されます。 

863 """ 

864 self._writer_ready.wait() 

865 

866 # キューをフラッシュするためのダミータスク 

867 def _noop_op(conn: sqlite3.Connection) -> None: 

868 pass 

869 

870 task = _WriteTask(fn=_noop_op, event=threading.Event()) 

871 

872 # shutdown との TOCTOU を防ぐため、ロック内でチェックと put を原子的に行う 

873 with self._shutdown_lock: 

874 if self._shutdown_requested or not self._writer_thread.is_alive(): 874 ↛ 875line 874 didn't jump to line 875 because the condition on line 874 was never true

875 return False 

876 self._write_queue.put(task) 

877 

878 # ライタースレッドの死活を定期確認しながら待機する。 

879 # timeout=None で event.wait() を直接呼ぶとスレッド死亡時に永久ハングするため、 

880 # ポーリングループで代替する。 

881 _POLL = 0.5 

882 deadline = (time.monotonic() + timeout) if timeout is not None else None 

883 

884 while True: 

885 remaining = ( 

886 max(0.0, deadline - time.monotonic()) if deadline is not None else None 

887 ) 

888 wait_time = _POLL if remaining is None else min(_POLL, remaining) 

889 

890 if task.event.wait(timeout=wait_time): 

891 return True 

892 

893 if not self._writer_thread.is_alive(): 893 ↛ 894line 893 didn't jump to line 894 because the condition on line 893 was never true

894 logger.error( 

895 "SQLite writer thread died unexpectedly while waiting for flush." 

896 ) 

897 return False 

898 

899 if deadline is not None and time.monotonic() >= deadline: 

900 return False