Coverage for src / beautyspot / maintenance.py: 84%
200 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-11 19:10 +0900
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-11 19:10 +0900
1# src/beautyspot/maintenance.py
3import logging
4import shutil
5import time
6from datetime import datetime, timedelta, timezone
7from pathlib import Path
8import threading
9from typing import Optional, Any
11from beautyspot.db import Flushable, Shutdownable, TaskDBMaintenable
12from beautyspot.storage import BlobStorageMaintenable
13from beautyspot.serializer import SerializerProtocol
15logger = logging.getLogger(__name__)
16logger.addHandler(logging.NullHandler())
19class MaintenanceService:
20 """
21 Service layer for administrative tasks, dashboard support, and system assembly.
22 """
24 def __init__(
25 self,
26 db: TaskDBMaintenable,
27 storage: BlobStorageMaintenable,
28 serializer: SerializerProtocol,
29 ):
30 self.db = db
31 self.storage = storage
32 self.serializer = serializer
33 self._cleaning_lock = threading.Lock()
34 self._owns_db = False # from_path で作成した場合のみ True
36 def close(self) -> None:
37 """DB バックエンドをシャットダウンする。
39 ``from_path`` で作成された場合のみ DB をシャットダウンします。
40 外部から注入された DB は呼び出し元の責務でシャットダウンしてください。
41 """
42 if self._owns_db and isinstance(self.db, Shutdownable): 42 ↛ exitline 42 didn't return from function 'close' because the condition on line 42 was always true
43 self.db.shutdown(wait=True)
45 def __enter__(self) -> "MaintenanceService":
46 return self
48 def __exit__(self, *args: object) -> None:
49 self.close()
51 @classmethod
52 def from_path(
53 cls, db_path: str | Path, blob_dir: Optional[str | Path] = None
54 ) -> "MaintenanceService":
55 """
56 Factory method to assemble the system components (SQLite + Msgpack + Storage)
57 from a database path.
58 """
59 # 遅延インポートで依存関係を解決
60 from beautyspot.db import SQLiteTaskDB
61 from beautyspot.storage import create_storage
62 from beautyspot.serializer import MsgpackSerializer
64 path = Path(db_path)
66 # Blobディレクトリの推論ロジック
67 if blob_dir:
68 b_path = Path(blob_dir)
69 else:
70 # bs.Spot(name="foo") のデフォルト配置:
71 # .beautyspot/foo.db -> .beautyspot/blobs/foo/
72 parent = path.parent
73 stem = path.stem
75 # 現行レイアウト優先: .beautyspot/blobs/{name}/
76 candidate = parent / "blobs" / stem
77 if candidate.exists():
78 b_path = candidate
79 else:
80 # 旧レイアウトへのフォールバック: .beautyspot/{name}/blobs/
81 legacy = parent / stem / "blobs"
82 b_path = legacy if legacy.exists() else candidate
84 db = SQLiteTaskDB(path)
85 try:
86 db.init_schema()
87 except Exception as e:
88 logger.warning(
89 f"Failed to initialize schema for {path}: {e}. Proceeding with existing schema."
90 )
92 service = cls(
93 db=db,
94 storage=create_storage(str(b_path)),
95 serializer=MsgpackSerializer(),
96 )
97 service._owns_db = True
98 return service
100 # --- Dashboard Support ---
102 def get_history(self, limit: int = 1000):
103 """Get task history from DB."""
104 return self.db.get_history(limit=limit)
106 def get_task_detail(
107 self, cache_key: str, *, include_expired: bool = False
108 ) -> Optional[dict[str, Any]]:
109 """
110 Retrieve task details and decode the blob data if available.
111 Returns the record dict with an extra 'decoded_data' key.
113 Args:
114 include_expired: If True, return expired records as well (for dashboard/debugging).
115 """
116 record = self.db.get(cache_key, include_expired=include_expired)
117 if not record:
118 return None
120 result_record: dict[str, Any] = dict(record)
122 decoded_data = None
123 r_type = record.get("result_type")
124 r_val = record.get("result_value")
125 r_blob = record.get("result_data")
127 try:
128 if r_type == "DIRECT_BLOB":
129 if r_blob is not None:
130 decoded_data = self.serializer.loads(r_blob)
132 elif r_type == "FILE": 132 ↛ 142line 132 didn't jump to line 142 because the condition on line 132 was always true
133 if r_val: 133 ↛ 142line 133 didn't jump to line 142 because the condition on line 133 was always true
134 # Storage からロードしてデシリアライズ
135 data_bytes = self.storage.load(r_val)
136 decoded_data = self.serializer.loads(data_bytes)
138 except Exception as e:
139 logger.error(f"Failed to decode data for key '{cache_key}': {e}")
140 # デコード失敗時は decoded_data は None のまま
142 result_record["decoded_data"] = decoded_data
143 return result_record
145 def delete_expired_tasks(self) -> int:
146 """期限切れタスクの物理削除 (GC用)"""
147 # [FIX] 内部実装(_connect)への依存を排除し、インターフェースメソッドを使用
148 return self.db.delete_expired()
150 def delete_task(self, cache_key: str) -> bool:
151 """
152 Delete a task record and its associated blob file.
153 """
154 record = self.db.get(cache_key, include_expired=True)
155 if not record: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 return False
158 result_record: dict[str, Any] = dict(record)
160 # Bug Fix (Bug6): DB レコードを先に削除してからブロブを削除する。
161 # 旧実装(ブロブ先削除)では DB レコード削除が失敗すると
162 # 「DB に参照が残るがブロブが消えた」状態になり、次のアクセスで
163 # CacheCorruptedError が発生していた。
164 # ブロブが孤立した場合は GC (scan_garbage) で回収可能なため、
165 # この順序の方が安全。
166 deleted = self.db.delete(cache_key)
167 if not deleted: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true
168 return False
170 # Blob削除
171 if record.get("result_type") == "FILE" and record.get("result_value"):
172 try:
173 self.storage.delete(result_record["result_value"])
174 except Exception as e:
175 logger.warning(
176 f"Failed to delete blob for key '{cache_key}': {e}. "
177 "The orphaned blob will be collected by GC."
178 )
180 return True
182 # --- Maintenance Operations ---
184 def get_prunable_tasks(
185 self, days: int, func_name: Optional[str] = None
186 ) -> list[tuple[str, str, str]]:
187 cutoff = datetime.now(timezone.utc) - timedelta(days=days)
188 return self.db.get_outdated_tasks(cutoff, func_name)
190 def prune(self, days: int, func_name: Optional[str] = None) -> int:
191 cutoff = datetime.now(timezone.utc) - timedelta(days=days)
192 logger.info(f"Pruning tasks older than {cutoff} (func={func_name})...")
193 count = self.db.prune(cutoff, func_name)
194 logger.info(f"Deleted {count} tasks.")
195 return count
197 def clear(self, func_name: Optional[str] = None) -> int:
198 logger.info(f"Clearing all tasks (func={func_name})...")
199 count = self.db.delete_all(func_name)
200 logger.info(f"Cleared {count} tasks.")
201 return count
203 def scan_garbage(self, grace_period: float = 60.0) -> list[str]:
204 """
205 Scan for orphaned blob files that are not referenced in the database.
207 Args:
208 grace_period: Minimum age of a blob (in seconds) to be considered an orphan.
209 This prevents deleting blobs that were just created but
210 not yet registered in the database (background saves).
211 """
212 ref_filenames = self.db.get_blob_refs()
213 if ref_filenames is None: 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true
214 return []
216 def _normalize_location(location: str) -> str:
217 return location.replace("\\", "/")
219 def _basename(location: str) -> str:
220 return _normalize_location(location).split("/")[-1]
222 ref_locations = {_normalize_location(loc) for loc in ref_filenames}
223 # Legacy support: DBに絶対パスが保存されていた場合のみbasenameを許容
224 ref_basenames = {
225 _basename(loc) for loc in ref_locations if Path(loc).is_absolute()
226 }
228 now = time.time()
229 orphans = []
230 for location in self.storage.list_keys():
231 norm = _normalize_location(location)
232 if norm in ref_locations:
233 continue
234 if _basename(norm) in ref_basenames:
235 continue
237 # Check grace period
238 if grace_period > 0:
239 try:
240 mtime = self.storage.get_mtime(location)
241 if now - mtime < grace_period:
242 # Too new; potentially in-flight background save.
243 continue
244 except Exception as e:
245 logger.debug(f"Failed to check mtime for {location} (ignored): {e}")
246 # Skip files that we can't check, to be safe.
247 continue
249 orphans.append(location)
251 return orphans
253 def clean_garbage(
254 self,
255 orphans: Optional[list[str]] = None,
256 tmp_max_age_seconds: int = 86400,
257 orphan_grace_seconds: float = 60.0,
258 ) -> tuple[int, int]:
259 """
260 期限切れのタスク(DBレコード)と孤立したBlobファイルを削除します。
261 また、アトミック書き込み時に残留した古い一時ファイル (.spot_tmp) の
262 クリーンアップも同時に行います。
264 Args:
265 orphans: 事前にスキャンされた孤立ファイルのリスト。
266 tmp_max_age_seconds: 一時ファイルを削除対象とするまでの猶予期間(秒)。デフォルトは24時間。
267 orphan_grace_seconds: 孤立ファイルを判定する際の猶予期間(秒)。デフォルトは60秒。
269 Returns:
270 tuple[int, int]: (削除された期限切れタスクの数, 削除された孤立ファイルの数)
271 """
272 if not self._cleaning_lock.acquire(blocking=False): 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true
273 logger.debug("Another eviction task is currently running. Skipping.")
274 return 0, 0
276 try:
277 # Phase -1: DBの書き込みキューをフラッシュ
278 # save_sync=False で投入された書き込みが未コミットの場合、
279 # 直後の scan_garbage() がその blob を孤立ファイルと誤判定して
280 # 削除してしまうレースコンディションを防ぐ。
281 if isinstance(self.db, Flushable): 281 ↛ 288line 281 didn't jump to line 288 because the condition on line 281 was always true
282 try:
283 self.db.flush(timeout=10.0)
284 except Exception as e:
285 logger.warning(f"DB flush before garbage scan failed: {e}")
287 # Phase 0: 期限切れタスクの削除
288 deleted_expired_count = self.delete_expired_tasks()
289 if deleted_expired_count > 0: 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true
290 logger.info(f"Deleted {deleted_expired_count} expired tasks from DB.")
292 # Phase 1: 孤立したファイルの特定
293 if orphans is None:
294 orphans = self.scan_garbage(grace_period=orphan_grace_seconds)
296 deleted_orphan_count = 0
298 # Phase 2: ファイルの実体削除
299 if orphans:
300 for location in orphans:
301 try:
302 self.storage.delete(location)
303 deleted_orphan_count += 1
304 except Exception as e:
305 logger.warning(f"Failed to delete orphan blob {location}: {e}")
307 if deleted_orphan_count > 0: 307 ↛ 311line 307 didn't jump to line 311 because the condition on line 307 was always true
308 logger.info(f"Deleted {deleted_orphan_count} orphaned blob files.")
310 # [ADD] Phase 2.5: 古い一時ファイルのクリーンアップ
311 if hasattr(self.storage, "clean_temp_files"): 311 ↛ 322line 311 didn't jump to line 322 because the condition on line 311 was always true
312 try:
313 tmp_count = self.storage.clean_temp_files( # type: ignore
314 max_age_seconds=tmp_max_age_seconds
315 )
316 if tmp_count > 0:
317 logger.info(f"Removed {tmp_count} abandoned temporary files.")
318 except Exception as e:
319 logger.warning(f"Failed to clean temporary files: {e}")
321 # Phase 3: 空ディレクトリ掃除
322 if hasattr(self.storage, "prune_empty_dirs"): 322 ↛ 330line 322 didn't jump to line 330 because the condition on line 322 was always true
323 try:
324 dir_count = self.storage.prune_empty_dirs() # type: ignore
325 if dir_count > 0: 325 ↛ 326line 325 didn't jump to line 326 because the condition on line 325 was never true
326 logger.info(f"Removed {dir_count} empty directories.")
327 except Exception as e:
328 logger.warning(f"Failed to prune empty directories: {e}")
330 return deleted_expired_count, deleted_orphan_count
332 finally:
333 self._cleaning_lock.release()
335 def resolve_key_prefix(self, prefix: str) -> str | list[str] | None:
336 """
337 Resolve a potentially shortened key to a full cache key.
339 Returns:
340 str: The single matching full key (Exact match or unique prefix match).
341 list[str]: A list of conflicting candidates (Ambiguous).
342 None: No match found.
343 """
344 # 1. 完全一致を最優先でチェック
345 if self.db.get(prefix, include_expired=True):
346 return prefix
348 # 2. プレフィックス検索
349 candidates = self.db.get_keys_start_with(prefix)
351 if not candidates: 351 ↛ 354line 351 didn't jump to line 354 because the condition on line 351 was always true
352 return None
354 if len(candidates) == 1:
355 return candidates[0]
357 # 3. 曖昧な場合 (複数の候補を返す)
358 return candidates
360 # --- Zombie Project Cleanup (gc command) ---
362 @staticmethod
363 def scan_orphan_projects(workspace_dir: Path) -> list[Path]:
364 """
365 Scan for blob directories in .beautyspot/blobs/ that have no corresponding .db file.
366 Returns a list of Path objects for the orphan directories.
367 """
368 blobs_root = workspace_dir / "blobs"
369 if not blobs_root.exists():
370 return []
372 orphans = []
373 for entry in blobs_root.iterdir():
374 if entry.is_dir(): 374 ↛ 373line 374 didn't jump to line 373 because the condition on line 374 was always true
375 # blobs/{name} に対して {name}.db が存在するか確認
376 db_path = workspace_dir / f"{entry.name}.db"
377 if not db_path.exists():
378 orphans.append(entry)
380 return orphans
382 @staticmethod
383 def delete_project_storage(path: Path) -> None:
384 """
385 Recursively delete a project storage directory.
386 """
387 if path.exists() and path.is_dir(): 387 ↛ exitline 387 didn't return from function 'delete_project_storage' because the condition on line 387 was always true
388 try:
389 shutil.rmtree(path)
390 except Exception as e:
391 logger.error(f"Failed to delete directory {path}: {e}")
392 raise