本ドキュメントは、『beautyspot』の各機能における詳細な仕様、インターフェース、およびアルゴリズムを定義する。
各コンポーネントが提供するAPI、内部状態遷移、および例外ハンドリングの詳細を定義する。
@spot.mark(
save_blob: Optional[bool] = None,
keygen: Optional[Union[Callable, KeyGenPolicy]] = None,
input_key_fn: Optional[Union[Callable, KeyGenPolicy]] = None,
version: str | None = None,
content_type: Optional[str | ContentType] = None,
serializer: Optional[SerializerProtocol] = None,
save_sync: Optional[bool] = None,
retention: RetentionSpec = None,
hooks: Optional[Sequence[HookBase]] = None,
)
def my_func(x, y): ...
inspect.iscoroutinefunction(fn) で判定する_execute_sync() に委譲する同期ラッパーを返す_execute_async() に委譲する非同期ラッパーを返す| パラメータ | デフォルト | 説明 |
|---|---|---|
save_blob |
None |
None: StoragePolicy に委譲、True: 強制Blob保存、False: DB内保存 |
keygen |
None |
キャッシュキー生成のカスタマイズ。引数やポリシーを指定可能 |
input_key_fn |
None |
(非推奨)keygen を使用すること |
version |
None |
バージョン文字列。変更するとキャッシュキーが変わり無効化される |
content_type |
None |
戻り値のMIMEタイプ |
serializer |
None |
None: Spotのデフォルトを使用。関数単位でオーバーライド可能 |
save_sync |
None |
保存処理を同期的に行うかどうか(Noneの場合はデフォルトに従う) |
retention |
None |
キャッシュの保持ポリシー("30d" などの文字列や期間オブジェクト等) |
hooks |
None |
関数単位のフックリスト(実行前後やキャッシュヒット時に発火) |
isgeneratorfunction または isasyncgenfunction)を渡した場合 → デコレーション時に ConfigurationError を送出するcached_run 利用時に引数として関数が一つも指定されなかった場合 → ValidationError を送出する__name__, __doc__ は functools.wraps によって保持される@mark (括弧なし) と @mark() (括弧あり) の両方の記法をサポートする@contextmanager
def cached_run(
self,
*funcs: Any,
**kwargs: Any
) -> Iterator[Any]: ...
Spot インスタンスからコンテキストマネージャとして呼び出される。funcs に渡された関数が0個の場合はエラーを送出する。Spot.mark(**kwargs) で生成したキャッシュデコレータを適用し、ラップされた関数を生成する。yield してコンテキストブロック内のユーザーコードに提供する。@spot.mark 適用済み関数と同様にキャッシュ機構を経由する。| パラメータ | 必須 | 説明 |
|---|---|---|
*funcs |
はい | キャッシュ機能を適用したい対象の関数。外部ライブラリの関数なども指定可能。複数指定可能。 |
**kwargs |
いいえ | Spot.mark デコレータに渡すオプション引数(save_blob, keygen, version, content_type, serializer, retention, save_sync, hooks 等)。 |
funcs が空(0個)の場合、beautyspot.exceptions.ValidationError を送出し、コンテキストに入らない。mark デコレータの制約に完全に依存する。KeyGen のスコープ規則に従う。@spot.mark() デコレータは同期関数と非同期(async def)関数の両方をサポートし、関数定義に応じて適切な実行・保存・キャッシュ検索フローを透過的に切り替える。
inspect.iscoroutinefunction(func) を用いて判定する。_execute_sync() に委譲する。ThreadPoolExecutor) にタスクを投げて非同期化される(save_sync=False の場合)。_execute_async() に委譲する。inspect.isgeneratorfunction または inspect.isasyncgenfunction で真となるジェネレータ・非同期ジェネレータが渡された場合、サポート対象外として即座に ConfigurationError を送出する。on_background_error コールバックが設定されていればそこに処理が委譲される。class KeyGen:
@staticmethod
def _default(args: tuple, kwargs: dict) -> str: ...
@staticmethod
def hash_items(items: list) -> str: ...
_default)args と kwargs を canonicalize() により再帰的に正規化する[args, kwargs] を MessagePack でバイナリにシリアライズするhash_items)KeyGenPolicy で使用される。| パラメータ | 型 | 説明 |
|---|---|---|
args |
tuple |
関数の位置引数 |
kwargs |
dict |
関数のキーワード引数 |
items |
list |
バインド済み引数の正規化値リスト |
RecursionError 発生時は、安全のため str((args, kwargs)) のハッシュをフォールバックとして使用するstr() ベースのハッシュを返す[(), {}] の構造として一定のハッシュ値を生成するstr() はオブジェクトによっては実行ごとに変わる可能性があるため、明示的な正規化が推奨される@singledispatch
def canonicalize(obj: Any) -> Any: ...
singledispatch を使用し、型に応じた最適な正規化形式へ再帰的に変換する。
("__bool__", value) のタプルに変換(1 と True の衝突を防止)[[k1, v1], [k2, v2], ...] のリストに変換"__list__") を付与し、要素を再帰的に正規化。集合型はソートを行う("__enum__", module, qualname, value) に変換__dict__ または __slots__ から属性を取得し、型名と共に正規化| オブジェクト型 | 正規化後の形式 | 備考 |
|---|---|---|
numpy.ndarray |
("__numpy__", shape, dtype, bytes) |
高速かつ正確なハッシュを保証 |
OrderedDict |
("__ordered_dict__", items) |
挿入順序を保持したまま正規化 |
Pydantic Model |
("__pydantic_v2__", schema) |
スキーマ情報をベースに判定 |
str(obj) にフォールバックするRecursionError をキャッチし、その階層で str() による正規化に切り替える_safe_sort_key により、Python 3 で比較不可能な型同士(例: int と str)が辞書のキーに含まれていても安定したソート順を保証するclass Strategy(Enum):
DEFAULT = auto() # 再帰的正規化
IGNORE = auto() # キー計算から除外
FILE_CONTENT = auto() # ファイルの内容をハッシュ
PATH_STAT = auto() # パス+サイズ+更新時刻をハッシュ
class KeyGenPolicy:
def bind(self, func: Callable) -> Callable[..., str]: ...
bind(func) 時に inspect.signature を解析し、引数名とデフォルト値を把握するStrategy を適用し、正規化された値のリストを作成するKeyGen.hash_items() で一つのハッシュにまとめる| 戦略 | 内容 | 用途 |
|---|---|---|
IGNORE |
キャッシュキーに含めない | verbose や logger 等、結果に影響しない引数 |
FILE_CONTENT |
ファイルを全読込してハッシュ化 | 入力ファイルの厳密な変更検知 |
PATH_STAT |
os.stat 情報を使用 |
大容量ファイルの高速な変更検知 |
FILE_CONTENT / PATH_STAT 指定時にファイルがない場合、"MISSING_{path}" という固定文字列をハッシュ対象とする"ERROR_{path}" を返すTypeError を送出する(実行前バリデーション)class SQLiteTaskDB(TaskDBBase):
def __init__(self, db_path: str | Path, timeout: float = 30.0): ...
def save(self, task: TaskRecord) -> None: ...
def get(self, input_key: str) -> TaskRecord | None: ...
_WriteTask キューに投入され、専用のバックグラウンドスレッドで直列に実行される。これにより SQLite の database is locked エラーを回避する。PRAGMA query_only = ON 状態で並行して実行される。tasks テーブルの存在を確認し、必要に応じて自動的に ALTER TABLE によるカラム追加(マイグレーション)を行う。| 設定 | デフォルト | 説明 |
|---|---|---|
journal_mode |
WAL |
書き込みと読み取りの並行性を高める設定 |
synchronous |
NORMAL |
パフォーマンスと耐久性のバランス設定 |
timeout |
30.0 |
データベースロック待機時間の閾値 |
sqlite3.Error をキャッチし、適切にラップして再送出する:memory: 指定時は、プロセスの生存期間中のみキャッシュが保持される@runtime_checkable
class TaskDBCore(Protocol):
def get(self, input_key: str) -> TaskRecord | None: ...
def save(self, record: TaskRecord) -> None: ...
class TaskDBBase(ABC):
# TaskDBCore を満たしつつ、メンテナンスのデフォルト実装を提供
| メソッド | 役割 |
|---|---|
get_blob_refs() |
全レコードの blob_key を列挙する(GC用) |
delete_expired() |
expires_at を経過したレコードを一括削除する |
get_history() |
実行履歴(統計)を取得する |
NotImplementedError を送出する代わりに、runtime_checkable による事前チェックを推奨する。TaskDBMaintenable 構成プロトコルとして扱う。class LocalStorage(BlobStorageMaintenable):
def __init__(self, base_dir: str | Path): ...
def save(self, key: str, data: bytes) -> str: ...
def load(self, location: str) -> bytes: ...
save)tempfile.mkstemp で一時ファイルを作成flush() および os.fsync() でディスク到達を保証os.replace で最終的なパスへリネーム(アトミックな置換)load / delete)base_dir の配下にあることを is_relative_to で厳密にチェックし、範囲外へのアクセスを遮断する。| パラメータ | 説明 | 制限 |
|---|---|---|
key |
保存ファイル名のベース | .. や / を含む場合は ValidationError |
location |
save が返した識別子 |
相対パス形式を推奨 |
load 時にファイルがない場合 CacheCorruptedError を送出PermissionError 発生時は、GCでの回収に委ねるため警告ログのみ出力OSError を送出/ と \ の両方を検証対象とするclass S3Storage(BlobStorageMaintenable):
def __init__(self, s3_uri: str, s3_opts: dict | None = None): ...
s3://bucket/prefix/key.bin 形式のURIをロケーション識別子として使用する。save)boto3.upload_fileobj を使用。5GBを超えるデータに対しては、自動的にマルチパートアップロードに切り替えられ、PutObjectの制限を回避する。get_mtime)head_object を使用し、データ本体をダウンロードせずに最終更新時刻(LastModified)を取得する。| 設定 | 内容 |
|---|---|
s3_uri |
s3://my-bucket/my-prefix 形式 |
s3_opts |
boto3.client('s3', **s3_opts) に渡される設定 |
boto3 がインポートできない環境でインスタンス化を試みると ImportError を送出botocore.exceptions.ClientError をキャッチし、状況に応じて CacheCorruptedError に変換class StoragePolicyProtocol(Protocol):
def should_save_as_blob(self, data: bytes) -> bool: ...
len(data) が threshold を超えた場合に True を返す。False を返すが、閾値を超えた場合にログ出力のみ行う(互換モード)。True を返す。| ポリシー型 | パラメータ | デフォルト | 説明 |
|---|---|---|---|
Threshold |
threshold |
なし | バイト単位の閾値。10MB等を推奨 |
Warning |
warning_threshold |
なし | 警告を出すサイズ閾値 |
False (DB保存) とみなし、エラーをログに記録する。len(data) > threshold のため、閾値と等しい場合はDB保存となる。class MsgpackSerializer(SerializerProtocol, TypeRegistryProtocol):
def dumps(self, obj: Any) -> bytes: ...
def loads(self, data: bytes) -> Any: ...
dumps)type(obj) で取得ExtType としてパックSerializationError)loads)ExtType を発見した場合、コードに対応するカスタムデコーダを呼び出す| 設定 | デフォルト | 説明 |
|---|---|---|
max_cache_size |
1024 |
スレッドごとの型解決LRUキャッシュの最大サイズ |
SerializationError を送出SerializationError を送出registry_generation カウンタを使用して、スレッドローカルキャッシュの整合性を維持def register(
code: int,
encoder: Callable[[Any], Any],
decoder: Callable[[Any], Any],
) -> Callable: ... # デコレータ形式
def register_type(
type_class: Type,
code: int,
encoder: Callable[[Any], Any],
decoder: Callable[[Any], Any],
) -> None: ...
_registry_generation をインクリメントし、全スレッドのLRUキャッシュを無効化対象とする| パラメータ | 制限 | 説明 |
|---|---|---|
code |
0 〜 127 |
MessagePack ExtType のユーザー定義領域コード |
encoder |
引数1、戻り値シリアライズ可能 | オブジェクトをプリミティブ構造へ変換する関数 |
decoder |
引数1、戻り値オブジェクト | プリミティブ構造からオブジェクトを復元する関数 |
ValueError を送出し、レジストリの破損を防ぐValueErrorclass _BackgroundLoop:
def submit(self, coro: Coroutine) -> None: ...
def stop(self) -> None: ...
def is_running(self) -> bool: ...
submit 時にデーモンスレッドを作成し、asyncio.run() を開始するrun_coroutine_threadsafe を使用して、メインスレッドからコルーチンをイベントループへ投入| 内部属性 | 型 | 説明 |
|---|---|---|
_loop |
AbstractEventLoop |
スレッド内で稼働するループ本体 |
_thread |
Thread |
daemon=True 設定のスレッド |
on_background_error コールバックで処理されるdef flush(timeout: float | None = None) -> bool: ...
@spot.mark(save_sync=False)
def my_func(): ...
save_sync=True)save_sync=False)BackgroundLoop に委譲し、即座に関数の戻り値を返す。flush)_inflight タスク(保存中)のリストを確認し、それら全てが完了するまで、またはタイムアウトまで待機する。| パラメータ | デフォルト | 説明 |
|---|---|---|
timeout |
None |
flush の最大待機時間(秒)。None は無限待機 |
on_background_error |
logger.error |
非同期保存失敗時のエラーハンドラ |
flush がタイムアウトした場合、False を返し、一部のデータが未保存である可能性を通知する。with spot: 終了時に自動的に flush が呼ばれ、プロセス終了前のデータ保存を確実にする。class LifecyclePolicy:
def add_rule(self, pattern: str, retention: Any) -> None: ...
def resolve_with_fallback(self, func_name: str) -> float | None: ...
順次照合: 登録された順にルールを走査する
パターンマッチ: fnmatch.fnmatch を使用し、関数名がパターンに合致するか判定
二段階解決:
最初に「完全修飾名 (module.qualname)」で照合
マッチしなければ「短縮名 (qualname)」で照合
結果返却: 最初にマッチしたルールの保持期間を秒数に変換して返す
パラメータ例説明pattern"my_mod.*", "*_test"Glob形式のパターンretention"30d", 3600保持期間の定義
default_retention パラメータで、どのルールにもマッチしない場合のデフォルト保持期間を指定できるLifecyclePolicy.default() のデフォルト値は "30d"(30日)None を指定すると無期限保持(従来互換)default_retention で指定された保持期間を返す(デフォルト: 30日)def parse_retention(val: Any) -> float | None: ...
"7d" (7日), "12h" (12時間), "30m" (30分), "10s" (10秒) 形式を秒数に変換する。大文字小文字は区別しない。int, float: 秒数としてそのまま扱う。timedelta: total_seconds() を取得。None: ポリシーに従う(または無期限)を意味する。| 単位 | 倍率 (秒) | 備考 |
|---|---|---|
d |
86400 | 日数 |
h |
3600 | 時間 |
m |
60 | 分 |
s |
1 | 秒 |
ValidationError を送出する。ValidationError となる。Retention.FOREVER (無限大の秒数) を提供し、ポリシーに関わらず永続化を指示できる。class TokenBucket(LimiterProtocol):
def consume(self, cost: float = 1.0) -> None: ...
async def consume_async(self, cost: float = 1.0) -> None: ...
TAT - 許容バースト 以前の場合、リクエストを拒否または待機させる。consume: 待機が必要な場合は time.sleep() でブロック。consume_async: 待機が必要な場合は asyncio.sleep() で非占有待機。| 設定 | デフォルト | 説明 |
|---|---|---|
tokens_per_minute |
必須 | 1分間に許可する平均リクエスト数(コスト合計) |
max_burst |
1.0 |
許容されるバースト数(トークン単位) |
RateLimitError (ValueErrorのサブクラス) を送出する。max_burst を超えるコストを持つ単一リクエストは、常に拒否される。TAT は現在時刻より過去に一定以上離れないよう補正される(過度なバーストの防止)。class HookBase:
def pre_execute(self, ctx: PreExecuteContext) -> None: ...
def on_cache_hit(self, ctx: CacheHitContext) -> None: ...
def on_cache_miss(self, ctx: CacheMissContext) -> None: ...
ThreadSafeHookBase を継承した場合、メタクラスが自動的に各メソッドを with self._lock: で包む| コンテキスト | 保持データ |
|---|---|
PreExecuteContext |
func, args, kwargs, cache_key |
CacheHitContext |
cache_key, result, metadata |
CacheMissContext |
cache_key, result, execution_time |
hooks=[h1, h2] と指定した場合、h1 -> h2 の順で実行される。class MaintenanceService:
def clean_garbage(self, grace_period: int = 3600) -> GarbageStats: ...
def prune(self, days: int, func_name: str | None = None) -> int: ...
def clear(self, func_name: str | None = None) -> int: ...
clean_garbage)delete_expired() でDBから古いタスクを除去blob_key とストレージ内の全ファイルを比較grace_period 以上経過したものを削除| パラメータ | デフォルト | 説明 |
|---|---|---|
grace_period |
3600 |
孤立Blobを削除するまでの猶予期間(秒)。並行実行中の保存を保護する |
days |
なし | prune で削除対象とする経過日数 |
clear(func_name) は前方一致ではなく、完全一致(または glob)で判定する。beautyspot list [--db DB_PATH]
beautyspot gc [--name PROJECT_NAME] [--force]
beautyspot stats
beautyspot ui
--db 指定がない場合、デフォルトの .beautyspot/ ディレクトリを探索するMaintenanceService または Spot インスタンスを生成し、対応するメソッドを呼び出すRich ライブラリを使用して、結果をテーブルやプログレスバーで表示する| コマンド | 説明 |
|---|---|
list |
保存されているキャッシュキーと関数名の一覧を表示 |
gc |
期限切れタスクと孤立Blobのクリーンアップを実行 |
stats |
キャッシュヒット率やストレージ使用量の統計を表示 |
ui |
ターミナルベースのインタラクティブダッシュボードを起動 |
--force)に対応する。def Spot(
name: str = "default",
storage_path: str | Path | None = None,
serializer: SerializerProtocol | None = None,
limiter: LimiterProtocol | None = None,
# ...その他の依存
) -> core.Spot: ...
storage_path が未指定の場合、.beautyspot/ 以下のプロジェクト名ディレクトリを使用するSQLiteTaskDB を生成し、マイグレーションを実行s3://等)を判定し、適切な BlobStorage 実装を生成core.Spot のコンストラクタに注入して返す| パラメータ | デフォルト値の導出 |
|---|---|
db |
.beautyspot/{name}.db |
blobs |
.beautyspot/blobs/{name}/ |
serializer |
MsgpackSerializer() |
policy |
WarningOnlyPolicy (閾値10MB) |
OSError を送出する。name で複数回呼び出した場合も、原則として新しいインスタンスを返すが、DBファイルへの接続は共有される(SQLiteのWALモードを活用)。class CacheManager:
def get_or_create_inflight(
self, key: str, is_async: bool
) -> ExecutionState: ...
_inflight 辞書をチェックするExecutionState を新規作成し、自分が「実行者 (Executor)」となるExecutionState を返し、自分は「待機者 (Waiter)」となるExecutionState の Event (または Future) をセットし、結果を共有する実行中のタスク状態(イベント・結果・Future)が GC によって消失しないよう、
_inflight 辞書を通じて強参照で保持する。WeakRef は使用しない。
_inflight 辞書はキャッシュキーをキーとし、以下の3要素タプルを値として保持する:
_inflight: dict[str, tuple[threading.Event, list[asyncio.Future], list]]
| 要素 | 型 | 役割 |
|---|---|---|
event |
threading.Event |
同期待機者への完了通知シグナル |
futures |
list[asyncio.Future] |
非同期待機者への結果配信チャネル |
result_box |
list |
結果の共有ボックス。[(success: bool, value: Any)] 形式 |
wait_herd_sync / wait_herd_async で _inflight にキーが存在しない場合、
_inflight_lock を保持した状態で新規タプルを挿入する_inflight 辞書がタプルへの唯一の管理参照を保持する。
待機者はロック取得時にタプル要素への参照を取得するが、_inflight エントリが正規の所有者であるnotify_and_cleanup_inflight で以下の順序で解放する:
a. _inflight_lock を取得し、エントリの同一性を event の identity (is) で確認する
b. 辞書からエントリを削除する(del _inflight[cache_key])
c. ロックを解放した後、event.set() で同期待機者に通知する
d. futures リスト内の各 asyncio.Future に結果またはエラーを伝播する_inflight 辞書への挿入・削除は常に _inflight_lock の保護下で行う_inflight 内には高々1つのエントリしか存在しないevent と同一オブジェクト(is 比較)を持つスレッドのみが行えるresult_box への書き込みは実行者スレッドのみが行い、待機者は読み取り専用とするfutures リストへの追加は _inflight_lock の保護下でのみ行う_inflight 辞書は CacheManager インスタンスの属性であり、CacheManager が
生存している限り、全てのインフライトエントリは GC の対象にならない。
CacheManager 自体は Spot インスタンスが所有するため、with spot: ブロック内での
安全性が保証される。
| 内部定数 | 値 | 説明 |
|---|---|---|
HERD_TIMEOUT |
300.0 |
待機者が実行者の完了を待つ最大秒数 |
HERD_MAX_RETRIES |
3 |
実行者が失敗またはタイムアウトした際のリトライ回数 |
class BlobStorageBase(ABC):
@abstractmethod
def save(self, key: str, data: ReadableBuffer) -> str: ...
@abstractmethod
def load(self, location: str) -> bytes: ...
@abstractmethod
def delete(self, location: str) -> None: ...
@abstractmethod
def list_keys(self) -> Iterator[str]: ...
@abstractmethod
def get_mtime(self, location: str) -> float: ...
| パラメータ | 型 | 説明 |
|---|---|---|
key |
str |
キャッシュキー(通常はハッシュ値) |
data |
bytes / memoryview |
シリアライズ済みバイナリ |
location |
str |
実体へのポインタ(ファイルパス、S3 URI等) |
CacheCorruptedError を送出すべきである。ReadableBuffer を受け入れることで、memoryview 等を使用したゼロコピー書き込みを可能にする。