Coverage for src / beautyspot / db.py: 74%

472 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-03-18 18:20 +0900

1# src/beautyspot/db.py 

2 

3import sqlite3 

4import os 

5import logging 

6import queue 

7import threading 

8import time 

9from collections.abc import Iterator 

10from contextlib import contextmanager 

11import dataclasses 

12from dataclasses import dataclass 

13from datetime import datetime, timezone 

14from pathlib import Path 

15from abc import ABC, abstractmethod 

16from typing import Optional, TYPE_CHECKING, Any, Callable, Protocol, runtime_checkable 

17import weakref 

18from beautyspot.types import TaskRecord 

19 

20 

21@runtime_checkable 

22class TaskDBCore(Protocol): 

23 """ 

24 Core interface for task metadata storage required during execution. 

25 """ 

26 

27 def init_schema(self) -> None: ... 

28 

29 def get( 

30 self, cache_key: str, *, include_expired: bool = False 

31 ) -> Optional[TaskRecord]: ... 

32 

33 def save( 

34 self, 

35 cache_key: str, 

36 func_name: str, 

37 func_identifier: Optional[str], 

38 input_id: str, 

39 version: Optional[str], 

40 result_type: str, 

41 content_type: Optional[str], 

42 result_value: Optional[str] = None, 

43 result_data: Optional[bytes] = None, 

44 expires_at: Optional[datetime] = None, 

45 ) -> None: ... 

46 

47 def delete(self, cache_key: str) -> bool: ... 

48 

49 

50@runtime_checkable 

51class Flushable(Protocol): 

52 """Protocol for backends that support flushing pending writes.""" 

53 

54 def flush(self, timeout: Optional[float] = None) -> bool: ... 

55 

56 

57@runtime_checkable 

58class Shutdownable(Protocol): 

59 """Protocol for backends that require graceful shutdown.""" 

60 

61 def shutdown(self, wait: bool = True) -> None: ... 

62 

63 

64@runtime_checkable 

65class Maintenable(Protocol): 

66 """ 

67 Extended interface for maintenance tasks (GC, CLI, Dashboard). 

68 """ 

69 

70 def delete_expired(self) -> int: ... 

71 

72 def prune(self, older_than: datetime, func_name: Optional[str] = None) -> int: ... 

73 

74 def get_outdated_tasks( 

75 self, older_than: datetime, func_name: Optional[str] = None 

76 ) -> list[tuple[str, str, str]]: ... 

77 

78 def get_blob_refs(self) -> Optional[set[str]]: ... 

79 

80 def delete_all(self, func_name: Optional[str] = None) -> int: ... 

81 

82 def get_keys_start_with(self, prefix: str) -> list[str]: ... 

83 

84 def get_history(self, limit: int = 1000) -> "pd.DataFrame": ... 

85 

86 

87@runtime_checkable 

88class TaskDBMaintenable(TaskDBCore, Maintenable, Protocol): 

89 ... 

90 

91 

92 

93class _ReadConnWrapper: 

94 def __init__(self, conn: sqlite3.Connection): 

95 self.conn = conn 

96 self.lock = threading.RLock() 

97 self._closed = False 

98 self._shutdown_requested = False 

99 

100 def close(self, wait: bool = True): 

101 """ 

102 Args: 

103 wait: True の場合はロック解放を待機。 

104 False (シャットダウン時) の場合は即座に試行し、他が使用中ならスキップする。 

105 """ 

106 # wait=False の場合は blocking=False になり、取得できなければ直ちに False を返す 

107 if not self.lock.acquire(blocking=wait): 

108 # 誰かがクエリ実行中なので、強制クローズによるクラッシュを防ぐために諦める。 

109 # 代わりにシャットダウン要求フラグを立てて、クエリ完了後に自身でクローズさせる。 

110 if not wait: 110 ↛ 112line 110 didn't jump to line 112 because the condition on line 110 was always true

111 self._shutdown_requested = True 

112 return 

113 

114 try: 

115 if not self._closed: 

116 try: 

117 self.conn.close() 

118 except Exception: 

119 pass 

120 self._closed = True 

121 finally: 

122 self.lock.release() 

123 

124 def __del__(self): 

125 self.close() 

126 

127 

128if TYPE_CHECKING: 

129 import pandas as pd 

130 

131logger = logging.getLogger(__name__) 

132logger.addHandler(logging.NullHandler()) 

133 

134 

135def _ensure_utc_isoformat(dt: Optional[datetime]) -> Optional[str]: 

136 """datetime を UTC 保証の ISO 8601 文字列に変換する。None はそのまま返す。""" 

137 if dt is None: 

138 return None 

139 if dt.tzinfo is None: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true

140 dt = dt.replace(tzinfo=timezone.utc) 

141 else: 

142 dt = dt.astimezone(timezone.utc) 

143 return dt.isoformat(" ") 

144 

145 

146@dataclass 

147class _WriteTask: 

148 fn: Callable[[sqlite3.Connection], Any] 

149 event: threading.Event 

150 result: Any = None 

151 error: Exception | BaseException | None = None 

152 state: str = "PENDING" # "PENDING", "RUNNING", "DONE", "CANCELLED" 

153 _state_lock: threading.Lock = dataclasses.field(default_factory=threading.Lock) 

154 

155 def try_cancel(self) -> bool: 

156 """PENDING 状態のタスクをキャンセルする。成功時 True。""" 

157 with self._state_lock: 

158 if self.state == "PENDING": 

159 self.state = "CANCELLED" 

160 return True 

161 return False 

162 

163 def try_start(self) -> bool: 

164 """PENDING → RUNNING に遷移する。CANCELLED なら False を返す。""" 

165 with self._state_lock: 

166 if self.state == "CANCELLED": 

167 return False 

168 self.state = "RUNNING" 

169 return True 

170 

171 def mark_done(self) -> None: 

172 """RUNNING → DONE に遷移する。""" 

173 with self._state_lock: 

174 if self.state != "CANCELLED": 174 ↛ exitline 174 didn't jump to the function exit

175 self.state = "DONE" 

176 

177 

178_STOP = object() 

179 

180 

181class TaskDBBase(ABC): 

182 """ 

183 Abstract base class providing default no-op implementations for maintenance methods. 

184 Actual backends should implement TaskDBCore and optionally TaskDBMaintenance. 

185 """ 

186 

187 @abstractmethod 

188 def init_schema(self): 

189 pass 

190 

191 @abstractmethod 

192 def get( 

193 self, cache_key: str, *, include_expired: bool = False 

194 ) -> Optional[TaskRecord]: 

195 pass 

196 

197 @abstractmethod 

198 def save( 

199 self, 

200 cache_key: str, 

201 func_name: str, 

202 func_identifier: Optional[str], 

203 input_id: str, 

204 version: Optional[str], 

205 result_type: str, 

206 content_type: Optional[str], 

207 result_value: Optional[str] = None, 

208 result_data: Optional[bytes] = None, 

209 expires_at: Optional[datetime] = None, 

210 ): 

211 pass 

212 

213 @abstractmethod 

214 def delete(self, cache_key: str) -> bool: 

215 pass 

216 

217 # --- Maintenance Methods (Default implementations) --- 

218 def delete_expired(self) -> int: 

219 """Delete tasks that have passed their expiration time.""" 

220 return 0 

221 

222 def prune(self, older_than: datetime, func_name: Optional[str] = None) -> int: 

223 """Delete tasks older than the specified datetime.""" 

224 return 0 

225 

226 def get_outdated_tasks( 

227 self, older_than: datetime, func_name: Optional[str] = None 

228 ) -> list[tuple[str, str, str]]: 

229 """Retrieve tasks older than the specified datetime (Preview for prune).""" 

230 return [] 

231 

232 def get_blob_refs(self) -> Optional[set[str]]: 

233 """Retrieve all 'result_value' entries that point to external storage.""" 

234 return None 

235 

236 def delete_all(self, func_name: Optional[str] = None) -> int: 

237 """Delete all tasks, optionally filtered by function name.""" 

238 return 0 

239 

240 def get_keys_start_with(self, prefix: str) -> list[str]: 

241 """Retrieve cache keys that start with the given prefix.""" 

242 return [] 

243 

244 def get_history(self, limit: int = 1000) -> "pd.DataFrame": 

245 """Get task history. Returns an empty DataFrame by default.""" 

246 try: 

247 import pandas as pd 

248 

249 return pd.DataFrame() 

250 except ImportError: 

251 raise ImportError("Pandas is required for this feature.") 

252 

253 

254class WriterTaintedError(RuntimeError): 

255 """ 

256 Indicates that the SQLite writer thread is hung beyond repair. 

257 Manual reset or process restart is required. 

258 """ 

259 pass 

260 

261 

262class SQLiteTaskDB(TaskDBMaintenable, Flushable, Shutdownable): 

263 """ 

264 Default implementation using SQLite. 

265 """ 

266 

267 def __init__(self, db_path: str | Path | None = None, timeout: float = 30.0): 

268 if db_path is not None: 268 ↛ 273line 268 didn't jump to line 273 because the condition on line 268 was always true

269 self.db_path = Path(db_path).resolve() 

270 else: 

271 # hash(self) は PYTHONHASHSEED に依存し実行ごとに変わるため、 

272 # 決定的なデフォルトパスを使用する。 

273 self.db_path = Path(".beautyspot/default.db").resolve() 

274 logger.warning( 

275 "db_path not specified; defaulting to '%s'. " 

276 "Use bs.Spot(name=...) for automatic path management.", 

277 self.db_path, 

278 ) 

279 self._ensure_cache_dir(self.db_path.parent) 

280 self.timeout = timeout 

281 self._local = threading.local() 

282 self._write_queue: queue.Queue[object] = queue.Queue() 

283 self._shutdown_lock = threading.Lock() 

284 self._shutdown_requested = False 

285 self._writer_ready = threading.Event() 

286 self._writer_error: Exception | None = None 

287 self._writer_tainted = False 

288 self._writer_generation = 0 

289 # ライタースレッドの接続を保持(interrupt() 用) 

290 self._writer_conn: sqlite3.Connection | None = None 

291 self._writer_conn_lock = threading.Lock() 

292 # 読み取り専用スレッドローカル接続を追跡し、 

293 # shutdown() 時に一括クローズする。WAL チェックポイントの妨げを防ぐ。 

294 self._read_wrappers = weakref.WeakSet() 

295 self._read_conns_lock = threading.Lock() 

296 self._writer_thread = threading.Thread( 

297 target=self._writer_loop, 

298 args=(self._writer_generation,), 

299 daemon=True, 

300 name=f"BeautySpot-SQLiteWriter-{self._writer_generation}", 

301 ) 

302 self._writer_thread.start() 

303 self._writer_ready.wait() 

304 if self._writer_error: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true

305 raise self._writer_error 

306 

307 self.init_schema() 

308 

309 def reset(self) -> None: 

310 """ 

311 Forcefully restart the writer thread. Use this if the writer is tainted or hung. 

312 """ 

313 with self._shutdown_lock: 

314 if self._shutdown_requested: 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true

315 return 

316 

317 # 1. 現在の接続に interrupt を試みる(もし生きていれば) 

318 with self._writer_conn_lock: 

319 if self._writer_conn: 319 ↛ 326line 319 didn't jump to line 326

320 try: 

321 self._writer_conn.interrupt() 

322 except Exception: 

323 pass 

324 

325 # 2. 世代を上げ、汚染フラグを下げる 

326 self._writer_generation += 1 

327 self._writer_tainted = False 

328 self._writer_ready = threading.Event() 

329 self._writer_error = None 

330 

331 # 3. 新しいスレッドを起動 

332 self._writer_thread = threading.Thread( 

333 target=self._writer_loop, 

334 args=(self._writer_generation,), 

335 daemon=True, 

336 name=f"BeautySpot-SQLiteWriter-{self._writer_generation}", 

337 ) 

338 self._writer_thread.start() 

339 self._writer_ready.wait() 

340 if self._writer_error: 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true

341 raise self._writer_error 

342 logger.info( 

343 f"SQLiteTaskDB writer thread reset to generation {self._writer_generation}" 

344 ) 

345 

346 @staticmethod 

347 def _ensure_cache_dir(directory: Path) -> None: 

348 """ 

349 データベース格納用の親ディレクトリを作成し、.gitignore を配置する。 

350 """ 

351 directory.mkdir(parents=True, exist_ok=True) 

352 gitignore_path = directory / ".gitignore" 

353 if not gitignore_path.exists(): 

354 try: 

355 gitignore_path.write_text("*\n") 

356 except OSError as e: 

357 logging.warning(f"Failed to create .gitignore in {directory}: {e}") 

358 

359 @contextmanager 

360 def _read_connect(self) -> Iterator[sqlite3.Connection]: 

361 """ 

362 Thread-safe connection context manager for read-only operations. 

363 Uses a thread-local pool to reuse connections and reduce overhead. 

364 PRAGMA query_only = ON により、誤った書き込みを接続レベルで防止する。 

365 

366 新規接続を _read_wrappers に登録し、 

367 shutdown() 時の一括クローズで WAL チェックポイント妨害を防ぐ。 

368 また、_ReadConnWrapper によってスレッド終了時に接続がクローズされ、メモリリークを防止。 

369 """ 

370 if self._shutdown_requested: 370 ↛ 371line 370 didn't jump to line 371 because the condition on line 370 was never true

371 raise RuntimeError("SQLiteTaskDB is shutting down.") 

372 

373 wrapper = getattr(self._local, "read_conn_wrapper", None) 

374 if wrapper is None or wrapper._closed: 

375 # シャットダウン後に新しい接続がリークするのを防ぐため再チェック。 

376 # 最初のチェック通過後に別スレッドが shutdown() を呼び出し、 

377 # 全ラッパーをクローズした場合にここに到達する。 

378 if self._shutdown_requested: 378 ↛ 379line 378 didn't jump to line 379 because the condition on line 378 was never true

379 raise RuntimeError("SQLiteTaskDB is shutting down.") 

380 

381 conn = sqlite3.connect( 

382 self.db_path, 

383 timeout=self.timeout, 

384 check_same_thread=False, 

385 ) 

386 

387 try: 

388 conn.execute("PRAGMA query_only = ON;") 

389 except Exception: 

390 conn.close() 

391 raise 

392 wrapper = _ReadConnWrapper(conn) 

393 with self._read_conns_lock: 

394 # ロック内で再度チェックし、shutdown() による _read_wrappers.clear() と 

395 # 新規追加の間の競合を完全に排除する。 

396 if self._shutdown_requested: 396 ↛ 397line 396 didn't jump to line 397 because the condition on line 396 was never true

397 conn.close() 

398 raise RuntimeError("SQLiteTaskDB is shutting down.") 

399 self._read_wrappers.add(wrapper) 

400 self._local.read_conn_wrapper = wrapper 

401 

402 with wrapper.lock: 

403 if wrapper._closed: 403 ↛ 404line 403 didn't jump to line 404 because the condition on line 403 was never true

404 raise RuntimeError("Database connection was closed") 

405 try: 

406 yield wrapper.conn 

407 except sqlite3.Error: 

408 # 接続が壊れた場合等のリカバリ (BUG-3) 

409 # 現在の接続を破棄し、次回のアクセス時に新しく作り直す 

410 wrapper.close() 

411 with self._read_conns_lock: 

412 self._read_wrappers.discard(wrapper) 

413 self._local.read_conn_wrapper = None 

414 raise 

415 

416 # クエリ実行中にシャットダウン要求があった場合、自身でクローズする 

417 if getattr(wrapper, "_shutdown_requested", False): 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true

418 wrapper.close() 

419 with self._read_conns_lock: 

420 self._read_wrappers.discard(wrapper) 

421 self._local.read_conn_wrapper = None 

422 

423 def _writer_loop(self, generation: int) -> None: 

424 conn: sqlite3.Connection | None = None 

425 try: 

426 conn = sqlite3.connect(self.db_path, timeout=self.timeout) 

427 conn.execute("PRAGMA journal_mode=WAL;") 

428 with self._writer_conn_lock: 

429 if generation == self._writer_generation: 429 ↛ 437line 429 didn't jump to line 437

430 self._writer_conn = conn 

431 except Exception as e: 

432 if generation == self._writer_generation: 

433 self._writer_error = e 

434 self._writer_ready.set() 

435 return 

436 

437 self._writer_ready.set() 

438 try: 

439 while True: 

440 # 世代チェック: 自分が最新でないなら終了 

441 if generation != self._writer_generation: 

442 break 

443 

444 try: 

445 task = self._write_queue.get(timeout=1.0) 

446 except queue.Empty: 

447 continue 

448 

449 if task is _STOP: 

450 self._write_queue.task_done() 

451 break 

452 assert isinstance(task, _WriteTask) 

453 if not task.try_start(): 

454 # CANCELLED 状態 — スキップ 

455 task.event.set() 

456 self._write_queue.task_done() 

457 continue 

458 

459 try: 

460 task.result = task.fn(conn) 

461 conn.commit() 

462 except sqlite3.OperationalError as e: 

463 # interrupt() によって中断された場合も含めロールバック 

464 conn.rollback() 

465 task.error = e 

466 except BaseException as e: 

467 conn.rollback() 

468 task.error = e 

469 finally: 

470 task.mark_done() 

471 task.event.set() 

472 self._write_queue.task_done() 

473 finally: 

474 with self._writer_conn_lock: 

475 if self._writer_conn is conn: 

476 self._writer_conn = None 

477 if conn is not None: 477 ↛ exitline 477 didn't return from function '_writer_loop' because the condition on line 477 was always true

478 try: 

479 conn.close() 

480 except Exception: 

481 pass 

482 

483 def _enqueue_write(self, fn: Callable[[sqlite3.Connection], Any]) -> Any: 

484 self._writer_ready.wait() 

485 if self._writer_error: 485 ↛ 486line 485 didn't jump to line 486 because the condition on line 485 was never true

486 raise RuntimeError( 

487 "SQLite writer thread failed to start." 

488 ) from self._writer_error 

489 

490 with self._shutdown_lock: 

491 if self._writer_tainted: 

492 raise WriterTaintedError( 

493 "SQLite writer thread is tainted (hung) and cannot process writes. " 

494 "Call reset() or restart the process." 

495 ) 

496 if self._shutdown_requested: 496 ↛ 497line 496 didn't jump to line 497 because the condition on line 496 was never true

497 raise RuntimeError("SQLiteTaskDB is shutting down.") 

498 if not self._writer_thread.is_alive(): 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true

499 raise RuntimeError("SQLite writer thread is not running.") 

500 task = _WriteTask(fn=fn, event=threading.Event()) 

501 self._write_queue.put(task) 

502 

503 start = time.monotonic() 

504 while not task.event.wait(timeout=0.5): 

505 if not self._writer_thread.is_alive(): 505 ↛ 506line 505 didn't jump to line 506 because the condition on line 505 was never true

506 raise RuntimeError("SQLite writer thread stopped unexpectedly.") 

507 if self._shutdown_requested: 507 ↛ 508line 507 didn't jump to line 508 because the condition on line 507 was never true

508 raise RuntimeError("SQLiteTaskDB is shutting down.") 

509 

510 # 待機中に汚染された場合 

511 if self._writer_tainted: 511 ↛ 512line 511 didn't jump to line 512 because the condition on line 511 was never true

512 raise WriterTaintedError("Writer thread became tainted during wait.") 

513 

514 elapsed = time.monotonic() - start 

515 if elapsed > self.timeout: 515 ↛ 504line 515 didn't jump to line 504 because the condition on line 515 was always true

516 if task.try_cancel(): 

517 # PENDING(未着手)のタスクはキャンセル可能 

518 raise TimeoutError( 

519 f"SQLite write did not start within {self.timeout}s and was cancelled." 

520 ) 

521 else: 

522 # RUNNING(実行中)のタスクは interrupt() を試みる。 

523 with self._writer_conn_lock: 

524 if self._writer_conn: 524 ↛ 528line 524 didn't jump to line 528

525 self._writer_conn.interrupt() 

526 

527 # interrupt() 後、ライタースレッドが例外を処理して event を set するのを少し待つ。 

528 if task.event.wait(timeout=1.0): 

529 break 

530 else: 

531 # interrupt() が効かない場合、Writerスレッドを「汚染」としてマークする。 

532 self._writer_tainted = True 

533 raise WriterTaintedError( 

534 f"SQLite write timed out after {self.timeout}s and interrupt failed. " 

535 "Writer thread is now tainted." 

536 ) 

537 if task.error: 

538 if isinstance(task.error, sqlite3.OperationalError) and "interrupted" in str(task.error).lower(): 538 ↛ 542line 538 didn't jump to line 542 because the condition on line 538 was always true

539 raise TimeoutError( 

540 f"SQLite write timed out after {self.timeout}s and was interrupted." 

541 ) from task.error 

542 raise task.error 

543 return task.result 

544 

545 def shutdown(self, wait: bool = True) -> None: 

546 with self._shutdown_lock: 

547 if self._shutdown_requested: 547 ↛ 548line 547 didn't jump to line 548 because the condition on line 547 was never true

548 return 

549 self._shutdown_requested = True 

550 

551 if not self._writer_thread.is_alive(): 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true

552 logger.error( 

553 "SQLite writer thread is not running; pending writes may be lost." 

554 ) 

555 return 

556 

557 if wait: 

558 self._write_queue.join() 

559 self._write_queue.put(_STOP) 

560 if wait: 

561 self._writer_thread.join() 

562 

563 # 全スレッドのread-only接続を一括クローズ。 

564 # スレッドローカル接続が開いたままだと WAL チェックポイントを妨げるため。 

565 with self._read_conns_lock: 

566 for wrapper in self._read_wrappers: 

567 try: 

568 wrapper.close(wait=False) 

569 except Exception: 

570 pass 

571 self._read_wrappers.clear() 

572 

573 def init_schema(self): 

574 # スキーマ初期化および全マイグレーションを Writer Thread の接続で実行する。 

575 # _connect() による別コネクション経由の DDL は、Writer Thread が保持する 

576 # WAL ライターロックと競合するリスクがあるため、_enqueue_write に委譲する。 

577 def _op(conn: sqlite3.Connection) -> None: 

578 conn.execute("PRAGMA journal_mode=WAL;") 

579 conn.execute(""" 

580 CREATE TABLE IF NOT EXISTS tasks ( 

581 cache_key TEXT PRIMARY KEY, 

582 func_name TEXT, 

583 func_identifier TEXT, 

584 input_id TEXT, 

585 result_type TEXT, 

586 result_value TEXT, 

587 result_data BLOB, 

588 content_type TEXT, 

589 version TEXT, 

590 expires_at TIMESTAMP, 

591 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 

592 ) 

593 """) 

594 

595 # Auto Migration 

596 cursor = conn.execute("PRAGMA table_info(tasks)") 

597 columns = [row[1] for row in cursor.fetchall()] 

598 

599 if "content_type" not in columns: 599 ↛ 600line 599 didn't jump to line 600 because the condition on line 599 was never true

600 try: 

601 conn.execute("ALTER TABLE tasks ADD COLUMN content_type TEXT;") 

602 except sqlite3.OperationalError as e: 

603 if "duplicate column name" not in str(e).lower(): 

604 raise 

605 pass 

606 if "version" not in columns: 606 ↛ 607line 606 didn't jump to line 607 because the condition on line 606 was never true

607 try: 

608 conn.execute("ALTER TABLE tasks ADD COLUMN version TEXT;") 

609 except sqlite3.OperationalError as e: 

610 if "duplicate column name" not in str(e).lower(): 

611 raise 

612 pass 

613 if "result_data" not in columns: 613 ↛ 614line 613 didn't jump to line 614 because the condition on line 613 was never true

614 try: 

615 conn.execute("ALTER TABLE tasks ADD COLUMN result_data BLOB;") 

616 except sqlite3.OperationalError as e: 

617 if "duplicate column name" not in str(e).lower(): 

618 raise 

619 pass 

620 

621 if "func_identifier" not in columns: 621 ↛ 622line 621 didn't jump to line 622 because the condition on line 621 was never true

622 try: 

623 conn.execute("ALTER TABLE tasks ADD COLUMN func_identifier TEXT;") 

624 except sqlite3.OperationalError as e: 

625 if "duplicate column name" not in str(e).lower(): 

626 raise 

627 pass 

628 conn.execute( 

629 "CREATE INDEX IF NOT EXISTS idx_func_identifier ON tasks(func_identifier);" 

630 ) 

631 

632 if "expires_at" not in columns: 

633 try: 

634 conn.execute("ALTER TABLE tasks ADD COLUMN expires_at TIMESTAMP;") 

635 except sqlite3.OperationalError as e: 

636 if "duplicate column name" not in str(e).lower(): 

637 raise 

638 pass 

639 conn.execute( 

640 "CREATE INDEX IF NOT EXISTS idx_expires_at ON tasks(expires_at);" 

641 ) 

642 

643 self._enqueue_write(_op) 

644 

645 def get( 

646 self, cache_key: str, *, include_expired: bool = False 

647 ) -> Optional[TaskRecord]: 

648 with self._read_connect() as conn: 

649 # [MOD] Include expires_at in query 

650 row = conn.execute( 

651 "SELECT result_type, result_value, result_data, expires_at FROM tasks WHERE cache_key=?", 

652 (cache_key,), 

653 ).fetchone() 

654 

655 if row: 

656 r_type, r_val, r_data, exp_str = row 

657 

658 # [ADD] Lazy Expiration Check (skip when include_expired=True) 

659 if exp_str and not include_expired: 

660 try: 

661 # SQLite returns timestamps as strings usually 

662 # Replace space with T for compatibility with Python <= 3.10 

663 expires_at = datetime.fromisoformat(exp_str.replace(" ", "T")) 

664 # Naive datetimes stored before timezone support are treated as UTC 

665 if expires_at.tzinfo is None: 665 ↛ 666line 665 didn't jump to line 666 because the condition on line 665 was never true

666 expires_at = expires_at.replace(tzinfo=timezone.utc) 

667 if expires_at < datetime.now(timezone.utc): 

668 # Expired -> Treat as Cache Miss 

669 # (Physical deletion is deferred to `beautyspot gc`) 

670 return None 

671 except (ValueError, TypeError): 

672 pass # Ignore parsing errors, treat as valid 

673 

674 return TaskRecord( 

675 result_type=r_type, 

676 result_value=r_val, 

677 result_data=r_data, 

678 expires_at=exp_str, 

679 ) 

680 return None 

681 

682 def save( 

683 self, 

684 cache_key: str, 

685 func_name: str, 

686 func_identifier: Optional[str], 

687 input_id: str, 

688 version: Optional[str], 

689 result_type: str, 

690 content_type: Optional[str], 

691 result_value: Optional[str] = None, 

692 result_data: Optional[bytes] = None, 

693 expires_at: Optional[datetime] = None, # [ADD] Added argument 

694 ): 

695 def _op(conn: sqlite3.Connection) -> None: 

696 effective_identifier = func_identifier or func_name 

697 # updated_at を明示的に設定し、expires_at と同じ形式 

698 # (_ensure_utc_isoformat) で統一する。 

699 # SQLite の DEFAULT CURRENT_TIMESTAMP は秒精度でフォーマットが異なるため、 

700 # prune/get_outdated_tasks との比較精度を揃える。 

701 now_str = _ensure_utc_isoformat(datetime.now(timezone.utc)) 

702 conn.execute( 

703 """ 

704 INSERT OR REPLACE INTO tasks 

705 (cache_key, func_name, func_identifier, input_id, version, result_type, content_type, result_value, result_data, expires_at, updated_at) 

706 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 

707 """, 

708 ( 

709 cache_key, 

710 func_name, 

711 effective_identifier, 

712 input_id, 

713 version, 

714 result_type, 

715 content_type, 

716 result_value, 

717 result_data, 

718 _ensure_utc_isoformat(expires_at), 

719 now_str, 

720 ), 

721 ) 

722 

723 self._enqueue_write(_op) 

724 

725 def get_history(self, limit: int = 1000) -> "pd.DataFrame": 

726 try: 

727 import pandas as pd 

728 except ImportError as e: 

729 raise ImportError("Pandas is required for this feature.") from e 

730 

731 if not os.path.exists(self.db_path): 731 ↛ 732line 731 didn't jump to line 732 because the condition on line 731 was never true

732 return pd.DataFrame() 

733 

734 with self._read_connect() as conn: 

735 query = """ 

736 SELECT 

737 cache_key, func_name, func_identifier, input_id, version, result_type, 

738 content_type, result_value, result_data, updated_at, expires_at 

739 FROM tasks 

740 ORDER BY updated_at DESC LIMIT ? 

741 """ 

742 return pd.read_sql_query(query, conn, params=[limit]) 

743 

744 def delete(self, cache_key: str) -> bool: 

745 def _op(conn: sqlite3.Connection) -> bool: 

746 cursor = conn.execute("DELETE FROM tasks WHERE cache_key=?", (cache_key,)) 

747 return cursor.rowcount > 0 

748 

749 return bool(self._enqueue_write(_op)) 

750 

751 def delete_all(self, func_name: Optional[str] = None) -> int: 

752 def _op(conn: sqlite3.Connection) -> int: 

753 if func_name: 

754 cursor = conn.execute( 

755 "DELETE FROM tasks WHERE func_name = ? OR func_identifier = ?", 

756 (func_name, func_name), 

757 ) 

758 else: 

759 cursor = conn.execute("DELETE FROM tasks") 

760 return cursor.rowcount 

761 

762 return int(self._enqueue_write(_op)) 

763 

764 def prune(self, older_than: datetime, func_name: Optional[str] = None) -> int: 

765 cutoff_str = _ensure_utc_isoformat(older_than) 

766 

767 def _op(conn: sqlite3.Connection) -> int: 

768 if func_name: 

769 cursor = conn.execute( 

770 "DELETE FROM tasks WHERE updated_at < ? AND (func_name = ? OR func_identifier = ?)", 

771 (cutoff_str, func_name, func_name), 

772 ) 

773 else: 

774 cursor = conn.execute( 

775 "DELETE FROM tasks WHERE updated_at < ?", 

776 (cutoff_str,), 

777 ) 

778 return cursor.rowcount 

779 

780 return int(self._enqueue_write(_op)) 

781 

782 def get_outdated_tasks( 

783 self, older_than: datetime, func_name: Optional[str] = None 

784 ) -> list[tuple[str, str, str]]: 

785 cutoff_str = _ensure_utc_isoformat(older_than) 

786 if not os.path.exists(self.db_path): 786 ↛ 787line 786 didn't jump to line 787 because the condition on line 786 was never true

787 return [] 

788 

789 with self._read_connect() as conn: 

790 if func_name: 

791 cursor = conn.execute( 

792 "SELECT cache_key, COALESCE(func_identifier, func_name), updated_at FROM tasks " 

793 "WHERE updated_at < ? AND (func_name = ? OR func_identifier = ?)", 

794 (cutoff_str, func_name, func_name), 

795 ) 

796 else: 

797 cursor = conn.execute( 

798 "SELECT cache_key, COALESCE(func_identifier, func_name), updated_at FROM tasks WHERE updated_at < ?", 

799 (cutoff_str,), 

800 ) 

801 return [(row[0], row[1], str(row[2])) for row in cursor.fetchall()] 

802 

803 def delete_expired(self) -> int: 

804 if not os.path.exists(self.db_path): 804 ↛ 805line 804 didn't jump to line 805 because the condition on line 804 was never true

805 return 0 

806 

807 # save() と同じ _ensure_utc_isoformat を使い、フォーマットを統一する 

808 now_str = _ensure_utc_isoformat(datetime.now(timezone.utc)) 

809 

810 def _op(conn: sqlite3.Connection) -> int: 

811 cursor = conn.execute( 

812 "DELETE FROM tasks WHERE expires_at IS NOT NULL AND expires_at < ?", 

813 (now_str,), 

814 ) 

815 return cursor.rowcount 

816 

817 return int(self._enqueue_write(_op)) 

818 

819 def get_blob_refs(self) -> Optional[set[str]]: 

820 if not os.path.exists(self.db_path): 820 ↛ 821line 820 didn't jump to line 821 because the condition on line 820 was never true

821 return set() 

822 

823 with self._read_connect() as conn: 

824 cursor = conn.execute( 

825 "SELECT result_value FROM tasks WHERE result_type = 'FILE' AND result_value IS NOT NULL" 

826 ) 

827 # Return full location strings for precise matching 

828 return {row[0] for row in cursor.fetchall() if row[0]} 

829 

830 def get_keys_start_with(self, prefix: str) -> list[str]: 

831 if not os.path.exists(self.db_path): 831 ↛ 832line 831 didn't jump to line 832 because the condition on line 831 was never true

832 return [] 

833 

834 with self._read_connect() as conn: 

835 # LIKE ワイルドカード文字をエスケープしてプレフィックス検索 

836 escaped = ( 

837 prefix.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") 

838 ) 

839 cursor = conn.execute( 

840 "SELECT cache_key FROM tasks WHERE cache_key LIKE ? ESCAPE '\\' LIMIT 50", 

841 (f"{escaped}%",), 

842 ) 

843 return [row[0] for row in cursor.fetchall()] 

844 

845 @staticmethod 

846 def count_tasks(db_path: Path, timeout: float = 5.0) -> int: 

847 """ 

848 Writer Thread を起動せずに tasks テーブルの件数を返す軽量ユーティリティ。 

849 CLI の一覧表示など、読み込みのみを目的とした用途向け。 

850 エラー時は -1 を返す。 

851 """ 

852 try: 

853 conn = sqlite3.connect(str(db_path), timeout=timeout) 

854 try: 

855 # 読み取り専用ユーティリティに journal_mode=WAL 設定は不要。 

856 # query_only=ON との組み合わせで動作が曖昧になる可能性もあるため削除。 

857 conn.execute("PRAGMA query_only = ON;") 

858 cursor = conn.execute("SELECT COUNT(*) FROM tasks") 

859 result = cursor.fetchone() 

860 return result[0] if result else 0 

861 finally: 

862 conn.close() 

863 except Exception: 

864 return -1 

865 

866 def flush(self, timeout: Optional[float] = None) -> bool: 

867 """ 

868 キューに溜まっているすべての書き込み操作が完了するまで待機します。 

869 

870 No-op(何もしない)タスクをキューの末尾に挿入し、そのタスクが処理されるまで 

871 待機することで、先行するすべてのタスクの完了を保証します。 

872 

873 Args: 

874 timeout: 待機する最大秒数。タイムアウトした場合は False を返します。 

875 None の場合は無期限に待機しますが、ライタースレッドの 

876 死活監視ループにより永久ハングは防止されます。 

877 """ 

878 self._writer_ready.wait() 

879 

880 # キューをフラッシュするためのダミータスク 

881 def _noop_op(conn: sqlite3.Connection) -> None: 

882 pass 

883 

884 task = _WriteTask(fn=_noop_op, event=threading.Event()) 

885 

886 # shutdown との TOCTOU を防ぐため、ロック内でチェックと put を原子的に行う 

887 with self._shutdown_lock: 

888 if self._shutdown_requested or not self._writer_thread.is_alive(): 888 ↛ 889line 888 didn't jump to line 889 because the condition on line 888 was never true

889 return False 

890 self._write_queue.put(task) 

891 

892 # ライタースレッドの死活を定期確認しながら待機する。 

893 # timeout=None で event.wait() を直接呼ぶとスレッド死亡時に永久ハングするため、 

894 # ポーリングループで代替する。 

895 _POLL = 0.5 

896 deadline = (time.monotonic() + timeout) if timeout is not None else None 

897 

898 while True: 

899 remaining = ( 

900 max(0.0, deadline - time.monotonic()) if deadline is not None else None 

901 ) 

902 wait_time = _POLL if remaining is None else min(_POLL, remaining) 

903 

904 if task.event.wait(timeout=wait_time): 

905 return True 

906 

907 if not self._writer_thread.is_alive(): 907 ↛ 908line 907 didn't jump to line 908 because the condition on line 907 was never true

908 logger.error( 

909 "SQLite writer thread died unexpectedly while waiting for flush." 

910 ) 

911 return False 

912 

913 if deadline is not None and time.monotonic() >= deadline: 

914 return False