Coverage for src / beautyspot / core.py: 88%
456 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-18 18:20 +0900
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-18 18:20 +0900
1"""BeautySpot コアモジュール。
3タスク管理、シリアライズ、キャッシュとストレージを含むリソース管理を行うメインクラス群を提供します。
4"""
5# src/beautyspot/core.py
7import atexit
8import asyncio
9import logging
10import functools
11import inspect
12import random
13import threading
14import warnings
15import weakref
16import time
17from concurrent.futures import Executor, Future, ThreadPoolExecutor, wait
18from contextlib import contextmanager
19from typing import (
20 Any,
21 Coroutine,
22 Callable,
23 Iterator,
24 NamedTuple,
25 Optional,
26 Union,
27 Type,
28 overload,
29 TypeVar,
30 TypeVarTuple,
31 ParamSpec,
32 Sequence,
33 ContextManager,
34)
36from beautyspot.maintenance import MaintenanceService
37from beautyspot.limiter import LimiterProtocol
38from beautyspot.types import SaveErrorContext
39from beautyspot.lifecycle import (
40 RetentionSpec,
41)
42from beautyspot.db import TaskDBCore, Flushable, Shutdownable
43from beautyspot.serializer import SerializerProtocol, TypeRegistryProtocol
44from beautyspot.cachekey import KeyGenPolicy
45from beautyspot.exceptions import (
46 ConfigurationError,
47 IncompatibleProviderError,
48 ValidationError,
49)
50from beautyspot.hooks import HookBase
51from beautyspot.types import PreExecuteContext, CacheHitContext, CacheMissContext
52from beautyspot.content_types import ContentType
53from beautyspot.cache import CacheManager, CACHE_MISS
55# ジェネリクスの定義
56P = ParamSpec("P")
57R = TypeVar("R")
58T = TypeVar("T")
59Ts = TypeVarTuple("Ts")
61# --- ロガー ---
62logger = logging.getLogger(__name__)
63logger.addHandler(logging.NullHandler())
66class _ExecutionContext(NamedTuple):
67 """_execute_sync / _execute_async の初期化フェーズで共通する解決済み値。"""
69 save_blob: bool | None
70 version: str | None
71 content_type: str | None
72 save_sync: bool
73 func_identifier: str
74 input_id: str
75 cache_key: str
76 hook_kwargs: dict
79class _BackgroundLoop:
80 """バックグラウンドで非同期IOタスクを処理するイベントループ。
82 明示的なタスク追跡とスレッドロックにより、シャットダウン時の競合状態を完全に排除します。
84 Args:
85 flush_timeout (float, optional): シャットダウン時のタスク完了待機タイムアウト(秒)。デフォルトは5.0。
86 """
88 def __init__(self, flush_timeout: float = 5.0):
89 self._flush_timeout = flush_timeout
91 # メインスレッドで loop オブジェクトを生成
92 self._loop = asyncio.new_event_loop()
94 self._lock = threading.Lock()
95 self._is_shutting_down = False
96 self._active_tasks = 0 # 実行中(またはスケジュール待ち)のタスク数
98 # 新しい Thread を設定
99 # daemon=True により、プロセス終了時の Python の無限ハングアップを防ぐ
100 self._thread = threading.Thread(
101 target=self._run_event_loop, daemon=True, name="BeautySpot-BGLoop"
102 )
104 # 設定した Thread を実行
105 self._thread.start()
107 # インスタンス自身が atexit を管理するため、グローバルな _active_loops 管理は不要
108 atexit.register(self._shutdown)
110 def _run_event_loop(self):
111 """スレッドローカルでイベントループを実行する"""
112 asyncio.set_event_loop(self._loop)
113 try:
114 self._loop.run_forever()
115 finally:
116 self._loop.close()
118 async def _task_wrapper(self, coro: Coroutine) -> Any:
119 """タスクの完了を確実にフックし、必要ならループを停止するラッパー"""
120 try:
121 return await coro
122 finally:
123 with self._lock:
124 self._active_tasks -= 1
125 # シャットダウン中で、かつ最後のタスクが終わった瞬間ならループを止める
126 if self._is_shutting_down and self._active_tasks == 0:
127 self._loop.call_soon_threadsafe(self._loop.stop)
129 def submit(self, coro: Coroutine) -> Optional[Future[Any]]:
130 """スレッドセーフにタスクを投入する"""
131 with self._lock:
132 # 実行時シャットダウンフラグをチェックすることで、タスクの異常タイミングでの追加を防止する
133 if self._is_shutting_down:
134 logger.debug("Background loop is shutting down. Task rejected.")
135 try:
136 coro.close()
137 except Exception:
138 # Best-effort cleanup; avoid raising during rejection.
139 pass
140 return None
142 # ロック内でカウンタを増やすことで、確実にインフライトとして追跡される
143 self._active_tasks += 1
145 try:
146 # マルチスレッドのループで、タスクのスレッドセーフな実行をスケジュールする
147 return asyncio.run_coroutine_threadsafe(
148 self._task_wrapper(coro),
149 self._loop,
150 )
151 except BaseException:
152 # 万が一スケジュールに失敗した場合はカウンタを戻す
153 try:
154 coro.close()
155 except Exception:
156 pass
157 with self._lock:
158 self._active_tasks -= 1
159 # シャットダウン中かつ最後のタスクだった場合、ループ停止を通知する
160 if self._is_shutting_down and self._active_tasks == 0:
161 try:
162 self._loop.call_soon_threadsafe(self._loop.stop)
163 except RuntimeError:
164 pass # ループは既に停止/クローズ済み
165 raise
167 def stop(self, save_sync: bool = True):
168 """
169 ループに対して新規タスクの受付停止を通知し、シャットダウンシーケンスを開始する。
170 Spot.shutdown() や GCの _shutdown_resources() から呼び出される統一されたAPI。
171 """
172 # atexit ハンドラの蓄積を防止
173 atexit.unregister(self._shutdown)
175 with self._lock:
176 # 既にシャットダウン中であれば二重実行を避ける
177 if self._is_shutting_down: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true
178 return
180 # まずシャットダウンフラグを立てつつ、ループを止めたり、残タスクをjoinするのがメイン処理
181 self._is_shutting_down = True
183 # 現在アクティブなタスクがゼロなら、即座にループ停止をスケジュール
184 if self._active_tasks == 0:
185 self._loop.call_soon_threadsafe(self._loop.stop)
187 if save_sync:
188 # アクティブなタスクが残っている場合は、最後の _task_wrapper が stop() を呼ぶ
189 # タイムアウト付きでスレッドの終了(=ループの停止)を待つ
190 self._thread.join(timeout=self._flush_timeout)
192 if self._thread.is_alive(): 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true
193 logger.warning(
194 f"BeautySpot background loop did not finish within {self._flush_timeout}s. "
195 "Pending IO tasks have been abruptly terminated."
196 )
198 def _shutdown(self):
199 """
200 [atexit フック]
201 プロセス終了時に呼ばれる安全網。タイムアウト付きの待機を実行する。
202 """
203 self.stop(save_sync=True)
206class Spot:
207 """タスク管理、シリアライズ、キャッシュとストレージを含むリソース管理を行うメインクラス。
209 依存オブジェクト(CacheManagerやLimiterProtocolなど)を注入して初期化されます。
210 通常は直接インスタンス化せず、`bs.Spot(...)` ファクトリ関数を通じて使用することが推奨されます。
212 Args:
213 name (str): Spotインスタンスの名前。
214 cache (CacheManager): キャッシュマネージャーのインスタンス。
215 limiter (LimiterProtocol): レートリミッターのインスタンス。
216 save_sync (bool, optional): キャッシュ保存のデフォルト同期動作。デフォルトはTrue。
217 gc_probability (float, optional): キャッシュの自動GCを実行する確率(0.0〜1.0)。デフォルトは0.0。
218 flush_timeout (float, optional): バックグラウンドタスク完了待機のタイムアウト(秒)。デフォルトは5.0。
219 flush_poll_interval (float, optional): バックグラウンドタスク待機時のポーリング間隔(秒)。デフォルトは0.5。
220 on_save_error (Optional[Callable[[Exception, SaveErrorContext], None]], optional): キャッシュ保存時のエラーハンドラ(同期・非同期問わず発火)。
221 """
223 def __init__(
224 self,
225 name: str,
226 cache: CacheManager,
227 limiter: LimiterProtocol,
228 save_sync: bool = True,
229 gc_probability: float = 0.0,
230 flush_timeout: float = 5.0,
231 flush_poll_interval: float = 0.5,
232 on_save_error: Optional[
233 Callable[[Exception | BaseException, SaveErrorContext], None]
234 ] = None,
235 _owns_db: bool = False,
236 ) -> None:
237 self.name = name
238 if not (0.0 <= gc_probability <= 1.0): 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true
239 raise ValueError("gc_probability must be between 0.0 and 1.0")
240 self.gc_probability = gc_probability
241 if flush_timeout <= 0: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true
242 raise ValueError("flush_timeout must be positive")
243 if flush_poll_interval <= 0: 243 ↛ 244line 243 didn't jump to line 244 because the condition on line 243 was never true
244 raise ValueError("flush_poll_interval must be positive")
245 self._flush_timeout = flush_timeout
246 self._flush_poll_interval = flush_poll_interval
248 # --- コンポーネントの保持 ---
249 self.cache = cache
250 self.limiter = limiter
252 # --- オプション設定の適用 ---
253 self._save_sync = save_sync
254 self.on_save_error = on_save_error
256 # --- バックグラウンド IO 管理 ---
257 self._bg_loop: _BackgroundLoop | None = None
258 self._executor: Executor | None = None
259 self._finalizer: weakref.finalize | None = None
261 self._bg_init_lock = threading.Lock()
262 self._shutdown_called = False
264 self._owns_db = _owns_db
266 self._active_futures: set = set()
267 self._futures_lock = threading.Lock()
269 self.maintenance_service: MaintenanceService | None = None
270 self._maintenance_lock = threading.Lock()
271 self._eviction_guard_lock = threading.Lock()
272 self._eviction_running = False
273 self._last_eviction_time = 0.0
275 def __enter__(self) -> "Spot":
276 return self
278 def __exit__(self, exc_type, exc_value, traceback):
279 self.flush()
281 def _track_future(self, future: Any):
282 if future is None: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true
283 return
284 with self._futures_lock:
285 self._active_futures.add(future)
287 def _on_done(f):
288 with self._futures_lock:
289 self._active_futures.discard(f)
291 future.add_done_callback(_on_done)
293 @property
294 def maintenance(self) -> MaintenanceService:
295 svc = self.maintenance_service
296 if svc is None: 296 ↛ 305line 296 didn't jump to line 305 because the condition on line 296 was always true
297 with self._maintenance_lock:
298 if self.maintenance_service is None: 298 ↛ 304line 298 didn't jump to line 304 because the condition on line 298 was always true
299 self.maintenance_service = MaintenanceService(
300 db=self.cache.db,
301 storage=self.cache.storage,
302 serializer=self.cache.serializer,
303 )
304 svc = self.maintenance_service
305 assert svc is not None
306 return svc
308 def _ensure_bg_resources(self) -> tuple[_BackgroundLoop, Executor]:
309 bg, ex = self._bg_loop, self._executor
310 if bg is not None and ex is not None:
311 return bg, ex
313 with self._bg_init_lock:
314 if self._shutdown_called:
315 raise RuntimeError(
316 "Cannot submit background tasks after shutdown() has been called."
317 )
318 if self._bg_loop is None or self._executor is None: 318 ↛ 333line 318 didn't jump to line 333 because the condition on line 318 was always true
319 if self._bg_loop is None: 319 ↛ 321line 319 didn't jump to line 321 because the condition on line 319 was always true
320 self._bg_loop = _BackgroundLoop(flush_timeout=self._flush_timeout)
321 if self._executor is None: 321 ↛ 323line 321 didn't jump to line 323 because the condition on line 321 was always true
322 self._executor = ThreadPoolExecutor()
323 if self._finalizer is None: 323 ↛ 333line 323 didn't jump to line 333 because the condition on line 323 was always true
324 self._finalizer = weakref.finalize(
325 self,
326 Spot._shutdown_resources,
327 self._bg_loop,
328 self._executor,
329 self.cache.db,
330 self._owns_db,
331 )
332 self._finalizer.atexit = False
333 return self._bg_loop, self._executor
335 @staticmethod
336 def _shutdown_resources(
337 bg_loop: _BackgroundLoop,
338 executor: Executor,
339 db: TaskDBCore,
340 owns_db: bool,
341 ) -> None:
342 bg_loop.stop(save_sync=False)
343 executor.shutdown(wait=False, cancel_futures=True)
344 if owns_db and isinstance(db, Shutdownable):
345 db.shutdown(wait=False)
347 def shutdown(self, save_sync: bool = True):
348 """Spotインスタンスをシャットダウンし、バックグラウンドリソースを解放する。
350 Args:
351 save_sync (bool, optional): 同期的に未完了の保存タスクを待機するかどうか。Trueの場合は完了を待つ。デフォルトはTrue。
352 """
353 with self._bg_init_lock:
354 self._shutdown_called = True
355 if self._finalizer is not None and self._finalizer.alive:
356 self._finalizer.detach()
357 if save_sync:
358 self.flush()
360 if self._bg_loop is not None:
361 self._bg_loop.stop(save_sync=save_sync)
363 if self._executor is not None:
364 self._executor.shutdown(wait=save_sync, cancel_futures=not save_sync)
366 def flush(self, timeout: Optional[float] = None) -> None:
367 """バックグラウンドで実行中のすべての保存タスクとDBの書き込みの完了を待機する。
369 Args:
370 timeout (Optional[float], optional): 待機する最大時間(秒)。指定しない場合は初期化時の `flush_timeout` が使用される。
371 """
372 timeout_val = timeout if timeout is not None else self._flush_timeout
373 deadline = time.monotonic() + timeout_val
375 while True:
376 with self._futures_lock:
377 snapshot = list(self._active_futures)
378 if not snapshot:
379 break
380 remaining = deadline - time.monotonic()
381 if remaining <= 0: 381 ↛ 382line 381 didn't jump to line 382 because the condition on line 381 was never true
382 break
383 wait_timeout = min(self._flush_poll_interval, remaining)
384 wait(snapshot, timeout=wait_timeout)
386 db_remaining = deadline - time.monotonic()
387 if db_remaining > 0 and isinstance(self.cache.db, Flushable):
388 self.cache.db.flush(timeout=db_remaining)
390 def _get_func_identifier(self, func: Callable) -> str:
391 module = getattr(func, "__module__", None) or func.__class__.__module__
392 qualname = getattr(func, "__qualname__", None) or func.__class__.__qualname__
393 return f"{module}.{qualname}"
395 def _trigger_auto_eviction(self) -> None:
396 if self.gc_probability <= 0.0:
397 return
398 if random.random() >= self.gc_probability: 398 ↛ 399line 398 didn't jump to line 399 because the condition on line 398 was never true
399 return
401 with self._eviction_guard_lock:
402 if self._eviction_running: 402 ↛ 403line 402 didn't jump to line 403 because the condition on line 402 was never true
403 return
404 now = time.monotonic()
405 if now - self._last_eviction_time < 60.0: 405 ↛ 406line 405 didn't jump to line 406 because the condition on line 405 was never true
406 return
407 self._eviction_running = True
409 logger.debug(f"Triggering auto-eviction (gc_probability: {self.gc_probability})")
411 with self._futures_lock:
412 pending_futures = list(self._active_futures)
414 def _run_clean_safe():
415 try:
416 self.maintenance.clean_garbage(orphan_grace_seconds=60.0)
417 except Exception as e:
418 logger.error(f"Auto-eviction failed: {e}", exc_info=True)
420 def _clear_eviction_flag():
421 with self._eviction_guard_lock:
422 self._last_eviction_time = time.monotonic()
423 self._eviction_running = False
425 try:
426 bg_loop, executor = self._ensure_bg_resources()
428 async def _run_clean_coro():
429 loop = asyncio.get_running_loop()
430 if pending_futures:
431 await asyncio.wait(
432 [asyncio.wrap_future(f) for f in pending_futures],
433 timeout=self._flush_timeout,
434 )
435 await loop.run_in_executor(executor, _run_clean_safe)
437 future = bg_loop.submit(_run_clean_coro())
438 if future:
439 self._track_future(future)
440 future.add_done_callback(lambda f: _clear_eviction_flag())
441 else:
442 _clear_eviction_flag()
443 except BaseException:
444 # KeyboardInterrupt 等でもフラグを確実にクリアし、
445 # 以降の eviction がスキップされ続けるのを防ぐ。
446 _clear_eviction_flag()
448 def _resolve_key_fn(
449 self,
450 func: Callable,
451 keygen: Optional[Union[Callable, KeyGenPolicy]] = None,
452 input_key_fn: Optional[Union[Callable, KeyGenPolicy]] = None,
453 ) -> Optional[Callable]:
454 if keygen is not None and input_key_fn is not None:
455 raise IncompatibleProviderError("Cannot specify both 'keygen' and 'input_key_fn'.")
456 if input_key_fn is not None:
457 warnings.warn("`input_key_fn` is deprecated, use `keygen` instead.", DeprecationWarning, stacklevel=3)
458 target = keygen or input_key_fn
459 if isinstance(target, KeyGenPolicy):
460 return target.bind(func)
461 return target
463 def register(
464 self,
465 code: int,
466 encoder: Callable[[T], Any],
467 decoder: Optional[Callable[[Any], T]] = None,
468 decoder_factory: Optional[Callable[[Type[T]], Callable[[Any], T]]] = None,
469 ) -> Callable[[Type[T]], Type[T]]:
470 """カスタム型をシリアライザに登録するためのデコレータ。
472 `decoder` または `decoder_factory` のいずれかを必ず提供する必要があります。
474 Args:
475 code (int): カスタム型の一意な識別コード。
476 encoder (Callable[[T], Any]): カスタム型オブジェクトからシリアライズ可能な形式(辞書など)に変換する関数。
477 decoder (Optional[Callable[[Any], T]], optional): デシリアライズ時にデータをカスタム型オブジェクトに復元する関数。
478 decoder_factory (Optional[Callable[[Type[T]], Callable[[Any], T]]], optional): 型に基づいてデコーダ関数を生成するファクトリ関数。
480 Returns:
481 Callable[[Type[T]], Type[T]]: クラスデコレータ。
483 Raises:
484 IncompatibleProviderError: `decoder` と `decoder_factory` の両方が未指定の場合に発生。
485 """
486 if decoder is None and decoder_factory is None:
487 raise IncompatibleProviderError(
488 "Must provide either `decoder` or `decoder_factory`."
489 )
491 def decorator(cls: Type) -> Type:
492 actual_decoder = decoder
493 if decoder_factory:
494 actual_decoder = decoder_factory(cls)
496 if actual_decoder is None:
497 raise ValueError("Decoder resolution failed.")
499 self.register_type(cls, code, encoder, actual_decoder)
500 return cls
502 return decorator
504 def register_type(
505 self,
506 type_class: Type[T],
507 code: int,
508 encoder: Callable[[T], Any],
509 decoder: Callable[[Any], T],
510 ):
511 """カスタム型を直接シリアライザに登録する。
513 Args:
514 type_class (Type[T]): 登録するカスタム型のクラス。
515 code (int): カスタム型の一意な識別コード。
516 encoder (Callable[[T], Any]): エンコーダ関数。
517 decoder (Callable[[Any], T]): デコーダ関数。
519 Raises:
520 NotImplementedError: 現在のシリアライザが型登録をサポートしていない場合。
521 """
522 if isinstance(self.cache.serializer, TypeRegistryProtocol): 522 ↛ 525line 522 didn't jump to line 525 because the condition on line 522 was always true
523 self.cache.serializer.register(type_class, code, encoder, decoder)
524 else:
525 raise NotImplementedError(
526 "Current serializer does not support type registration."
527 )
529 @staticmethod
530 def _dispatch_hooks(
531 hooks: Optional[Sequence[HookBase]], method_name: str, context: Any
532 ) -> None:
533 if not hooks:
534 return
535 for hook in hooks:
536 try:
537 getattr(hook, method_name)(context)
538 except Exception as e:
539 logger.error(
540 f"Error in hook '{type(hook).__name__}.{method_name}': {e}",
541 exc_info=True,
542 )
544 async def _dispatch_hooks_async(
545 self,
546 hooks: Optional[Sequence[HookBase]],
547 method_name: str,
548 context: Any,
549 loop: asyncio.AbstractEventLoop,
550 executor: Executor,
551 ) -> None:
552 if not hooks:
553 return
554 await loop.run_in_executor(
555 executor, self._dispatch_hooks, hooks, method_name, context
556 )
558 # --- Core Logic ---
560 def _resolve_settings(
561 self,
562 save_blob: bool | None,
563 version: str | None,
564 content_type: str | ContentType | None,
565 save_sync: bool | None,
566 ) -> tuple[bool | None, str | None, str | None, bool]:
567 return (
568 save_blob,
569 version,
570 content_type,
571 (save_sync if save_sync is not None else self._save_sync),
572 )
574 def _prepare_execution(
575 self,
576 func: Callable,
577 args: tuple,
578 kwargs: dict,
579 save_blob: bool | None,
580 effective_key_fn: Optional[Callable],
581 version: str | None,
582 content_type: Optional[str | ContentType],
583 save_sync: bool | None,
584 hooks: Optional[Sequence[HookBase]],
585 ) -> _ExecutionContext:
586 s_blob, s_ver, s_ct, s_save_sync = self._resolve_settings(
587 save_blob, version, content_type, save_sync
588 )
589 func_identifier = self._get_func_identifier(func)
590 iid, ck = self.cache.make_cache_key(
591 func_identifier, args, kwargs, effective_key_fn, s_ver
592 )
593 return _ExecutionContext(
594 s_blob,
595 s_ver,
596 s_ct,
597 s_save_sync,
598 func_identifier,
599 iid,
600 ck,
601 dict(kwargs) if hooks else kwargs,
602 )
604 def _build_cache_hit_context(
605 self,
606 func_name: str,
607 input_id: str,
608 cache_key: str,
609 args: tuple,
610 hook_kwargs: dict,
611 result: Any,
612 version: str | None,
613 ) -> CacheHitContext:
614 return CacheHitContext(
615 func_name=func_name,
616 input_id=str(input_id),
617 cache_key=cache_key,
618 args=args,
619 kwargs=hook_kwargs,
620 result=result,
621 version=version,
622 )
624 def _build_save_kwargs(
625 self,
626 cache_key: str,
627 func: Callable,
628 func_identifier: str,
629 input_id: str,
630 version: str | None,
631 result: Any,
632 content_type: str | None,
633 save_blob: bool | None,
634 serializer: Optional[SerializerProtocol],
635 retention: RetentionSpec,
636 ) -> dict:
637 expires_at = self.cache.calculate_expires_at(
638 func_identifier, func.__name__, retention
639 )
640 return {
641 "cache_key": cache_key,
642 "func_name": func.__name__,
643 "func_identifier": func_identifier,
644 "input_id": str(input_id),
645 "version": version,
646 "result": result,
647 "content_type": content_type,
648 "save_blob": save_blob,
649 "serializer": serializer,
650 "expires_at": expires_at,
651 }
653 def _persist_result_sync(self, save_sync: bool, save_kwargs: dict) -> None:
654 if save_sync:
655 try:
656 self.cache.set(**save_kwargs)
657 except Exception as e:
658 self._handle_save_error(e, save_kwargs)
659 raise
660 else:
661 try:
662 self._submit_background_save(**save_kwargs)
663 except Exception as e:
664 self._handle_save_error(e, save_kwargs)
665 raise
667 async def _persist_result_async(self, save_sync: bool, save_kwargs: dict) -> None:
668 if save_sync:
669 try:
670 bg_loop, exec_pool = self._ensure_bg_resources()
671 coro = self._save_result_async(
672 executor=exec_pool, safe=False, **save_kwargs
673 )
674 future = bg_loop.submit(coro)
675 if future is None: 675 ↛ 676line 675 didn't jump to line 676 because the condition on line 675 was never true
676 self._notify_save_discarded(save_kwargs)
677 raise RuntimeError(
678 f"Cache save for '{save_kwargs.get('func_name')}' "
679 "was discarded because the background loop is shutting down."
680 )
681 else:
682 await asyncio.wrap_future(future)
683 except Exception as e:
684 self._handle_save_error(e, save_kwargs)
685 raise
686 else:
687 try:
688 self._submit_background_save(**save_kwargs)
689 except Exception as e:
690 self._handle_save_error(e, save_kwargs)
691 raise
693 def _execute_sync(
694 self,
695 func: Callable,
696 args: tuple,
697 kwargs: dict,
698 save_blob: bool | None,
699 effective_key_fn: Optional[Callable],
700 version: str | None,
701 content_type: Optional[str | ContentType],
702 serializer: Optional[SerializerProtocol],
703 retention: RetentionSpec,
704 save_sync: bool | None,
705 hooks: Optional[Sequence[HookBase]] = None,
706 ) -> Any:
707 ctx = self._prepare_execution(
708 func,
709 args,
710 kwargs,
711 save_blob,
712 effective_key_fn,
713 version,
714 content_type,
715 save_sync,
716 hooks,
717 )
718 self._dispatch_hooks(
719 hooks,
720 "pre_execute",
721 PreExecuteContext(
722 func.__name__, str(ctx.input_id), ctx.cache_key, args, ctx.hook_kwargs
723 ),
724 )
725 cached = self.cache.get(ctx.cache_key, serializer)
726 if cached is not CACHE_MISS:
727 self._dispatch_hooks(
728 hooks,
729 "on_cache_hit",
730 self._build_cache_hit_context(
731 func.__name__,
732 ctx.input_id,
733 ctx.cache_key,
734 args,
735 ctx.hook_kwargs,
736 cached,
737 ctx.version,
738 ),
739 )
740 return cached
742 with self.cache.herd_sync(ctx.cache_key, serializer) as herd:
743 if not herd.is_executor:
744 if herd.is_error:
745 raise herd.result
746 self._dispatch_hooks(
747 hooks,
748 "on_cache_hit",
749 self._build_cache_hit_context(
750 func.__name__,
751 ctx.input_id,
752 ctx.cache_key,
753 args,
754 ctx.hook_kwargs,
755 herd.result,
756 ctx.version,
757 ),
758 )
759 return herd.result
761 try:
762 res = func(*args, **kwargs)
763 self._dispatch_hooks(
764 hooks,
765 "on_cache_miss",
766 CacheMissContext(
767 func.__name__,
768 str(ctx.input_id),
769 ctx.cache_key,
770 args,
771 ctx.hook_kwargs,
772 res,
773 ctx.version,
774 ),
775 )
776 herd.result_box.append((True, res))
778 # 実行成功後、同期モード(save_sync=True)の場合はキャッシュ保存エラーを伝播させる
779 try:
780 save_kwargs = self._build_save_kwargs(
781 ctx.cache_key,
782 func,
783 ctx.func_identifier,
784 ctx.input_id,
785 ctx.version,
786 res,
787 ctx.content_type,
788 ctx.save_blob,
789 serializer,
790 retention,
791 )
792 self._persist_result_sync(ctx.save_sync, save_kwargs)
793 except Exception as e:
794 if ctx.save_sync:
795 raise
796 logger.error(f"Failed to submit background cache save, but execution succeeded: {e}")
798 return res
800 except BaseException as e:
801 if not herd.result_box:
802 herd.result_box.append((False, e))
803 raise
804 finally:
805 self._trigger_auto_eviction()
807 async def _execute_async(
808 self,
809 func: Callable,
810 args: tuple,
811 kwargs: dict,
812 save_blob: bool | None,
813 effective_key_fn: Optional[Callable],
814 version: str | None,
815 content_type: Optional[str | ContentType],
816 serializer: Optional[SerializerProtocol],
817 retention: RetentionSpec,
818 save_sync: bool | None,
819 hooks: Optional[Sequence[HookBase]] = None,
820 ) -> Any:
821 ctx = self._prepare_execution(
822 func,
823 args,
824 kwargs,
825 save_blob,
826 effective_key_fn,
827 version,
828 content_type,
829 save_sync,
830 hooks,
831 )
832 loop = asyncio.get_running_loop()
833 _, executor = self._ensure_bg_resources()
834 await self._dispatch_hooks_async(
835 hooks,
836 "pre_execute",
837 PreExecuteContext(
838 func.__name__, str(ctx.input_id), ctx.cache_key, args, ctx.hook_kwargs
839 ),
840 loop,
841 executor,
842 )
843 cached = await loop.run_in_executor(
844 executor, self.cache.get, ctx.cache_key, serializer
845 )
846 if cached is not CACHE_MISS:
847 await self._dispatch_hooks_async(
848 hooks,
849 "on_cache_hit",
850 self._build_cache_hit_context(
851 func.__name__,
852 ctx.input_id,
853 ctx.cache_key,
854 args,
855 ctx.hook_kwargs,
856 cached,
857 ctx.version,
858 ),
859 loop,
860 executor,
861 )
862 return cached
864 async with self.cache.herd_async(
865 ctx.cache_key, serializer, loop, executor
866 ) as herd:
867 if not herd.is_executor:
868 if herd.is_error:
869 raise herd.result
870 await self._dispatch_hooks_async(
871 hooks,
872 "on_cache_hit",
873 self._build_cache_hit_context(
874 func.__name__,
875 ctx.input_id,
876 ctx.cache_key,
877 args,
878 ctx.hook_kwargs,
879 herd.result,
880 ctx.version,
881 ),
882 loop,
883 executor,
884 )
885 return herd.result
887 try:
888 res = await func(*args, **kwargs)
889 await self._dispatch_hooks_async(
890 hooks,
891 "on_cache_miss",
892 CacheMissContext(
893 func.__name__,
894 str(ctx.input_id),
895 ctx.cache_key,
896 args,
897 ctx.hook_kwargs,
898 res,
899 ctx.version,
900 ),
901 loop,
902 executor,
903 )
904 herd.result_box.append((True, res))
906 # 実行成功後、同期モード(save_sync=True)の場合はキャッシュ保存エラーを伝播させる
907 try:
908 save_kwargs = self._build_save_kwargs(
909 ctx.cache_key,
910 func,
911 ctx.func_identifier,
912 ctx.input_id,
913 ctx.version,
914 res,
915 ctx.content_type,
916 ctx.save_blob,
917 serializer,
918 retention,
919 )
920 await self._persist_result_async(ctx.save_sync, save_kwargs)
921 except Exception as e:
922 if ctx.save_sync: 922 ↛ 924line 922 didn't jump to line 924 because the condition on line 922 was always true
923 raise
924 logger.error(f"Failed to persist cache asynchronously, but execution succeeded: {e}")
926 return res
928 except BaseException as e:
929 if not herd.result_box:
930 herd.result_box.append((False, e))
931 raise
932 finally:
933 self._trigger_auto_eviction()
935 def _handle_save_error(self, err: BaseException | Exception, save_kwargs: dict) -> None:
936 logger.error(
937 f"Cache save failed for '{save_kwargs.get('func_name')}': {err}",
938 exc_info=True,
939 )
940 if self.on_save_error:
941 try:
942 import sys
944 res = save_kwargs.get("result")
945 self.on_save_error(
946 err,
947 SaveErrorContext(
948 func_name=save_kwargs.get("func_name", "unknown"),
949 cache_key=save_kwargs.get("cache_key", ""),
950 input_id=save_kwargs.get("input_id", ""),
951 version=save_kwargs.get("version"),
952 content_type=save_kwargs.get("content_type"),
953 save_blob=save_kwargs.get("save_blob"),
954 expires_at=save_kwargs.get("expires_at"),
955 result_type=type(res).__name__,
956 result_size=sys.getsizeof(res) if res is not None else None,
957 ),
958 )
959 except Exception:
960 logger.error(
961 "Error occurred within the 'on_save_error' callback",
962 exc_info=True,
963 )
965 def _notify_save_discarded(self, save_kwargs: dict) -> None:
966 msg = f"Background save for '{save_kwargs.get('func_name')}' discarded during shutdown."
967 logger.warning(msg)
968 warnings.warn(msg, ResourceWarning)
969 self._handle_save_error(RuntimeError(msg), save_kwargs)
971 def _submit_background_save(self, **save_kwargs) -> None:
972 bg_loop, executor = self._ensure_bg_resources()
973 coro = self._save_result_async(executor=executor, **save_kwargs)
974 future = bg_loop.submit(coro)
975 if future:
976 self._track_future(future)
977 else:
978 self._notify_save_discarded(save_kwargs)
980 async def _save_result_async(
981 self, /, executor: Executor, safe: bool = True, **kwargs
982 ) -> None:
983 loop = asyncio.get_running_loop()
984 target = (lambda **kw: self._save_result_safe(**kw)) if safe else self.cache.set
985 try:
986 await loop.run_in_executor(executor, functools.partial(target, **kwargs))
987 except (asyncio.CancelledError, RuntimeError) as e:
988 # Executor might be forcibly shut down during program exit or Spot.shutdown(save_sync=False)
989 msg = f"Background save for '{kwargs.get('func_name')}' cancelled during shutdown."
990 logger.warning(msg)
991 self._handle_save_error(e, kwargs)
992 if not safe:
993 raise
995 def _save_result_safe(self, **kwargs):
996 try:
997 self.cache.set(**kwargs)
998 except Exception as e:
999 self._handle_save_error(e, kwargs)
1001 def consume(self, cost: Union[int, Callable] = 1):
1002 """関数実行前にレートリミッターのトークンを消費するデコレータ。
1004 Args:
1005 cost (Union[int, Callable], optional): 消費するコスト。整数、または実行時の引数を受け取りコストを計算する関数を指定可能。デフォルトは1。
1007 Returns:
1008 Callable: デコレートされた関数。
1009 """
1010 def decorator(func):
1011 is_async = inspect.iscoroutinefunction(func)
1013 @functools.wraps(func)
1014 def sync_wrapper(*args, **kwargs):
1015 self.limiter.consume(cost(*args, **kwargs) if callable(cost) else cost)
1016 return func(*args, **kwargs)
1018 @functools.wraps(func)
1019 async def async_wrapper(*args, **kwargs):
1020 await self.limiter.consume_async(
1021 cost(*args, **kwargs) if callable(cost) else cost
1022 )
1023 return await func(*args, **kwargs)
1025 return async_wrapper if is_async else sync_wrapper
1027 return decorator
1029 @overload
1030 def mark(self, _func: Callable[P, R]) -> Callable[P, R]: ...
1032 @overload
1033 def mark(
1034 self,
1035 *,
1036 save_blob: Optional[bool] = None,
1037 keygen: Optional[Union[Callable, KeyGenPolicy]] = None,
1038 input_key_fn: Optional[Union[Callable, KeyGenPolicy]] = None,
1039 version: str | None = None,
1040 content_type: Optional[str | ContentType] = None,
1041 serializer: Optional[SerializerProtocol] = None,
1042 save_sync: Optional[bool] = None,
1043 retention: RetentionSpec = None,
1044 hooks: Optional[Sequence[HookBase]] = None,
1045 ) -> Callable[[Callable[P, R]], Callable[P, R]]: ...
1047 def mark(self, _func: Optional[Callable] = None, **kwargs) -> Any:
1048 """関数を修飾し、実行結果のキャッシュ機能とメタデータ管理を追加するデコレータ。
1050 関数の実行結果は、引数とその他の設定から計算されたキャッシュキーに基づいて保存・再利用されます。
1052 Args:
1053 _func (Optional[Callable], optional): デコレート対象の関数。
1054 save_blob (Optional[bool], optional): 大きな戻り値をBlobストレージに保存するかどうか。
1055 keygen (Optional[Union[Callable, KeyGenPolicy]], optional): キャッシュキーの生成ロジックを指定する。
1056 input_key_fn (Optional[Union[Callable, KeyGenPolicy]], optional): 非推奨。`keygen` を使用すること。
1057 version (Optional[str], optional): 関数のキャッシュバージョン。ロジック変更時にインクリメントすることでキャッシュを無効化できる。
1058 content_type (Optional[Union[str, ContentType]], optional): 戻り値のMIMEタイプ。
1059 serializer (Optional[SerializerProtocol], optional): この関数に適用するカスタムシリアライザ。
1060 save_sync (Optional[bool], optional): 保存処理を同期的に行うかどうか。
1061 retention (RetentionSpec, optional): キャッシュの保持ポリシー。
1062 hooks (Optional[Sequence[HookBase]], optional): 実行前後やキャッシュヒット時に発火するフックのリスト。
1064 Returns:
1065 Any: デコレートされた関数、またはデコレータ関数。
1066 """
1067 def decorator(func):
1068 if inspect.isgeneratorfunction(func) or inspect.isasyncgenfunction(func): 1068 ↛ 1069line 1068 didn't jump to line 1069 because the condition on line 1068 was never true
1069 raise ConfigurationError(f"Generators not supported: {func.__name__}")
1070 key_fn = self._resolve_key_fn(
1071 func, kwargs.get("keygen"), kwargs.get("input_key_fn")
1072 )
1073 is_async = inspect.iscoroutinefunction(func)
1075 @functools.wraps(func)
1076 def sync_wrapper(*args, **kw):
1077 return self._execute_sync(
1078 func,
1079 args,
1080 kw,
1081 kwargs.get("save_blob"),
1082 key_fn,
1083 kwargs.get("version"),
1084 kwargs.get("content_type"),
1085 kwargs.get("serializer"),
1086 kwargs.get("retention"),
1087 kwargs.get("save_sync"),
1088 kwargs.get("hooks"),
1089 )
1091 @functools.wraps(func)
1092 async def async_wrapper(*args, **kw):
1093 return await self._execute_async(
1094 func,
1095 args,
1096 kw,
1097 kwargs.get("save_blob"),
1098 key_fn,
1099 kwargs.get("version"),
1100 kwargs.get("content_type"),
1101 kwargs.get("serializer"),
1102 kwargs.get("retention"),
1103 kwargs.get("save_sync"),
1104 kwargs.get("hooks"),
1105 )
1107 return async_wrapper if is_async else sync_wrapper
1109 return decorator(_func) if _func else decorator
1111 @overload
1112 def cached_run(self, __func: Callable[P, R], **kwargs: Any) -> ContextManager[Callable[P, R]]: ...
1114 @overload
1115 def cached_run(self, *funcs: *Ts, **kwargs: Any) -> ContextManager[tuple[*Ts]]: ...
1117 @contextmanager
1118 def cached_run(self, *funcs: Any, **kwargs) -> Iterator[Any]:
1119 """コンテキストマネージャ内で一時的に関数を `mark` し、キャッシュ機能を適用する。
1121 デコレータを直接付与できない外部ライブラリの関数などをキャッシュする際に使用します。
1123 Args:
1124 *funcs (Any): キャッシュ対象にする関数(複数可)。
1125 **kwargs: `mark` デコレータに渡すオプションパラメータ。
1127 Yields:
1128 Callable | tuple[Callable, ...]: キャッシュ機能が付与された関数。複数の場合はタプルで返る。
1130 Raises:
1131 ValidationError: 関数が1つも指定されなかった場合。
1132 """
1133 if not funcs:
1134 raise ValidationError(
1135 "At least one function must be provided to cached_run."
1136 )
1137 wrappers = [self.mark(**kwargs)(f) for f in funcs]
1138 yield wrappers[0] if len(wrappers) == 1 else tuple(wrappers)