Coverage for src / beautyspot / core.py: 87%

461 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-03-11 19:10 +0900

1"""BeautySpot コアモジュール。 

2 

3タスク管理、シリアライズ、キャッシュとストレージを含むリソース管理を行うメインクラス群を提供します。 

4""" 

5# src/beautyspot/core.py 

6 

7import atexit 

8import asyncio 

9import logging 

10import functools 

11import inspect 

12import random 

13import threading 

14import warnings 

15import weakref 

16import time 

17from concurrent.futures import Executor, Future, ThreadPoolExecutor, wait 

18from contextlib import contextmanager 

19from typing import ( 

20 Any, 

21 Coroutine, 

22 Callable, 

23 Iterator, 

24 NamedTuple, 

25 Optional, 

26 Union, 

27 Type, 

28 overload, 

29 TypeVar, 

30 TypeVarTuple, 

31 ParamSpec, 

32 Sequence, 

33 ContextManager, 

34) 

35 

36from beautyspot.maintenance import MaintenanceService 

37from beautyspot.limiter import LimiterProtocol 

38from beautyspot.types import SaveErrorContext 

39from beautyspot.lifecycle import ( 

40 RetentionSpec, 

41) 

42from beautyspot.db import TaskDBCore, Flushable, Shutdownable 

43from beautyspot.serializer import SerializerProtocol, TypeRegistryProtocol 

44from beautyspot.cachekey import KeyGenPolicy 

45from beautyspot.exceptions import ( 

46 ConfigurationError, 

47 IncompatibleProviderError, 

48 ValidationError, 

49) 

50from beautyspot.hooks import HookBase 

51from beautyspot.types import PreExecuteContext, CacheHitContext, CacheMissContext 

52from beautyspot.content_types import ContentType 

53from beautyspot.cache import CacheManager, CACHE_MISS 

54 

55# ジェネリクスの定義 

56P = ParamSpec("P") 

57R = TypeVar("R") 

58T = TypeVar("T") 

59Ts = TypeVarTuple("Ts") 

60 

61# --- ロガー --- 

62logger = logging.getLogger(__name__) 

63logger.addHandler(logging.NullHandler()) 

64 

65 

66class _ExecutionContext(NamedTuple): 

67 """_execute_sync / _execute_async の初期化フェーズで共通する解決済み値。""" 

68 

69 save_blob: bool | None 

70 version: str | None 

71 content_type: str | None 

72 save_sync: bool 

73 func_identifier: str 

74 input_id: str 

75 cache_key: str 

76 hook_kwargs: dict 

77 

78 

79class _BackgroundLoop: 

80 """バックグラウンドで非同期IOタスクを処理するイベントループ。 

81 

82 明示的なタスク追跡とスレッドロックにより、シャットダウン時の競合状態を完全に排除します。 

83 

84 Args: 

85 drain_timeout (float, optional): シャットダウン時のタスク完了待機タイムアウト(秒)。デフォルトは5.0。 

86 """ 

87 

88 def __init__(self, drain_timeout: float = 5.0): 

89 self._drain_timeout = drain_timeout 

90 

91 # メインスレッドで loop オブジェクトを生成 

92 self._loop = asyncio.new_event_loop() 

93 

94 self._lock = threading.Lock() 

95 self._is_shutting_down = False 

96 self._active_tasks = 0 # 実行中(またはスケジュール待ち)のタスク数 

97 

98 # 新しい Thread を設定 

99 # daemon=True により、プロセス終了時の Python の無限ハングアップを防ぐ 

100 self._thread = threading.Thread( 

101 target=self._run_event_loop, daemon=True, name="BeautySpot-BGLoop" 

102 ) 

103 

104 # 設定した Thread を実行 

105 self._thread.start() 

106 

107 # インスタンス自身が atexit を管理するため、グローバルな _active_loops 管理は不要 

108 atexit.register(self._shutdown) 

109 

110 def _run_event_loop(self): 

111 """スレッドローカルでイベントループを実行する""" 

112 asyncio.set_event_loop(self._loop) 

113 try: 

114 self._loop.run_forever() 

115 finally: 

116 self._loop.close() 

117 

118 async def _task_wrapper(self, coro: Coroutine) -> Any: 

119 """タスクの完了を確実にフックし、必要ならループを停止するラッパー""" 

120 try: 

121 return await coro 

122 finally: 

123 with self._lock: 

124 self._active_tasks -= 1 

125 # シャットダウン中で、かつ最後のタスクが終わった瞬間ならループを止める 

126 if self._is_shutting_down and self._active_tasks == 0: 

127 self._loop.call_soon_threadsafe(self._loop.stop) 

128 

129 def submit(self, coro: Coroutine) -> Optional[Future[Any]]: 

130 """スレッドセーフにタスクを投入する""" 

131 with self._lock: 

132 if self._is_shutting_down: 

133 logger.debug("Background loop is shutting down. Task rejected.") 

134 try: 

135 coro.close() 

136 except Exception: 

137 # Best-effort cleanup; avoid raising during rejection. 

138 pass 

139 return None 

140 

141 # ロック内でカウンタを増やすことで、確実にインフライトとして追跡される 

142 self._active_tasks += 1 

143 

144 try: 

145 return asyncio.run_coroutine_threadsafe( 

146 self._task_wrapper(coro), self._loop 

147 ) 

148 except BaseException: 

149 # 万が一スケジュールに失敗した場合はカウンタを戻す 

150 try: 

151 coro.close() 

152 except Exception: 

153 pass 

154 with self._lock: 

155 self._active_tasks -= 1 

156 # シャットダウン中かつ最後のタスクだった場合、ループ停止を通知する 

157 if self._is_shutting_down and self._active_tasks == 0: 

158 try: 

159 self._loop.call_soon_threadsafe(self._loop.stop) 

160 except RuntimeError: 

161 pass # ループは既に停止/クローズ済み 

162 raise 

163 

164 def stop(self, save_sync: bool = True): 

165 """ 

166 ループに対して新規タスクの受付停止を通知し、シャットダウンシーケンスを開始する。 

167 Spot.shutdown() や GCの _shutdown_resources() から呼び出される統一されたAPI。 

168 """ 

169 # atexit ハンドラの蓄積を防止 

170 atexit.unregister(self._shutdown) 

171 

172 with self._lock: 

173 # 既にシャットダウン中であれば二重実行を避ける 

174 if self._is_shutting_down: 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true

175 return 

176 self._is_shutting_down = True 

177 

178 # 現在アクティブなタスクがゼロなら、即座にループ停止をスケジュール 

179 if self._active_tasks == 0: 

180 self._loop.call_soon_threadsafe(self._loop.stop) 

181 

182 if save_sync: 

183 # アクティブなタスクが残っている場合は、最後の _task_wrapper が stop() を呼ぶ 

184 # タイムアウト付きでスレッドの終了(=ループの停止)を待つ 

185 self._thread.join(timeout=self._drain_timeout) 

186 

187 if self._thread.is_alive(): 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true

188 logger.warning( 

189 f"BeautySpot background loop did not finish within {self._drain_timeout}s. " 

190 "Pending IO tasks have been abruptly terminated." 

191 ) 

192 

193 def _shutdown(self): 

194 """ 

195 [atexit フック] 

196 プロセス終了時に呼ばれる安全網。タイムアウト付きの待機を実行する。 

197 """ 

198 self.stop(save_sync=True) 

199 

200 

201class Spot: 

202 """タスク管理、シリアライズ、キャッシュとストレージを含むリソース管理を行うメインクラス。 

203 

204 依存オブジェクト(CacheManagerやLimiterProtocolなど)を注入して初期化されます。 

205 通常は直接インスタンス化せず、`bs.Spot(...)` ファクトリ関数を通じて使用することが推奨されます。 

206 

207 Args: 

208 name (str): Spotインスタンスの名前。 

209 cache (CacheManager): キャッシュマネージャーのインスタンス。 

210 limiter (LimiterProtocol): レートリミッターのインスタンス。 

211 save_sync (bool, optional): キャッシュ保存のデフォルト同期動作。デフォルトはTrue。 

212 eviction_rate (float, optional): キャッシュの自動破棄を実行する確率(0.0〜1.0)。デフォルトは0.0。 

213 drain_timeout (float, optional): バックグラウンドタスク完了待機のタイムアウト(秒)。デフォルトは5.0。 

214 drain_poll_interval (float, optional): バックグラウンドタスク待機時のポーリング間隔(秒)。デフォルトは0.5。 

215 on_background_error (Optional[Callable[[Exception, SaveErrorContext], None]], optional): バックグラウンド保存時のエラーハンドラ。 

216 """ 

217 

218 def __init__( 

219 self, 

220 name: str, 

221 cache: CacheManager, 

222 limiter: LimiterProtocol, 

223 save_sync: bool = True, 

224 eviction_rate: float = 0.0, 

225 drain_timeout: float = 5.0, 

226 drain_poll_interval: float = 0.5, 

227 on_background_error: Optional[ 

228 Callable[[Exception | BaseException, SaveErrorContext], None] 

229 ] = None, 

230 ) -> None: 

231 self.name = name 

232 if not (0.0 <= eviction_rate <= 1.0): 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true

233 raise ValueError("eviction_rate must be between 0.0 and 1.0") 

234 self.eviction_rate = eviction_rate 

235 if drain_timeout <= 0: 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true

236 raise ValueError("drain_timeout must be positive") 

237 if drain_poll_interval <= 0: 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true

238 raise ValueError("drain_poll_interval must be positive") 

239 self._drain_timeout = drain_timeout 

240 self._drain_poll_interval = drain_poll_interval 

241 

242 # --- コンポーネントの保持 --- 

243 self.cache = cache 

244 self.limiter = limiter 

245 

246 # --- オプション設定の適用 --- 

247 self._save_sync = save_sync 

248 self.on_background_error = on_background_error 

249 

250 # --- DBの初期化 --- 

251 self.cache.db.init_schema() 

252 

253 # --- バックグラウンド IO 管理 --- 

254 self._bg_loop: _BackgroundLoop | None = None 

255 self._executor: Executor | None = None 

256 self._finalizer: weakref.finalize | None = None 

257 

258 self._bg_init_lock = threading.Lock() 

259 self._shutdown_called = False 

260 self._owns_db = False 

261 

262 self._active_futures: set = set() 

263 self._futures_lock = threading.Lock() 

264 

265 self._maintenance_service: MaintenanceService | None = None 

266 self._maintenance_lock = threading.Lock() 

267 self._eviction_guard_lock = threading.Lock() 

268 self._eviction_running = False 

269 self._last_eviction_time = 0.0 

270 

271 def __enter__(self) -> "Spot": 

272 return self 

273 

274 def __exit__(self, exc_type, exc_value, traceback): 

275 self._drain_futures() 

276 

277 def _track_future(self, future: Any): 

278 if future is None: 278 ↛ 279line 278 didn't jump to line 279 because the condition on line 278 was never true

279 return 

280 with self._futures_lock: 

281 self._active_futures.add(future) 

282 

283 def _on_done(f): 

284 with self._futures_lock: 

285 self._active_futures.discard(f) 

286 

287 future.add_done_callback(_on_done) 

288 

289 @property 

290 def maintenance(self) -> MaintenanceService: 

291 svc = self._maintenance_service 

292 if svc is None: 292 ↛ 301line 292 didn't jump to line 301 because the condition on line 292 was always true

293 with self._maintenance_lock: 

294 if self._maintenance_service is None: 294 ↛ 300line 294 didn't jump to line 300 because the condition on line 294 was always true

295 self._maintenance_service = MaintenanceService( 

296 db=self.cache.db, 

297 storage=self.cache.storage, 

298 serializer=self.cache.serializer, 

299 ) 

300 svc = self._maintenance_service 

301 assert svc is not None 

302 return svc 

303 

304 def _ensure_bg_resources(self) -> tuple[_BackgroundLoop, Executor]: 

305 bg, ex = self._bg_loop, self._executor 

306 if bg is not None and ex is not None: 

307 return bg, ex 

308 

309 with self._bg_init_lock: 

310 if self._shutdown_called: 

311 raise RuntimeError( 

312 "Cannot submit background tasks after shutdown() has been called." 

313 ) 

314 if self._bg_loop is None or self._executor is None: 314 ↛ 329line 314 didn't jump to line 329 because the condition on line 314 was always true

315 if self._bg_loop is None: 315 ↛ 317line 315 didn't jump to line 317 because the condition on line 315 was always true

316 self._bg_loop = _BackgroundLoop(drain_timeout=self._drain_timeout) 

317 if self._executor is None: 317 ↛ 319line 317 didn't jump to line 319 because the condition on line 317 was always true

318 self._executor = ThreadPoolExecutor() 

319 if self._finalizer is None: 319 ↛ 329line 319 didn't jump to line 329 because the condition on line 319 was always true

320 self._finalizer = weakref.finalize( 

321 self, 

322 Spot._shutdown_resources, 

323 self._bg_loop, 

324 self._executor, 

325 self.cache.db, 

326 self._owns_db, 

327 ) 

328 self._finalizer.atexit = False 

329 return self._bg_loop, self._executor 

330 

331 @staticmethod 

332 def _shutdown_resources( 

333 bg_loop: _BackgroundLoop, 

334 executor: Executor, 

335 db: TaskDBCore, 

336 owns_db: bool, 

337 ) -> None: 

338 bg_loop.stop(save_sync=False) 

339 executor.shutdown(wait=False, cancel_futures=True) 

340 if owns_db and isinstance(db, Shutdownable): 

341 db.shutdown(wait=False) 

342 

343 def shutdown(self, save_sync: bool = True): 

344 """Spotインスタンスをシャットダウンし、バックグラウンドリソースを解放する。 

345 

346 Args: 

347 save_sync (bool, optional): 同期的に未完了の保存タスクを待機するかどうか。Trueの場合は完了を待つ。デフォルトはTrue。 

348 """ 

349 with self._bg_init_lock: 

350 self._shutdown_called = True 

351 if self._finalizer is not None and self._finalizer.alive: 

352 self._finalizer.detach() 

353 if save_sync: 

354 self._drain_futures() 

355 

356 if self._bg_loop is not None: 

357 self._bg_loop.stop(save_sync=save_sync) 

358 

359 if self._executor is not None: 

360 self._executor.shutdown(wait=save_sync, cancel_futures=not save_sync) 

361 

362 def flush(self, timeout: Optional[float] = None) -> None: 

363 """バックグラウンドで実行中のすべての保存タスクとDBの書き込みの完了を待機する。 

364 

365 Args: 

366 timeout (Optional[float], optional): 待機する最大時間(秒)。指定しない場合は初期化時の `drain_timeout` が使用される。 

367 """ 

368 timeout_val = timeout if timeout is not None else self._drain_timeout 

369 deadline = time.monotonic() + timeout_val 

370 

371 while True: 

372 with self._futures_lock: 

373 snapshot = list(self._active_futures) 

374 if not snapshot: 

375 break 

376 remaining = deadline - time.monotonic() 

377 if remaining <= 0: 377 ↛ 378line 377 didn't jump to line 378 because the condition on line 377 was never true

378 break 

379 wait_timeout = min(self._drain_poll_interval, remaining) 

380 wait(snapshot, timeout=wait_timeout) 

381 

382 db_remaining = deadline - time.monotonic() 

383 if db_remaining > 0 and isinstance(self.cache.db, Flushable): 

384 self.cache.db.flush(timeout=db_remaining) 

385 

386 def _drain_futures(self) -> None: 

387 self.flush() 

388 

389 def _get_func_identifier(self, func: Callable) -> str: 

390 module = getattr(func, "__module__", None) or func.__class__.__module__ 

391 qualname = getattr(func, "__qualname__", None) or func.__class__.__qualname__ 

392 return f"{module}.{qualname}" 

393 

394 def _trigger_auto_eviction(self) -> None: 

395 if self.eviction_rate <= 0.0: 

396 return 

397 if random.random() >= self.eviction_rate: 397 ↛ 398line 397 didn't jump to line 398 because the condition on line 397 was never true

398 return 

399 

400 with self._eviction_guard_lock: 

401 if self._eviction_running: 401 ↛ 402line 401 didn't jump to line 402 because the condition on line 401 was never true

402 return 

403 now = time.monotonic() 

404 if now - self._last_eviction_time < 60.0: 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true

405 return 

406 self._eviction_running = True 

407 

408 logger.debug(f"Triggering auto-eviction (rate: {self.eviction_rate})") 

409 

410 with self._futures_lock: 

411 pending_futures = list(self._active_futures) 

412 

413 def _run_clean_safe(): 

414 try: 

415 self.maintenance.clean_garbage(orphan_grace_seconds=60.0) 

416 except Exception as e: 

417 logger.error(f"Auto-eviction failed: {e}", exc_info=True) 

418 

419 def _clear_eviction_flag(): 

420 with self._eviction_guard_lock: 

421 self._last_eviction_time = time.monotonic() 

422 self._eviction_running = False 

423 

424 try: 

425 bg_loop, executor = self._ensure_bg_resources() 

426 

427 async def _run_clean_coro(): 

428 loop = asyncio.get_running_loop() 

429 if pending_futures: 

430 await asyncio.wait( 

431 [asyncio.wrap_future(f) for f in pending_futures], 

432 timeout=self._drain_timeout, 

433 ) 

434 await loop.run_in_executor(executor, _run_clean_safe) 

435 

436 future = bg_loop.submit(_run_clean_coro()) 

437 if future: 

438 self._track_future(future) 

439 future.add_done_callback(lambda f: _clear_eviction_flag()) 

440 else: 

441 _clear_eviction_flag() 

442 except Exception: 

443 _clear_eviction_flag() 

444 

445 def _resolve_key_fn( 

446 self, 

447 func: Callable, 

448 keygen: Optional[Union[Callable, KeyGenPolicy]] = None, 

449 input_key_fn: Optional[Union[Callable, KeyGenPolicy]] = None, 

450 ) -> Optional[Callable]: 

451 if keygen is not None and input_key_fn is not None: 

452 raise IncompatibleProviderError("Cannot specify both 'keygen' and 'input_key_fn'.") 

453 if input_key_fn is not None: 

454 warnings.warn("`input_key_fn` is deprecated, use `keygen` instead.", DeprecationWarning, stacklevel=3) 

455 target = keygen or input_key_fn 

456 if isinstance(target, KeyGenPolicy): 

457 return target.bind(func) 

458 return target 

459 

460 def register( 

461 self, 

462 code: int, 

463 encoder: Callable[[T], Any], 

464 decoder: Optional[Callable[[Any], T]] = None, 

465 decoder_factory: Optional[Callable[[Type[T]], Callable[[Any], T]]] = None, 

466 ) -> Callable[[Type[T]], Type[T]]: 

467 """カスタム型をシリアライザに登録するためのデコレータ。 

468 

469 `decoder` または `decoder_factory` のいずれかを必ず提供する必要があります。 

470 

471 Args: 

472 code (int): カスタム型の一意な識別コード。 

473 encoder (Callable[[T], Any]): カスタム型オブジェクトからシリアライズ可能な形式(辞書など)に変換する関数。 

474 decoder (Optional[Callable[[Any], T]], optional): デシリアライズ時にデータをカスタム型オブジェクトに復元する関数。 

475 decoder_factory (Optional[Callable[[Type[T]], Callable[[Any], T]]], optional): 型に基づいてデコーダ関数を生成するファクトリ関数。 

476 

477 Returns: 

478 Callable[[Type[T]], Type[T]]: クラスデコレータ。 

479 

480 Raises: 

481 IncompatibleProviderError: `decoder` と `decoder_factory` の両方が未指定の場合に発生。 

482 """ 

483 if decoder is None and decoder_factory is None: 

484 raise IncompatibleProviderError( 

485 "Must provide either `decoder` or `decoder_factory`." 

486 ) 

487 

488 def decorator(cls: Type) -> Type: 

489 actual_decoder = decoder 

490 if decoder_factory: 

491 actual_decoder = decoder_factory(cls) 

492 

493 if actual_decoder is None: 

494 raise ValueError("Decoder resolution failed.") 

495 

496 self.register_type(cls, code, encoder, actual_decoder) 

497 return cls 

498 

499 return decorator 

500 

501 def register_type( 

502 self, 

503 type_class: Type[T], 

504 code: int, 

505 encoder: Callable[[T], Any], 

506 decoder: Callable[[Any], T], 

507 ): 

508 """カスタム型を直接シリアライザに登録する。 

509 

510 Args: 

511 type_class (Type[T]): 登録するカスタム型のクラス。 

512 code (int): カスタム型の一意な識別コード。 

513 encoder (Callable[[T], Any]): エンコーダ関数。 

514 decoder (Callable[[Any], T]): デコーダ関数。 

515 

516 Raises: 

517 NotImplementedError: 現在のシリアライザが型登録をサポートしていない場合。 

518 """ 

519 if isinstance(self.cache.serializer, TypeRegistryProtocol): 519 ↛ 522line 519 didn't jump to line 522 because the condition on line 519 was always true

520 self.cache.serializer.register(type_class, code, encoder, decoder) 

521 else: 

522 raise NotImplementedError( 

523 "Current serializer does not support type registration." 

524 ) 

525 

526 @staticmethod 

527 def _dispatch_hooks( 

528 hooks: Optional[Sequence[HookBase]], method_name: str, context: Any 

529 ) -> None: 

530 if not hooks: 

531 return 

532 for hook in hooks: 

533 try: 

534 getattr(hook, method_name)(context) 

535 except Exception as e: 

536 logger.error( 

537 f"Error in hook '{type(hook).__name__}.{method_name}': {e}", 

538 exc_info=True, 

539 ) 

540 

541 async def _dispatch_hooks_async( 

542 self, 

543 hooks: Optional[Sequence[HookBase]], 

544 method_name: str, 

545 context: Any, 

546 loop: asyncio.AbstractEventLoop, 

547 executor: Executor, 

548 ) -> None: 

549 if not hooks: 

550 return 

551 await loop.run_in_executor( 

552 executor, self._dispatch_hooks, hooks, method_name, context 

553 ) 

554 

555 # --- Core Logic --- 

556 

557 def _resolve_settings( 

558 self, 

559 save_blob: bool | None, 

560 version: str | None, 

561 content_type: str | ContentType | None, 

562 save_sync: bool | None, 

563 ) -> tuple[bool | None, str | None, str | None, bool]: 

564 return ( 

565 save_blob, 

566 version, 

567 content_type, 

568 (save_sync if save_sync is not None else self._save_sync), 

569 ) 

570 

571 def _prepare_execution( 

572 self, 

573 func: Callable, 

574 args: tuple, 

575 kwargs: dict, 

576 save_blob: bool | None, 

577 effective_key_fn: Optional[Callable], 

578 version: str | None, 

579 content_type: Optional[str | ContentType], 

580 save_sync: bool | None, 

581 hooks: Optional[Sequence[HookBase]], 

582 ) -> _ExecutionContext: 

583 s_blob, s_ver, s_ct, s_save_sync = self._resolve_settings( 

584 save_blob, version, content_type, save_sync 

585 ) 

586 func_identifier = self._get_func_identifier(func) 

587 iid, ck = self.cache.make_cache_key( 

588 func_identifier, args, kwargs, effective_key_fn, s_ver 

589 ) 

590 return _ExecutionContext( 

591 s_blob, 

592 s_ver, 

593 s_ct, 

594 s_save_sync, 

595 func_identifier, 

596 iid, 

597 ck, 

598 dict(kwargs) if hooks else kwargs, 

599 ) 

600 

601 def _build_cache_hit_context( 

602 self, 

603 func_name: str, 

604 input_id: str, 

605 cache_key: str, 

606 args: tuple, 

607 hook_kwargs: dict, 

608 result: Any, 

609 version: str | None, 

610 ) -> CacheHitContext: 

611 return CacheHitContext( 

612 func_name=func_name, 

613 input_id=str(input_id), 

614 cache_key=cache_key, 

615 args=args, 

616 kwargs=hook_kwargs, 

617 result=result, 

618 version=version, 

619 ) 

620 

621 def _build_save_kwargs( 

622 self, 

623 cache_key: str, 

624 func: Callable, 

625 func_identifier: str, 

626 input_id: str, 

627 version: str | None, 

628 result: Any, 

629 content_type: str | None, 

630 save_blob: bool | None, 

631 serializer: Optional[SerializerProtocol], 

632 retention: RetentionSpec, 

633 ) -> dict: 

634 expires_at = self.cache.calculate_expires_at( 

635 func_identifier, func.__name__, retention 

636 ) 

637 return { 

638 "cache_key": cache_key, 

639 "func_name": func.__name__, 

640 "func_identifier": func_identifier, 

641 "input_id": str(input_id), 

642 "version": version, 

643 "result": result, 

644 "content_type": content_type, 

645 "save_blob": save_blob, 

646 "serializer": serializer, 

647 "expires_at": expires_at, 

648 } 

649 

650 def _persist_result_sync(self, save_sync: bool, save_kwargs: dict) -> None: 

651 if save_sync: 

652 try: 

653 self.cache.set(**save_kwargs) 

654 except Exception as e: 

655 self._handle_save_error(e, save_kwargs) 

656 raise 

657 else: 

658 try: 

659 self._submit_background_save(**save_kwargs) 

660 except Exception as e: 

661 self._handle_save_error(e, save_kwargs) 

662 if self.on_background_error is None: 662 ↛ 663line 662 didn't jump to line 663 because the condition on line 662 was never true

663 raise 

664 

665 async def _persist_result_async(self, save_sync: bool, save_kwargs: dict) -> None: 

666 if save_sync: 

667 try: 

668 bg_loop, exec_pool = self._ensure_bg_resources() 

669 coro = self._save_result_async( 

670 executor=exec_pool, safe=False, **save_kwargs 

671 ) 

672 future = bg_loop.submit(coro) 

673 if future is None: 673 ↛ 674line 673 didn't jump to line 674 because the condition on line 673 was never true

674 self._notify_save_discarded(save_kwargs) 

675 raise RuntimeError( 

676 f"Cache save for '{save_kwargs.get('func_name')}' " 

677 "was discarded because the background loop is shutting down." 

678 ) 

679 else: 

680 await asyncio.wrap_future(future) 

681 except Exception as e: 

682 self._handle_save_error(e, save_kwargs) 

683 raise 

684 else: 

685 try: 

686 self._submit_background_save(**save_kwargs) 

687 except Exception as e: 

688 self._handle_save_error(e, save_kwargs) 

689 if self.on_background_error is None: 

690 raise 

691 

692 def _execute_sync( 

693 self, 

694 func: Callable, 

695 args: tuple, 

696 kwargs: dict, 

697 save_blob: bool | None, 

698 effective_key_fn: Optional[Callable], 

699 version: str | None, 

700 content_type: Optional[str | ContentType], 

701 serializer: Optional[SerializerProtocol], 

702 retention: RetentionSpec, 

703 save_sync: bool | None, 

704 hooks: Optional[Sequence[HookBase]] = None, 

705 ) -> Any: 

706 ctx = self._prepare_execution( 

707 func, 

708 args, 

709 kwargs, 

710 save_blob, 

711 effective_key_fn, 

712 version, 

713 content_type, 

714 save_sync, 

715 hooks, 

716 ) 

717 self._dispatch_hooks( 

718 hooks, 

719 "pre_execute", 

720 PreExecuteContext( 

721 func.__name__, str(ctx.input_id), ctx.cache_key, args, ctx.hook_kwargs 

722 ), 

723 ) 

724 cached = self.cache.get(ctx.cache_key, serializer) 

725 if cached is not CACHE_MISS: 

726 self._dispatch_hooks( 

727 hooks, 

728 "on_cache_hit", 

729 self._build_cache_hit_context( 

730 func.__name__, 

731 ctx.input_id, 

732 ctx.cache_key, 

733 args, 

734 ctx.hook_kwargs, 

735 cached, 

736 ctx.version, 

737 ), 

738 ) 

739 return cached 

740 

741 with self.cache.herd_sync(ctx.cache_key, serializer) as herd: 

742 if not herd.is_executor: 

743 if herd.is_error: 

744 raise herd.result 

745 self._dispatch_hooks( 

746 hooks, 

747 "on_cache_hit", 

748 self._build_cache_hit_context( 

749 func.__name__, 

750 ctx.input_id, 

751 ctx.cache_key, 

752 args, 

753 ctx.hook_kwargs, 

754 herd.result, 

755 ctx.version, 

756 ), 

757 ) 

758 return herd.result 

759 

760 try: 

761 res = func(*args, **kwargs) 

762 self._dispatch_hooks( 

763 hooks, 

764 "on_cache_miss", 

765 CacheMissContext( 

766 func.__name__, 

767 str(ctx.input_id), 

768 ctx.cache_key, 

769 args, 

770 ctx.hook_kwargs, 

771 res, 

772 ctx.version, 

773 ), 

774 ) 

775 herd.result_box.append((True, res)) 

776 

777 # 実行成功後、同期モード(save_sync=True)の場合はキャッシュ保存エラーを伝播させる 

778 try: 

779 save_kwargs = self._build_save_kwargs( 

780 ctx.cache_key, 

781 func, 

782 ctx.func_identifier, 

783 ctx.input_id, 

784 ctx.version, 

785 res, 

786 ctx.content_type, 

787 ctx.save_blob, 

788 serializer, 

789 retention, 

790 ) 

791 self._persist_result_sync(ctx.save_sync, save_kwargs) 

792 except Exception as e: 

793 if ctx.save_sync: 793 ↛ 795line 793 didn't jump to line 795 because the condition on line 793 was always true

794 raise 

795 logger.error(f"Failed to submit background cache save, but execution succeeded: {e}") 

796 

797 return res 

798 

799 except BaseException as e: 

800 if not herd.result_box: 

801 herd.result_box.append((False, e)) 

802 raise 

803 finally: 

804 self._trigger_auto_eviction() 

805 

806 async def _execute_async( 

807 self, 

808 func: Callable, 

809 args: tuple, 

810 kwargs: dict, 

811 save_blob: bool | None, 

812 effective_key_fn: Optional[Callable], 

813 version: str | None, 

814 content_type: Optional[str | ContentType], 

815 serializer: Optional[SerializerProtocol], 

816 retention: RetentionSpec, 

817 save_sync: bool | None, 

818 hooks: Optional[Sequence[HookBase]] = None, 

819 ) -> Any: 

820 ctx = self._prepare_execution( 

821 func, 

822 args, 

823 kwargs, 

824 save_blob, 

825 effective_key_fn, 

826 version, 

827 content_type, 

828 save_sync, 

829 hooks, 

830 ) 

831 loop = asyncio.get_running_loop() 

832 _, executor = self._ensure_bg_resources() 

833 await self._dispatch_hooks_async( 

834 hooks, 

835 "pre_execute", 

836 PreExecuteContext( 

837 func.__name__, str(ctx.input_id), ctx.cache_key, args, ctx.hook_kwargs 

838 ), 

839 loop, 

840 executor, 

841 ) 

842 cached = await loop.run_in_executor( 

843 executor, self.cache.get, ctx.cache_key, serializer 

844 ) 

845 if cached is not CACHE_MISS: 

846 await self._dispatch_hooks_async( 

847 hooks, 

848 "on_cache_hit", 

849 self._build_cache_hit_context( 

850 func.__name__, 

851 ctx.input_id, 

852 ctx.cache_key, 

853 args, 

854 ctx.hook_kwargs, 

855 cached, 

856 ctx.version, 

857 ), 

858 loop, 

859 executor, 

860 ) 

861 return cached 

862 

863 async with self.cache.herd_async( 

864 ctx.cache_key, serializer, loop, executor 

865 ) as herd: 

866 if not herd.is_executor: 

867 if herd.is_error: 

868 raise herd.result 

869 await self._dispatch_hooks_async( 

870 hooks, 

871 "on_cache_hit", 

872 self._build_cache_hit_context( 

873 func.__name__, 

874 ctx.input_id, 

875 ctx.cache_key, 

876 args, 

877 ctx.hook_kwargs, 

878 herd.result, 

879 ctx.version, 

880 ), 

881 loop, 

882 executor, 

883 ) 

884 return herd.result 

885 

886 try: 

887 res = await func(*args, **kwargs) 

888 await self._dispatch_hooks_async( 

889 hooks, 

890 "on_cache_miss", 

891 CacheMissContext( 

892 func.__name__, 

893 str(ctx.input_id), 

894 ctx.cache_key, 

895 args, 

896 ctx.hook_kwargs, 

897 res, 

898 ctx.version, 

899 ), 

900 loop, 

901 executor, 

902 ) 

903 herd.result_box.append((True, res)) 

904 

905 # 実行成功後、同期モード(save_sync=True)の場合はキャッシュ保存エラーを伝播させる 

906 try: 

907 save_kwargs = self._build_save_kwargs( 

908 ctx.cache_key, 

909 func, 

910 ctx.func_identifier, 

911 ctx.input_id, 

912 ctx.version, 

913 res, 

914 ctx.content_type, 

915 ctx.save_blob, 

916 serializer, 

917 retention, 

918 ) 

919 await self._persist_result_async(ctx.save_sync, save_kwargs) 

920 except Exception as e: 

921 if ctx.save_sync: 921 ↛ 923line 921 didn't jump to line 923 because the condition on line 921 was always true

922 raise 

923 logger.error(f"Failed to persist cache asynchronously, but execution succeeded: {e}") 

924 

925 return res 

926 

927 except BaseException as e: 

928 if not herd.result_box: 

929 herd.result_box.append((False, e)) 

930 raise 

931 finally: 

932 self._trigger_auto_eviction() 

933 

934 def _handle_save_error(self, err: BaseException | Exception, save_kwargs: dict) -> None: 

935 logger.error( 

936 f"Cache save failed for '{save_kwargs.get('func_name')}': {err}", 

937 exc_info=True, 

938 ) 

939 if self.on_background_error: 

940 try: 

941 import sys 

942 

943 res = save_kwargs.get("result") 

944 self.on_background_error( 

945 err, 

946 SaveErrorContext( 

947 func_name=save_kwargs.get("func_name", "unknown"), 

948 cache_key=save_kwargs.get("cache_key", ""), 

949 input_id=save_kwargs.get("input_id", ""), 

950 version=save_kwargs.get("version"), 

951 content_type=save_kwargs.get("content_type"), 

952 save_blob=save_kwargs.get("save_blob"), 

953 expires_at=save_kwargs.get("expires_at"), 

954 result_type=type(res).__name__, 

955 result_size=sys.getsizeof(res) if res is not None else None, 

956 ), 

957 ) 

958 except Exception: 

959 logger.error( 

960 "Error occurred within the 'on_background_error' callback", 

961 exc_info=True, 

962 ) 

963 

964 def _notify_save_discarded(self, save_kwargs: dict) -> None: 

965 msg = f"Background save for '{save_kwargs.get('func_name')}' discarded during shutdown." 

966 logger.warning(msg) 

967 warnings.warn(msg, ResourceWarning) 

968 self._handle_save_error(RuntimeError(msg), save_kwargs) 

969 

970 def _submit_background_save(self, **save_kwargs) -> None: 

971 bg_loop, executor = self._ensure_bg_resources() 

972 coro = self._save_result_async(executor=executor, **save_kwargs) 

973 future = bg_loop.submit(coro) 

974 if future: 

975 self._track_future(future) 

976 else: 

977 self._notify_save_discarded(save_kwargs) 

978 

979 async def _save_result_async( 

980 self, /, executor: Executor, safe: bool = True, **kwargs 

981 ) -> None: 

982 loop = asyncio.get_running_loop() 

983 target = (lambda **kw: self._save_result_safe(**kw)) if safe else self.cache.set 

984 try: 

985 await loop.run_in_executor(executor, functools.partial(target, **kwargs)) 

986 except (asyncio.CancelledError, RuntimeError) as e: 

987 # Executor might be forcibly shut down during program exit or Spot.shutdown(save_sync=False) 

988 msg = f"Background save for '{kwargs.get('func_name')}' cancelled during shutdown." 

989 logger.warning(msg) 

990 self._handle_save_error(e, kwargs) 

991 if not safe: 

992 raise 

993 

994 def _save_result_safe(self, **kwargs): 

995 try: 

996 self.cache.set(**kwargs) 

997 except Exception as e: 

998 self._handle_save_error(e, kwargs) 

999 

1000 def consume(self, cost: Union[int, Callable] = 1): 

1001 """関数実行前にレートリミッターのトークンを消費するデコレータ。 

1002 

1003 Args: 

1004 cost (Union[int, Callable], optional): 消費するコスト。整数、または実行時の引数を受け取りコストを計算する関数を指定可能。デフォルトは1。 

1005 

1006 Returns: 

1007 Callable: デコレートされた関数。 

1008 """ 

1009 def decorator(func): 

1010 is_async = inspect.iscoroutinefunction(func) 

1011 

1012 @functools.wraps(func) 

1013 def sync_wrapper(*args, **kwargs): 

1014 self.limiter.consume(cost(*args, **kwargs) if callable(cost) else cost) 

1015 return func(*args, **kwargs) 

1016 

1017 @functools.wraps(func) 

1018 async def async_wrapper(*args, **kwargs): 

1019 await self.limiter.consume_async( 

1020 cost(*args, **kwargs) if callable(cost) else cost 

1021 ) 

1022 return await func(*args, **kwargs) 

1023 

1024 return async_wrapper if is_async else sync_wrapper 

1025 

1026 return decorator 

1027 

1028 @overload 

1029 def mark(self, _func: Callable[P, R]) -> Callable[P, R]: ... 

1030 

1031 @overload 

1032 def mark( 

1033 self, 

1034 *, 

1035 save_blob: Optional[bool] = None, 

1036 keygen: Optional[Union[Callable, KeyGenPolicy]] = None, 

1037 input_key_fn: Optional[Union[Callable, KeyGenPolicy]] = None, 

1038 version: str | None = None, 

1039 content_type: Optional[str | ContentType] = None, 

1040 serializer: Optional[SerializerProtocol] = None, 

1041 save_sync: Optional[bool] = None, 

1042 retention: RetentionSpec = None, 

1043 hooks: Optional[Sequence[HookBase]] = None, 

1044 ) -> Callable[[Callable[P, R]], Callable[P, R]]: ... 

1045 

1046 def mark(self, _func: Optional[Callable] = None, **kwargs) -> Any: 

1047 """関数を修飾し、実行結果のキャッシュ機能とメタデータ管理を追加するデコレータ。 

1048 

1049 関数の実行結果は、引数とその他の設定から計算されたキャッシュキーに基づいて保存・再利用されます。 

1050 

1051 Args: 

1052 _func (Optional[Callable], optional): デコレート対象の関数。 

1053 save_blob (Optional[bool], optional): 大きな戻り値をBlobストレージに保存するかどうか。 

1054 keygen (Optional[Union[Callable, KeyGenPolicy]], optional): キャッシュキーの生成ロジックを指定する。 

1055 input_key_fn (Optional[Union[Callable, KeyGenPolicy]], optional): 非推奨。`keygen` を使用すること。 

1056 version (Optional[str], optional): 関数のキャッシュバージョン。ロジック変更時にインクリメントすることでキャッシュを無効化できる。 

1057 content_type (Optional[Union[str, ContentType]], optional): 戻り値のMIMEタイプ。 

1058 serializer (Optional[SerializerProtocol], optional): この関数に適用するカスタムシリアライザ。 

1059 save_sync (Optional[bool], optional): 保存処理を同期的に行うかどうか。 

1060 retention (RetentionSpec, optional): キャッシュの保持ポリシー。 

1061 hooks (Optional[Sequence[HookBase]], optional): 実行前後やキャッシュヒット時に発火するフックのリスト。 

1062 

1063 Returns: 

1064 Any: デコレートされた関数、またはデコレータ関数。 

1065 """ 

1066 def decorator(func): 

1067 if inspect.isgeneratorfunction(func) or inspect.isasyncgenfunction(func): 1067 ↛ 1068line 1067 didn't jump to line 1068 because the condition on line 1067 was never true

1068 raise ConfigurationError(f"Generators not supported: {func.__name__}") 

1069 key_fn = self._resolve_key_fn( 

1070 func, kwargs.get("keygen"), kwargs.get("input_key_fn") 

1071 ) 

1072 is_async = inspect.iscoroutinefunction(func) 

1073 

1074 @functools.wraps(func) 

1075 def sync_wrapper(*args, **kw): 

1076 return self._execute_sync( 

1077 func, 

1078 args, 

1079 kw, 

1080 kwargs.get("save_blob"), 

1081 key_fn, 

1082 kwargs.get("version"), 

1083 kwargs.get("content_type"), 

1084 kwargs.get("serializer"), 

1085 kwargs.get("retention"), 

1086 kwargs.get("save_sync"), 

1087 kwargs.get("hooks"), 

1088 ) 

1089 

1090 @functools.wraps(func) 

1091 async def async_wrapper(*args, **kw): 

1092 return await self._execute_async( 

1093 func, 

1094 args, 

1095 kw, 

1096 kwargs.get("save_blob"), 

1097 key_fn, 

1098 kwargs.get("version"), 

1099 kwargs.get("content_type"), 

1100 kwargs.get("serializer"), 

1101 kwargs.get("retention"), 

1102 kwargs.get("save_sync"), 

1103 kwargs.get("hooks"), 

1104 ) 

1105 

1106 return async_wrapper if is_async else sync_wrapper 

1107 

1108 return decorator(_func) if _func else decorator 

1109 

1110 @overload 

1111 def cached_run(self, __func: Callable[P, R], **kwargs: Any) -> ContextManager[Callable[P, R]]: ... 

1112 

1113 @overload 

1114 def cached_run(self, *funcs: *Ts, **kwargs: Any) -> ContextManager[tuple[*Ts]]: ... 

1115 

1116 @contextmanager 

1117 def cached_run(self, *funcs: Any, **kwargs) -> Iterator[Any]: 

1118 """コンテキストマネージャ内で一時的に関数を `mark` し、キャッシュ機能を適用する。 

1119 

1120 デコレータを直接付与できない外部ライブラリの関数などをキャッシュする際に使用します。 

1121 

1122 Args: 

1123 *funcs (Any): キャッシュ対象にする関数(複数可)。 

1124 **kwargs: `mark` デコレータに渡すオプションパラメータ。 

1125 

1126 Yields: 

1127 Callable | tuple[Callable, ...]: キャッシュ機能が付与された関数。複数の場合はタプルで返る。 

1128 

1129 Raises: 

1130 ValidationError: 関数が1つも指定されなかった場合。 

1131 """ 

1132 if not funcs: 

1133 raise ValidationError( 

1134 "At least one function must be provided to cached_run." 

1135 ) 

1136 wrappers = [self.mark(**kwargs)(f) for f in funcs] 

1137 yield wrappers[0] if len(wrappers) == 1 else tuple(wrappers)