Coverage for src / beautyspot / db.py: 74%
472 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-18 18:20 +0900
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-18 18:20 +0900
1# src/beautyspot/db.py
3import sqlite3
4import os
5import logging
6import queue
7import threading
8import time
9from collections.abc import Iterator
10from contextlib import contextmanager
11import dataclasses
12from dataclasses import dataclass
13from datetime import datetime, timezone
14from pathlib import Path
15from abc import ABC, abstractmethod
16from typing import Optional, TYPE_CHECKING, Any, Callable, Protocol, runtime_checkable
17import weakref
18from beautyspot.types import TaskRecord
21@runtime_checkable
22class TaskDBCore(Protocol):
23 """
24 Core interface for task metadata storage required during execution.
25 """
27 def init_schema(self) -> None: ...
29 def get(
30 self, cache_key: str, *, include_expired: bool = False
31 ) -> Optional[TaskRecord]: ...
33 def save(
34 self,
35 cache_key: str,
36 func_name: str,
37 func_identifier: Optional[str],
38 input_id: str,
39 version: Optional[str],
40 result_type: str,
41 content_type: Optional[str],
42 result_value: Optional[str] = None,
43 result_data: Optional[bytes] = None,
44 expires_at: Optional[datetime] = None,
45 ) -> None: ...
47 def delete(self, cache_key: str) -> bool: ...
50@runtime_checkable
51class Flushable(Protocol):
52 """Protocol for backends that support flushing pending writes."""
54 def flush(self, timeout: Optional[float] = None) -> bool: ...
57@runtime_checkable
58class Shutdownable(Protocol):
59 """Protocol for backends that require graceful shutdown."""
61 def shutdown(self, wait: bool = True) -> None: ...
64@runtime_checkable
65class Maintenable(Protocol):
66 """
67 Extended interface for maintenance tasks (GC, CLI, Dashboard).
68 """
70 def delete_expired(self) -> int: ...
72 def prune(self, older_than: datetime, func_name: Optional[str] = None) -> int: ...
74 def get_outdated_tasks(
75 self, older_than: datetime, func_name: Optional[str] = None
76 ) -> list[tuple[str, str, str]]: ...
78 def get_blob_refs(self) -> Optional[set[str]]: ...
80 def delete_all(self, func_name: Optional[str] = None) -> int: ...
82 def get_keys_start_with(self, prefix: str) -> list[str]: ...
84 def get_history(self, limit: int = 1000) -> "pd.DataFrame": ...
87@runtime_checkable
88class TaskDBMaintenable(TaskDBCore, Maintenable, Protocol):
89 ...
93class _ReadConnWrapper:
94 def __init__(self, conn: sqlite3.Connection):
95 self.conn = conn
96 self.lock = threading.RLock()
97 self._closed = False
98 self._shutdown_requested = False
100 def close(self, wait: bool = True):
101 """
102 Args:
103 wait: True の場合はロック解放を待機。
104 False (シャットダウン時) の場合は即座に試行し、他が使用中ならスキップする。
105 """
106 # wait=False の場合は blocking=False になり、取得できなければ直ちに False を返す
107 if not self.lock.acquire(blocking=wait):
108 # 誰かがクエリ実行中なので、強制クローズによるクラッシュを防ぐために諦める。
109 # 代わりにシャットダウン要求フラグを立てて、クエリ完了後に自身でクローズさせる。
110 if not wait: 110 ↛ 112line 110 didn't jump to line 112 because the condition on line 110 was always true
111 self._shutdown_requested = True
112 return
114 try:
115 if not self._closed:
116 try:
117 self.conn.close()
118 except Exception:
119 pass
120 self._closed = True
121 finally:
122 self.lock.release()
124 def __del__(self):
125 self.close()
128if TYPE_CHECKING:
129 import pandas as pd
131logger = logging.getLogger(__name__)
132logger.addHandler(logging.NullHandler())
135def _ensure_utc_isoformat(dt: Optional[datetime]) -> Optional[str]:
136 """datetime を UTC 保証の ISO 8601 文字列に変換する。None はそのまま返す。"""
137 if dt is None:
138 return None
139 if dt.tzinfo is None: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true
140 dt = dt.replace(tzinfo=timezone.utc)
141 else:
142 dt = dt.astimezone(timezone.utc)
143 return dt.isoformat(" ")
146@dataclass
147class _WriteTask:
148 fn: Callable[[sqlite3.Connection], Any]
149 event: threading.Event
150 result: Any = None
151 error: Exception | BaseException | None = None
152 state: str = "PENDING" # "PENDING", "RUNNING", "DONE", "CANCELLED"
153 _state_lock: threading.Lock = dataclasses.field(default_factory=threading.Lock)
155 def try_cancel(self) -> bool:
156 """PENDING 状態のタスクをキャンセルする。成功時 True。"""
157 with self._state_lock:
158 if self.state == "PENDING":
159 self.state = "CANCELLED"
160 return True
161 return False
163 def try_start(self) -> bool:
164 """PENDING → RUNNING に遷移する。CANCELLED なら False を返す。"""
165 with self._state_lock:
166 if self.state == "CANCELLED":
167 return False
168 self.state = "RUNNING"
169 return True
171 def mark_done(self) -> None:
172 """RUNNING → DONE に遷移する。"""
173 with self._state_lock:
174 if self.state != "CANCELLED": 174 ↛ exitline 174 didn't jump to the function exit
175 self.state = "DONE"
178_STOP = object()
181class TaskDBBase(ABC):
182 """
183 Abstract base class providing default no-op implementations for maintenance methods.
184 Actual backends should implement TaskDBCore and optionally TaskDBMaintenance.
185 """
187 @abstractmethod
188 def init_schema(self):
189 pass
191 @abstractmethod
192 def get(
193 self, cache_key: str, *, include_expired: bool = False
194 ) -> Optional[TaskRecord]:
195 pass
197 @abstractmethod
198 def save(
199 self,
200 cache_key: str,
201 func_name: str,
202 func_identifier: Optional[str],
203 input_id: str,
204 version: Optional[str],
205 result_type: str,
206 content_type: Optional[str],
207 result_value: Optional[str] = None,
208 result_data: Optional[bytes] = None,
209 expires_at: Optional[datetime] = None,
210 ):
211 pass
213 @abstractmethod
214 def delete(self, cache_key: str) -> bool:
215 pass
217 # --- Maintenance Methods (Default implementations) ---
218 def delete_expired(self) -> int:
219 """Delete tasks that have passed their expiration time."""
220 return 0
222 def prune(self, older_than: datetime, func_name: Optional[str] = None) -> int:
223 """Delete tasks older than the specified datetime."""
224 return 0
226 def get_outdated_tasks(
227 self, older_than: datetime, func_name: Optional[str] = None
228 ) -> list[tuple[str, str, str]]:
229 """Retrieve tasks older than the specified datetime (Preview for prune)."""
230 return []
232 def get_blob_refs(self) -> Optional[set[str]]:
233 """Retrieve all 'result_value' entries that point to external storage."""
234 return None
236 def delete_all(self, func_name: Optional[str] = None) -> int:
237 """Delete all tasks, optionally filtered by function name."""
238 return 0
240 def get_keys_start_with(self, prefix: str) -> list[str]:
241 """Retrieve cache keys that start with the given prefix."""
242 return []
244 def get_history(self, limit: int = 1000) -> "pd.DataFrame":
245 """Get task history. Returns an empty DataFrame by default."""
246 try:
247 import pandas as pd
249 return pd.DataFrame()
250 except ImportError:
251 raise ImportError("Pandas is required for this feature.")
254class WriterTaintedError(RuntimeError):
255 """
256 Indicates that the SQLite writer thread is hung beyond repair.
257 Manual reset or process restart is required.
258 """
259 pass
262class SQLiteTaskDB(TaskDBMaintenable, Flushable, Shutdownable):
263 """
264 Default implementation using SQLite.
265 """
267 def __init__(self, db_path: str | Path | None = None, timeout: float = 30.0):
268 if db_path is not None: 268 ↛ 273line 268 didn't jump to line 273 because the condition on line 268 was always true
269 self.db_path = Path(db_path).resolve()
270 else:
271 # hash(self) は PYTHONHASHSEED に依存し実行ごとに変わるため、
272 # 決定的なデフォルトパスを使用する。
273 self.db_path = Path(".beautyspot/default.db").resolve()
274 logger.warning(
275 "db_path not specified; defaulting to '%s'. "
276 "Use bs.Spot(name=...) for automatic path management.",
277 self.db_path,
278 )
279 self._ensure_cache_dir(self.db_path.parent)
280 self.timeout = timeout
281 self._local = threading.local()
282 self._write_queue: queue.Queue[object] = queue.Queue()
283 self._shutdown_lock = threading.Lock()
284 self._shutdown_requested = False
285 self._writer_ready = threading.Event()
286 self._writer_error: Exception | None = None
287 self._writer_tainted = False
288 self._writer_generation = 0
289 # ライタースレッドの接続を保持(interrupt() 用)
290 self._writer_conn: sqlite3.Connection | None = None
291 self._writer_conn_lock = threading.Lock()
292 # 読み取り専用スレッドローカル接続を追跡し、
293 # shutdown() 時に一括クローズする。WAL チェックポイントの妨げを防ぐ。
294 self._read_wrappers = weakref.WeakSet()
295 self._read_conns_lock = threading.Lock()
296 self._writer_thread = threading.Thread(
297 target=self._writer_loop,
298 args=(self._writer_generation,),
299 daemon=True,
300 name=f"BeautySpot-SQLiteWriter-{self._writer_generation}",
301 )
302 self._writer_thread.start()
303 self._writer_ready.wait()
304 if self._writer_error: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true
305 raise self._writer_error
307 self.init_schema()
309 def reset(self) -> None:
310 """
311 Forcefully restart the writer thread. Use this if the writer is tainted or hung.
312 """
313 with self._shutdown_lock:
314 if self._shutdown_requested: 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true
315 return
317 # 1. 現在の接続に interrupt を試みる(もし生きていれば)
318 with self._writer_conn_lock:
319 if self._writer_conn: 319 ↛ 326line 319 didn't jump to line 326
320 try:
321 self._writer_conn.interrupt()
322 except Exception:
323 pass
325 # 2. 世代を上げ、汚染フラグを下げる
326 self._writer_generation += 1
327 self._writer_tainted = False
328 self._writer_ready = threading.Event()
329 self._writer_error = None
331 # 3. 新しいスレッドを起動
332 self._writer_thread = threading.Thread(
333 target=self._writer_loop,
334 args=(self._writer_generation,),
335 daemon=True,
336 name=f"BeautySpot-SQLiteWriter-{self._writer_generation}",
337 )
338 self._writer_thread.start()
339 self._writer_ready.wait()
340 if self._writer_error: 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true
341 raise self._writer_error
342 logger.info(
343 f"SQLiteTaskDB writer thread reset to generation {self._writer_generation}"
344 )
346 @staticmethod
347 def _ensure_cache_dir(directory: Path) -> None:
348 """
349 データベース格納用の親ディレクトリを作成し、.gitignore を配置する。
350 """
351 directory.mkdir(parents=True, exist_ok=True)
352 gitignore_path = directory / ".gitignore"
353 if not gitignore_path.exists():
354 try:
355 gitignore_path.write_text("*\n")
356 except OSError as e:
357 logging.warning(f"Failed to create .gitignore in {directory}: {e}")
359 @contextmanager
360 def _read_connect(self) -> Iterator[sqlite3.Connection]:
361 """
362 Thread-safe connection context manager for read-only operations.
363 Uses a thread-local pool to reuse connections and reduce overhead.
364 PRAGMA query_only = ON により、誤った書き込みを接続レベルで防止する。
366 新規接続を _read_wrappers に登録し、
367 shutdown() 時の一括クローズで WAL チェックポイント妨害を防ぐ。
368 また、_ReadConnWrapper によってスレッド終了時に接続がクローズされ、メモリリークを防止。
369 """
370 if self._shutdown_requested: 370 ↛ 371line 370 didn't jump to line 371 because the condition on line 370 was never true
371 raise RuntimeError("SQLiteTaskDB is shutting down.")
373 wrapper = getattr(self._local, "read_conn_wrapper", None)
374 if wrapper is None or wrapper._closed:
375 # シャットダウン後に新しい接続がリークするのを防ぐため再チェック。
376 # 最初のチェック通過後に別スレッドが shutdown() を呼び出し、
377 # 全ラッパーをクローズした場合にここに到達する。
378 if self._shutdown_requested: 378 ↛ 379line 378 didn't jump to line 379 because the condition on line 378 was never true
379 raise RuntimeError("SQLiteTaskDB is shutting down.")
381 conn = sqlite3.connect(
382 self.db_path,
383 timeout=self.timeout,
384 check_same_thread=False,
385 )
387 try:
388 conn.execute("PRAGMA query_only = ON;")
389 except Exception:
390 conn.close()
391 raise
392 wrapper = _ReadConnWrapper(conn)
393 with self._read_conns_lock:
394 # ロック内で再度チェックし、shutdown() による _read_wrappers.clear() と
395 # 新規追加の間の競合を完全に排除する。
396 if self._shutdown_requested: 396 ↛ 397line 396 didn't jump to line 397 because the condition on line 396 was never true
397 conn.close()
398 raise RuntimeError("SQLiteTaskDB is shutting down.")
399 self._read_wrappers.add(wrapper)
400 self._local.read_conn_wrapper = wrapper
402 with wrapper.lock:
403 if wrapper._closed: 403 ↛ 404line 403 didn't jump to line 404 because the condition on line 403 was never true
404 raise RuntimeError("Database connection was closed")
405 try:
406 yield wrapper.conn
407 except sqlite3.Error:
408 # 接続が壊れた場合等のリカバリ (BUG-3)
409 # 現在の接続を破棄し、次回のアクセス時に新しく作り直す
410 wrapper.close()
411 with self._read_conns_lock:
412 self._read_wrappers.discard(wrapper)
413 self._local.read_conn_wrapper = None
414 raise
416 # クエリ実行中にシャットダウン要求があった場合、自身でクローズする
417 if getattr(wrapper, "_shutdown_requested", False): 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true
418 wrapper.close()
419 with self._read_conns_lock:
420 self._read_wrappers.discard(wrapper)
421 self._local.read_conn_wrapper = None
423 def _writer_loop(self, generation: int) -> None:
424 conn: sqlite3.Connection | None = None
425 try:
426 conn = sqlite3.connect(self.db_path, timeout=self.timeout)
427 conn.execute("PRAGMA journal_mode=WAL;")
428 with self._writer_conn_lock:
429 if generation == self._writer_generation: 429 ↛ 437line 429 didn't jump to line 437
430 self._writer_conn = conn
431 except Exception as e:
432 if generation == self._writer_generation:
433 self._writer_error = e
434 self._writer_ready.set()
435 return
437 self._writer_ready.set()
438 try:
439 while True:
440 # 世代チェック: 自分が最新でないなら終了
441 if generation != self._writer_generation:
442 break
444 try:
445 task = self._write_queue.get(timeout=1.0)
446 except queue.Empty:
447 continue
449 if task is _STOP:
450 self._write_queue.task_done()
451 break
452 assert isinstance(task, _WriteTask)
453 if not task.try_start():
454 # CANCELLED 状態 — スキップ
455 task.event.set()
456 self._write_queue.task_done()
457 continue
459 try:
460 task.result = task.fn(conn)
461 conn.commit()
462 except sqlite3.OperationalError as e:
463 # interrupt() によって中断された場合も含めロールバック
464 conn.rollback()
465 task.error = e
466 except BaseException as e:
467 conn.rollback()
468 task.error = e
469 finally:
470 task.mark_done()
471 task.event.set()
472 self._write_queue.task_done()
473 finally:
474 with self._writer_conn_lock:
475 if self._writer_conn is conn:
476 self._writer_conn = None
477 if conn is not None: 477 ↛ exitline 477 didn't return from function '_writer_loop' because the condition on line 477 was always true
478 try:
479 conn.close()
480 except Exception:
481 pass
483 def _enqueue_write(self, fn: Callable[[sqlite3.Connection], Any]) -> Any:
484 self._writer_ready.wait()
485 if self._writer_error: 485 ↛ 486line 485 didn't jump to line 486 because the condition on line 485 was never true
486 raise RuntimeError(
487 "SQLite writer thread failed to start."
488 ) from self._writer_error
490 with self._shutdown_lock:
491 if self._writer_tainted:
492 raise WriterTaintedError(
493 "SQLite writer thread is tainted (hung) and cannot process writes. "
494 "Call reset() or restart the process."
495 )
496 if self._shutdown_requested: 496 ↛ 497line 496 didn't jump to line 497 because the condition on line 496 was never true
497 raise RuntimeError("SQLiteTaskDB is shutting down.")
498 if not self._writer_thread.is_alive(): 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true
499 raise RuntimeError("SQLite writer thread is not running.")
500 task = _WriteTask(fn=fn, event=threading.Event())
501 self._write_queue.put(task)
503 start = time.monotonic()
504 while not task.event.wait(timeout=0.5):
505 if not self._writer_thread.is_alive(): 505 ↛ 506line 505 didn't jump to line 506 because the condition on line 505 was never true
506 raise RuntimeError("SQLite writer thread stopped unexpectedly.")
507 if self._shutdown_requested: 507 ↛ 508line 507 didn't jump to line 508 because the condition on line 507 was never true
508 raise RuntimeError("SQLiteTaskDB is shutting down.")
510 # 待機中に汚染された場合
511 if self._writer_tainted: 511 ↛ 512line 511 didn't jump to line 512 because the condition on line 511 was never true
512 raise WriterTaintedError("Writer thread became tainted during wait.")
514 elapsed = time.monotonic() - start
515 if elapsed > self.timeout: 515 ↛ 504line 515 didn't jump to line 504 because the condition on line 515 was always true
516 if task.try_cancel():
517 # PENDING(未着手)のタスクはキャンセル可能
518 raise TimeoutError(
519 f"SQLite write did not start within {self.timeout}s and was cancelled."
520 )
521 else:
522 # RUNNING(実行中)のタスクは interrupt() を試みる。
523 with self._writer_conn_lock:
524 if self._writer_conn: 524 ↛ 528line 524 didn't jump to line 528
525 self._writer_conn.interrupt()
527 # interrupt() 後、ライタースレッドが例外を処理して event を set するのを少し待つ。
528 if task.event.wait(timeout=1.0):
529 break
530 else:
531 # interrupt() が効かない場合、Writerスレッドを「汚染」としてマークする。
532 self._writer_tainted = True
533 raise WriterTaintedError(
534 f"SQLite write timed out after {self.timeout}s and interrupt failed. "
535 "Writer thread is now tainted."
536 )
537 if task.error:
538 if isinstance(task.error, sqlite3.OperationalError) and "interrupted" in str(task.error).lower(): 538 ↛ 542line 538 didn't jump to line 542 because the condition on line 538 was always true
539 raise TimeoutError(
540 f"SQLite write timed out after {self.timeout}s and was interrupted."
541 ) from task.error
542 raise task.error
543 return task.result
545 def shutdown(self, wait: bool = True) -> None:
546 with self._shutdown_lock:
547 if self._shutdown_requested: 547 ↛ 548line 547 didn't jump to line 548 because the condition on line 547 was never true
548 return
549 self._shutdown_requested = True
551 if not self._writer_thread.is_alive(): 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true
552 logger.error(
553 "SQLite writer thread is not running; pending writes may be lost."
554 )
555 return
557 if wait:
558 self._write_queue.join()
559 self._write_queue.put(_STOP)
560 if wait:
561 self._writer_thread.join()
563 # 全スレッドのread-only接続を一括クローズ。
564 # スレッドローカル接続が開いたままだと WAL チェックポイントを妨げるため。
565 with self._read_conns_lock:
566 for wrapper in self._read_wrappers:
567 try:
568 wrapper.close(wait=False)
569 except Exception:
570 pass
571 self._read_wrappers.clear()
573 def init_schema(self):
574 # スキーマ初期化および全マイグレーションを Writer Thread の接続で実行する。
575 # _connect() による別コネクション経由の DDL は、Writer Thread が保持する
576 # WAL ライターロックと競合するリスクがあるため、_enqueue_write に委譲する。
577 def _op(conn: sqlite3.Connection) -> None:
578 conn.execute("PRAGMA journal_mode=WAL;")
579 conn.execute("""
580 CREATE TABLE IF NOT EXISTS tasks (
581 cache_key TEXT PRIMARY KEY,
582 func_name TEXT,
583 func_identifier TEXT,
584 input_id TEXT,
585 result_type TEXT,
586 result_value TEXT,
587 result_data BLOB,
588 content_type TEXT,
589 version TEXT,
590 expires_at TIMESTAMP,
591 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
592 )
593 """)
595 # Auto Migration
596 cursor = conn.execute("PRAGMA table_info(tasks)")
597 columns = [row[1] for row in cursor.fetchall()]
599 if "content_type" not in columns: 599 ↛ 600line 599 didn't jump to line 600 because the condition on line 599 was never true
600 try:
601 conn.execute("ALTER TABLE tasks ADD COLUMN content_type TEXT;")
602 except sqlite3.OperationalError as e:
603 if "duplicate column name" not in str(e).lower():
604 raise
605 pass
606 if "version" not in columns: 606 ↛ 607line 606 didn't jump to line 607 because the condition on line 606 was never true
607 try:
608 conn.execute("ALTER TABLE tasks ADD COLUMN version TEXT;")
609 except sqlite3.OperationalError as e:
610 if "duplicate column name" not in str(e).lower():
611 raise
612 pass
613 if "result_data" not in columns: 613 ↛ 614line 613 didn't jump to line 614 because the condition on line 613 was never true
614 try:
615 conn.execute("ALTER TABLE tasks ADD COLUMN result_data BLOB;")
616 except sqlite3.OperationalError as e:
617 if "duplicate column name" not in str(e).lower():
618 raise
619 pass
621 if "func_identifier" not in columns: 621 ↛ 622line 621 didn't jump to line 622 because the condition on line 621 was never true
622 try:
623 conn.execute("ALTER TABLE tasks ADD COLUMN func_identifier TEXT;")
624 except sqlite3.OperationalError as e:
625 if "duplicate column name" not in str(e).lower():
626 raise
627 pass
628 conn.execute(
629 "CREATE INDEX IF NOT EXISTS idx_func_identifier ON tasks(func_identifier);"
630 )
632 if "expires_at" not in columns:
633 try:
634 conn.execute("ALTER TABLE tasks ADD COLUMN expires_at TIMESTAMP;")
635 except sqlite3.OperationalError as e:
636 if "duplicate column name" not in str(e).lower():
637 raise
638 pass
639 conn.execute(
640 "CREATE INDEX IF NOT EXISTS idx_expires_at ON tasks(expires_at);"
641 )
643 self._enqueue_write(_op)
645 def get(
646 self, cache_key: str, *, include_expired: bool = False
647 ) -> Optional[TaskRecord]:
648 with self._read_connect() as conn:
649 # [MOD] Include expires_at in query
650 row = conn.execute(
651 "SELECT result_type, result_value, result_data, expires_at FROM tasks WHERE cache_key=?",
652 (cache_key,),
653 ).fetchone()
655 if row:
656 r_type, r_val, r_data, exp_str = row
658 # [ADD] Lazy Expiration Check (skip when include_expired=True)
659 if exp_str and not include_expired:
660 try:
661 # SQLite returns timestamps as strings usually
662 # Replace space with T for compatibility with Python <= 3.10
663 expires_at = datetime.fromisoformat(exp_str.replace(" ", "T"))
664 # Naive datetimes stored before timezone support are treated as UTC
665 if expires_at.tzinfo is None: 665 ↛ 666line 665 didn't jump to line 666 because the condition on line 665 was never true
666 expires_at = expires_at.replace(tzinfo=timezone.utc)
667 if expires_at < datetime.now(timezone.utc):
668 # Expired -> Treat as Cache Miss
669 # (Physical deletion is deferred to `beautyspot gc`)
670 return None
671 except (ValueError, TypeError):
672 pass # Ignore parsing errors, treat as valid
674 return TaskRecord(
675 result_type=r_type,
676 result_value=r_val,
677 result_data=r_data,
678 expires_at=exp_str,
679 )
680 return None
682 def save(
683 self,
684 cache_key: str,
685 func_name: str,
686 func_identifier: Optional[str],
687 input_id: str,
688 version: Optional[str],
689 result_type: str,
690 content_type: Optional[str],
691 result_value: Optional[str] = None,
692 result_data: Optional[bytes] = None,
693 expires_at: Optional[datetime] = None, # [ADD] Added argument
694 ):
695 def _op(conn: sqlite3.Connection) -> None:
696 effective_identifier = func_identifier or func_name
697 # updated_at を明示的に設定し、expires_at と同じ形式
698 # (_ensure_utc_isoformat) で統一する。
699 # SQLite の DEFAULT CURRENT_TIMESTAMP は秒精度でフォーマットが異なるため、
700 # prune/get_outdated_tasks との比較精度を揃える。
701 now_str = _ensure_utc_isoformat(datetime.now(timezone.utc))
702 conn.execute(
703 """
704 INSERT OR REPLACE INTO tasks
705 (cache_key, func_name, func_identifier, input_id, version, result_type, content_type, result_value, result_data, expires_at, updated_at)
706 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
707 """,
708 (
709 cache_key,
710 func_name,
711 effective_identifier,
712 input_id,
713 version,
714 result_type,
715 content_type,
716 result_value,
717 result_data,
718 _ensure_utc_isoformat(expires_at),
719 now_str,
720 ),
721 )
723 self._enqueue_write(_op)
725 def get_history(self, limit: int = 1000) -> "pd.DataFrame":
726 try:
727 import pandas as pd
728 except ImportError as e:
729 raise ImportError("Pandas is required for this feature.") from e
731 if not os.path.exists(self.db_path): 731 ↛ 732line 731 didn't jump to line 732 because the condition on line 731 was never true
732 return pd.DataFrame()
734 with self._read_connect() as conn:
735 query = """
736 SELECT
737 cache_key, func_name, func_identifier, input_id, version, result_type,
738 content_type, result_value, result_data, updated_at, expires_at
739 FROM tasks
740 ORDER BY updated_at DESC LIMIT ?
741 """
742 return pd.read_sql_query(query, conn, params=[limit])
744 def delete(self, cache_key: str) -> bool:
745 def _op(conn: sqlite3.Connection) -> bool:
746 cursor = conn.execute("DELETE FROM tasks WHERE cache_key=?", (cache_key,))
747 return cursor.rowcount > 0
749 return bool(self._enqueue_write(_op))
751 def delete_all(self, func_name: Optional[str] = None) -> int:
752 def _op(conn: sqlite3.Connection) -> int:
753 if func_name:
754 cursor = conn.execute(
755 "DELETE FROM tasks WHERE func_name = ? OR func_identifier = ?",
756 (func_name, func_name),
757 )
758 else:
759 cursor = conn.execute("DELETE FROM tasks")
760 return cursor.rowcount
762 return int(self._enqueue_write(_op))
764 def prune(self, older_than: datetime, func_name: Optional[str] = None) -> int:
765 cutoff_str = _ensure_utc_isoformat(older_than)
767 def _op(conn: sqlite3.Connection) -> int:
768 if func_name:
769 cursor = conn.execute(
770 "DELETE FROM tasks WHERE updated_at < ? AND (func_name = ? OR func_identifier = ?)",
771 (cutoff_str, func_name, func_name),
772 )
773 else:
774 cursor = conn.execute(
775 "DELETE FROM tasks WHERE updated_at < ?",
776 (cutoff_str,),
777 )
778 return cursor.rowcount
780 return int(self._enqueue_write(_op))
782 def get_outdated_tasks(
783 self, older_than: datetime, func_name: Optional[str] = None
784 ) -> list[tuple[str, str, str]]:
785 cutoff_str = _ensure_utc_isoformat(older_than)
786 if not os.path.exists(self.db_path): 786 ↛ 787line 786 didn't jump to line 787 because the condition on line 786 was never true
787 return []
789 with self._read_connect() as conn:
790 if func_name:
791 cursor = conn.execute(
792 "SELECT cache_key, COALESCE(func_identifier, func_name), updated_at FROM tasks "
793 "WHERE updated_at < ? AND (func_name = ? OR func_identifier = ?)",
794 (cutoff_str, func_name, func_name),
795 )
796 else:
797 cursor = conn.execute(
798 "SELECT cache_key, COALESCE(func_identifier, func_name), updated_at FROM tasks WHERE updated_at < ?",
799 (cutoff_str,),
800 )
801 return [(row[0], row[1], str(row[2])) for row in cursor.fetchall()]
803 def delete_expired(self) -> int:
804 if not os.path.exists(self.db_path): 804 ↛ 805line 804 didn't jump to line 805 because the condition on line 804 was never true
805 return 0
807 # save() と同じ _ensure_utc_isoformat を使い、フォーマットを統一する
808 now_str = _ensure_utc_isoformat(datetime.now(timezone.utc))
810 def _op(conn: sqlite3.Connection) -> int:
811 cursor = conn.execute(
812 "DELETE FROM tasks WHERE expires_at IS NOT NULL AND expires_at < ?",
813 (now_str,),
814 )
815 return cursor.rowcount
817 return int(self._enqueue_write(_op))
819 def get_blob_refs(self) -> Optional[set[str]]:
820 if not os.path.exists(self.db_path): 820 ↛ 821line 820 didn't jump to line 821 because the condition on line 820 was never true
821 return set()
823 with self._read_connect() as conn:
824 cursor = conn.execute(
825 "SELECT result_value FROM tasks WHERE result_type = 'FILE' AND result_value IS NOT NULL"
826 )
827 # Return full location strings for precise matching
828 return {row[0] for row in cursor.fetchall() if row[0]}
830 def get_keys_start_with(self, prefix: str) -> list[str]:
831 if not os.path.exists(self.db_path): 831 ↛ 832line 831 didn't jump to line 832 because the condition on line 831 was never true
832 return []
834 with self._read_connect() as conn:
835 # LIKE ワイルドカード文字をエスケープしてプレフィックス検索
836 escaped = (
837 prefix.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
838 )
839 cursor = conn.execute(
840 "SELECT cache_key FROM tasks WHERE cache_key LIKE ? ESCAPE '\\' LIMIT 50",
841 (f"{escaped}%",),
842 )
843 return [row[0] for row in cursor.fetchall()]
845 @staticmethod
846 def count_tasks(db_path: Path, timeout: float = 5.0) -> int:
847 """
848 Writer Thread を起動せずに tasks テーブルの件数を返す軽量ユーティリティ。
849 CLI の一覧表示など、読み込みのみを目的とした用途向け。
850 エラー時は -1 を返す。
851 """
852 try:
853 conn = sqlite3.connect(str(db_path), timeout=timeout)
854 try:
855 # 読み取り専用ユーティリティに journal_mode=WAL 設定は不要。
856 # query_only=ON との組み合わせで動作が曖昧になる可能性もあるため削除。
857 conn.execute("PRAGMA query_only = ON;")
858 cursor = conn.execute("SELECT COUNT(*) FROM tasks")
859 result = cursor.fetchone()
860 return result[0] if result else 0
861 finally:
862 conn.close()
863 except Exception:
864 return -1
866 def flush(self, timeout: Optional[float] = None) -> bool:
867 """
868 キューに溜まっているすべての書き込み操作が完了するまで待機します。
870 No-op(何もしない)タスクをキューの末尾に挿入し、そのタスクが処理されるまで
871 待機することで、先行するすべてのタスクの完了を保証します。
873 Args:
874 timeout: 待機する最大秒数。タイムアウトした場合は False を返します。
875 None の場合は無期限に待機しますが、ライタースレッドの
876 死活監視ループにより永久ハングは防止されます。
877 """
878 self._writer_ready.wait()
880 # キューをフラッシュするためのダミータスク
881 def _noop_op(conn: sqlite3.Connection) -> None:
882 pass
884 task = _WriteTask(fn=_noop_op, event=threading.Event())
886 # shutdown との TOCTOU を防ぐため、ロック内でチェックと put を原子的に行う
887 with self._shutdown_lock:
888 if self._shutdown_requested or not self._writer_thread.is_alive(): 888 ↛ 889line 888 didn't jump to line 889 because the condition on line 888 was never true
889 return False
890 self._write_queue.put(task)
892 # ライタースレッドの死活を定期確認しながら待機する。
893 # timeout=None で event.wait() を直接呼ぶとスレッド死亡時に永久ハングするため、
894 # ポーリングループで代替する。
895 _POLL = 0.5
896 deadline = (time.monotonic() + timeout) if timeout is not None else None
898 while True:
899 remaining = (
900 max(0.0, deadline - time.monotonic()) if deadline is not None else None
901 )
902 wait_time = _POLL if remaining is None else min(_POLL, remaining)
904 if task.event.wait(timeout=wait_time):
905 return True
907 if not self._writer_thread.is_alive(): 907 ↛ 908line 907 didn't jump to line 908 because the condition on line 907 was never true
908 logger.error(
909 "SQLite writer thread died unexpectedly while waiting for flush."
910 )
911 return False
913 if deadline is not None and time.monotonic() >= deadline:
914 return False