Coverage for src / beautyspot / maintenance.py: 84%
196 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-18 18:20 +0900
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-18 18:20 +0900
1# src/beautyspot/maintenance.py
3import logging
4import shutil
5import time
6from datetime import datetime, timedelta, timezone
7from pathlib import Path
8import threading
9from typing import Optional, Any
11from beautyspot.db import Flushable, Shutdownable, TaskDBMaintenable
12from beautyspot.storage import BlobStorageMaintenable
13from beautyspot.serializer import SerializerProtocol
15logger = logging.getLogger(__name__)
16logger.addHandler(logging.NullHandler())
19class MaintenanceService:
20 """
21 Service layer for administrative tasks, dashboard support, and system assembly.
22 """
24 def __init__(
25 self,
26 db: TaskDBMaintenable,
27 storage: BlobStorageMaintenable,
28 serializer: SerializerProtocol,
29 ):
30 self.db = db
31 self.storage = storage
32 self.serializer = serializer
33 self._cleaning_lock = threading.Lock()
34 self._owns_db = False # from_path で作成した場合のみ True
36 def close(self) -> None:
37 """DB バックエンドをシャットダウンする。
39 ``from_path`` で作成された場合のみ DB をシャットダウンします。
40 外部から注入された DB は呼び出し元の責務でシャットダウンしてください。
41 """
42 if self._owns_db and isinstance(self.db, Shutdownable): 42 ↛ exitline 42 didn't return from function 'close' because the condition on line 42 was always true
43 self.db.shutdown(wait=True)
45 def __enter__(self) -> "MaintenanceService":
46 return self
48 def __exit__(self, *args: object) -> None:
49 self.close()
51 @classmethod
52 def from_path(
53 cls, db_path: str | Path, blob_dir: Optional[str | Path] = None
54 ) -> "MaintenanceService":
55 """
56 Factory method to assemble the system components (SQLite + Msgpack + Storage)
57 from a database path.
58 """
59 # 遅延インポートで依存関係を解決
60 from beautyspot.db import SQLiteTaskDB
61 from beautyspot.storage import create_storage
62 from beautyspot.serializer import MsgpackSerializer
64 path = Path(db_path)
66 # Blobディレクトリの推論ロジック
67 if blob_dir:
68 b_path = Path(blob_dir)
69 else:
70 # bs.Spot(name="foo") のデフォルト配置:
71 # .beautyspot/foo.db -> .beautyspot/blobs/foo/
72 parent = path.parent
73 stem = path.stem
75 # 現行レイアウト優先: .beautyspot/blobs/{name}/
76 candidate = parent / "blobs" / stem
77 if candidate.exists():
78 b_path = candidate
79 else:
80 # 旧レイアウトへのフォールバック: .beautyspot/{name}/blobs/
81 legacy = parent / stem / "blobs"
82 b_path = legacy if legacy.exists() else candidate
84 db = SQLiteTaskDB(path)
86 service = cls(
87 db=db,
88 storage=create_storage(str(b_path)),
89 serializer=MsgpackSerializer(),
90 )
91 service._owns_db = True
92 return service
94 # --- Dashboard Support ---
96 def get_history(self, limit: int = 1000):
97 """Get task history from DB."""
98 return self.db.get_history(limit=limit)
100 def get_task_detail(
101 self, cache_key: str, *, include_expired: bool = False
102 ) -> Optional[dict[str, Any]]:
103 """
104 Retrieve task details and decode the blob data if available.
105 Returns the record dict with an extra 'decoded_data' key.
107 Args:
108 include_expired: If True, return expired records as well (for dashboard/debugging).
109 """
110 record = self.db.get(cache_key, include_expired=include_expired)
111 if not record:
112 return None
114 result_record: dict[str, Any] = dict(record)
116 decoded_data = None
117 r_type = record.get("result_type")
118 r_val = record.get("result_value")
119 r_blob = record.get("result_data")
121 try:
122 if r_type == "DIRECT_BLOB":
123 if r_blob is not None:
124 decoded_data = self.serializer.loads(r_blob)
126 elif r_type == "FILE": 126 ↛ 136line 126 didn't jump to line 136 because the condition on line 126 was always true
127 if r_val: 127 ↛ 136line 127 didn't jump to line 136 because the condition on line 127 was always true
128 # Storage からロードしてデシリアライズ
129 data_bytes = self.storage.load(r_val)
130 decoded_data = self.serializer.loads(data_bytes)
132 except Exception as e:
133 logger.error(f"Failed to decode data for key '{cache_key}': {e}")
134 # デコード失敗時は decoded_data は None のまま
136 result_record["decoded_data"] = decoded_data
137 return result_record
139 def delete_expired_tasks(self) -> int:
140 """期限切れタスクの物理削除 (GC用)"""
141 # [FIX] 内部実装(_connect)への依存を排除し、インターフェースメソッドを使用
142 return self.db.delete_expired()
144 def delete_task(self, cache_key: str) -> bool:
145 """
146 Delete a task record and its associated blob file.
147 """
148 record = self.db.get(cache_key, include_expired=True)
149 if not record: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true
150 return False
152 result_record: dict[str, Any] = dict(record)
154 # Bug Fix (Bug6): DB レコードを先に削除してからブロブを削除する。
155 # 旧実装(ブロブ先削除)では DB レコード削除が失敗すると
156 # 「DB に参照が残るがブロブが消えた」状態になり、次のアクセスで
157 # CacheCorruptedError が発生していた。
158 # ブロブが孤立した場合は GC (scan_garbage) で回収可能なため、
159 # この順序の方が安全。
160 deleted = self.db.delete(cache_key)
161 if not deleted: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true
162 return False
164 # Blob削除
165 if record.get("result_type") == "FILE" and record.get("result_value"):
166 try:
167 self.storage.delete(result_record["result_value"])
168 except Exception as e:
169 logger.warning(
170 f"Failed to delete blob for key '{cache_key}': {e}. "
171 "The orphaned blob will be collected by GC."
172 )
174 return True
176 # --- Maintenance Operations ---
178 def get_prunable_tasks(
179 self, days: int, func_name: Optional[str] = None
180 ) -> list[tuple[str, str, str]]:
181 cutoff = datetime.now(timezone.utc) - timedelta(days=days)
182 return self.db.get_outdated_tasks(cutoff, func_name)
184 def prune(self, days: int, func_name: Optional[str] = None) -> int:
185 cutoff = datetime.now(timezone.utc) - timedelta(days=days)
186 logger.info(f"Pruning tasks older than {cutoff} (func={func_name})...")
187 count = self.db.prune(cutoff, func_name)
188 logger.info(f"Deleted {count} tasks.")
189 return count
191 def clear(self, func_name: Optional[str] = None) -> int:
192 logger.info(f"Clearing all tasks (func={func_name})...")
193 count = self.db.delete_all(func_name)
194 logger.info(f"Cleared {count} tasks.")
195 return count
197 def scan_garbage(self, grace_period: float = 60.0) -> list[str]:
198 """
199 Scan for orphaned blob files that are not referenced in the database.
201 Args:
202 grace_period: Minimum age of a blob (in seconds) to be considered an orphan.
203 This prevents deleting blobs that were just created but
204 not yet registered in the database (background saves).
205 """
206 ref_filenames = self.db.get_blob_refs()
207 if ref_filenames is None: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 return []
210 def _normalize_location(location: str) -> str:
211 return location.replace("\\", "/")
213 def _basename(location: str) -> str:
214 return _normalize_location(location).split("/")[-1]
216 ref_locations = {_normalize_location(loc) for loc in ref_filenames}
217 # Legacy support: DBに絶対パスが保存されていた場合のみbasenameを許容
218 ref_basenames = {
219 _basename(loc) for loc in ref_locations if Path(loc).is_absolute()
220 }
222 now = time.time()
223 orphans = []
224 for location in self.storage.list_keys():
225 norm = _normalize_location(location)
226 if norm in ref_locations:
227 continue
228 if _basename(norm) in ref_basenames:
229 continue
231 # Check grace period
232 if grace_period > 0:
233 try:
234 mtime = self.storage.get_mtime(location)
235 if now - mtime < grace_period:
236 # Too new; potentially in-flight background save.
237 continue
238 except Exception as e:
239 logger.debug(f"Failed to check mtime for {location} (ignored): {e}")
240 # Skip files that we can't check, to be safe.
241 continue
243 orphans.append(location)
245 return orphans
247 def clean_garbage(
248 self,
249 orphans: Optional[list[str]] = None,
250 tmp_max_age_seconds: int = 86400,
251 orphan_grace_seconds: float = 60.0,
252 ) -> tuple[int, int]:
253 """
254 期限切れのタスク(DBレコード)と孤立したBlobファイルを削除します。
255 また、アトミック書き込み時に残留した古い一時ファイル (.spot_tmp) の
256 クリーンアップも同時に行います。
258 Args:
259 orphans: 事前にスキャンされた孤立ファイルのリスト。
260 tmp_max_age_seconds: 一時ファイルを削除対象とするまでの猶予期間(秒)。デフォルトは24時間。
261 orphan_grace_seconds: 孤立ファイルを判定する際の猶予期間(秒)。デフォルトは60秒。
263 Returns:
264 tuple[int, int]: (削除された期限切れタスクの数, 削除された孤立ファイルの数)
265 """
266 if not self._cleaning_lock.acquire(blocking=False): 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true
267 logger.debug("Another eviction task is currently running. Skipping.")
268 return 0, 0
270 try:
271 # Phase -1: DBの書き込みキューをフラッシュ
272 # save_sync=False で投入された書き込みが未コミットの場合、
273 # 直後の scan_garbage() がその blob を孤立ファイルと誤判定して
274 # 削除してしまうレースコンディションを防ぐ。
275 if isinstance(self.db, Flushable): 275 ↛ 282line 275 didn't jump to line 282 because the condition on line 275 was always true
276 try:
277 self.db.flush(timeout=10.0)
278 except Exception as e:
279 logger.warning(f"DB flush before garbage scan failed: {e}")
281 # Phase 0: 期限切れタスクの削除
282 deleted_expired_count = self.delete_expired_tasks()
283 if deleted_expired_count > 0: 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true
284 logger.info(f"Deleted {deleted_expired_count} expired tasks from DB.")
286 # Phase 1: 孤立したファイルの特定
287 if orphans is None:
288 orphans = self.scan_garbage(grace_period=orphan_grace_seconds)
290 deleted_orphan_count = 0
292 # Phase 2: ファイルの実体削除
293 if orphans:
294 for location in orphans:
295 try:
296 self.storage.delete(location)
297 deleted_orphan_count += 1
298 except Exception as e:
299 logger.warning(f"Failed to delete orphan blob {location}: {e}")
301 if deleted_orphan_count > 0: 301 ↛ 305line 301 didn't jump to line 305 because the condition on line 301 was always true
302 logger.info(f"Deleted {deleted_orphan_count} orphaned blob files.")
304 # [ADD] Phase 2.5: 古い一時ファイルのクリーンアップ
305 if hasattr(self.storage, "clean_temp_files"): 305 ↛ 316line 305 didn't jump to line 316 because the condition on line 305 was always true
306 try:
307 tmp_count = self.storage.clean_temp_files( # type: ignore
308 max_age_seconds=tmp_max_age_seconds
309 )
310 if tmp_count > 0:
311 logger.info(f"Removed {tmp_count} abandoned temporary files.")
312 except Exception as e:
313 logger.warning(f"Failed to clean temporary files: {e}")
315 # Phase 3: 空ディレクトリ掃除
316 if hasattr(self.storage, "prune_empty_dirs"): 316 ↛ 324line 316 didn't jump to line 324 because the condition on line 316 was always true
317 try:
318 dir_count = self.storage.prune_empty_dirs() # type: ignore
319 if dir_count > 0: 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true
320 logger.info(f"Removed {dir_count} empty directories.")
321 except Exception as e:
322 logger.warning(f"Failed to prune empty directories: {e}")
324 return deleted_expired_count, deleted_orphan_count
326 finally:
327 self._cleaning_lock.release()
329 def resolve_key_prefix(self, prefix: str) -> str | list[str] | None:
330 """
331 Resolve a potentially shortened key to a full cache key.
333 Returns:
334 str: The single matching full key (Exact match or unique prefix match).
335 list[str]: A list of conflicting candidates (Ambiguous).
336 None: No match found.
337 """
338 # 1. 完全一致を最優先でチェック
339 if self.db.get(prefix, include_expired=True):
340 return prefix
342 # 2. プレフィックス検索
343 candidates = self.db.get_keys_start_with(prefix)
345 if not candidates: 345 ↛ 348line 345 didn't jump to line 348 because the condition on line 345 was always true
346 return None
348 if len(candidates) == 1:
349 return candidates[0]
351 # 3. 曖昧な場合 (複数の候補を返す)
352 return candidates
354 # --- Zombie Project Cleanup (gc command) ---
356 @staticmethod
357 def scan_orphan_projects(workspace_dir: Path) -> list[Path]:
358 """
359 Scan for blob directories in .beautyspot/blobs/ that have no corresponding .db file.
360 Returns a list of Path objects for the orphan directories.
361 """
362 blobs_root = workspace_dir / "blobs"
363 if not blobs_root.exists():
364 return []
366 orphans = []
367 for entry in blobs_root.iterdir():
368 if entry.is_dir(): 368 ↛ 367line 368 didn't jump to line 367 because the condition on line 368 was always true
369 # blobs/{name} に対して {name}.db が存在するか確認
370 db_path = workspace_dir / f"{entry.name}.db"
371 if not db_path.exists():
372 orphans.append(entry)
374 return orphans
376 @staticmethod
377 def delete_project_storage(path: Path) -> None:
378 """
379 Recursively delete a project storage directory.
380 """
381 if path.exists() and path.is_dir(): 381 ↛ exitline 381 didn't return from function 'delete_project_storage' because the condition on line 381 was always true
382 try:
383 shutil.rmtree(path)
384 except Exception as e:
385 logger.error(f"Failed to delete directory {path}: {e}")
386 raise