Coverage for src / beautyspot / db.py: 74%
468 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-11 19:14 +0900
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-11 19:14 +0900
1# src/beautyspot/db.py
3import sqlite3
4import os
5import logging
6import queue
7import threading
8import time
9from collections.abc import Iterator
10from contextlib import contextmanager
11import dataclasses
12from dataclasses import dataclass
13from datetime import datetime, timezone
14from pathlib import Path
15from abc import ABC, abstractmethod
16from typing import Optional, TYPE_CHECKING, Any, Callable, Protocol, runtime_checkable
17import weakref
18from beautyspot.types import TaskRecord
21@runtime_checkable
22class TaskDBCore(Protocol):
23 """
24 Core interface for task metadata storage required during execution.
25 """
27 def init_schema(self) -> None: ...
29 def get(
30 self, cache_key: str, *, include_expired: bool = False
31 ) -> Optional[TaskRecord]: ...
33 def save(
34 self,
35 cache_key: str,
36 func_name: str,
37 func_identifier: Optional[str],
38 input_id: str,
39 version: Optional[str],
40 result_type: str,
41 content_type: Optional[str],
42 result_value: Optional[str] = None,
43 result_data: Optional[bytes] = None,
44 expires_at: Optional[datetime] = None,
45 ) -> None: ...
47 def delete(self, cache_key: str) -> bool: ...
50@runtime_checkable
51class Flushable(Protocol):
52 """Protocol for backends that support flushing pending writes."""
54 def flush(self, timeout: Optional[float] = None) -> bool: ...
57@runtime_checkable
58class Shutdownable(Protocol):
59 """Protocol for backends that require graceful shutdown."""
61 def shutdown(self, wait: bool = True) -> None: ...
64@runtime_checkable
65class Maintenable(Protocol):
66 """
67 Extended interface for maintenance tasks (GC, CLI, Dashboard).
68 """
70 def delete_expired(self) -> int: ...
72 def prune(self, older_than: datetime, func_name: Optional[str] = None) -> int: ...
74 def get_outdated_tasks(
75 self, older_than: datetime, func_name: Optional[str] = None
76 ) -> list[tuple[str, str, str]]: ...
78 def get_blob_refs(self) -> Optional[set[str]]: ...
80 def delete_all(self, func_name: Optional[str] = None) -> int: ...
82 def get_keys_start_with(self, prefix: str) -> list[str]: ...
84 def get_history(self, limit: int = 1000) -> "pd.DataFrame": ...
87@runtime_checkable
88class TaskDBMaintenable(TaskDBCore, Maintenable, Protocol):
89 ...
93class _ReadConnWrapper:
94 def __init__(self, conn: sqlite3.Connection):
95 self.conn = conn
96 self.lock = threading.RLock()
97 self._closed = False
98 self._shutdown_requested = False
100 def close(self, wait: bool = True):
101 """
102 Args:
103 wait: True の場合はロック解放を待機。
104 False (シャットダウン時) の場合は即座に試行し、他が使用中ならスキップする。
105 """
106 # wait=False の場合は blocking=False になり、取得できなければ直ちに False を返す
107 if not self.lock.acquire(blocking=wait): 107 ↛ 110line 107 didn't jump to line 110 because the condition on line 107 was never true
108 # 誰かがクエリ実行中なので、強制クローズによるクラッシュを防ぐために諦める。
109 # 代わりにシャットダウン要求フラグを立てて、クエリ完了後に自身でクローズさせる。
110 if not wait:
111 self._shutdown_requested = True
112 return
114 try:
115 if not self._closed:
116 try:
117 self.conn.close()
118 except Exception:
119 pass
120 self._closed = True
121 finally:
122 self.lock.release()
124 def __del__(self):
125 self.close()
128if TYPE_CHECKING:
129 import pandas as pd
131logger = logging.getLogger(__name__)
132logger.addHandler(logging.NullHandler())
135def _ensure_utc_isoformat(dt: Optional[datetime]) -> Optional[str]:
136 """datetime を UTC 保証の ISO 8601 文字列に変換する。None はそのまま返す。"""
137 if dt is None:
138 return None
139 if dt.tzinfo is None: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true
140 dt = dt.replace(tzinfo=timezone.utc)
141 else:
142 dt = dt.astimezone(timezone.utc)
143 return dt.isoformat(" ")
146@dataclass
147class _WriteTask:
148 fn: Callable[[sqlite3.Connection], Any]
149 event: threading.Event
150 result: Any = None
151 error: Exception | BaseException | None = None
152 state: str = "PENDING" # "PENDING", "RUNNING", "DONE", "CANCELLED"
153 _state_lock: threading.Lock = dataclasses.field(default_factory=threading.Lock)
155 def try_cancel(self) -> bool:
156 """PENDING 状態のタスクをキャンセルする。成功時 True。"""
157 with self._state_lock:
158 if self.state == "PENDING":
159 self.state = "CANCELLED"
160 return True
161 return False
163 def try_start(self) -> bool:
164 """PENDING → RUNNING に遷移する。CANCELLED なら False を返す。"""
165 with self._state_lock:
166 if self.state == "CANCELLED":
167 return False
168 self.state = "RUNNING"
169 return True
171 def mark_done(self) -> None:
172 """RUNNING → DONE に遷移する。"""
173 with self._state_lock:
174 if self.state != "CANCELLED": 174 ↛ exitline 174 didn't jump to the function exit
175 self.state = "DONE"
178_STOP = object()
181class TaskDBBase(ABC):
182 """
183 Abstract base class providing default no-op implementations for maintenance methods.
184 Actual backends should implement TaskDBCore and optionally TaskDBMaintenance.
185 """
187 @abstractmethod
188 def init_schema(self):
189 pass
191 @abstractmethod
192 def get(
193 self, cache_key: str, *, include_expired: bool = False
194 ) -> Optional[TaskRecord]:
195 pass
197 @abstractmethod
198 def save(
199 self,
200 cache_key: str,
201 func_name: str,
202 func_identifier: Optional[str],
203 input_id: str,
204 version: Optional[str],
205 result_type: str,
206 content_type: Optional[str],
207 result_value: Optional[str] = None,
208 result_data: Optional[bytes] = None,
209 expires_at: Optional[datetime] = None,
210 ):
211 pass
213 @abstractmethod
214 def delete(self, cache_key: str) -> bool:
215 pass
217 # --- Maintenance Methods (Default implementations) ---
218 def delete_expired(self) -> int:
219 """Delete tasks that have passed their expiration time."""
220 return 0
222 def prune(self, older_than: datetime, func_name: Optional[str] = None) -> int:
223 """Delete tasks older than the specified datetime."""
224 return 0
226 def get_outdated_tasks(
227 self, older_than: datetime, func_name: Optional[str] = None
228 ) -> list[tuple[str, str, str]]:
229 """Retrieve tasks older than the specified datetime (Preview for prune)."""
230 return []
232 def get_blob_refs(self) -> Optional[set[str]]:
233 """Retrieve all 'result_value' entries that point to external storage."""
234 return None
236 def delete_all(self, func_name: Optional[str] = None) -> int:
237 """Delete all tasks, optionally filtered by function name."""
238 return 0
240 def get_keys_start_with(self, prefix: str) -> list[str]:
241 """Retrieve cache keys that start with the given prefix."""
242 return []
244 def get_history(self, limit: int = 1000) -> "pd.DataFrame":
245 """Get task history. Returns an empty DataFrame by default."""
246 try:
247 import pandas as pd
249 return pd.DataFrame()
250 except ImportError:
251 raise ImportError("Pandas is required for this feature.")
254class WriterTaintedError(RuntimeError):
255 """
256 Indicates that the SQLite writer thread is hung beyond repair.
257 Manual reset or process restart is required.
258 """
259 pass
262class SQLiteTaskDB(TaskDBCore, Flushable, Shutdownable, Maintenable):
263 """
264 Default implementation using SQLite.
265 """
267 def __init__(self, db_path: str | Path | None = None, timeout: float = 30.0):
268 self.db_path = (
269 Path(db_path).resolve() if db_path else Path(f".beautyspot/{hash(self)}.db")
270 )
271 self._ensure_cache_dir(self.db_path.parent)
272 self.timeout = timeout
273 self._local = threading.local()
274 self._write_queue: queue.Queue[object] = queue.Queue()
275 self._shutdown_lock = threading.Lock()
276 self._shutdown_requested = False
277 self._writer_ready = threading.Event()
278 self._writer_error: Exception | None = None
279 self._writer_tainted = False
280 self._writer_generation = 0
281 # ライタースレッドの接続を保持(interrupt() 用)
282 self._writer_conn: sqlite3.Connection | None = None
283 self._writer_conn_lock = threading.Lock()
284 # 読み取り専用スレッドローカル接続を追跡し、
285 # shutdown() 時に一括クローズする。WAL チェックポイントの妨げを防ぐ。
286 self._read_wrappers = weakref.WeakSet()
287 self._read_conns_lock = threading.Lock()
288 self._writer_thread = threading.Thread(
289 target=self._writer_loop,
290 args=(self._writer_generation,),
291 daemon=True,
292 name=f"BeautySpot-SQLiteWriter-{self._writer_generation}",
293 )
294 self._writer_thread.start()
295 self._writer_ready.wait()
296 if self._writer_error: 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true
297 raise self._writer_error
299 def reset(self) -> None:
300 """
301 Forcefully restart the writer thread. Use this if the writer is tainted or hung.
302 """
303 with self._shutdown_lock:
304 if self._shutdown_requested: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true
305 return
307 # 1. 現在の接続に interrupt を試みる(もし生きていれば)
308 with self._writer_conn_lock:
309 if self._writer_conn: 309 ↛ 316line 309 didn't jump to line 316
310 try:
311 self._writer_conn.interrupt()
312 except Exception:
313 pass
315 # 2. 世代を上げ、汚染フラグを下げる
316 self._writer_generation += 1
317 self._writer_tainted = False
318 self._writer_ready = threading.Event()
319 self._writer_error = None
321 # 3. 新しいスレッドを起動
322 self._writer_thread = threading.Thread(
323 target=self._writer_loop,
324 args=(self._writer_generation,),
325 daemon=True,
326 name=f"BeautySpot-SQLiteWriter-{self._writer_generation}",
327 )
328 self._writer_thread.start()
329 self._writer_ready.wait()
330 if self._writer_error: 330 ↛ 331line 330 didn't jump to line 331 because the condition on line 330 was never true
331 raise self._writer_error
332 logger.info(
333 f"SQLiteTaskDB writer thread reset to generation {self._writer_generation}"
334 )
336 @staticmethod
337 def _ensure_cache_dir(directory: Path) -> None:
338 """
339 データベース格納用の親ディレクトリを作成し、.gitignore を配置する。
340 """
341 directory.mkdir(parents=True, exist_ok=True)
342 gitignore_path = directory / ".gitignore"
343 if not gitignore_path.exists():
344 try:
345 gitignore_path.write_text("*\n")
346 except OSError as e:
347 logging.warning(f"Failed to create .gitignore in {directory}: {e}")
349 @contextmanager
350 def _read_connect(self) -> Iterator[sqlite3.Connection]:
351 """
352 Thread-safe connection context manager for read-only operations.
353 Uses a thread-local pool to reuse connections and reduce overhead.
354 PRAGMA query_only = ON により、誤った書き込みを接続レベルで防止する。
356 新規接続を _read_wrappers に登録し、
357 shutdown() 時の一括クローズで WAL チェックポイント妨害を防ぐ。
358 また、_ReadConnWrapper によってスレッド終了時に接続がクローズされ、メモリリークを防止。
359 """
360 if self._shutdown_requested: 360 ↛ 361line 360 didn't jump to line 361 because the condition on line 360 was never true
361 raise RuntimeError("SQLiteTaskDB is shutting down.")
363 wrapper = getattr(self._local, "read_conn_wrapper", None)
364 if wrapper is None or wrapper._closed:
365 # シャットダウン後に新しい接続がリークするのを防ぐため再チェック。
366 # 最初のチェック通過後に別スレッドが shutdown() を呼び出し、
367 # 全ラッパーをクローズした場合にここに到達する。
368 if self._shutdown_requested: 368 ↛ 369line 368 didn't jump to line 369 because the condition on line 368 was never true
369 raise RuntimeError("SQLiteTaskDB is shutting down.")
370 conn = sqlite3.connect(
371 self.db_path, timeout=self.timeout, check_same_thread=False
372 )
373 try:
374 conn.execute("PRAGMA query_only = ON;")
375 except Exception:
376 conn.close()
377 raise
378 wrapper = _ReadConnWrapper(conn)
379 with self._read_conns_lock:
380 # ロック内で再度チェックし、shutdown() による _read_wrappers.clear() と
381 # 新規追加の間の競合を完全に排除する。
382 if self._shutdown_requested: 382 ↛ 383line 382 didn't jump to line 383 because the condition on line 382 was never true
383 conn.close()
384 raise RuntimeError("SQLiteTaskDB is shutting down.")
385 self._read_wrappers.add(wrapper)
386 self._local.read_conn_wrapper = wrapper
388 with wrapper.lock:
389 if wrapper._closed: 389 ↛ 390line 389 didn't jump to line 390 because the condition on line 389 was never true
390 raise RuntimeError("Database connection was closed")
391 try:
392 yield wrapper.conn
393 except sqlite3.Error:
394 # 接続が壊れた場合等のリカバリ (BUG-3)
395 # 現在の接続を破棄し、次回のアクセス時に新しく作り直す
396 wrapper.close()
397 with self._read_conns_lock:
398 self._read_wrappers.discard(wrapper)
399 self._local.read_conn_wrapper = None
400 raise
402 # クエリ実行中にシャットダウン要求があった場合、自身でクローズする
403 if getattr(wrapper, "_shutdown_requested", False): 403 ↛ 404line 403 didn't jump to line 404 because the condition on line 403 was never true
404 wrapper.close()
405 with self._read_conns_lock:
406 self._read_wrappers.discard(wrapper)
407 self._local.read_conn_wrapper = None
409 def _writer_loop(self, generation: int) -> None:
410 conn: sqlite3.Connection | None = None
411 try:
412 conn = sqlite3.connect(self.db_path, timeout=self.timeout)
413 conn.execute("PRAGMA journal_mode=WAL;")
414 with self._writer_conn_lock:
415 if generation == self._writer_generation: 415 ↛ 423line 415 didn't jump to line 423
416 self._writer_conn = conn
417 except Exception as e:
418 if generation == self._writer_generation:
419 self._writer_error = e
420 self._writer_ready.set()
421 return
423 self._writer_ready.set()
424 try:
425 while True:
426 # 世代チェック: 自分が最新でないなら終了
427 if generation != self._writer_generation:
428 break
430 try:
431 task = self._write_queue.get(timeout=1.0)
432 except queue.Empty:
433 continue
435 if task is _STOP:
436 self._write_queue.task_done()
437 break
438 assert isinstance(task, _WriteTask)
439 if not task.try_start():
440 # CANCELLED 状態 — スキップ
441 task.event.set()
442 self._write_queue.task_done()
443 continue
445 try:
446 task.result = task.fn(conn)
447 conn.commit()
448 except sqlite3.OperationalError as e:
449 # interrupt() によって中断された場合も含めロールバック
450 conn.rollback()
451 task.error = e
452 except BaseException as e:
453 conn.rollback()
454 task.error = e
455 finally:
456 task.mark_done()
457 task.event.set()
458 self._write_queue.task_done()
459 finally:
460 with self._writer_conn_lock:
461 if self._writer_conn is conn:
462 self._writer_conn = None
463 if conn is not None: 463 ↛ exitline 463 didn't return from function '_writer_loop' because the condition on line 463 was always true
464 try:
465 conn.close()
466 except Exception:
467 pass
469 def _enqueue_write(self, fn: Callable[[sqlite3.Connection], Any]) -> Any:
470 self._writer_ready.wait()
471 if self._writer_error: 471 ↛ 472line 471 didn't jump to line 472 because the condition on line 471 was never true
472 raise RuntimeError(
473 "SQLite writer thread failed to start."
474 ) from self._writer_error
476 with self._shutdown_lock:
477 if self._writer_tainted:
478 raise WriterTaintedError(
479 "SQLite writer thread is tainted (hung) and cannot process writes. "
480 "Call reset() or restart the process."
481 )
482 if self._shutdown_requested: 482 ↛ 483line 482 didn't jump to line 483 because the condition on line 482 was never true
483 raise RuntimeError("SQLiteTaskDB is shutting down.")
484 if not self._writer_thread.is_alive(): 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true
485 raise RuntimeError("SQLite writer thread is not running.")
486 task = _WriteTask(fn=fn, event=threading.Event())
487 self._write_queue.put(task)
489 start = time.monotonic()
490 while not task.event.wait(timeout=0.5):
491 if not self._writer_thread.is_alive(): 491 ↛ 492line 491 didn't jump to line 492 because the condition on line 491 was never true
492 raise RuntimeError("SQLite writer thread stopped unexpectedly.")
493 if self._shutdown_requested: 493 ↛ 494line 493 didn't jump to line 494 because the condition on line 493 was never true
494 raise RuntimeError("SQLiteTaskDB is shutting down.")
496 # 待機中に汚染された場合
497 if self._writer_tainted: 497 ↛ 498line 497 didn't jump to line 498 because the condition on line 497 was never true
498 raise WriterTaintedError("Writer thread became tainted during wait.")
500 elapsed = time.monotonic() - start
501 if elapsed > self.timeout: 501 ↛ 490line 501 didn't jump to line 490 because the condition on line 501 was always true
502 if task.try_cancel():
503 # PENDING(未着手)のタスクはキャンセル可能
504 raise TimeoutError(
505 f"SQLite write did not start within {self.timeout}s and was cancelled."
506 )
507 else:
508 # RUNNING(実行中)のタスクは interrupt() を試みる。
509 with self._writer_conn_lock:
510 if self._writer_conn: 510 ↛ 514line 510 didn't jump to line 514
511 self._writer_conn.interrupt()
513 # interrupt() 後、ライタースレッドが例外を処理して event を set するのを少し待つ。
514 if task.event.wait(timeout=1.0):
515 break
516 else:
517 # interrupt() が効かない場合、Writerスレッドを「汚染」としてマークする。
518 self._writer_tainted = True
519 raise WriterTaintedError(
520 f"SQLite write timed out after {self.timeout}s and interrupt failed. "
521 "Writer thread is now tainted."
522 )
523 if task.error:
524 if isinstance(task.error, sqlite3.OperationalError) and "interrupted" in str(task.error).lower(): 524 ↛ 528line 524 didn't jump to line 528 because the condition on line 524 was always true
525 raise TimeoutError(
526 f"SQLite write timed out after {self.timeout}s and was interrupted."
527 ) from task.error
528 raise task.error
529 return task.result
531 def shutdown(self, wait: bool = True) -> None:
532 with self._shutdown_lock:
533 if self._shutdown_requested: 533 ↛ 534line 533 didn't jump to line 534 because the condition on line 533 was never true
534 return
535 self._shutdown_requested = True
537 if not self._writer_thread.is_alive(): 537 ↛ 538line 537 didn't jump to line 538 because the condition on line 537 was never true
538 logger.error(
539 "SQLite writer thread is not running; pending writes may be lost."
540 )
541 return
543 if wait:
544 self._write_queue.join()
545 self._write_queue.put(_STOP)
546 if wait:
547 self._writer_thread.join()
549 # 全スレッドのread-only接続を一括クローズ。
550 # スレッドローカル接続が開いたままだと WAL チェックポイントを妨げるため。
551 with self._read_conns_lock:
552 for wrapper in self._read_wrappers:
553 try:
554 wrapper.close(wait=False)
555 except Exception:
556 pass
557 self._read_wrappers.clear()
559 def init_schema(self):
560 # スキーマ初期化および全マイグレーションを Writer Thread の接続で実行する。
561 # _connect() による別コネクション経由の DDL は、Writer Thread が保持する
562 # WAL ライターロックと競合するリスクがあるため、_enqueue_write に委譲する。
563 def _op(conn: sqlite3.Connection) -> None:
564 conn.execute("PRAGMA journal_mode=WAL;")
565 conn.execute("""
566 CREATE TABLE IF NOT EXISTS tasks (
567 cache_key TEXT PRIMARY KEY,
568 func_name TEXT,
569 func_identifier TEXT,
570 input_id TEXT,
571 result_type TEXT,
572 result_value TEXT,
573 result_data BLOB,
574 content_type TEXT,
575 version TEXT,
576 expires_at TIMESTAMP,
577 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
578 )
579 """)
581 # Auto Migration
582 cursor = conn.execute("PRAGMA table_info(tasks)")
583 columns = [row[1] for row in cursor.fetchall()]
585 if "content_type" not in columns: 585 ↛ 586line 585 didn't jump to line 586 because the condition on line 585 was never true
586 try:
587 conn.execute("ALTER TABLE tasks ADD COLUMN content_type TEXT;")
588 except sqlite3.OperationalError as e:
589 if "duplicate column name" not in str(e).lower():
590 raise
591 pass
592 if "version" not in columns: 592 ↛ 593line 592 didn't jump to line 593 because the condition on line 592 was never true
593 try:
594 conn.execute("ALTER TABLE tasks ADD COLUMN version TEXT;")
595 except sqlite3.OperationalError as e:
596 if "duplicate column name" not in str(e).lower():
597 raise
598 pass
599 if "result_data" not in columns: 599 ↛ 600line 599 didn't jump to line 600 because the condition on line 599 was never true
600 try:
601 conn.execute("ALTER TABLE tasks ADD COLUMN result_data BLOB;")
602 except sqlite3.OperationalError as e:
603 if "duplicate column name" not in str(e).lower():
604 raise
605 pass
607 if "func_identifier" not in columns: 607 ↛ 608line 607 didn't jump to line 608 because the condition on line 607 was never true
608 try:
609 conn.execute("ALTER TABLE tasks ADD COLUMN func_identifier TEXT;")
610 except sqlite3.OperationalError as e:
611 if "duplicate column name" not in str(e).lower():
612 raise
613 pass
614 conn.execute(
615 "CREATE INDEX IF NOT EXISTS idx_func_identifier ON tasks(func_identifier);"
616 )
618 if "expires_at" not in columns:
619 try:
620 conn.execute("ALTER TABLE tasks ADD COLUMN expires_at TIMESTAMP;")
621 except sqlite3.OperationalError as e:
622 if "duplicate column name" not in str(e).lower():
623 raise
624 pass
625 conn.execute(
626 "CREATE INDEX IF NOT EXISTS idx_expires_at ON tasks(expires_at);"
627 )
629 self._enqueue_write(_op)
631 def get(
632 self, cache_key: str, *, include_expired: bool = False
633 ) -> Optional[TaskRecord]:
634 with self._read_connect() as conn:
635 # [MOD] Include expires_at in query
636 row = conn.execute(
637 "SELECT result_type, result_value, result_data, expires_at FROM tasks WHERE cache_key=?",
638 (cache_key,),
639 ).fetchone()
641 if row:
642 r_type, r_val, r_data, exp_str = row
644 # [ADD] Lazy Expiration Check (skip when include_expired=True)
645 if exp_str and not include_expired:
646 try:
647 # SQLite returns timestamps as strings usually
648 # Replace space with T for compatibility with Python <= 3.10
649 expires_at = datetime.fromisoformat(exp_str.replace(" ", "T"))
650 # Naive datetimes stored before timezone support are treated as UTC
651 if expires_at.tzinfo is None: 651 ↛ 652line 651 didn't jump to line 652 because the condition on line 651 was never true
652 expires_at = expires_at.replace(tzinfo=timezone.utc)
653 if expires_at < datetime.now(timezone.utc):
654 # Expired -> Treat as Cache Miss
655 # (Physical deletion is deferred to `beautyspot gc`)
656 return None
657 except (ValueError, TypeError):
658 pass # Ignore parsing errors, treat as valid
660 return TaskRecord(
661 result_type=r_type,
662 result_value=r_val,
663 result_data=r_data,
664 expires_at=exp_str,
665 )
666 return None
668 def save(
669 self,
670 cache_key: str,
671 func_name: str,
672 func_identifier: Optional[str],
673 input_id: str,
674 version: Optional[str],
675 result_type: str,
676 content_type: Optional[str],
677 result_value: Optional[str] = None,
678 result_data: Optional[bytes] = None,
679 expires_at: Optional[datetime] = None, # [ADD] Added argument
680 ):
681 def _op(conn: sqlite3.Connection) -> None:
682 effective_identifier = func_identifier or func_name
683 # updated_at を明示的に設定し、expires_at と同じ形式
684 # (_ensure_utc_isoformat) で統一する。
685 # SQLite の DEFAULT CURRENT_TIMESTAMP は秒精度でフォーマットが異なるため、
686 # prune/get_outdated_tasks との比較精度を揃える。
687 now_str = _ensure_utc_isoformat(datetime.now(timezone.utc))
688 conn.execute(
689 """
690 INSERT OR REPLACE INTO tasks
691 (cache_key, func_name, func_identifier, input_id, version, result_type, content_type, result_value, result_data, expires_at, updated_at)
692 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
693 """,
694 (
695 cache_key,
696 func_name,
697 effective_identifier,
698 input_id,
699 version,
700 result_type,
701 content_type,
702 result_value,
703 result_data,
704 _ensure_utc_isoformat(expires_at),
705 now_str,
706 ),
707 )
709 self._enqueue_write(_op)
711 def get_history(self, limit: int = 1000) -> "pd.DataFrame":
712 try:
713 import pandas as pd
714 except ImportError as e:
715 raise ImportError("Pandas is required for this feature.") from e
717 if not os.path.exists(self.db_path): 717 ↛ 718line 717 didn't jump to line 718 because the condition on line 717 was never true
718 return pd.DataFrame()
720 with self._read_connect() as conn:
721 query = """
722 SELECT
723 cache_key, func_name, func_identifier, input_id, version, result_type,
724 content_type, result_value, result_data, updated_at, expires_at
725 FROM tasks
726 ORDER BY updated_at DESC LIMIT ?
727 """
728 return pd.read_sql_query(query, conn, params=[limit])
730 def delete(self, cache_key: str) -> bool:
731 def _op(conn: sqlite3.Connection) -> bool:
732 cursor = conn.execute("DELETE FROM tasks WHERE cache_key=?", (cache_key,))
733 return cursor.rowcount > 0
735 return bool(self._enqueue_write(_op))
737 def delete_all(self, func_name: Optional[str] = None) -> int:
738 def _op(conn: sqlite3.Connection) -> int:
739 if func_name:
740 cursor = conn.execute(
741 "DELETE FROM tasks WHERE func_name = ? OR func_identifier = ?",
742 (func_name, func_name),
743 )
744 else:
745 cursor = conn.execute("DELETE FROM tasks")
746 return cursor.rowcount
748 return int(self._enqueue_write(_op))
750 def prune(self, older_than: datetime, func_name: Optional[str] = None) -> int:
751 cutoff_str = _ensure_utc_isoformat(older_than)
753 def _op(conn: sqlite3.Connection) -> int:
754 if func_name:
755 cursor = conn.execute(
756 "DELETE FROM tasks WHERE updated_at < ? AND (func_name = ? OR func_identifier = ?)",
757 (cutoff_str, func_name, func_name),
758 )
759 else:
760 cursor = conn.execute(
761 "DELETE FROM tasks WHERE updated_at < ?",
762 (cutoff_str,),
763 )
764 return cursor.rowcount
766 return int(self._enqueue_write(_op))
768 def get_outdated_tasks(
769 self, older_than: datetime, func_name: Optional[str] = None
770 ) -> list[tuple[str, str, str]]:
771 cutoff_str = _ensure_utc_isoformat(older_than)
772 if not os.path.exists(self.db_path): 772 ↛ 773line 772 didn't jump to line 773 because the condition on line 772 was never true
773 return []
775 with self._read_connect() as conn:
776 if func_name:
777 cursor = conn.execute(
778 "SELECT cache_key, COALESCE(func_identifier, func_name), updated_at FROM tasks "
779 "WHERE updated_at < ? AND (func_name = ? OR func_identifier = ?)",
780 (cutoff_str, func_name, func_name),
781 )
782 else:
783 cursor = conn.execute(
784 "SELECT cache_key, COALESCE(func_identifier, func_name), updated_at FROM tasks WHERE updated_at < ?",
785 (cutoff_str,),
786 )
787 return [(row[0], row[1], str(row[2])) for row in cursor.fetchall()]
789 def delete_expired(self) -> int:
790 if not os.path.exists(self.db_path): 790 ↛ 791line 790 didn't jump to line 791 because the condition on line 790 was never true
791 return 0
793 # save() と同じ _ensure_utc_isoformat を使い、フォーマットを統一する
794 now_str = _ensure_utc_isoformat(datetime.now(timezone.utc))
796 def _op(conn: sqlite3.Connection) -> int:
797 cursor = conn.execute(
798 "DELETE FROM tasks WHERE expires_at IS NOT NULL AND expires_at < ?",
799 (now_str,),
800 )
801 return cursor.rowcount
803 return int(self._enqueue_write(_op))
805 def get_blob_refs(self) -> Optional[set[str]]:
806 if not os.path.exists(self.db_path): 806 ↛ 807line 806 didn't jump to line 807 because the condition on line 806 was never true
807 return set()
809 with self._read_connect() as conn:
810 cursor = conn.execute(
811 "SELECT result_value FROM tasks WHERE result_type = 'FILE' AND result_value IS NOT NULL"
812 )
813 # Return full location strings for precise matching
814 return {row[0] for row in cursor.fetchall() if row[0]}
816 def get_keys_start_with(self, prefix: str) -> list[str]:
817 if not os.path.exists(self.db_path): 817 ↛ 818line 817 didn't jump to line 818 because the condition on line 817 was never true
818 return []
820 with self._read_connect() as conn:
821 # LIKE ワイルドカード文字をエスケープしてプレフィックス検索
822 escaped = (
823 prefix.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
824 )
825 cursor = conn.execute(
826 "SELECT cache_key FROM tasks WHERE cache_key LIKE ? ESCAPE '\\' LIMIT 50",
827 (f"{escaped}%",),
828 )
829 return [row[0] for row in cursor.fetchall()]
831 @staticmethod
832 def count_tasks(db_path: Path, timeout: float = 5.0) -> int:
833 """
834 Writer Thread を起動せずに tasks テーブルの件数を返す軽量ユーティリティ。
835 CLI の一覧表示など、読み込みのみを目的とした用途向け。
836 エラー時は -1 を返す。
837 """
838 try:
839 conn = sqlite3.connect(str(db_path), timeout=timeout)
840 try:
841 # 読み取り専用ユーティリティに journal_mode=WAL 設定は不要。
842 # query_only=ON との組み合わせで動作が曖昧になる可能性もあるため削除。
843 conn.execute("PRAGMA query_only = ON;")
844 cursor = conn.execute("SELECT COUNT(*) FROM tasks")
845 result = cursor.fetchone()
846 return result[0] if result else 0
847 finally:
848 conn.close()
849 except Exception:
850 return -1
852 def flush(self, timeout: Optional[float] = None) -> bool:
853 """
854 キューに溜まっているすべての書き込み操作が完了するまで待機します。
856 No-op(何もしない)タスクをキューの末尾に挿入し、そのタスクが処理されるまで
857 待機することで、先行するすべてのタスクの完了を保証します。
859 Args:
860 timeout: 待機する最大秒数。タイムアウトした場合は False を返します。
861 None の場合は無期限に待機しますが、ライタースレッドの
862 死活監視ループにより永久ハングは防止されます。
863 """
864 self._writer_ready.wait()
866 # キューをフラッシュするためのダミータスク
867 def _noop_op(conn: sqlite3.Connection) -> None:
868 pass
870 task = _WriteTask(fn=_noop_op, event=threading.Event())
872 # shutdown との TOCTOU を防ぐため、ロック内でチェックと put を原子的に行う
873 with self._shutdown_lock:
874 if self._shutdown_requested or not self._writer_thread.is_alive(): 874 ↛ 875line 874 didn't jump to line 875 because the condition on line 874 was never true
875 return False
876 self._write_queue.put(task)
878 # ライタースレッドの死活を定期確認しながら待機する。
879 # timeout=None で event.wait() を直接呼ぶとスレッド死亡時に永久ハングするため、
880 # ポーリングループで代替する。
881 _POLL = 0.5
882 deadline = (time.monotonic() + timeout) if timeout is not None else None
884 while True:
885 remaining = (
886 max(0.0, deadline - time.monotonic()) if deadline is not None else None
887 )
888 wait_time = _POLL if remaining is None else min(_POLL, remaining)
890 if task.event.wait(timeout=wait_time):
891 return True
893 if not self._writer_thread.is_alive(): 893 ↛ 894line 893 didn't jump to line 894 because the condition on line 893 was never true
894 logger.error(
895 "SQLite writer thread died unexpectedly while waiting for flush."
896 )
897 return False
899 if deadline is not None and time.monotonic() >= deadline:
900 return False