Coverage for src / beautyspot / core.py: 87%
461 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-11 19:10 +0900
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-11 19:10 +0900
1"""BeautySpot コアモジュール。
3タスク管理、シリアライズ、キャッシュとストレージを含むリソース管理を行うメインクラス群を提供します。
4"""
5# src/beautyspot/core.py
7import atexit
8import asyncio
9import logging
10import functools
11import inspect
12import random
13import threading
14import warnings
15import weakref
16import time
17from concurrent.futures import Executor, Future, ThreadPoolExecutor, wait
18from contextlib import contextmanager
19from typing import (
20 Any,
21 Coroutine,
22 Callable,
23 Iterator,
24 NamedTuple,
25 Optional,
26 Union,
27 Type,
28 overload,
29 TypeVar,
30 TypeVarTuple,
31 ParamSpec,
32 Sequence,
33 ContextManager,
34)
36from beautyspot.maintenance import MaintenanceService
37from beautyspot.limiter import LimiterProtocol
38from beautyspot.types import SaveErrorContext
39from beautyspot.lifecycle import (
40 RetentionSpec,
41)
42from beautyspot.db import TaskDBCore, Flushable, Shutdownable
43from beautyspot.serializer import SerializerProtocol, TypeRegistryProtocol
44from beautyspot.cachekey import KeyGenPolicy
45from beautyspot.exceptions import (
46 ConfigurationError,
47 IncompatibleProviderError,
48 ValidationError,
49)
50from beautyspot.hooks import HookBase
51from beautyspot.types import PreExecuteContext, CacheHitContext, CacheMissContext
52from beautyspot.content_types import ContentType
53from beautyspot.cache import CacheManager, CACHE_MISS
55# ジェネリクスの定義
56P = ParamSpec("P")
57R = TypeVar("R")
58T = TypeVar("T")
59Ts = TypeVarTuple("Ts")
61# --- ロガー ---
62logger = logging.getLogger(__name__)
63logger.addHandler(logging.NullHandler())
66class _ExecutionContext(NamedTuple):
67 """_execute_sync / _execute_async の初期化フェーズで共通する解決済み値。"""
69 save_blob: bool | None
70 version: str | None
71 content_type: str | None
72 save_sync: bool
73 func_identifier: str
74 input_id: str
75 cache_key: str
76 hook_kwargs: dict
79class _BackgroundLoop:
80 """バックグラウンドで非同期IOタスクを処理するイベントループ。
82 明示的なタスク追跡とスレッドロックにより、シャットダウン時の競合状態を完全に排除します。
84 Args:
85 drain_timeout (float, optional): シャットダウン時のタスク完了待機タイムアウト(秒)。デフォルトは5.0。
86 """
88 def __init__(self, drain_timeout: float = 5.0):
89 self._drain_timeout = drain_timeout
91 # メインスレッドで loop オブジェクトを生成
92 self._loop = asyncio.new_event_loop()
94 self._lock = threading.Lock()
95 self._is_shutting_down = False
96 self._active_tasks = 0 # 実行中(またはスケジュール待ち)のタスク数
98 # 新しい Thread を設定
99 # daemon=True により、プロセス終了時の Python の無限ハングアップを防ぐ
100 self._thread = threading.Thread(
101 target=self._run_event_loop, daemon=True, name="BeautySpot-BGLoop"
102 )
104 # 設定した Thread を実行
105 self._thread.start()
107 # インスタンス自身が atexit を管理するため、グローバルな _active_loops 管理は不要
108 atexit.register(self._shutdown)
110 def _run_event_loop(self):
111 """スレッドローカルでイベントループを実行する"""
112 asyncio.set_event_loop(self._loop)
113 try:
114 self._loop.run_forever()
115 finally:
116 self._loop.close()
118 async def _task_wrapper(self, coro: Coroutine) -> Any:
119 """タスクの完了を確実にフックし、必要ならループを停止するラッパー"""
120 try:
121 return await coro
122 finally:
123 with self._lock:
124 self._active_tasks -= 1
125 # シャットダウン中で、かつ最後のタスクが終わった瞬間ならループを止める
126 if self._is_shutting_down and self._active_tasks == 0:
127 self._loop.call_soon_threadsafe(self._loop.stop)
129 def submit(self, coro: Coroutine) -> Optional[Future[Any]]:
130 """スレッドセーフにタスクを投入する"""
131 with self._lock:
132 if self._is_shutting_down:
133 logger.debug("Background loop is shutting down. Task rejected.")
134 try:
135 coro.close()
136 except Exception:
137 # Best-effort cleanup; avoid raising during rejection.
138 pass
139 return None
141 # ロック内でカウンタを増やすことで、確実にインフライトとして追跡される
142 self._active_tasks += 1
144 try:
145 return asyncio.run_coroutine_threadsafe(
146 self._task_wrapper(coro), self._loop
147 )
148 except BaseException:
149 # 万が一スケジュールに失敗した場合はカウンタを戻す
150 try:
151 coro.close()
152 except Exception:
153 pass
154 with self._lock:
155 self._active_tasks -= 1
156 # シャットダウン中かつ最後のタスクだった場合、ループ停止を通知する
157 if self._is_shutting_down and self._active_tasks == 0:
158 try:
159 self._loop.call_soon_threadsafe(self._loop.stop)
160 except RuntimeError:
161 pass # ループは既に停止/クローズ済み
162 raise
164 def stop(self, save_sync: bool = True):
165 """
166 ループに対して新規タスクの受付停止を通知し、シャットダウンシーケンスを開始する。
167 Spot.shutdown() や GCの _shutdown_resources() から呼び出される統一されたAPI。
168 """
169 # atexit ハンドラの蓄積を防止
170 atexit.unregister(self._shutdown)
172 with self._lock:
173 # 既にシャットダウン中であれば二重実行を避ける
174 if self._is_shutting_down: 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true
175 return
176 self._is_shutting_down = True
178 # 現在アクティブなタスクがゼロなら、即座にループ停止をスケジュール
179 if self._active_tasks == 0:
180 self._loop.call_soon_threadsafe(self._loop.stop)
182 if save_sync:
183 # アクティブなタスクが残っている場合は、最後の _task_wrapper が stop() を呼ぶ
184 # タイムアウト付きでスレッドの終了(=ループの停止)を待つ
185 self._thread.join(timeout=self._drain_timeout)
187 if self._thread.is_alive(): 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true
188 logger.warning(
189 f"BeautySpot background loop did not finish within {self._drain_timeout}s. "
190 "Pending IO tasks have been abruptly terminated."
191 )
193 def _shutdown(self):
194 """
195 [atexit フック]
196 プロセス終了時に呼ばれる安全網。タイムアウト付きの待機を実行する。
197 """
198 self.stop(save_sync=True)
201class Spot:
202 """タスク管理、シリアライズ、キャッシュとストレージを含むリソース管理を行うメインクラス。
204 依存オブジェクト(CacheManagerやLimiterProtocolなど)を注入して初期化されます。
205 通常は直接インスタンス化せず、`bs.Spot(...)` ファクトリ関数を通じて使用することが推奨されます。
207 Args:
208 name (str): Spotインスタンスの名前。
209 cache (CacheManager): キャッシュマネージャーのインスタンス。
210 limiter (LimiterProtocol): レートリミッターのインスタンス。
211 save_sync (bool, optional): キャッシュ保存のデフォルト同期動作。デフォルトはTrue。
212 eviction_rate (float, optional): キャッシュの自動破棄を実行する確率(0.0〜1.0)。デフォルトは0.0。
213 drain_timeout (float, optional): バックグラウンドタスク完了待機のタイムアウト(秒)。デフォルトは5.0。
214 drain_poll_interval (float, optional): バックグラウンドタスク待機時のポーリング間隔(秒)。デフォルトは0.5。
215 on_background_error (Optional[Callable[[Exception, SaveErrorContext], None]], optional): バックグラウンド保存時のエラーハンドラ。
216 """
218 def __init__(
219 self,
220 name: str,
221 cache: CacheManager,
222 limiter: LimiterProtocol,
223 save_sync: bool = True,
224 eviction_rate: float = 0.0,
225 drain_timeout: float = 5.0,
226 drain_poll_interval: float = 0.5,
227 on_background_error: Optional[
228 Callable[[Exception | BaseException, SaveErrorContext], None]
229 ] = None,
230 ) -> None:
231 self.name = name
232 if not (0.0 <= eviction_rate <= 1.0): 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true
233 raise ValueError("eviction_rate must be between 0.0 and 1.0")
234 self.eviction_rate = eviction_rate
235 if drain_timeout <= 0: 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true
236 raise ValueError("drain_timeout must be positive")
237 if drain_poll_interval <= 0: 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true
238 raise ValueError("drain_poll_interval must be positive")
239 self._drain_timeout = drain_timeout
240 self._drain_poll_interval = drain_poll_interval
242 # --- コンポーネントの保持 ---
243 self.cache = cache
244 self.limiter = limiter
246 # --- オプション設定の適用 ---
247 self._save_sync = save_sync
248 self.on_background_error = on_background_error
250 # --- DBの初期化 ---
251 self.cache.db.init_schema()
253 # --- バックグラウンド IO 管理 ---
254 self._bg_loop: _BackgroundLoop | None = None
255 self._executor: Executor | None = None
256 self._finalizer: weakref.finalize | None = None
258 self._bg_init_lock = threading.Lock()
259 self._shutdown_called = False
260 self._owns_db = False
262 self._active_futures: set = set()
263 self._futures_lock = threading.Lock()
265 self._maintenance_service: MaintenanceService | None = None
266 self._maintenance_lock = threading.Lock()
267 self._eviction_guard_lock = threading.Lock()
268 self._eviction_running = False
269 self._last_eviction_time = 0.0
271 def __enter__(self) -> "Spot":
272 return self
274 def __exit__(self, exc_type, exc_value, traceback):
275 self._drain_futures()
277 def _track_future(self, future: Any):
278 if future is None: 278 ↛ 279line 278 didn't jump to line 279 because the condition on line 278 was never true
279 return
280 with self._futures_lock:
281 self._active_futures.add(future)
283 def _on_done(f):
284 with self._futures_lock:
285 self._active_futures.discard(f)
287 future.add_done_callback(_on_done)
289 @property
290 def maintenance(self) -> MaintenanceService:
291 svc = self._maintenance_service
292 if svc is None: 292 ↛ 301line 292 didn't jump to line 301 because the condition on line 292 was always true
293 with self._maintenance_lock:
294 if self._maintenance_service is None: 294 ↛ 300line 294 didn't jump to line 300 because the condition on line 294 was always true
295 self._maintenance_service = MaintenanceService(
296 db=self.cache.db,
297 storage=self.cache.storage,
298 serializer=self.cache.serializer,
299 )
300 svc = self._maintenance_service
301 assert svc is not None
302 return svc
304 def _ensure_bg_resources(self) -> tuple[_BackgroundLoop, Executor]:
305 bg, ex = self._bg_loop, self._executor
306 if bg is not None and ex is not None:
307 return bg, ex
309 with self._bg_init_lock:
310 if self._shutdown_called:
311 raise RuntimeError(
312 "Cannot submit background tasks after shutdown() has been called."
313 )
314 if self._bg_loop is None or self._executor is None: 314 ↛ 329line 314 didn't jump to line 329 because the condition on line 314 was always true
315 if self._bg_loop is None: 315 ↛ 317line 315 didn't jump to line 317 because the condition on line 315 was always true
316 self._bg_loop = _BackgroundLoop(drain_timeout=self._drain_timeout)
317 if self._executor is None: 317 ↛ 319line 317 didn't jump to line 319 because the condition on line 317 was always true
318 self._executor = ThreadPoolExecutor()
319 if self._finalizer is None: 319 ↛ 329line 319 didn't jump to line 329 because the condition on line 319 was always true
320 self._finalizer = weakref.finalize(
321 self,
322 Spot._shutdown_resources,
323 self._bg_loop,
324 self._executor,
325 self.cache.db,
326 self._owns_db,
327 )
328 self._finalizer.atexit = False
329 return self._bg_loop, self._executor
331 @staticmethod
332 def _shutdown_resources(
333 bg_loop: _BackgroundLoop,
334 executor: Executor,
335 db: TaskDBCore,
336 owns_db: bool,
337 ) -> None:
338 bg_loop.stop(save_sync=False)
339 executor.shutdown(wait=False, cancel_futures=True)
340 if owns_db and isinstance(db, Shutdownable):
341 db.shutdown(wait=False)
343 def shutdown(self, save_sync: bool = True):
344 """Spotインスタンスをシャットダウンし、バックグラウンドリソースを解放する。
346 Args:
347 save_sync (bool, optional): 同期的に未完了の保存タスクを待機するかどうか。Trueの場合は完了を待つ。デフォルトはTrue。
348 """
349 with self._bg_init_lock:
350 self._shutdown_called = True
351 if self._finalizer is not None and self._finalizer.alive:
352 self._finalizer.detach()
353 if save_sync:
354 self._drain_futures()
356 if self._bg_loop is not None:
357 self._bg_loop.stop(save_sync=save_sync)
359 if self._executor is not None:
360 self._executor.shutdown(wait=save_sync, cancel_futures=not save_sync)
362 def flush(self, timeout: Optional[float] = None) -> None:
363 """バックグラウンドで実行中のすべての保存タスクとDBの書き込みの完了を待機する。
365 Args:
366 timeout (Optional[float], optional): 待機する最大時間(秒)。指定しない場合は初期化時の `drain_timeout` が使用される。
367 """
368 timeout_val = timeout if timeout is not None else self._drain_timeout
369 deadline = time.monotonic() + timeout_val
371 while True:
372 with self._futures_lock:
373 snapshot = list(self._active_futures)
374 if not snapshot:
375 break
376 remaining = deadline - time.monotonic()
377 if remaining <= 0: 377 ↛ 378line 377 didn't jump to line 378 because the condition on line 377 was never true
378 break
379 wait_timeout = min(self._drain_poll_interval, remaining)
380 wait(snapshot, timeout=wait_timeout)
382 db_remaining = deadline - time.monotonic()
383 if db_remaining > 0 and isinstance(self.cache.db, Flushable):
384 self.cache.db.flush(timeout=db_remaining)
386 def _drain_futures(self) -> None:
387 self.flush()
389 def _get_func_identifier(self, func: Callable) -> str:
390 module = getattr(func, "__module__", None) or func.__class__.__module__
391 qualname = getattr(func, "__qualname__", None) or func.__class__.__qualname__
392 return f"{module}.{qualname}"
394 def _trigger_auto_eviction(self) -> None:
395 if self.eviction_rate <= 0.0:
396 return
397 if random.random() >= self.eviction_rate: 397 ↛ 398line 397 didn't jump to line 398 because the condition on line 397 was never true
398 return
400 with self._eviction_guard_lock:
401 if self._eviction_running: 401 ↛ 402line 401 didn't jump to line 402 because the condition on line 401 was never true
402 return
403 now = time.monotonic()
404 if now - self._last_eviction_time < 60.0: 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true
405 return
406 self._eviction_running = True
408 logger.debug(f"Triggering auto-eviction (rate: {self.eviction_rate})")
410 with self._futures_lock:
411 pending_futures = list(self._active_futures)
413 def _run_clean_safe():
414 try:
415 self.maintenance.clean_garbage(orphan_grace_seconds=60.0)
416 except Exception as e:
417 logger.error(f"Auto-eviction failed: {e}", exc_info=True)
419 def _clear_eviction_flag():
420 with self._eviction_guard_lock:
421 self._last_eviction_time = time.monotonic()
422 self._eviction_running = False
424 try:
425 bg_loop, executor = self._ensure_bg_resources()
427 async def _run_clean_coro():
428 loop = asyncio.get_running_loop()
429 if pending_futures:
430 await asyncio.wait(
431 [asyncio.wrap_future(f) for f in pending_futures],
432 timeout=self._drain_timeout,
433 )
434 await loop.run_in_executor(executor, _run_clean_safe)
436 future = bg_loop.submit(_run_clean_coro())
437 if future:
438 self._track_future(future)
439 future.add_done_callback(lambda f: _clear_eviction_flag())
440 else:
441 _clear_eviction_flag()
442 except Exception:
443 _clear_eviction_flag()
445 def _resolve_key_fn(
446 self,
447 func: Callable,
448 keygen: Optional[Union[Callable, KeyGenPolicy]] = None,
449 input_key_fn: Optional[Union[Callable, KeyGenPolicy]] = None,
450 ) -> Optional[Callable]:
451 if keygen is not None and input_key_fn is not None:
452 raise IncompatibleProviderError("Cannot specify both 'keygen' and 'input_key_fn'.")
453 if input_key_fn is not None:
454 warnings.warn("`input_key_fn` is deprecated, use `keygen` instead.", DeprecationWarning, stacklevel=3)
455 target = keygen or input_key_fn
456 if isinstance(target, KeyGenPolicy):
457 return target.bind(func)
458 return target
460 def register(
461 self,
462 code: int,
463 encoder: Callable[[T], Any],
464 decoder: Optional[Callable[[Any], T]] = None,
465 decoder_factory: Optional[Callable[[Type[T]], Callable[[Any], T]]] = None,
466 ) -> Callable[[Type[T]], Type[T]]:
467 """カスタム型をシリアライザに登録するためのデコレータ。
469 `decoder` または `decoder_factory` のいずれかを必ず提供する必要があります。
471 Args:
472 code (int): カスタム型の一意な識別コード。
473 encoder (Callable[[T], Any]): カスタム型オブジェクトからシリアライズ可能な形式(辞書など)に変換する関数。
474 decoder (Optional[Callable[[Any], T]], optional): デシリアライズ時にデータをカスタム型オブジェクトに復元する関数。
475 decoder_factory (Optional[Callable[[Type[T]], Callable[[Any], T]]], optional): 型に基づいてデコーダ関数を生成するファクトリ関数。
477 Returns:
478 Callable[[Type[T]], Type[T]]: クラスデコレータ。
480 Raises:
481 IncompatibleProviderError: `decoder` と `decoder_factory` の両方が未指定の場合に発生。
482 """
483 if decoder is None and decoder_factory is None:
484 raise IncompatibleProviderError(
485 "Must provide either `decoder` or `decoder_factory`."
486 )
488 def decorator(cls: Type) -> Type:
489 actual_decoder = decoder
490 if decoder_factory:
491 actual_decoder = decoder_factory(cls)
493 if actual_decoder is None:
494 raise ValueError("Decoder resolution failed.")
496 self.register_type(cls, code, encoder, actual_decoder)
497 return cls
499 return decorator
501 def register_type(
502 self,
503 type_class: Type[T],
504 code: int,
505 encoder: Callable[[T], Any],
506 decoder: Callable[[Any], T],
507 ):
508 """カスタム型を直接シリアライザに登録する。
510 Args:
511 type_class (Type[T]): 登録するカスタム型のクラス。
512 code (int): カスタム型の一意な識別コード。
513 encoder (Callable[[T], Any]): エンコーダ関数。
514 decoder (Callable[[Any], T]): デコーダ関数。
516 Raises:
517 NotImplementedError: 現在のシリアライザが型登録をサポートしていない場合。
518 """
519 if isinstance(self.cache.serializer, TypeRegistryProtocol): 519 ↛ 522line 519 didn't jump to line 522 because the condition on line 519 was always true
520 self.cache.serializer.register(type_class, code, encoder, decoder)
521 else:
522 raise NotImplementedError(
523 "Current serializer does not support type registration."
524 )
526 @staticmethod
527 def _dispatch_hooks(
528 hooks: Optional[Sequence[HookBase]], method_name: str, context: Any
529 ) -> None:
530 if not hooks:
531 return
532 for hook in hooks:
533 try:
534 getattr(hook, method_name)(context)
535 except Exception as e:
536 logger.error(
537 f"Error in hook '{type(hook).__name__}.{method_name}': {e}",
538 exc_info=True,
539 )
541 async def _dispatch_hooks_async(
542 self,
543 hooks: Optional[Sequence[HookBase]],
544 method_name: str,
545 context: Any,
546 loop: asyncio.AbstractEventLoop,
547 executor: Executor,
548 ) -> None:
549 if not hooks:
550 return
551 await loop.run_in_executor(
552 executor, self._dispatch_hooks, hooks, method_name, context
553 )
555 # --- Core Logic ---
557 def _resolve_settings(
558 self,
559 save_blob: bool | None,
560 version: str | None,
561 content_type: str | ContentType | None,
562 save_sync: bool | None,
563 ) -> tuple[bool | None, str | None, str | None, bool]:
564 return (
565 save_blob,
566 version,
567 content_type,
568 (save_sync if save_sync is not None else self._save_sync),
569 )
571 def _prepare_execution(
572 self,
573 func: Callable,
574 args: tuple,
575 kwargs: dict,
576 save_blob: bool | None,
577 effective_key_fn: Optional[Callable],
578 version: str | None,
579 content_type: Optional[str | ContentType],
580 save_sync: bool | None,
581 hooks: Optional[Sequence[HookBase]],
582 ) -> _ExecutionContext:
583 s_blob, s_ver, s_ct, s_save_sync = self._resolve_settings(
584 save_blob, version, content_type, save_sync
585 )
586 func_identifier = self._get_func_identifier(func)
587 iid, ck = self.cache.make_cache_key(
588 func_identifier, args, kwargs, effective_key_fn, s_ver
589 )
590 return _ExecutionContext(
591 s_blob,
592 s_ver,
593 s_ct,
594 s_save_sync,
595 func_identifier,
596 iid,
597 ck,
598 dict(kwargs) if hooks else kwargs,
599 )
601 def _build_cache_hit_context(
602 self,
603 func_name: str,
604 input_id: str,
605 cache_key: str,
606 args: tuple,
607 hook_kwargs: dict,
608 result: Any,
609 version: str | None,
610 ) -> CacheHitContext:
611 return CacheHitContext(
612 func_name=func_name,
613 input_id=str(input_id),
614 cache_key=cache_key,
615 args=args,
616 kwargs=hook_kwargs,
617 result=result,
618 version=version,
619 )
621 def _build_save_kwargs(
622 self,
623 cache_key: str,
624 func: Callable,
625 func_identifier: str,
626 input_id: str,
627 version: str | None,
628 result: Any,
629 content_type: str | None,
630 save_blob: bool | None,
631 serializer: Optional[SerializerProtocol],
632 retention: RetentionSpec,
633 ) -> dict:
634 expires_at = self.cache.calculate_expires_at(
635 func_identifier, func.__name__, retention
636 )
637 return {
638 "cache_key": cache_key,
639 "func_name": func.__name__,
640 "func_identifier": func_identifier,
641 "input_id": str(input_id),
642 "version": version,
643 "result": result,
644 "content_type": content_type,
645 "save_blob": save_blob,
646 "serializer": serializer,
647 "expires_at": expires_at,
648 }
650 def _persist_result_sync(self, save_sync: bool, save_kwargs: dict) -> None:
651 if save_sync:
652 try:
653 self.cache.set(**save_kwargs)
654 except Exception as e:
655 self._handle_save_error(e, save_kwargs)
656 raise
657 else:
658 try:
659 self._submit_background_save(**save_kwargs)
660 except Exception as e:
661 self._handle_save_error(e, save_kwargs)
662 if self.on_background_error is None: 662 ↛ 663line 662 didn't jump to line 663 because the condition on line 662 was never true
663 raise
665 async def _persist_result_async(self, save_sync: bool, save_kwargs: dict) -> None:
666 if save_sync:
667 try:
668 bg_loop, exec_pool = self._ensure_bg_resources()
669 coro = self._save_result_async(
670 executor=exec_pool, safe=False, **save_kwargs
671 )
672 future = bg_loop.submit(coro)
673 if future is None: 673 ↛ 674line 673 didn't jump to line 674 because the condition on line 673 was never true
674 self._notify_save_discarded(save_kwargs)
675 raise RuntimeError(
676 f"Cache save for '{save_kwargs.get('func_name')}' "
677 "was discarded because the background loop is shutting down."
678 )
679 else:
680 await asyncio.wrap_future(future)
681 except Exception as e:
682 self._handle_save_error(e, save_kwargs)
683 raise
684 else:
685 try:
686 self._submit_background_save(**save_kwargs)
687 except Exception as e:
688 self._handle_save_error(e, save_kwargs)
689 if self.on_background_error is None:
690 raise
692 def _execute_sync(
693 self,
694 func: Callable,
695 args: tuple,
696 kwargs: dict,
697 save_blob: bool | None,
698 effective_key_fn: Optional[Callable],
699 version: str | None,
700 content_type: Optional[str | ContentType],
701 serializer: Optional[SerializerProtocol],
702 retention: RetentionSpec,
703 save_sync: bool | None,
704 hooks: Optional[Sequence[HookBase]] = None,
705 ) -> Any:
706 ctx = self._prepare_execution(
707 func,
708 args,
709 kwargs,
710 save_blob,
711 effective_key_fn,
712 version,
713 content_type,
714 save_sync,
715 hooks,
716 )
717 self._dispatch_hooks(
718 hooks,
719 "pre_execute",
720 PreExecuteContext(
721 func.__name__, str(ctx.input_id), ctx.cache_key, args, ctx.hook_kwargs
722 ),
723 )
724 cached = self.cache.get(ctx.cache_key, serializer)
725 if cached is not CACHE_MISS:
726 self._dispatch_hooks(
727 hooks,
728 "on_cache_hit",
729 self._build_cache_hit_context(
730 func.__name__,
731 ctx.input_id,
732 ctx.cache_key,
733 args,
734 ctx.hook_kwargs,
735 cached,
736 ctx.version,
737 ),
738 )
739 return cached
741 with self.cache.herd_sync(ctx.cache_key, serializer) as herd:
742 if not herd.is_executor:
743 if herd.is_error:
744 raise herd.result
745 self._dispatch_hooks(
746 hooks,
747 "on_cache_hit",
748 self._build_cache_hit_context(
749 func.__name__,
750 ctx.input_id,
751 ctx.cache_key,
752 args,
753 ctx.hook_kwargs,
754 herd.result,
755 ctx.version,
756 ),
757 )
758 return herd.result
760 try:
761 res = func(*args, **kwargs)
762 self._dispatch_hooks(
763 hooks,
764 "on_cache_miss",
765 CacheMissContext(
766 func.__name__,
767 str(ctx.input_id),
768 ctx.cache_key,
769 args,
770 ctx.hook_kwargs,
771 res,
772 ctx.version,
773 ),
774 )
775 herd.result_box.append((True, res))
777 # 実行成功後、同期モード(save_sync=True)の場合はキャッシュ保存エラーを伝播させる
778 try:
779 save_kwargs = self._build_save_kwargs(
780 ctx.cache_key,
781 func,
782 ctx.func_identifier,
783 ctx.input_id,
784 ctx.version,
785 res,
786 ctx.content_type,
787 ctx.save_blob,
788 serializer,
789 retention,
790 )
791 self._persist_result_sync(ctx.save_sync, save_kwargs)
792 except Exception as e:
793 if ctx.save_sync: 793 ↛ 795line 793 didn't jump to line 795 because the condition on line 793 was always true
794 raise
795 logger.error(f"Failed to submit background cache save, but execution succeeded: {e}")
797 return res
799 except BaseException as e:
800 if not herd.result_box:
801 herd.result_box.append((False, e))
802 raise
803 finally:
804 self._trigger_auto_eviction()
806 async def _execute_async(
807 self,
808 func: Callable,
809 args: tuple,
810 kwargs: dict,
811 save_blob: bool | None,
812 effective_key_fn: Optional[Callable],
813 version: str | None,
814 content_type: Optional[str | ContentType],
815 serializer: Optional[SerializerProtocol],
816 retention: RetentionSpec,
817 save_sync: bool | None,
818 hooks: Optional[Sequence[HookBase]] = None,
819 ) -> Any:
820 ctx = self._prepare_execution(
821 func,
822 args,
823 kwargs,
824 save_blob,
825 effective_key_fn,
826 version,
827 content_type,
828 save_sync,
829 hooks,
830 )
831 loop = asyncio.get_running_loop()
832 _, executor = self._ensure_bg_resources()
833 await self._dispatch_hooks_async(
834 hooks,
835 "pre_execute",
836 PreExecuteContext(
837 func.__name__, str(ctx.input_id), ctx.cache_key, args, ctx.hook_kwargs
838 ),
839 loop,
840 executor,
841 )
842 cached = await loop.run_in_executor(
843 executor, self.cache.get, ctx.cache_key, serializer
844 )
845 if cached is not CACHE_MISS:
846 await self._dispatch_hooks_async(
847 hooks,
848 "on_cache_hit",
849 self._build_cache_hit_context(
850 func.__name__,
851 ctx.input_id,
852 ctx.cache_key,
853 args,
854 ctx.hook_kwargs,
855 cached,
856 ctx.version,
857 ),
858 loop,
859 executor,
860 )
861 return cached
863 async with self.cache.herd_async(
864 ctx.cache_key, serializer, loop, executor
865 ) as herd:
866 if not herd.is_executor:
867 if herd.is_error:
868 raise herd.result
869 await self._dispatch_hooks_async(
870 hooks,
871 "on_cache_hit",
872 self._build_cache_hit_context(
873 func.__name__,
874 ctx.input_id,
875 ctx.cache_key,
876 args,
877 ctx.hook_kwargs,
878 herd.result,
879 ctx.version,
880 ),
881 loop,
882 executor,
883 )
884 return herd.result
886 try:
887 res = await func(*args, **kwargs)
888 await self._dispatch_hooks_async(
889 hooks,
890 "on_cache_miss",
891 CacheMissContext(
892 func.__name__,
893 str(ctx.input_id),
894 ctx.cache_key,
895 args,
896 ctx.hook_kwargs,
897 res,
898 ctx.version,
899 ),
900 loop,
901 executor,
902 )
903 herd.result_box.append((True, res))
905 # 実行成功後、同期モード(save_sync=True)の場合はキャッシュ保存エラーを伝播させる
906 try:
907 save_kwargs = self._build_save_kwargs(
908 ctx.cache_key,
909 func,
910 ctx.func_identifier,
911 ctx.input_id,
912 ctx.version,
913 res,
914 ctx.content_type,
915 ctx.save_blob,
916 serializer,
917 retention,
918 )
919 await self._persist_result_async(ctx.save_sync, save_kwargs)
920 except Exception as e:
921 if ctx.save_sync: 921 ↛ 923line 921 didn't jump to line 923 because the condition on line 921 was always true
922 raise
923 logger.error(f"Failed to persist cache asynchronously, but execution succeeded: {e}")
925 return res
927 except BaseException as e:
928 if not herd.result_box:
929 herd.result_box.append((False, e))
930 raise
931 finally:
932 self._trigger_auto_eviction()
934 def _handle_save_error(self, err: BaseException | Exception, save_kwargs: dict) -> None:
935 logger.error(
936 f"Cache save failed for '{save_kwargs.get('func_name')}': {err}",
937 exc_info=True,
938 )
939 if self.on_background_error:
940 try:
941 import sys
943 res = save_kwargs.get("result")
944 self.on_background_error(
945 err,
946 SaveErrorContext(
947 func_name=save_kwargs.get("func_name", "unknown"),
948 cache_key=save_kwargs.get("cache_key", ""),
949 input_id=save_kwargs.get("input_id", ""),
950 version=save_kwargs.get("version"),
951 content_type=save_kwargs.get("content_type"),
952 save_blob=save_kwargs.get("save_blob"),
953 expires_at=save_kwargs.get("expires_at"),
954 result_type=type(res).__name__,
955 result_size=sys.getsizeof(res) if res is not None else None,
956 ),
957 )
958 except Exception:
959 logger.error(
960 "Error occurred within the 'on_background_error' callback",
961 exc_info=True,
962 )
964 def _notify_save_discarded(self, save_kwargs: dict) -> None:
965 msg = f"Background save for '{save_kwargs.get('func_name')}' discarded during shutdown."
966 logger.warning(msg)
967 warnings.warn(msg, ResourceWarning)
968 self._handle_save_error(RuntimeError(msg), save_kwargs)
970 def _submit_background_save(self, **save_kwargs) -> None:
971 bg_loop, executor = self._ensure_bg_resources()
972 coro = self._save_result_async(executor=executor, **save_kwargs)
973 future = bg_loop.submit(coro)
974 if future:
975 self._track_future(future)
976 else:
977 self._notify_save_discarded(save_kwargs)
979 async def _save_result_async(
980 self, /, executor: Executor, safe: bool = True, **kwargs
981 ) -> None:
982 loop = asyncio.get_running_loop()
983 target = (lambda **kw: self._save_result_safe(**kw)) if safe else self.cache.set
984 try:
985 await loop.run_in_executor(executor, functools.partial(target, **kwargs))
986 except (asyncio.CancelledError, RuntimeError) as e:
987 # Executor might be forcibly shut down during program exit or Spot.shutdown(save_sync=False)
988 msg = f"Background save for '{kwargs.get('func_name')}' cancelled during shutdown."
989 logger.warning(msg)
990 self._handle_save_error(e, kwargs)
991 if not safe:
992 raise
994 def _save_result_safe(self, **kwargs):
995 try:
996 self.cache.set(**kwargs)
997 except Exception as e:
998 self._handle_save_error(e, kwargs)
1000 def consume(self, cost: Union[int, Callable] = 1):
1001 """関数実行前にレートリミッターのトークンを消費するデコレータ。
1003 Args:
1004 cost (Union[int, Callable], optional): 消費するコスト。整数、または実行時の引数を受け取りコストを計算する関数を指定可能。デフォルトは1。
1006 Returns:
1007 Callable: デコレートされた関数。
1008 """
1009 def decorator(func):
1010 is_async = inspect.iscoroutinefunction(func)
1012 @functools.wraps(func)
1013 def sync_wrapper(*args, **kwargs):
1014 self.limiter.consume(cost(*args, **kwargs) if callable(cost) else cost)
1015 return func(*args, **kwargs)
1017 @functools.wraps(func)
1018 async def async_wrapper(*args, **kwargs):
1019 await self.limiter.consume_async(
1020 cost(*args, **kwargs) if callable(cost) else cost
1021 )
1022 return await func(*args, **kwargs)
1024 return async_wrapper if is_async else sync_wrapper
1026 return decorator
1028 @overload
1029 def mark(self, _func: Callable[P, R]) -> Callable[P, R]: ...
1031 @overload
1032 def mark(
1033 self,
1034 *,
1035 save_blob: Optional[bool] = None,
1036 keygen: Optional[Union[Callable, KeyGenPolicy]] = None,
1037 input_key_fn: Optional[Union[Callable, KeyGenPolicy]] = None,
1038 version: str | None = None,
1039 content_type: Optional[str | ContentType] = None,
1040 serializer: Optional[SerializerProtocol] = None,
1041 save_sync: Optional[bool] = None,
1042 retention: RetentionSpec = None,
1043 hooks: Optional[Sequence[HookBase]] = None,
1044 ) -> Callable[[Callable[P, R]], Callable[P, R]]: ...
1046 def mark(self, _func: Optional[Callable] = None, **kwargs) -> Any:
1047 """関数を修飾し、実行結果のキャッシュ機能とメタデータ管理を追加するデコレータ。
1049 関数の実行結果は、引数とその他の設定から計算されたキャッシュキーに基づいて保存・再利用されます。
1051 Args:
1052 _func (Optional[Callable], optional): デコレート対象の関数。
1053 save_blob (Optional[bool], optional): 大きな戻り値をBlobストレージに保存するかどうか。
1054 keygen (Optional[Union[Callable, KeyGenPolicy]], optional): キャッシュキーの生成ロジックを指定する。
1055 input_key_fn (Optional[Union[Callable, KeyGenPolicy]], optional): 非推奨。`keygen` を使用すること。
1056 version (Optional[str], optional): 関数のキャッシュバージョン。ロジック変更時にインクリメントすることでキャッシュを無効化できる。
1057 content_type (Optional[Union[str, ContentType]], optional): 戻り値のMIMEタイプ。
1058 serializer (Optional[SerializerProtocol], optional): この関数に適用するカスタムシリアライザ。
1059 save_sync (Optional[bool], optional): 保存処理を同期的に行うかどうか。
1060 retention (RetentionSpec, optional): キャッシュの保持ポリシー。
1061 hooks (Optional[Sequence[HookBase]], optional): 実行前後やキャッシュヒット時に発火するフックのリスト。
1063 Returns:
1064 Any: デコレートされた関数、またはデコレータ関数。
1065 """
1066 def decorator(func):
1067 if inspect.isgeneratorfunction(func) or inspect.isasyncgenfunction(func): 1067 ↛ 1068line 1067 didn't jump to line 1068 because the condition on line 1067 was never true
1068 raise ConfigurationError(f"Generators not supported: {func.__name__}")
1069 key_fn = self._resolve_key_fn(
1070 func, kwargs.get("keygen"), kwargs.get("input_key_fn")
1071 )
1072 is_async = inspect.iscoroutinefunction(func)
1074 @functools.wraps(func)
1075 def sync_wrapper(*args, **kw):
1076 return self._execute_sync(
1077 func,
1078 args,
1079 kw,
1080 kwargs.get("save_blob"),
1081 key_fn,
1082 kwargs.get("version"),
1083 kwargs.get("content_type"),
1084 kwargs.get("serializer"),
1085 kwargs.get("retention"),
1086 kwargs.get("save_sync"),
1087 kwargs.get("hooks"),
1088 )
1090 @functools.wraps(func)
1091 async def async_wrapper(*args, **kw):
1092 return await self._execute_async(
1093 func,
1094 args,
1095 kw,
1096 kwargs.get("save_blob"),
1097 key_fn,
1098 kwargs.get("version"),
1099 kwargs.get("content_type"),
1100 kwargs.get("serializer"),
1101 kwargs.get("retention"),
1102 kwargs.get("save_sync"),
1103 kwargs.get("hooks"),
1104 )
1106 return async_wrapper if is_async else sync_wrapper
1108 return decorator(_func) if _func else decorator
1110 @overload
1111 def cached_run(self, __func: Callable[P, R], **kwargs: Any) -> ContextManager[Callable[P, R]]: ...
1113 @overload
1114 def cached_run(self, *funcs: *Ts, **kwargs: Any) -> ContextManager[tuple[*Ts]]: ...
1116 @contextmanager
1117 def cached_run(self, *funcs: Any, **kwargs) -> Iterator[Any]:
1118 """コンテキストマネージャ内で一時的に関数を `mark` し、キャッシュ機能を適用する。
1120 デコレータを直接付与できない外部ライブラリの関数などをキャッシュする際に使用します。
1122 Args:
1123 *funcs (Any): キャッシュ対象にする関数(複数可)。
1124 **kwargs: `mark` デコレータに渡すオプションパラメータ。
1126 Yields:
1127 Callable | tuple[Callable, ...]: キャッシュ機能が付与された関数。複数の場合はタプルで返る。
1129 Raises:
1130 ValidationError: 関数が1つも指定されなかった場合。
1131 """
1132 if not funcs:
1133 raise ValidationError(
1134 "At least one function must be provided to cached_run."
1135 )
1136 wrappers = [self.mark(**kwargs)(f) for f in funcs]
1137 yield wrappers[0] if len(wrappers) == 1 else tuple(wrappers)