Coverage for src / beautyspot / maintenance.py: 84%

196 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-03-18 18:20 +0900

1# src/beautyspot/maintenance.py 

2 

3import logging 

4import shutil 

5import time 

6from datetime import datetime, timedelta, timezone 

7from pathlib import Path 

8import threading 

9from typing import Optional, Any 

10 

11from beautyspot.db import Flushable, Shutdownable, TaskDBMaintenable 

12from beautyspot.storage import BlobStorageMaintenable 

13from beautyspot.serializer import SerializerProtocol 

14 

15logger = logging.getLogger(__name__) 

16logger.addHandler(logging.NullHandler()) 

17 

18 

19class MaintenanceService: 

20 """ 

21 Service layer for administrative tasks, dashboard support, and system assembly. 

22 """ 

23 

24 def __init__( 

25 self, 

26 db: TaskDBMaintenable, 

27 storage: BlobStorageMaintenable, 

28 serializer: SerializerProtocol, 

29 ): 

30 self.db = db 

31 self.storage = storage 

32 self.serializer = serializer 

33 self._cleaning_lock = threading.Lock() 

34 self._owns_db = False # from_path で作成した場合のみ True 

35 

36 def close(self) -> None: 

37 """DB バックエンドをシャットダウンする。 

38 

39 ``from_path`` で作成された場合のみ DB をシャットダウンします。 

40 外部から注入された DB は呼び出し元の責務でシャットダウンしてください。 

41 """ 

42 if self._owns_db and isinstance(self.db, Shutdownable): 42 ↛ exitline 42 didn't return from function 'close' because the condition on line 42 was always true

43 self.db.shutdown(wait=True) 

44 

45 def __enter__(self) -> "MaintenanceService": 

46 return self 

47 

48 def __exit__(self, *args: object) -> None: 

49 self.close() 

50 

51 @classmethod 

52 def from_path( 

53 cls, db_path: str | Path, blob_dir: Optional[str | Path] = None 

54 ) -> "MaintenanceService": 

55 """ 

56 Factory method to assemble the system components (SQLite + Msgpack + Storage) 

57 from a database path. 

58 """ 

59 # 遅延インポートで依存関係を解決 

60 from beautyspot.db import SQLiteTaskDB 

61 from beautyspot.storage import create_storage 

62 from beautyspot.serializer import MsgpackSerializer 

63 

64 path = Path(db_path) 

65 

66 # Blobディレクトリの推論ロジック 

67 if blob_dir: 

68 b_path = Path(blob_dir) 

69 else: 

70 # bs.Spot(name="foo") のデフォルト配置: 

71 # .beautyspot/foo.db -> .beautyspot/blobs/foo/ 

72 parent = path.parent 

73 stem = path.stem 

74 

75 # 現行レイアウト優先: .beautyspot/blobs/{name}/ 

76 candidate = parent / "blobs" / stem 

77 if candidate.exists(): 

78 b_path = candidate 

79 else: 

80 # 旧レイアウトへのフォールバック: .beautyspot/{name}/blobs/ 

81 legacy = parent / stem / "blobs" 

82 b_path = legacy if legacy.exists() else candidate 

83 

84 db = SQLiteTaskDB(path) 

85 

86 service = cls( 

87 db=db, 

88 storage=create_storage(str(b_path)), 

89 serializer=MsgpackSerializer(), 

90 ) 

91 service._owns_db = True 

92 return service 

93 

94 # --- Dashboard Support --- 

95 

96 def get_history(self, limit: int = 1000): 

97 """Get task history from DB.""" 

98 return self.db.get_history(limit=limit) 

99 

100 def get_task_detail( 

101 self, cache_key: str, *, include_expired: bool = False 

102 ) -> Optional[dict[str, Any]]: 

103 """ 

104 Retrieve task details and decode the blob data if available. 

105 Returns the record dict with an extra 'decoded_data' key. 

106 

107 Args: 

108 include_expired: If True, return expired records as well (for dashboard/debugging). 

109 """ 

110 record = self.db.get(cache_key, include_expired=include_expired) 

111 if not record: 

112 return None 

113 

114 result_record: dict[str, Any] = dict(record) 

115 

116 decoded_data = None 

117 r_type = record.get("result_type") 

118 r_val = record.get("result_value") 

119 r_blob = record.get("result_data") 

120 

121 try: 

122 if r_type == "DIRECT_BLOB": 

123 if r_blob is not None: 

124 decoded_data = self.serializer.loads(r_blob) 

125 

126 elif r_type == "FILE": 126 ↛ 136line 126 didn't jump to line 136 because the condition on line 126 was always true

127 if r_val: 127 ↛ 136line 127 didn't jump to line 136 because the condition on line 127 was always true

128 # Storage からロードしてデシリアライズ 

129 data_bytes = self.storage.load(r_val) 

130 decoded_data = self.serializer.loads(data_bytes) 

131 

132 except Exception as e: 

133 logger.error(f"Failed to decode data for key '{cache_key}': {e}") 

134 # デコード失敗時は decoded_data は None のまま 

135 

136 result_record["decoded_data"] = decoded_data 

137 return result_record 

138 

139 def delete_expired_tasks(self) -> int: 

140 """期限切れタスクの物理削除 (GC用)""" 

141 # [FIX] 内部実装(_connect)への依存を排除し、インターフェースメソッドを使用 

142 return self.db.delete_expired() 

143 

144 def delete_task(self, cache_key: str) -> bool: 

145 """ 

146 Delete a task record and its associated blob file. 

147 """ 

148 record = self.db.get(cache_key, include_expired=True) 

149 if not record: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true

150 return False 

151 

152 result_record: dict[str, Any] = dict(record) 

153 

154 # Bug Fix (Bug6): DB レコードを先に削除してからブロブを削除する。 

155 # 旧実装(ブロブ先削除)では DB レコード削除が失敗すると 

156 # 「DB に参照が残るがブロブが消えた」状態になり、次のアクセスで 

157 # CacheCorruptedError が発生していた。 

158 # ブロブが孤立した場合は GC (scan_garbage) で回収可能なため、 

159 # この順序の方が安全。 

160 deleted = self.db.delete(cache_key) 

161 if not deleted: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true

162 return False 

163 

164 # Blob削除 

165 if record.get("result_type") == "FILE" and record.get("result_value"): 

166 try: 

167 self.storage.delete(result_record["result_value"]) 

168 except Exception as e: 

169 logger.warning( 

170 f"Failed to delete blob for key '{cache_key}': {e}. " 

171 "The orphaned blob will be collected by GC." 

172 ) 

173 

174 return True 

175 

176 # --- Maintenance Operations --- 

177 

178 def get_prunable_tasks( 

179 self, days: int, func_name: Optional[str] = None 

180 ) -> list[tuple[str, str, str]]: 

181 cutoff = datetime.now(timezone.utc) - timedelta(days=days) 

182 return self.db.get_outdated_tasks(cutoff, func_name) 

183 

184 def prune(self, days: int, func_name: Optional[str] = None) -> int: 

185 cutoff = datetime.now(timezone.utc) - timedelta(days=days) 

186 logger.info(f"Pruning tasks older than {cutoff} (func={func_name})...") 

187 count = self.db.prune(cutoff, func_name) 

188 logger.info(f"Deleted {count} tasks.") 

189 return count 

190 

191 def clear(self, func_name: Optional[str] = None) -> int: 

192 logger.info(f"Clearing all tasks (func={func_name})...") 

193 count = self.db.delete_all(func_name) 

194 logger.info(f"Cleared {count} tasks.") 

195 return count 

196 

197 def scan_garbage(self, grace_period: float = 60.0) -> list[str]: 

198 """ 

199 Scan for orphaned blob files that are not referenced in the database. 

200 

201 Args: 

202 grace_period: Minimum age of a blob (in seconds) to be considered an orphan. 

203 This prevents deleting blobs that were just created but 

204 not yet registered in the database (background saves). 

205 """ 

206 ref_filenames = self.db.get_blob_refs() 

207 if ref_filenames is None: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 return [] 

209 

210 def _normalize_location(location: str) -> str: 

211 return location.replace("\\", "/") 

212 

213 def _basename(location: str) -> str: 

214 return _normalize_location(location).split("/")[-1] 

215 

216 ref_locations = {_normalize_location(loc) for loc in ref_filenames} 

217 # Legacy support: DBに絶対パスが保存されていた場合のみbasenameを許容 

218 ref_basenames = { 

219 _basename(loc) for loc in ref_locations if Path(loc).is_absolute() 

220 } 

221 

222 now = time.time() 

223 orphans = [] 

224 for location in self.storage.list_keys(): 

225 norm = _normalize_location(location) 

226 if norm in ref_locations: 

227 continue 

228 if _basename(norm) in ref_basenames: 

229 continue 

230 

231 # Check grace period 

232 if grace_period > 0: 

233 try: 

234 mtime = self.storage.get_mtime(location) 

235 if now - mtime < grace_period: 

236 # Too new; potentially in-flight background save. 

237 continue 

238 except Exception as e: 

239 logger.debug(f"Failed to check mtime for {location} (ignored): {e}") 

240 # Skip files that we can't check, to be safe. 

241 continue 

242 

243 orphans.append(location) 

244 

245 return orphans 

246 

247 def clean_garbage( 

248 self, 

249 orphans: Optional[list[str]] = None, 

250 tmp_max_age_seconds: int = 86400, 

251 orphan_grace_seconds: float = 60.0, 

252 ) -> tuple[int, int]: 

253 """ 

254 期限切れのタスク(DBレコード)と孤立したBlobファイルを削除します。 

255 また、アトミック書き込み時に残留した古い一時ファイル (.spot_tmp) の 

256 クリーンアップも同時に行います。 

257 

258 Args: 

259 orphans: 事前にスキャンされた孤立ファイルのリスト。 

260 tmp_max_age_seconds: 一時ファイルを削除対象とするまでの猶予期間(秒)。デフォルトは24時間。 

261 orphan_grace_seconds: 孤立ファイルを判定する際の猶予期間(秒)。デフォルトは60秒。 

262 

263 Returns: 

264 tuple[int, int]: (削除された期限切れタスクの数, 削除された孤立ファイルの数) 

265 """ 

266 if not self._cleaning_lock.acquire(blocking=False): 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true

267 logger.debug("Another eviction task is currently running. Skipping.") 

268 return 0, 0 

269 

270 try: 

271 # Phase -1: DBの書き込みキューをフラッシュ 

272 # save_sync=False で投入された書き込みが未コミットの場合、 

273 # 直後の scan_garbage() がその blob を孤立ファイルと誤判定して 

274 # 削除してしまうレースコンディションを防ぐ。 

275 if isinstance(self.db, Flushable): 275 ↛ 282line 275 didn't jump to line 282 because the condition on line 275 was always true

276 try: 

277 self.db.flush(timeout=10.0) 

278 except Exception as e: 

279 logger.warning(f"DB flush before garbage scan failed: {e}") 

280 

281 # Phase 0: 期限切れタスクの削除 

282 deleted_expired_count = self.delete_expired_tasks() 

283 if deleted_expired_count > 0: 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true

284 logger.info(f"Deleted {deleted_expired_count} expired tasks from DB.") 

285 

286 # Phase 1: 孤立したファイルの特定 

287 if orphans is None: 

288 orphans = self.scan_garbage(grace_period=orphan_grace_seconds) 

289 

290 deleted_orphan_count = 0 

291 

292 # Phase 2: ファイルの実体削除 

293 if orphans: 

294 for location in orphans: 

295 try: 

296 self.storage.delete(location) 

297 deleted_orphan_count += 1 

298 except Exception as e: 

299 logger.warning(f"Failed to delete orphan blob {location}: {e}") 

300 

301 if deleted_orphan_count > 0: 301 ↛ 305line 301 didn't jump to line 305 because the condition on line 301 was always true

302 logger.info(f"Deleted {deleted_orphan_count} orphaned blob files.") 

303 

304 # [ADD] Phase 2.5: 古い一時ファイルのクリーンアップ 

305 if hasattr(self.storage, "clean_temp_files"): 305 ↛ 316line 305 didn't jump to line 316 because the condition on line 305 was always true

306 try: 

307 tmp_count = self.storage.clean_temp_files( # type: ignore 

308 max_age_seconds=tmp_max_age_seconds 

309 ) 

310 if tmp_count > 0: 

311 logger.info(f"Removed {tmp_count} abandoned temporary files.") 

312 except Exception as e: 

313 logger.warning(f"Failed to clean temporary files: {e}") 

314 

315 # Phase 3: 空ディレクトリ掃除 

316 if hasattr(self.storage, "prune_empty_dirs"): 316 ↛ 324line 316 didn't jump to line 324 because the condition on line 316 was always true

317 try: 

318 dir_count = self.storage.prune_empty_dirs() # type: ignore 

319 if dir_count > 0: 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true

320 logger.info(f"Removed {dir_count} empty directories.") 

321 except Exception as e: 

322 logger.warning(f"Failed to prune empty directories: {e}") 

323 

324 return deleted_expired_count, deleted_orphan_count 

325 

326 finally: 

327 self._cleaning_lock.release() 

328 

329 def resolve_key_prefix(self, prefix: str) -> str | list[str] | None: 

330 """ 

331 Resolve a potentially shortened key to a full cache key. 

332 

333 Returns: 

334 str: The single matching full key (Exact match or unique prefix match). 

335 list[str]: A list of conflicting candidates (Ambiguous). 

336 None: No match found. 

337 """ 

338 # 1. 完全一致を最優先でチェック 

339 if self.db.get(prefix, include_expired=True): 

340 return prefix 

341 

342 # 2. プレフィックス検索 

343 candidates = self.db.get_keys_start_with(prefix) 

344 

345 if not candidates: 345 ↛ 348line 345 didn't jump to line 348 because the condition on line 345 was always true

346 return None 

347 

348 if len(candidates) == 1: 

349 return candidates[0] 

350 

351 # 3. 曖昧な場合 (複数の候補を返す) 

352 return candidates 

353 

354 # --- Zombie Project Cleanup (gc command) --- 

355 

356 @staticmethod 

357 def scan_orphan_projects(workspace_dir: Path) -> list[Path]: 

358 """ 

359 Scan for blob directories in .beautyspot/blobs/ that have no corresponding .db file. 

360 Returns a list of Path objects for the orphan directories. 

361 """ 

362 blobs_root = workspace_dir / "blobs" 

363 if not blobs_root.exists(): 

364 return [] 

365 

366 orphans = [] 

367 for entry in blobs_root.iterdir(): 

368 if entry.is_dir(): 368 ↛ 367line 368 didn't jump to line 367 because the condition on line 368 was always true

369 # blobs/{name} に対して {name}.db が存在するか確認 

370 db_path = workspace_dir / f"{entry.name}.db" 

371 if not db_path.exists(): 

372 orphans.append(entry) 

373 

374 return orphans 

375 

376 @staticmethod 

377 def delete_project_storage(path: Path) -> None: 

378 """ 

379 Recursively delete a project storage directory. 

380 """ 

381 if path.exists() and path.is_dir(): 381 ↛ exitline 381 didn't return from function 'delete_project_storage' because the condition on line 381 was always true

382 try: 

383 shutil.rmtree(path) 

384 except Exception as e: 

385 logger.error(f"Failed to delete directory {path}: {e}") 

386 raise