本ドキュメントは、Python関数キャッシュおよびレート制限ライブラリ『beautyspot』の要件を定義する。
データパイプラインや機械学習の実験、外部API呼び出し等において、同一入力での不要な再実行を防ぐことで、実行時間の短縮とリソースの節約を実現する。開発者が最小限のコード変更で強力なキャッシュ・レート制限機能を導入できることを目指す。
本プロジェクト(Beautyspot)で使用される主要な用語の定義です。
仕様書アイテムに付与される groups 属性の意味は以下の通りです。
| グループ名 | 意味 / 対象領域 |
|---|---|
CACHE |
関数キャッシュの基本機能(同期・非同期のデコレータなど) |
KEY |
キャッシュキーの生成ロジック(引数のハッシュ化など) |
DB |
メタデータDB(有効期限、メタ情報の管理) |
STORE |
Blobストレージ(ペイロードの保存) |
SERIAL |
シリアライゼーション(データの変換処理) |
BGIO |
バックグラウンドIO(非同期書き込み処理) |
LIFE |
ライフサイクル管理(有効期限の判定、無効化ルールなど) |
LIMIT |
レート制限(トークンバケット方式等) |
HOOK |
コールバックフックの仕組み |
MAINT |
メンテナンス(キャッシュの削除、ガベージコレクション) |
CLI |
コマンドラインインターフェース |
DI |
依存注入の仕組みとデフォルトファクトリ |
HERD |
Thundering Herd対策(直列化、重複実行防止) |
COMMON |
共通定義(用語集など、特定の機能に縛られないもの) |
本章ではシステムが提供すべき具体的な機能要件について定義する。
関数の実行結果をキャッシュし、同一入力に対して再実行せずにキャッシュから結果を返せること。同期関数・非同期関数の両方をサポートすること。
関数の引数から安定かつ一意なキャッシュキーを生成できること。引数の型・構造に応じた正規化を行い、辞書のキー順序やコレクション型の違いを正しく区別できること。
キャッシュのメタデータ(関数名、入力ID、バージョン、有効期限等)をデータベースに永続化できること。
抽象インターフェースにより、DBやストレージなどのバックエンド実装を差し替え可能であること。
大きなキャッシュデータを外部ストレージ(ローカルファイルシステムやS3等)に保存できること。
DB直接保存とBlob保存を自動判定できること。
関数の戻り値を安全にシリアライズ・デシリアライズできること。ユーザー定義型のカスタムエンコーダ・デコーダを登録可能であること。
キャッシュの保存処理をバックグラウンドで非同期に実行し、関数の応答レイテンシに影響を与えないモードを提供すること。
キャッシュの有効期限を関数名パターンに基づくルールで設定できること。個別の関数に対して保持期間を指定でき、期限切れのキャッシュを自動的に無効化できること。
関数の実行頻度をトークンバケット方式で制限し、外部APIの呼び出しレートを制御できること。同期・非同期の両方に対応すること。
関数の実行前、キャッシュヒット時、キャッシュミス時にカスタムコールバック(フック)を実行できること。スレッドセーフなフック実装を提供すること。
期限切れキャッシュの削除、孤立Blobファイルのガベージコレクション、時間ベースの一括削除(prune)等のメンテナンス操作ができること。
全コンポーネント(DB、ストレージ、シリアライザ、リミッター等)が Protocol/ABC に基づく依存注入により差し替え可能であること。ファクトリ関数がデフォルトの組み立てを提供すること。
同一キャッシュキーに対する並行リクエストを直列化し、1つのリクエストのみが関数を実行し、他のリクエストはその結果を共有することで重複実行を防止できること。
本ドキュメントは、『beautyspot』のアーキテクチャ設計を定義する。
システムを構成する主要コンポーネントとその責務、およびコンポーネント間の相互作用を定義する。
graph TD
A[bs.Spot Factory] -->|DI/Composition| B[core.Spot]
B --> C[TaskDBBase]
B --> D[SerializerProtocol]
B --> E[BlobStorageBase]
B --> F[StoragePolicyProtocol]
B --> G[LimiterProtocol]
B --> H[LifecyclePolicy]
| コンポーネント | 責務 | インターフェース |
|---|---|---|
| core.Spot | キャッシュエンジン。キー生成→検索→実行→保存の制御 | mark(), cached_run() |
| TaskDBBase | メタデータ永続化。キャッシュキーによる検索と保存 | find_by_key(), insert() |
| SerializerProtocol | データのシリアライズ/デシリアライズ | pack(), unpack() |
| BlobStorageBase | 大規模データの外部ストレージ保存 | save(), load(), delete() |
| StoragePolicyProtocol | Blob保存の判定ポリシー | should_save_as_blob() |
sequenceDiagram
participant User as ユーザーコード
participant Spot as core.Spot
participant DB as TaskDB
participant Ser as Serializer
participant Blob as BlobStorage
User->>Spot: fn(args)
Spot->>DB: find_by_key(cache_key)
alt キャッシュヒット
DB-->>Spot: cached_data
Spot->>Ser: unpack(data)
else キャッシュミス
Spot->>Spot: fn(args) 実行
Spot->>Ser: pack(result)
Spot->>DB: insert(metadata)
opt Blob保存
Spot->>Blob: save(key, data)
end
end
Spot-->>User: result
| 技術領域 | 選定 | 理由 |
|---|---|---|
| メタデータDB | SQLite | ゼロ設定、組み込み可能、十分な性能 |
| シリアライズ | MessagePack | JSONより高速・コンパクト、バイナリ対応 |
| ストレージ | ローカルファイル / クラウド | 小規模はローカル、大規模は外部ストレージ等で拡張可能 |
graph TD
A[core.Spot] -->|キャッシュキー生成要求| B[KeyGenPolicy]
B -->|バインド| C[bound_keygen]
C -->|引数別戦略| D[Strategy]
C -->|正規化| E[canonicalize]
C -->|ハッシュ計算| F[KeyGen.hash_items]
E --> F
| コンポーネント | 責務 | インターフェース |
|---|---|---|
KeyGenPolicy |
関数の引数に対するハッシュ化戦略(無視、ファイル内容等)の宣言的定義と保持。 | bind(func) |
Strategy |
引数ごとの処理方法(DEFAULT, IGNORE, FILE_CONTENT, PATH_STAT)の定義。 | - |
canonicalize |
Pythonオブジェクトを再帰的に正規化し、Msgpackで安定してシリアライズ可能な状態に変換。 | @singledispatch canonicalize(obj) |
KeyGen |
正規化されたリストからSHA-256ハッシュを生成。レガシーデフォルトのハッシュ生成も担う。 | hash_items(items), from_file_content(path) |
sequenceDiagram
participant Spot as core.Spot
participant KP as KeyGenPolicy
participant Sig as inspect.signature
participant Can as canonicalize
participant KG as KeyGen
Spot->>KP: bind(func)
KP->>Sig: signature(func)
Sig-->>KP: bound_keygen 生成
KP-->>Spot: bound_keygen
Spot->>Spot: bound_keygen(*args, **kwargs)
loop 各引数
alt Strategy.IGNORE
Spot->>Spot: スキップ
else Strategy.FILE_CONTENT
Spot->>KG: from_file_content(path)
else Strategy.PATH_STAT
Spot->>KG: from_path_stat(path)
else Strategy.DEFAULT
Spot->>Can: canonicalize(val)
Can-->>Spot: 正規化済みデータ
end
end
Spot->>KG: hash_items(items_to_hash)
KG-->>Spot: SHA-256 ハッシュ文字列 (キャッシュキー)
| 技術領域 | 選定 | 理由 |
|---|---|---|
| 引数のバインド | inspect.signature |
argsとkwargsを定義順に正確にマッピングし、デフォルト値も適用するため |
| 正規化 | functools.singledispatch |
型ごとの正規化ロジックを拡張可能かつクリーンに実装するため |
| シリアライズ | msgpack |
高速でバイナリセーフであり、一貫したバイト列表現を得るため |
| ハッシュ関数 | SHA-256 | 衝突確率が極めて低く、暗号学的に安全かつ広くサポートされているため |
KeyGenPolicy を定義可能にし、柔軟なキャッシュ戦略をサポートする。classDiagram
class TaskDBCore {
<<Protocol>>
+init_schema() void
+get(cache_key) TaskRecord
+save(cache_key, ...) void
+delete(cache_key) bool
}
class Maintenable {
<<Protocol>>
+delete_expired() int
+prune(older_than) int
+delete_all() int
}
class TaskDBMaintenable {
<<Protocol>>
}
TaskDBCore <|-- TaskDBMaintenable
Maintenable <|-- TaskDBMaintenable
class SQLiteTaskDB {
-db_path: Path
-_write_queue: Queue
+init_schema() void
+get(cache_key) TaskRecord
+save(cache_key, ...) void
}
TaskDBMaintenable <|-- SQLiteTaskDB
| コンポーネント | 責務 | インターフェース |
|---|---|---|
TaskDBCore |
キャッシュ実行時に必要な最小限のメタデータDBアクセス | init_schema(), get(), save(), delete() |
Maintenable |
GCやCLI等、運用・保守に必要な拡張操作 | delete_expired(), prune(), delete_all() |
TaskDBMaintenable |
実行用と保守用の両方を備えた上位Protocol | TaskDBCore + Maintenable |
SQLiteTaskDB |
SQLiteを用いたデフォルト実装。非同期書き込みキューを内包 | TaskDBMaintenable 実装 |
sequenceDiagram
participant Spot as core.Spot
participant DB as SQLiteTaskDB
participant Q as WriteQueue
participant Thread as WriterThread
participant SQLite as SQLiteDB
Spot->>DB: get(cache_key)
DB->>SQLite: SELECT
SQLite-->>DB: TaskRecord
DB-->>Spot: TaskRecord
Spot->>DB: save(metadata)
DB->>Q: put(WriteTask)
DB-->>Spot: void (Non-blocking)
Thread->>Q: get()
Thread->>SQLite: INSERT / UPDATE
| 技術領域 | 選定 | 理由 |
|---|---|---|
| デフォルトメタデータDB | SQLite | ゼロ設定、組み込み可能、ローカルキャッシュとして十分な性能 |
| バックグラウンド書き込み | キュー + 専用スレッド | SQLiteの排他制御によるメインスレッドのブロックを防ぐため |
| 抽象化 | ProtocolベースのDI |
TaskDBCoreを満たせばPostgreSQL等に容易に差し替え可能 |
TaskDBCore, Maintenable) で定義され、利用側(core.Spotや_CacheManager)は実装に依存しない。graph TD
A[core.Spot] -->|判定| B[StoragePolicyProtocol];
A -->|保存/読込| C[BlobStorageBase];
D[LocalStorage] --> C;
E[S3Storage] --> C;
A -->|メタデータ| F[TaskDBBase];
| コンポーネント | 責務 | インターフェース |
|---|---|---|
| StoragePolicyProtocol | データのサイズ等に基づき、DB保存かBlob保存かを判定する | should_save_as_blob() |
| BlobStorageBase | 大規模データの外部ストレージ保存を抽象化する | save(), load(), delete() |
| LocalStorage | ローカルファイルシステムへのアトミックな保存と検証 | base_dir, os.replace |
| S3Storage | AWS S3互換オブジェクトストレージへの保存 | s3:// URI, boto3 |
sequenceDiagram
participant Spot as core.Spot
participant Policy as StoragePolicy
participant DB as TaskDB
participant Blob as BlobStorage
Spot->>Policy: should_save_as_blob(data)
alt Policy: True (Blob保存)
Spot->>Blob: save(key, data)
Blob-->>Spot: location (path/URI)
Spot->>DB: insert(metadata, storage_type=FILE, blob_key=location)
else Policy: False (Inline保存)
Spot->>DB: insert(metadata, storage_type=DIRECT_BLOB, result_data=data)
end
| 技術領域 | 選定 | 理由 |
|---|---|---|
| 保存判定 | 閾値ベース (Threshold) | DBの肥大化を防ぎつつ、小容量データのI/Oを最適化 |
| アトミック性 | 一時ファイル + Rename | 保存中のクラッシュや並行書き込みによる破損を完全に防止 |
| クラウド対応 | Boto3 (S3) | 業界標準のプロトコルによる高い互換性とスケーラビリティ |
安全性: LocalStorage では is_relative_to によるパストラバーサル防止を徹底
信頼性: 保存時は fsync を実行し、OSレベルでのデータ到達を確認
効率性: S3 保存時は upload_fileobj を使用し、5GB超のマルチパートアップロードに自動対応
graph TD
A[MsgpackSerializer] -->|Registry| B[TypeRegistryProtocol]
A -->|LRU Cache| C[Thread-Local Storage]
A -->|Core| D[msgpack-python]
| コンポーネント | 責務 | インターフェース |
|---|---|---|
| MsgpackSerializer | MessagePackベースの高速・コンパクトなバイナリ変換 | dumps(), loads() |
| TypeRegistryProtocol | ユーザー定義型(ExtType)のエンコーダ・デコーダ登録 | register() |
| Thread-Local Cache | free-threading環境でのロック競合を回避するMRO解決キャッシュ | OrderedDict (LRU) |
sequenceDiagram
participant User as ユーザーコード
participant Ser as MsgpackSerializer
participant Reg as TypeRegistry
participant MP as msgpack
User->>Ser: dumps(obj)
Ser->>Ser: 型チェック & MRO解決
opt カスタム型
Ser->>Reg: エンコーダ取得
Reg-->>Ser: encoder_fn
Ser->>MP: ExtType(code, encoder_fn(obj))
end
MP-->>Ser: binary
Ser-->>User: bytes
| 技術領域 | 選定 | 理由 |
|---|---|---|
| フォーマット | MessagePack | JSONより高速・コンパクト。バイナリをネイティブにサポート |
| スレッド安全 | Copy-on-Write (CoW) | 読み取りパスからロックを排除し、マルチスレッド性能を最大化 |
| 型拡張 | ExtType (0-127) | カスタム型を1バイトのオーバーヘッドで表現可能 |
ExtCode 未登録エラーを検知し、互換性問題を明示的に報告graph TD
A[core.Spot] -->|非同期タスク投入| B[_BackgroundLoop]
B -->|Daemon Thread| C[asyncio.AbstractEventLoop]
C -->|I/O| D[TaskDB / BlobStorage]
A -->|ライフサイクル制御| E[atexit / ContextManager]
| コンポーネント | 責務 | インターフェース |
|---|---|---|
| _BackgroundLoop | デーモンスレッドでのイベントループ管理とタスク投入 | submit(), run_forever() |
| core.Spot | 非同期保存のトリガーとエラーハンドリング | _save_metadata_async() |
| ContextManager | 処理終了時のタスク完了待機(Flush) | __exit__, flush() |
sequenceDiagram
participant Main as メインスレッド
participant BGLoop as _BackgroundLoop
participant Store as ストレージ
Main->>BGLoop: submit(save_coro)
Note over Main: 関数は即座に結果を返す (save_sync=False)
BGLoop->>Store: 書き込み実行 (I/O)
alt 成功
Store-->>BGLoop: OK
else 失敗
BGLoop->>Main: on_background_error(error)
end
| 技術領域 | 選定 | 理由 |
|---|---|---|
| 並行処理 | asyncio in Thread | GILを解放しつつ、多数のI/Oタスクを効率的に多重化 |
| スレッド種類 | Daemon Thread | アプリケーション終了時にプロセスをブロックしない |
| 終了制御 | atexit / drain | 強制終了時も、可能な限り保留中の書き込みを完了させる |
透過性: 保存失敗時もユーザーコードの例外にはせず、コールバック経由で通知
一貫性: flush() により、クリティカルな処理の直後での保存完了を保証
安全性: シャットダウンシーケンス中に新規タスクを受け付けない排他制御を導入
graph TD
A[core.Spot] -->|マッチング要求| B[LifecyclePolicy]
B -->|ルールリスト| C[Rule]
C -->|パターン照合| D[fnmatch]
B -->|パース| E[Retention]
| コンポーネント | 責務 | インターフェース |
|---|---|---|
| LifecyclePolicy | 複数のルールを管理し、関数名に最適な有効期限を決定する | resolve_with_fallback() |
| Rule | 1つのパターンとそれに対応する保持期間のペア | match(func_name) |
| Retention | 多様な時間表現(文字列/数値)を秒数に正規化する | parse_retention() |
sequenceDiagram
participant Spot as core.Spot
participant Policy as LifecyclePolicy
participant DB as TaskDB
Spot->>Policy: resolve_with_fallback("module.func")
Policy->>Policy: ルール順次照合 (fnmatch)
Policy-->>Spot: expires_at (timestamp)
Spot->>DB: save(..., expires_at)
Note over DB: get() 時に現在時刻と比較判定
| 技術領域 | 選定 | 理由 |
|---|---|---|
| パターンマッチ | fnmatch (Glob) | 正規表現よりも直感的で、開発者にとって馴染みのある指定方法 |
| 保持期間指定 | 文字列パーサー ("30d"等) | 設定ファイルやデコレータでの可読性を向上 |
| 判定タイミング | 遅延判定 (Lazy) | 書き込み・読み込み時のオーバーヘッドを最小化 |
Retention.FOREVER 等の定数により、グローバルポリシーの個別上書きに対応graph TD
A["@spot.consume"] -->|宣言的適用| B[LimiterProtocol]
C[core.Spot] -->|DI| B
D[TokenBucket] --> B
D -->|アルゴリズム| E[GCRA]
D -->|時刻同期| F[monotonic clock]
| コンポーネント | 責務 | インターフェース |
|---|---|---|
| TokenBucket | GCRAアルゴリズムによるスムースなトラフィック制御 | consume(), consume_async() |
| @spot.consume | 関数実行前のトークン消費を透過的に行うデコレータ | cost (int or callable) |
| LimiterProtocol | レートリミッターの差し替えを可能にするインターフェース | LimiterProtocol |
sequenceDiagram
participant User as ユーザーコード
participant Dec as @spot.consume
participant Bucket as TokenBucket
User->>Dec: call func()
Dec->>Bucket: consume(cost)
Bucket->>Bucket: 次回実行許可時刻の計算 (GCRA)
alt 許可
Bucket-->>Dec: OK
Dec->>User: execute func()
else 拒否 (Over rate)
Bucket-->>Dec: raise ValueError/RateLimitError
end
| 技術領域 | 選定 | 理由 |
|---|---|---|
| アルゴリズム | GCRA (Generic Cell Rate Algorithm) | 固定ウィンドウと違い「スムースな」流量制御が可能 |
| 待機制御 | asyncio.sleep / time.sleep | 同期・非同期の両方のコンテキストで正確なスロットリングを実現 |
| コスト計算 | ダイナミックコスト | 引数に応じて消費トークン数を動的に変更可能 |
正確性: time.monotonic() を使用し、システム時刻の変更(NTP同期等)の影響を受けない
バースト制御: max_burst 設定により、アイドル後の過度なバーストを防止
スレッド安全: threading.Lock により、マルチスレッド環境での二重消費を防止
graph TD
A[core.Spot] -->|イベント通知| B[HookBase]
C[ThreadSafeHookBase] --> B
B -->|コンテキスト| D[Context Objects]
E[PreExecuteContext] --> D
F[CacheHitContext] --> D
G[CacheMissContext] --> D
| コンポーネント | 責務 | インターフェース |
|---|---|---|
| HookBase | 実行ライフサイクルの各段階で呼び出される抽象基底クラス | pre_execute, on_cache_hit, on_cache_miss |
| ThreadSafeHookBase | RLockを用いて各フックメソッドの実行を直列化する | __init_subclass__ による自動ラップ |
| Context Objects | フックに渡される実行時の引数、結果、例外、メタデータのコンテナ | ctx.args, ctx.result 等 |
sequenceDiagram
participant Spot as core.Spot
participant Hook as HookBase
participant Fn as Target Function
Spot->>Hook: pre_execute(ctx)
alt キャッシュヒット
Spot->>Hook: on_cache_hit(ctx)
else キャッシュミス
Spot->>Fn: execute()
Fn-->>Spot: result
Spot->>Hook: on_cache_miss(ctx)
end
| 技術領域 | 選定 | 理由 |
|---|---|---|
| パターン | オブザーバー | コアロジックを汚さずに、ログ出力や統計取得等の横断的関心を分離 |
| スレッド安全 | 自動装飾 (Decorator) | ユーザーが意識せずに、サブクラス化するだけで安全なフックを実装可能 |
| 実行制御 | 準同期実行 | フック内の例外をキャッチしログ出力に留めることで、主処理を保護 |
透過性: フックの実行失敗(例外)は主処理の結果に影響を与えない
柔軟性: 関数ごとに異なるフックリストを hooks=[...] で指定可能
パフォーマンス: フック未登録時のオーバーヘッドを最小化するガード条件を実装
graph TD
A[MaintenanceService] -->|操作集約| B[TaskDBBase]
A -->|操作集約| C[BlobStorageBase]
A -->|自動実行| D[Probabilistic Eviction]
| コンポーネント | 責務 | インターフェース |
|---|---|---|
| MaintenanceService | DBとストレージを跨ぐクリーンアップ操作のファサード | clean_garbage(), prune() |
| TaskDBBase | 参照カウントや期限切れレコードの提供 | get_blob_refs(), delete_expired() |
| BlobStorageBase | 物理ファイルの列挙と削除 | list_keys(), delete() |
sequenceDiagram
participant Service as MaintenanceService
participant DB as TaskDB
participant Storage as BlobStorage
Service->>DB: get_blob_refs()
DB-->>Service: 有効な参照リスト (Whitelist)
Service->>Storage: list_keys()
Storage-->>Service: 全ファイルリスト
Service->>Service: 差分抽出 (孤立ファイルの特定)
Service->>Storage: delete(orphan_keys)
Note over Service: 猶予期間 (grace_period) を考慮して削除
| 技術領域 | 選定 | 理由 |
|---|---|---|
| 削除ポリシー | ホワイトリスト方式 | DBに記録がないBlobを削除対象とすることで、データの整合性を担保 |
| 競合防止 | 更新時刻確認 (mtime) | 保存中のファイルを誤って消さないよう、一定時間経過した孤立のみ削除 |
| 実行頻度 | 確率的オートエビクション | 書き込み時に低確率でGCをキックし、手動メンテなしでの健康度を維持 |
graph TD
A[beautyspot CLI] -->|Command| B[Typer App]
B -->|Business Logic| C[MaintenanceService]
B -->|Output| D[Rich Console]
B -->|Dashboard| E[Dashboard App]
| コンポーネント | 責務 | インターフェース |
|---|---|---|
| Typer App | コマンドライン引数のパースとサブコマンドのディスパッチ | main(), gc(), list() |
| MaintenanceService | 実際のキャッシュ管理ロジックの実行 | clean_garbage(), clear() |
| Rich Console | ターミナルでの視覚的に分かりやすいテーブルやパネルの描画 | Console, Table |
| Dashboard App | キャッシュ状態をインタラクティブに閲覧するためのTUI | dashboard.py |
sequenceDiagram
participant User as ユーザー
participant CLI as CLI App
participant Service as MaintenanceService
participant Output as Rich Console
User->>CLI: beautyspot gc --name my-project
CLI->>Service: clean_garbage()
Service-->>CLI: result (removed count, etc.)
CLI->>Output: print summary table
Output-->>User: formatted output
| 技術領域 | 選定 | 理由 |
|---|---|---|
| フレームワーク | Typer | 型ヒントに基づいた堅牢なコマンドラインインターフェースを迅速に構築 |
| 出力装飾 | Rich | プログレスバーやステータス表示により、長時間処理の進捗を可視化 |
| 連携 | Factory DI | カレントディレクトリや設定から自動的に Spot インスタンスを組み立て |
clear等)には対話的な確認プロンプトを表示graph TD
A[bs.Spot Factory] -->|構築| B[core.Spot]
A -->|DI| C[TaskDBBase]
A -->|DI| D[BlobStorageBase]
A -->|DI| E[SerializerProtocol]
A -->|DI| F[StoragePolicyProtocol]
A -->|DI| G[LimiterProtocol]
| コンポーネント | 責務 | インターフェース |
|---|---|---|
| bs.Spot (Factory) | 環境(名前、パス等)に応じたデフォルトコンポーネントの選定と組み立て | bs.Spot() |
| core.Spot | 注入された依存関係を使用して、キャッシュロジックをオーケストレーションする | mark(), cached_run() |
| Protocols / ABCs | 各コンポーネントが満たすべき契約(インターフェース)の定義 | SerializerProtocol 等 |
| 技術領域 | 選定 | 理由 |
|---|---|---|
| パターン | Constructor Injection | 依存関係を明示的に渡し、テスト時のMock差し替えを容易にする |
| インターフェース | Protocol (Duck Typing) | 厳密な継承を強制せず、構造的部分型によりサードパーティ実装を受け入れ |
| デフォルト設定 | 規約より構成 (CoC) | .beautyspot/ ディレクトリを基点とした標準パスを自動生成 |
core.Spot は具体的なクラス名(SQLite等)を知らず、Protocolのみに依存するgraph TD
A[core.Spot] -->|管理要求| B[CacheManager]
B -->|In-flight追跡| C[_inflight Dict]
C -->|同期待機| D[threading.Event]
C -->|非同期待機| E[asyncio.Future]
コンポーネント責務インターフェースCacheManagerキャッシュキーごとの実行状態管理と実行のシリアライズget_or_create_inflight()_inflight実行中のタスクを保持し、後続リクエストをイベントで待機させるExecutionStatecore.Spot待機結果の受け取りとエラーの伝播cached_run()
sequenceDiagram
participant T1 as Thread 1 (First)
participant T2 as Thread 2 (Second)
participant CM as CacheManager
participant Fn as Target Function
T1->>CM: get_or_create(key)
Note over CM: 新規作成 (Executor)
T2->>CM: get_or_create(key)
Note over CM: 既存あり (Waiter)
T1->>Fn: execute()
Fn-->>T1: result
T1->>CM: set_result(key, result)
Note over CM: Eventをセット
CM-->>T2: notify result
T1-->>T1: return result
T2-->>T2: return result
技術領域選定理由待機機構Event / FutureOS/ランタイムレベルの待機を使用し、CPU負荷(ビジーループ)を回避スコープキャッシュキー単位異なる関数の実行は妨げず、同一入力のみを直列化安全策タイムアウト & リトライ実行者がハングした場合に、待機側がデッドロックしないよう保護
一貫性: 実行者が成功すれば全員に結果を、失敗すれば全員に同じ例外を伝播
効率性: メモリリークを防ぐため、結果配布後に _inflight エントリを即座に削除
信頼性: 強参照 (StrongReference) で実行中のタスクを保持し、GCによる消失を防止
本ドキュメントは、『beautyspot』の各機能における詳細な仕様、インターフェース、およびアルゴリズムを定義する。
各コンポーネントが提供するAPI、内部状態遷移、および例外ハンドリングの詳細を定義する。
@spot.mark(
save_blob: Optional[bool] = None,
keygen: Optional[Union[Callable, KeyGenPolicy]] = None,
input_key_fn: Optional[Union[Callable, KeyGenPolicy]] = None,
version: str | None = None,
content_type: Optional[str | ContentType] = None,
serializer: Optional[SerializerProtocol] = None,
save_sync: Optional[bool] = None,
retention: RetentionSpec = None,
hooks: Optional[Sequence[HookBase]] = None,
)
def my_func(x, y): ...
inspect.iscoroutinefunction(fn) で判定する_execute_sync() に委譲する同期ラッパーを返す_execute_async() に委譲する非同期ラッパーを返す| パラメータ | デフォルト | 説明 |
|---|---|---|
save_blob |
None |
None: StoragePolicy に委譲、True: 強制Blob保存、False: DB内保存 |
keygen |
None |
キャッシュキー生成のカスタマイズ。引数やポリシーを指定可能 |
input_key_fn |
None |
(非推奨)keygen を使用すること |
version |
None |
バージョン文字列。変更するとキャッシュキーが変わり無効化される |
content_type |
None |
戻り値のMIMEタイプ |
serializer |
None |
None: Spotのデフォルトを使用。関数単位でオーバーライド可能 |
save_sync |
None |
保存処理を同期的に行うかどうか(Noneの場合はデフォルトに従う) |
retention |
None |
キャッシュの保持ポリシー("30d" などの文字列や期間オブジェクト等) |
hooks |
None |
関数単位のフックリスト(実行前後やキャッシュヒット時に発火) |
isgeneratorfunction または isasyncgenfunction)を渡した場合 → デコレーション時に ConfigurationError を送出するcached_run 利用時に引数として関数が一つも指定されなかった場合 → ValidationError を送出する__name__, __doc__ は functools.wraps によって保持される@mark (括弧なし) と @mark() (括弧あり) の両方の記法をサポートする@contextmanager
def cached_run(
self,
*funcs: Any,
**kwargs: Any
) -> Iterator[Any]: ...
Spot インスタンスからコンテキストマネージャとして呼び出される。funcs に渡された関数が0個の場合はエラーを送出する。Spot.mark(**kwargs) で生成したキャッシュデコレータを適用し、ラップされた関数を生成する。yield してコンテキストブロック内のユーザーコードに提供する。@spot.mark 適用済み関数と同様にキャッシュ機構を経由する。| パラメータ | 必須 | 説明 |
|---|---|---|
*funcs |
はい | キャッシュ機能を適用したい対象の関数。外部ライブラリの関数なども指定可能。複数指定可能。 |
**kwargs |
いいえ | Spot.mark デコレータに渡すオプション引数(save_blob, keygen, version, content_type, serializer, retention, save_sync, hooks 等)。 |
funcs が空(0個)の場合、beautyspot.exceptions.ValidationError を送出し、コンテキストに入らない。mark デコレータの制約に完全に依存する。KeyGen のスコープ規則に従う。@spot.mark() デコレータは同期関数と非同期(async def)関数の両方をサポートし、関数定義に応じて適切な実行・保存・キャッシュ検索フローを透過的に切り替える。
inspect.iscoroutinefunction(func) を用いて判定する。_execute_sync() に委譲する。ThreadPoolExecutor) にタスクを投げて非同期化される(save_sync=False の場合)。_execute_async() に委譲する。inspect.isgeneratorfunction または inspect.isasyncgenfunction で真となるジェネレータ・非同期ジェネレータが渡された場合、サポート対象外として即座に ConfigurationError を送出する。on_background_error コールバックが設定されていればそこに処理が委譲される。class KeyGen:
@staticmethod
def _default(args: tuple, kwargs: dict) -> str: ...
@staticmethod
def hash_items(items: list) -> str: ...
_default)args と kwargs を canonicalize() により再帰的に正規化する[args, kwargs] を MessagePack でバイナリにシリアライズするhash_items)KeyGenPolicy で使用される。| パラメータ | 型 | 説明 |
|---|---|---|
args |
tuple |
関数の位置引数 |
kwargs |
dict |
関数のキーワード引数 |
items |
list |
バインド済み引数の正規化値リスト |
RecursionError 発生時は、安全のため str((args, kwargs)) のハッシュをフォールバックとして使用するstr() ベースのハッシュを返す[(), {}] の構造として一定のハッシュ値を生成するstr() はオブジェクトによっては実行ごとに変わる可能性があるため、明示的な正規化が推奨される@singledispatch
def canonicalize(obj: Any) -> Any: ...
singledispatch を使用し、型に応じた最適な正規化形式へ再帰的に変換する。
("__bool__", value) のタプルに変換(1 と True の衝突を防止)[[k1, v1], [k2, v2], ...] のリストに変換"__list__") を付与し、要素を再帰的に正規化。集合型はソートを行う("__enum__", module, qualname, value) に変換__dict__ または __slots__ から属性を取得し、型名と共に正規化| オブジェクト型 | 正規化後の形式 | 備考 |
|---|---|---|
numpy.ndarray |
("__numpy__", shape, dtype, bytes) |
高速かつ正確なハッシュを保証 |
OrderedDict |
("__ordered_dict__", items) |
挿入順序を保持したまま正規化 |
Pydantic Model |
("__pydantic_v2__", schema) |
スキーマ情報をベースに判定 |
str(obj) にフォールバックするRecursionError をキャッチし、その階層で str() による正規化に切り替える_safe_sort_key により、Python 3 で比較不可能な型同士(例: int と str)が辞書のキーに含まれていても安定したソート順を保証するclass Strategy(Enum):
DEFAULT = auto() # 再帰的正規化
IGNORE = auto() # キー計算から除外
FILE_CONTENT = auto() # ファイルの内容をハッシュ
PATH_STAT = auto() # パス+サイズ+更新時刻をハッシュ
class KeyGenPolicy:
def bind(self, func: Callable) -> Callable[..., str]: ...
bind(func) 時に inspect.signature を解析し、引数名とデフォルト値を把握するStrategy を適用し、正規化された値のリストを作成するKeyGen.hash_items() で一つのハッシュにまとめる| 戦略 | 内容 | 用途 |
|---|---|---|
IGNORE |
キャッシュキーに含めない | verbose や logger 等、結果に影響しない引数 |
FILE_CONTENT |
ファイルを全読込してハッシュ化 | 入力ファイルの厳密な変更検知 |
PATH_STAT |
os.stat 情報を使用 |
大容量ファイルの高速な変更検知 |
FILE_CONTENT / PATH_STAT 指定時にファイルがない場合、"MISSING_{path}" という固定文字列をハッシュ対象とする"ERROR_{path}" を返すTypeError を送出する(実行前バリデーション)class SQLiteTaskDB(TaskDBBase):
def __init__(self, db_path: str | Path, timeout: float = 30.0): ...
def save(self, task: TaskRecord) -> None: ...
def get(self, input_key: str) -> TaskRecord | None: ...
_WriteTask キューに投入され、専用のバックグラウンドスレッドで直列に実行される。これにより SQLite の database is locked エラーを回避する。PRAGMA query_only = ON 状態で並行して実行される。tasks テーブルの存在を確認し、必要に応じて自動的に ALTER TABLE によるカラム追加(マイグレーション)を行う。| 設定 | デフォルト | 説明 |
|---|---|---|
journal_mode |
WAL |
書き込みと読み取りの並行性を高める設定 |
synchronous |
NORMAL |
パフォーマンスと耐久性のバランス設定 |
timeout |
30.0 |
データベースロック待機時間の閾値 |
sqlite3.Error をキャッチし、適切にラップして再送出する:memory: 指定時は、プロセスの生存期間中のみキャッシュが保持される@runtime_checkable
class TaskDBCore(Protocol):
def get(self, input_key: str) -> TaskRecord | None: ...
def save(self, record: TaskRecord) -> None: ...
class TaskDBBase(ABC):
# TaskDBCore を満たしつつ、メンテナンスのデフォルト実装を提供
| メソッド | 役割 |
|---|---|
get_blob_refs() |
全レコードの blob_key を列挙する(GC用) |
delete_expired() |
expires_at を経過したレコードを一括削除する |
get_history() |
実行履歴(統計)を取得する |
NotImplementedError を送出する代わりに、runtime_checkable による事前チェックを推奨する。TaskDBMaintenable 構成プロトコルとして扱う。class LocalStorage(BlobStorageMaintenable):
def __init__(self, base_dir: str | Path): ...
def save(self, key: str, data: bytes) -> str: ...
def load(self, location: str) -> bytes: ...
save)tempfile.mkstemp で一時ファイルを作成flush() および os.fsync() でディスク到達を保証os.replace で最終的なパスへリネーム(アトミックな置換)load / delete)base_dir の配下にあることを is_relative_to で厳密にチェックし、範囲外へのアクセスを遮断する。| パラメータ | 説明 | 制限 |
|---|---|---|
key |
保存ファイル名のベース | .. や / を含む場合は ValidationError |
location |
save が返した識別子 |
相対パス形式を推奨 |
load 時にファイルがない場合 CacheCorruptedError を送出PermissionError 発生時は、GCでの回収に委ねるため警告ログのみ出力OSError を送出/ と \ の両方を検証対象とするclass S3Storage(BlobStorageMaintenable):
def __init__(self, s3_uri: str, s3_opts: dict | None = None): ...
s3://bucket/prefix/key.bin 形式のURIをロケーション識別子として使用する。save)boto3.upload_fileobj を使用。5GBを超えるデータに対しては、自動的にマルチパートアップロードに切り替えられ、PutObjectの制限を回避する。get_mtime)head_object を使用し、データ本体をダウンロードせずに最終更新時刻(LastModified)を取得する。| 設定 | 内容 |
|---|---|
s3_uri |
s3://my-bucket/my-prefix 形式 |
s3_opts |
boto3.client('s3', **s3_opts) に渡される設定 |
boto3 がインポートできない環境でインスタンス化を試みると ImportError を送出botocore.exceptions.ClientError をキャッチし、状況に応じて CacheCorruptedError に変換class StoragePolicyProtocol(Protocol):
def should_save_as_blob(self, data: bytes) -> bool: ...
len(data) が threshold を超えた場合に True を返す。False を返すが、閾値を超えた場合にログ出力のみ行う(互換モード)。True を返す。| ポリシー型 | パラメータ | デフォルト | 説明 |
|---|---|---|---|
Threshold |
threshold |
なし | バイト単位の閾値。10MB等を推奨 |
Warning |
warning_threshold |
なし | 警告を出すサイズ閾値 |
False (DB保存) とみなし、エラーをログに記録する。len(data) > threshold のため、閾値と等しい場合はDB保存となる。class MsgpackSerializer(SerializerProtocol, TypeRegistryProtocol):
def dumps(self, obj: Any) -> bytes: ...
def loads(self, data: bytes) -> Any: ...
dumps)type(obj) で取得ExtType としてパックSerializationError)loads)ExtType を発見した場合、コードに対応するカスタムデコーダを呼び出す| 設定 | デフォルト | 説明 |
|---|---|---|
max_cache_size |
1024 |
スレッドごとの型解決LRUキャッシュの最大サイズ |
SerializationError を送出SerializationError を送出registry_generation カウンタを使用して、スレッドローカルキャッシュの整合性を維持def register(
code: int,
encoder: Callable[[Any], Any],
decoder: Callable[[Any], Any],
) -> Callable: ... # デコレータ形式
def register_type(
type_class: Type,
code: int,
encoder: Callable[[Any], Any],
decoder: Callable[[Any], Any],
) -> None: ...
_registry_generation をインクリメントし、全スレッドのLRUキャッシュを無効化対象とする| パラメータ | 制限 | 説明 |
|---|---|---|
code |
0 〜 127 |
MessagePack ExtType のユーザー定義領域コード |
encoder |
引数1、戻り値シリアライズ可能 | オブジェクトをプリミティブ構造へ変換する関数 |
decoder |
引数1、戻り値オブジェクト | プリミティブ構造からオブジェクトを復元する関数 |
ValueError を送出し、レジストリの破損を防ぐValueErrorclass _BackgroundLoop:
def submit(self, coro: Coroutine) -> None: ...
def stop(self) -> None: ...
def is_running(self) -> bool: ...
submit 時にデーモンスレッドを作成し、asyncio.run() を開始するrun_coroutine_threadsafe を使用して、メインスレッドからコルーチンをイベントループへ投入| 内部属性 | 型 | 説明 |
|---|---|---|
_loop |
AbstractEventLoop |
スレッド内で稼働するループ本体 |
_thread |
Thread |
daemon=True 設定のスレッド |
on_background_error コールバックで処理されるdef flush(timeout: float | None = None) -> bool: ...
@spot.mark(save_sync=False)
def my_func(): ...
save_sync=True)save_sync=False)BackgroundLoop に委譲し、即座に関数の戻り値を返す。flush)_inflight タスク(保存中)のリストを確認し、それら全てが完了するまで、またはタイムアウトまで待機する。| パラメータ | デフォルト | 説明 |
|---|---|---|
timeout |
None |
flush の最大待機時間(秒)。None は無限待機 |
on_background_error |
logger.error |
非同期保存失敗時のエラーハンドラ |
flush がタイムアウトした場合、False を返し、一部のデータが未保存である可能性を通知する。with spot: 終了時に自動的に flush が呼ばれ、プロセス終了前のデータ保存を確実にする。class LifecyclePolicy:
def add_rule(self, pattern: str, retention: Any) -> None: ...
def resolve_with_fallback(self, func_name: str) -> float | None: ...
順次照合: 登録された順にルールを走査する
パターンマッチ: fnmatch.fnmatch を使用し、関数名がパターンに合致するか判定
二段階解決:
最初に「完全修飾名 (module.qualname)」で照合
マッチしなければ「短縮名 (qualname)」で照合
結果返却: 最初にマッチしたルールの保持期間を秒数に変換して返す
パラメータ例説明pattern"my_mod.*", "*_test"Glob形式のパターンretention"30d", 3600保持期間の定義
default_retention パラメータで、どのルールにもマッチしない場合のデフォルト保持期間を指定できるLifecyclePolicy.default() のデフォルト値は "30d"(30日)None を指定すると無期限保持(従来互換)default_retention で指定された保持期間を返す(デフォルト: 30日)def parse_retention(val: Any) -> float | None: ...
"7d" (7日), "12h" (12時間), "30m" (30分), "10s" (10秒) 形式を秒数に変換する。大文字小文字は区別しない。int, float: 秒数としてそのまま扱う。timedelta: total_seconds() を取得。None: ポリシーに従う(または無期限)を意味する。| 単位 | 倍率 (秒) | 備考 |
|---|---|---|
d |
86400 | 日数 |
h |
3600 | 時間 |
m |
60 | 分 |
s |
1 | 秒 |
ValidationError を送出する。ValidationError となる。Retention.FOREVER (無限大の秒数) を提供し、ポリシーに関わらず永続化を指示できる。class TokenBucket(LimiterProtocol):
def consume(self, cost: float = 1.0) -> None: ...
async def consume_async(self, cost: float = 1.0) -> None: ...
TAT - 許容バースト 以前の場合、リクエストを拒否または待機させる。consume: 待機が必要な場合は time.sleep() でブロック。consume_async: 待機が必要な場合は asyncio.sleep() で非占有待機。| 設定 | デフォルト | 説明 |
|---|---|---|
tokens_per_minute |
必須 | 1分間に許可する平均リクエスト数(コスト合計) |
max_burst |
1.0 |
許容されるバースト数(トークン単位) |
RateLimitError (ValueErrorのサブクラス) を送出する。max_burst を超えるコストを持つ単一リクエストは、常に拒否される。TAT は現在時刻より過去に一定以上離れないよう補正される(過度なバーストの防止)。class HookBase:
def pre_execute(self, ctx: PreExecuteContext) -> None: ...
def on_cache_hit(self, ctx: CacheHitContext) -> None: ...
def on_cache_miss(self, ctx: CacheMissContext) -> None: ...
ThreadSafeHookBase を継承した場合、メタクラスが自動的に各メソッドを with self._lock: で包む| コンテキスト | 保持データ |
|---|---|
PreExecuteContext |
func, args, kwargs, cache_key |
CacheHitContext |
cache_key, result, metadata |
CacheMissContext |
cache_key, result, execution_time |
hooks=[h1, h2] と指定した場合、h1 -> h2 の順で実行される。class MaintenanceService:
def clean_garbage(self, grace_period: int = 3600) -> GarbageStats: ...
def prune(self, days: int, func_name: str | None = None) -> int: ...
def clear(self, func_name: str | None = None) -> int: ...
clean_garbage)delete_expired() でDBから古いタスクを除去blob_key とストレージ内の全ファイルを比較grace_period 以上経過したものを削除| パラメータ | デフォルト | 説明 |
|---|---|---|
grace_period |
3600 |
孤立Blobを削除するまでの猶予期間(秒)。並行実行中の保存を保護する |
days |
なし | prune で削除対象とする経過日数 |
clear(func_name) は前方一致ではなく、完全一致(または glob)で判定する。beautyspot list [--db DB_PATH]
beautyspot gc [--name PROJECT_NAME] [--force]
beautyspot stats
beautyspot ui
--db 指定がない場合、デフォルトの .beautyspot/ ディレクトリを探索するMaintenanceService または Spot インスタンスを生成し、対応するメソッドを呼び出すRich ライブラリを使用して、結果をテーブルやプログレスバーで表示する| コマンド | 説明 |
|---|---|
list |
保存されているキャッシュキーと関数名の一覧を表示 |
gc |
期限切れタスクと孤立Blobのクリーンアップを実行 |
stats |
キャッシュヒット率やストレージ使用量の統計を表示 |
ui |
ターミナルベースのインタラクティブダッシュボードを起動 |
--force)に対応する。def Spot(
name: str = "default",
storage_path: str | Path | None = None,
serializer: SerializerProtocol | None = None,
limiter: LimiterProtocol | None = None,
# ...その他の依存
) -> core.Spot: ...
storage_path が未指定の場合、.beautyspot/ 以下のプロジェクト名ディレクトリを使用するSQLiteTaskDB を生成し、マイグレーションを実行s3://等)を判定し、適切な BlobStorage 実装を生成core.Spot のコンストラクタに注入して返す| パラメータ | デフォルト値の導出 |
|---|---|
db |
.beautyspot/{name}.db |
blobs |
.beautyspot/blobs/{name}/ |
serializer |
MsgpackSerializer() |
policy |
WarningOnlyPolicy (閾値10MB) |
OSError を送出する。name で複数回呼び出した場合も、原則として新しいインスタンスを返すが、DBファイルへの接続は共有される(SQLiteのWALモードを活用)。class CacheManager:
def get_or_create_inflight(
self, key: str, is_async: bool
) -> ExecutionState: ...
_inflight 辞書をチェックするExecutionState を新規作成し、自分が「実行者 (Executor)」となるExecutionState を返し、自分は「待機者 (Waiter)」となるExecutionState の Event (または Future) をセットし、結果を共有する実行中のタスク状態(イベント・結果・Future)が GC によって消失しないよう、
_inflight 辞書を通じて強参照で保持する。WeakRef は使用しない。
_inflight 辞書はキャッシュキーをキーとし、以下の3要素タプルを値として保持する:
_inflight: dict[str, tuple[threading.Event, list[asyncio.Future], list]]
| 要素 | 型 | 役割 |
|---|---|---|
event |
threading.Event |
同期待機者への完了通知シグナル |
futures |
list[asyncio.Future] |
非同期待機者への結果配信チャネル |
result_box |
list |
結果の共有ボックス。[(success: bool, value: Any)] 形式 |
wait_herd_sync / wait_herd_async で _inflight にキーが存在しない場合、
_inflight_lock を保持した状態で新規タプルを挿入する_inflight 辞書がタプルへの唯一の管理参照を保持する。
待機者はロック取得時にタプル要素への参照を取得するが、_inflight エントリが正規の所有者であるnotify_and_cleanup_inflight で以下の順序で解放する:
a. _inflight_lock を取得し、エントリの同一性を event の identity (is) で確認する
b. 辞書からエントリを削除する(del _inflight[cache_key])
c. ロックを解放した後、event.set() で同期待機者に通知する
d. futures リスト内の各 asyncio.Future に結果またはエラーを伝播する_inflight 辞書への挿入・削除は常に _inflight_lock の保護下で行う_inflight 内には高々1つのエントリしか存在しないevent と同一オブジェクト(is 比較)を持つスレッドのみが行えるresult_box への書き込みは実行者スレッドのみが行い、待機者は読み取り専用とするfutures リストへの追加は _inflight_lock の保護下でのみ行う_inflight 辞書は CacheManager インスタンスの属性であり、CacheManager が
生存している限り、全てのインフライトエントリは GC の対象にならない。
CacheManager 自体は Spot インスタンスが所有するため、with spot: ブロック内での
安全性が保証される。
| 内部定数 | 値 | 説明 |
|---|---|---|
HERD_TIMEOUT |
300.0 |
待機者が実行者の完了を待つ最大秒数 |
HERD_MAX_RETRIES |
3 |
実行者が失敗またはタイムアウトした際のリトライ回数 |
class BlobStorageBase(ABC):
@abstractmethod
def save(self, key: str, data: ReadableBuffer) -> str: ...
@abstractmethod
def load(self, location: str) -> bytes: ...
@abstractmethod
def delete(self, location: str) -> None: ...
@abstractmethod
def list_keys(self) -> Iterator[str]: ...
@abstractmethod
def get_mtime(self, location: str) -> float: ...
| パラメータ | 型 | 説明 |
|---|---|---|
key |
str |
キャッシュキー(通常はハッシュ値) |
data |
bytes / memoryview |
シリアライズ済みバイナリ |
location |
str |
実体へのポインタ(ファイルパス、S3 URI等) |
CacheCorruptedError を送出すべきである。ReadableBuffer を受け入れることで、memoryview 等を使用したゼロコピー書き込みを可能にする。本ドキュメントは、『beautyspot』のテスト計画および検証内容を定義する。
各機能が仕様通りに動作することを確認するための検証観点およびテストシナリオを定義する。
@spot.mark() はユーザーが最も頻繁に使う公開APIであり、キャッシュの正確性・透過性・拡張性の基盤となる。デコレーションによって元の関数の挙動やメタデータが損なわれると、ユーザーコードのデバッグや型チェックに支障をきたすため、ラッパーの透過性を厳密に検証する。
save_blob=True 指定時にDB直接保存ではなくBlobストレージ経由で保存されること__name__, __doc__, inspect.signature が元の関数と一致することserializer= パラメータでデフォルトの MsgpackSerializer をカスタム実装に置換できることversion= パラメータを変更するとキャッシュキーが変わり、既存キャッシュがヒットしなくなることcached_run はデコレータを直接付与できない外部ライブラリ関数にキャッシュを適用する唯一の手段であり、このAPIの不具合はサードパーティ統合シナリオ全体に影響する。戻り値の型が関数の数で変わるスマートリターン仕様は、誤実装すると型安全性を損なうため重点的に検証する。
ValidationError が送出されることbeautyspot は同期・非同期の両方の関数を透過的にキャッシュするが、asyncio 固有のイベントループ制約(スレッドをまたぐコルーチン実行、例外伝播のタイミング差異)により、同期版とは異なる不具合が発生しやすい。非同期パスの堅牢性を独立して検証する。
inspect.iscoroutinefunction() により同期/非同期が正しく識別され、適切なラッパーが適用されることawait が1回の実行結果を共有することsave_sync=False 時に _BackgroundLoop 経由でキャッシュ保存が非同期実行されることキャッシュキーはキャッシュの同一性判定の基盤であり、ハッシュが不安定だとキャッシュミスの頻発(性能劣化)や誤ヒット(データ不整合)を引き起こす。Python バージョンやプラットフォームに依存しないキー生成の安定性を保証する。
func_identifier:input_id[:version] 入力に対して、複数回の呼び出しで常に同じ SHA-256 ハッシュが生成されることmodule.qualname 形式の func_identifier を含む正しい構造であることcanonicalize() は引数の型ごとに正規化ルールを適用し、論理的に同値な入力が同じキャッシュキーを生成することを保証する。正規化の不備は「同じ入力なのにキャッシュミス」または「異なる入力なのに誤ヒット」という致命的なバグに直結する。
{"a": 1, "b": 2} と {"b": 2, "a": 1} が同一キーを生成すること[1, 2] (list) と (1, 2) (tuple) が異なるキーを生成すること(型タグ付き正規化)型名.メンバー名 で正規化されることshape + dtype + tobytes による正規化で、同値配列が同一キーを生成することTrue と 1 が異なるキーを生成すること(Python では True == 1 だが区別が必要)KeyGenPolicy は引数ごとのキー生成戦略をカスタマイズする機構であり、ログレベルやデバッグフラグのような非決定的引数をキーから除外したり、ファイルパスの代わりにファイル内容でキーを生成するユースケースを支える。戦略の誤適用はキャッシュの正確性を根本から損なう。
KeyGenPolicy.bind(func) が関数シグネチャ(位置引数・キーワード引数・デフォルト値)を正しく解釈すること@spot.mark(keygen=KeyGen.ignore("verbose")) のようにデコレータ経由で戦略が正しく適用されることSQLiteTaskDB はキャッシュメタデータの永続化を担う中核コンポーネントであり、DB の不整合はキャッシュの消失や重複実行を引き起こす。WALモード・専用ライタースレッドという独自アーキテクチャの信頼性を保証する。
TaskDBCore / TaskDBBase のプロトコル階層は、サードパーティによるカスタムDB実装(Redis、PostgreSQL 等)の差し替えを可能にする拡張ポイントである。インターフェース契約が不明確だと、カスタム実装が実行時に予期しないエラーを起こす。
TaskDBCore の必須メソッド(save, load, delete 等)を実装したクラスが isinstance チェックをパスすることTaskDBBase がメンテナンス系メソッド(prune, stats 等)のデフォルト実装を提供し、サブクラスが必須メソッドのみの実装で動作することbs.Spot(db=CustomDB()) でプロトコル準拠のカスタムDBが正常に動作することLocalStorage はBlobデータのファイルシステム永続化を担い、並行書き込み時のデータ破損やパストラバーサルによるセキュリティ脆弱性が発生しうる。アトミック書き込みとセキュリティバリデーションの正確性を保証する。
base_dir が存在しない場合に自動作成され、.gitignore が配置されることtempfile.mkstemp → fsync → os.replace のフローで、書き込み中のクラッシュや並行アクセスでファイルが破損しないこと.. や / を含む場合に ValidationError が送出され、base_dir 外へのアクセスが不可能であること。load() でも is_relative_to 検証が機能することclean_temp_files() が猶予期間超過の .spot_tmp ファイルを削除し、猶予期間内のファイルは保持することdelete() が存在しないキーに対してもエラーを送出しないことS3Storage はクラウド環境での大規模Blob保存を担い、ネットワーク障害・認証エラー・バケット不在などオンプレミスにはない障害モードが存在する。boto3 オプショナル依存のガード処理も含め、クラウドストレージ統合の信頼性を検証する。
save / load / delete / list_keys が正しく動作することimport 時にわかりやすいエラーメッセージが表示されることストレージポリシーは「データをDB直接保存するか、Blobストレージに分離するか」を決定する戦略レイヤーである。閾値判定の誤りはDBの肥大化(性能劣化)や不要なBlob分離(オーバーヘッド増加)を招く。3種のポリシー実装がそれぞれの契約を正しく満たすことを検証する。
False、閾値以上のデータで True を返すこと。境界値(ちょうど閾値サイズ)での挙動が明確であることFalse を返しDB直接保存を選択すること。ただし閾値超過時に WARNING レベルのログが出力されることTrue を返すこと@spot.mark(save_blob=True/False) の明示指定がポリシー判定より優先されること。save_blob=None(デフォルト)時にポリシーの should_save_as_blob() が呼ばれることMsgpackSerializer はキャッシュデータの永続化形式を決定する。シリアライズの不具合はデータ消失(デシリアライズ不能)やサイレントなデータ劣化(精度低下等)に直結する。スレッドセーフ性の欠如はマルチスレッド環境での競合状態を引き起こす。
dumps → loads で元の値と一致することカスタム型登録はユーザー定義クラスのキャッシュを可能にする拡張ポイントである。登録の不備は SerializationError によるキャッシュ保存失敗を引き起こし、ユーザーの既存コードとの統合を阻害する。デコレータ方式と命令方式の両方の登録パスを検証する。
@spot.register(code=N, encoder=..., decoder=...) でクラスが登録され、そのインスタンスのシリアライズ/デシリアライズが正常に動作することspot.register_type(MyClass, code=N, ...) でも同等の登録が行われることmodel_validate をデコーダとして使用する場合に、バリデーション付きの復元が正しく動作すること_BackgroundLoop はデーモンスレッドで asyncio イベントループを駆動し、非同期キャッシュ保存を処理する。スレッド間のコルーチン受け渡しは競合状態やデッドロックが発生しやすく、シャットダウン時のタスク消失はデータロスに直結する。
submit(coro) で投入したコルーチンが正しく実行され、結果がDBとストレージに反映されることdrain_timeout 秒以内に完了すること。タイムアウト後は強制終了されることsubmit() 呼び出しが受け付けられないことsave_sync パラメータと flush / drain メカニズムは、キャッシュ保存のレイテンシとデータ安全性のトレードオフをユーザーが制御する手段である。save_sync=False 使用時にデータが失われないことと、コンテキストマネージャによる確実なフラッシュを保証する。
spot.flush(timeout) 呼び出しで全ペンディング保存が完了するまで待機すること。タイムアウト時の挙動が明確であることwith spot: ブロック終了時に未完了の保存が自動的に drain されることwith ブロックで再利用でき、各ブロック終了時に適切に drain されることon_background_error コールバックが呼ばれ、メインスレッドには影響しないことLifecyclePolicy はキャッシュデータの保持期間を関数名パターンで制御する。パターンマッチングの不備は、重要なキャッシュの意図しない削除(データロス)や不要なキャッシュの蓄積(ディスク圧迫)を引き起こす。
"train_*", "*.preprocess" 等)がルールの func_pattern として正しく機能し、マッチした関数に対して指定の Retention が適用されることmodule.qualname)でルールが見つからない場合に、短縮名(qualname のみ)でフォールバック検索が行われることdefault_retention で指定された保持期間(デフォルト: 30日)が適用されることdefault_retention=None を指定した場合に無期限保持(None)が返されることRetention はキャッシュの有効期間をユーザーフレンドリーな形式で指定する値オブジェクトである。パースの不備は意図しない保持期間(例: "7d" を 7秒と誤解釈)を招き、重要なキャッシュの早期削除やディスクの際限ない肥大化に繋がる。
"7d" → 7日、"12h" → 12時間、"30m" → 30分、"10s" → 10秒として正しく解釈されることtimedelta(days=7) がそのまま保持期間として受け入れられることRetention.FOREVER が無期限保持を表し、シングルトンであること(is 比較が成立)"-1d")やゼロ("0s")が ValidationError を送出すること。不正な形式("abc", "" 等)も拒否されることTokenBucket はAPI呼び出しやリソースアクセスのレート制限を実現する。レートリミッターの不具合はAPIの過負荷(制限が甘い場合)やスループットの不必要な低下(制限が厳しすぎる場合)を招く。GCRA アルゴリズムのスムーズなレート制御を検証する。
Hook システムはキャッシュライフサイクルへのユーザー拡張ポイント(ロギング、メトリクス収集、監査等)であり、フック内の不具合がメインの関数実行に影響を与えないことが最重要の契約である。また ThreadSafeHookBase の自動ロック機構の正確性を保証する。
pre_execute → 関数実行 → on_cache_miss の順でフックが呼ばれること。キャッシュヒット時に pre_execute → on_cache_hit の順で呼ばれることPreExecuteContext, CacheHitContext, CacheMissContext)が正しい情報(func_name, args, result 等)を含むこと__init_subclass__ によりユーザー定義メソッドが自動的に RLock でラップされ、複数スレッドからの同時呼び出しで競合状態が発生しないことMaintenanceService は CLI やダッシュボードからのシステム管理操作を仲介するサービス層である。タスク詳細の表示ミスは運用者の誤判断を招き、メンテナンス操作の不備はデータの意図しない削除に繋がる。
None が返却され、例外が送出されないことCLI はユーザーがキャッシュの状況確認・管理を行う主要インターフェースであり、コマンドの不具合はユーザー体験と運用効率に直接影響する。CliRunner によるコマンド実行と出力の正確性を E2E で検証する。
--force フラグでプロンプトがスキップされること@mark でキャッシュ保存 → beautyspot list で確認 → beautyspot clear で削除、という一連のワークフローが正常に動作することbs.Spot() ファクトリ関数は全コンポーネントの DI 配線を担う唯一のパブリックエントリポイントであり、デフォルト構成の正確性とカスタム実装の差し替え可能性がシステム全体の柔軟性と正確性を決定する。
SQLiteTaskDB, LocalStorage, MsgpackSerializer, TokenBucket, WarningOnlyPolicy が自動注入されること。ワークスペースディレクトリ(.beautyspot/)が自動作成されることTaskDBBase / TaskDBCore プロトコル準拠のカスタムDBを db= で注入し、正常にキャッシュ操作が行えることBlobStorageBase 準拠のモック Storage を storage_backend= で注入し、Blob保存が委譲されることSerializerProtocol 準拠のカスタム Serializer を注入し、シリアライズ処理が差し替わることbs.Spot() の戻り値型が pyright / mypy で正しく推論されることThundering Herd Protection は、同一キーへの並行リクエストが一斉に関数を実行する「Thundering Herd」問題を防ぐ。この保護が不完全だと、重い計算やAPI呼び出しが不要に多重実行され、リソース浪費やレートリミット超過を招く。並行性バグはテスト困難であるため、複数の観点から網羅的に検証する。
asyncio.Task が同一キーで同時にキャッシュミスした場合も、関数が1回のみ実行されること_inflight エントリがクリーンアップされることHERD_TIMEOUT(300秒)を超過した場合にタイムアウト処理が発動し、無限待機にならないことHERD_MAX_RETRIES(3回)の範囲でリトライが行われ、上限超過で最終エラーが返ること_inflight 辞書の状態遷移(エントリ追加→結果セット→エントリ削除)が正しい順序で行われることBlobStorageBase は全ストレージバックエンド(LocalStorage, S3Storage, サードパーティ)が
準拠すべき抽象契約を定義する。この契約が正しく機能しないと、キャッシュデータの
保存・復元・削除・メンテナンス(GC)の全てが破綻する。
LocalStorage を具象実装として使い、インターフェース契約を網羅的に検証する。
bytes, bytearray, memoryview の全てを受け入れること(ゼロコピー書き込み)b"") を正常に保存・復元できることCacheCorruptedError を送出すること本ドキュメントは、『beautyspot』の実装記録(ジャーナル)を保持する。
ソースコードの実装構造、設計判断の根拠、および技術的負債について記録する。
Spot.mark() メソッドが本仕様の中核。二段階デコレータファクトリとして、
オプションを受け取る外側の mark() と、関数をラップする内側の decorator() で構成される。
sync/async の判定は inspect.iscoroutinefunction() でデコレーション時に行い、
同期関数は _execute_sync()、非同期関数は _execute_async() に委譲する。
functools.wraps(fn) により元関数のメタデータ(__name__, __doc__, __module__, __qualname__)を保持する。
@mark() (括弧あり)と @mark(括弧なし)の両方をサポートするため、
引数の型で分岐するパターンを採用した。第一引数が callable なら括弧なし、
そうでなければ括弧ありと判定する。
デコレーション時に判定する方式を採用。呼び出し時に毎回 inspect する方式と比較し、
ランタイムオーバーヘッドがゼロになる利点がある。ただしデコレーション後に
関数の性質が動的に変わるケースには対応できない(実用上問題なし)。
inspect.isgeneratorfunction() と inspect.isasyncgenfunction() の
両方をチェックし、ConfigurationError を送出する。
ジェネレータの戻り値はイテレータであり、キャッシュの意味論と矛盾するため。
inspect.signature の保持は functools.wraps が設定する __wrapped__ 属性に依存する_resolve_settings() で Spot レベルのデフォルト → 関数レベルのオーバーライドの順で行われるkeygen パラメータが指定された場合、KeyGenPolicy.bind(fn) でシグネチャにバインドされたキー生成関数に変換される@mark を2回付ける)時の検出・警告が未実装Spot.cached_run() はコンテキストマネージャとして実装。
渡された関数群を内部で mark() と同等のラッピングを行い、
コンテキスト内で呼び出すとキャッシュが効く一時的なラッパーを返す。
単一関数の場合は結果を直接返し、複数関数の場合はタプルで返す。
これにより wrapped_fn = spot.cached_run(fn) のように自然に書ける。
0個の場合は ValidationError で早期失敗させる。
@contextmanager デコレータを使用。__enter__ でラップ済み関数を返し、
__exit__ で後処理(drain 等)は with spot: に委譲する設計。
@overload を使用しているfunctools.wraps が元関数に適用されるSpot._execute_sync() と Spot._execute_async() が実行エンジンの中核。
両メソッドは同一のフロー(キー生成→キャッシュ検索→Herd待機→関数実行→保存)を
それぞれ同期/非同期のセマンティクスで実装する。
mark() がデコレーション時に inspect.iscoroutinefunction() で判定し、
適切な方を選択する。
_execute_sync と _execute_async は処理フローが同一だが、
await の有無やロック機構(threading.Event vs asyncio.Future)が異なるため、
共通化せず並行して実装している。DRY 原則よりも明瞭性・デバッグ容易性を優先した。
Herd protection の結果ボックスへの結果格納は、DB/Blob 保存の前に行う。 これにより、保存が失敗しても待機中のスレッド/タスクは結果を受け取れる。 「保存の失敗で実行結果が失われる」ことを防ぐ意図的な設計。
save_sync=False 時の保存エラーは on_background_error コールバックに
通知し、ERROR ログを出力するが、例外を再送出しない。
関数の実行自体は成功しているため、ユーザーに例外を見せるのは不適切と判断した。
pre_execute, on_cache_hit, on_cache_miss)は
try/except で囲み、フック内の例外が実行結果に影響しないようにしている_eviction_guard_lock で非ブロッキングにガードし、
並行エビクションの重複実行を防止している_execute_async 内の保存処理は _BackgroundLoop.submit() でコルーチンとして投入されるKeyGen クラスの静的メソッド群がキャッシュキー生成を担う。
_default(args, kwargs) が主要なエントリポイントで、引数を
canonicalize() で正規化 → msgpack でシリアライズ → SHA-256 でハッシュする。
最終キーは func_identifier:input_id[:version] 形式の文字列を SHA-256 した値。
MD5 より衝突耐性が高く、Python 標準ライブラリの hashlib で利用可能。
キャッシュキーとして64文字の hex 文字列は十分にコンパクトで、
DB のインデックスとしても効率的。
正規化結果を直接 str() でハッシュする方式と比較し、
msgpack はバイナリ表現が安定しており、Python バージョン間での
repr() の差異に影響されない。
from_file_content() はファイルを 65KB チャンクで読み取り、
拡張子をハッシュに含める。大きなファイルでもメモリ効率が良い。
from_path_stat() は mtime + size のみでハッシュし、
ファイル内容の読み取りを避ける高速版。ファイル不在時は
"MISSING_{filepath}" を返し、不在自体をキーに反映する。
hash_items() は汎用リストハッシュ。内部でも _default でも使われるcanonicalize() は functools.singledispatch で実装された再帰的正規化関数。
型ごとにディスパッチハンドラを登録し、ネストされたデータ構造を再帰的に正規化する。
各ハンドラは型タグ付きのタプルを返し、異なるコレクション型が同じ内容でも
区別されるようにする。
if/elif チェーンと比較して、新しい型のサポート追加が局所的で、 既存コードへの影響がない。numpy や Pydantic のようなオプショナル依存の 型ハンドラも、条件付きで登録できる。
[1, 2](list)と (1, 2)(tuple)を区別するため、
正規化結果に ("__list__", ...), ("__tuple__", ...) のように
型タグを含める。これにより、構造的に同一でも型が異なるデータが
異なるキャッシュキーを生成する。
Python では True == 1 かつ isinstance(True, int) だが、
キャッシュキーとしては区別が必要。singledispatch は MRO 順でマッチするため、
bool を int より先にチェックするインライン処理を base handler に配置した。
__dict__ と __slots__ の両方を MRO 走査で収集する。
__dict__ スロット自体はスキップするOrderedDict は順序が意味を持つため、キーのソートを行わない(dict とは異なる)defaultdict は default_factory を無視し、内容のみで正規化するEnum は 型名.メンバー名 + value で正規化し、モジュール・qualname も含めるstr() にフォールバックし、警告を出力する(非決定的)str() にフォールバックするが、安定性は保証されないStrategy enum(DEFAULT, IGNORE, FILE_CONTENT, PATH_STAT の4種)と
KeyGenPolicy クラスで構成。KeyGenPolicy は引数名→戦略のマッピングを保持し、
bind(func) で inspect.signature() を使って関数シグネチャにバインドされた
キー生成関数を返す。ファクトリメソッド KeyGen.ignore(), KeyGen.map(),
KeyGen.file_content(), KeyGen.path_stat() で宣言的にポリシーを構築できる。
デコレーション時に inspect.signature(func) を呼び、
位置引数・キーワード引数・デフォルト値を解決する。
これにより、呼び出し時にはシグネチャ解析のオーバーヘッドがなくなる。
引数ごとの戦略を enum で宣言する方式を採用。
関数を渡す方式(keygen=lambda ...)と比較して、
シリアライズ可能で、デバッグ時の可読性が高い。
bind() 内で RecursionError を catch し、警告を出力してフォールバックするFILE_CONTENT 戦略は KeyGen.from_file_content() に委譲し、65KB チャンク読み取りを行うPATH_STAT 戦略は KeyGen.from_path_stat() に委譲し、mtime + size でハッシュするIGNORE 戦略の引数はキー計算から完全に除外される(値に関係なくキャッシュヒット)SQLiteTaskDB クラスが中核。Writer Thread + Reader Threads パターンで実装。
単一の _writer_thread(デーモン)が _write_queue から書き込みタスクを取り出して
直列処理し、読み取りは threading.local() のスレッドローカル接続で並行実行する。
WAL モードにより読み取りと書き込みが並行可能。
SQLite は単一書き込み接続しかサポートしないため、全書き込みを1つのスレッドに 集約する Producer-Consumer パターンを採用。キュー経由でタスクを受け渡し、 ライタースレッドがコミット/ロールバックを管理する。 マルチスレッドからの並行書き込みによるロック競合を根本的に回避する。
threading.local() に _ReadConnWrapper を格納し、スレッドごとに
読み取り専用接続(PRAGMA query_only = ON)を保持する。
接続エラー時は自動的に再作成し、リーク防止のため WeakSet で全接続を追跡する。
書き込みタスクを PENDING → RUNNING → DONE(または CANCELLED)の
状態マシンで管理。タイムアウト時の try_cancel() により、
キュー内で待機中のタスクを安全にキャンセルできる。
既に RUNNING 状態のタスクはキャンセルせず完了を待つ。
PRAGMA table_info で既存カラムを確認し、
不足カラム(content_type, version, result_data, func_identifier, expires_at)を追加するget() 時に expires_at < NOW を確認し、期限切れなら None を返す。
物理削除は beautyspot gc コマンドに委譲するflush() は空の no-op タスクをキューに投入し、その完了を待つことで
先行する全書き込みの完了を保証するshutdown() は全読み取り接続のクローズ → _STOP センチネルの投入 →
ライタースレッドの join で安全に停止する_read_connect() のダブルチェックパターンで TOCTOU 競合を回避しているDB 層のプロトコル階層を定義。TaskDBCore(ランタイム必須の CRUD)、
Flushable(書き込み同期)、Shutdownable(安全な停止)、
Maintenable(GC・統計等の管理操作)の4つのプロトコルと、
それらを統合した TaskDBMaintenable を提供する。
TaskDBBase は ABC として Maintenable のデフォルト実装(安全な no-op)を提供する。
単一の巨大インターフェースではなく、責務ごとにプロトコルを分割した。
これにより、カスタム DB 実装は TaskDBCore のみ実装すればランタイムで動作し、
メンテナンス機能は段階的にオプトインできる。
TaskDBBase のメンテナンスメソッド(delete_expired(), prune() 等)は
デフォルトで空リスト返却や no-op を行い、サブクラスが未実装でも安全に動作する。
CLI の gc コマンドが未対応の DB バックエンドでもエラーにならない設計。
Flushable.flush(timeout) のデフォルトは no-op。SQLiteTaskDB のみがキュー同期を実装Shutdownable.shutdown(wait) のデフォルトは no-op。リソースを持たない軽量実装に配慮isinstance(db, Maintenable) で機能の有無を判定できるLocalStorage クラスが BlobStorageBase を実装。 ファイルは {base_dir}/{key}.bin 形式で保存される。
save() は tempfile.mkstemp() → fsync() → os.replace() の アトミック書き込みパターンで実装。
一時ファイルに書き込み → fsync でディスクに確実に反映 → os.replace で アトミックにリネーム。この3段階により、書き込み中のクラッシュや 並行アクセスでファイルが半壊状態になることを防ぐ。 一時ファイルには .spot_tmp 接尾辞を使用し、残存時のクリーンアップを容易にした。
save() / _validate_key(): キーに .., /, \ を含む場合に ValidationError
load() / delete(): 解決済みパスが base_dir.is_relative_to() を満たすか検証 二重のチェックにより、キー経由とロケーション経由の両方のパストラバーサルを防ぐ。
初期化時に base_dir を絶対パスに正規化し、ディレクトリと .gitignore を自動作成
delete() は冪等。ファイル不在時もエラーを送出しない。パーミッションエラーは警告のみ
list_keys() は rglob("*.bin") でレガシーサブディレクトリ構造にも対応
clean_temp_files() は猶予期間(デフォルト24時間)超過の .spot_tmp ファイルのみ削除。 ファイルロック(アンチウイルス等)のエラーは安全に無視する
prune_empty_dirs() はボトムアップで走査し、.DS_Store 等のシステムファイルのみの ディレクトリも空とみなして削除する。base_dir 自体は保持する
S3Storage クラスが BlobStorageBase を実装。
s3://bucket/prefix/ 形式の URI を解析し、boto3 の S3 クライアントを使用。
ファイルは {prefix}/{key}.bin のキーで S3 に保存される。
boto3 は import 時にガードし、未インストール時は
クラス定義は成功するがインスタンス化で明確なエラーメッセージを表示する。
beautyspot 全体が boto3 に依存しないよう、遅延インポートパターンを採用。
s3://bucket/prefix 形式の URI から bucket と prefix を自動解析する
_parse_s3_uri() ヘルパーにより、ファクトリ関数がスキームに基づいて
LocalStorage と S3Storage を透過的に切り替えられる。
save() は upload_fileobj() を使用し、5GB 超のデータにも
マルチパートアップロードで対応するs3://bucket/prefix/key.bin)を返すlist_keys() は paginator で全キーを列挙し、prefix を除いた相対キーを返すget_mtime() は head_object で LastModified を取得(GC の grace period 判定用)StoragePolicyProtocol が should_save_as_blob(data: bytes) -> bool を定義。
3つの実装を提供:
- ThresholdStoragePolicy: len(data) > threshold で判定
- WarningOnlyPolicy: 常に False を返し、閾値超過時に WARNING ログを出力
- AlwaysBlobPolicy: 常に True を返す
保存先判定ロジックを core.Spot から分離し、差し替え可能なポリシーオブジェクトに
委譲する。@mark(save_blob=True/False) の明示指定はポリシーより優先されるが、
save_blob=None(デフォルト)時にポリシーが判定を行う。
v2.0 との後方互換性を維持するため。v2.0 では全データが DB 直接保存だったため、
デフォルトを ThresholdStoragePolicy にすると既存ユーザーの挙動が変わる。
WARNING ログにより、ユーザーに Blob 分離の恩恵を段階的に周知する。
threshold パラメータを属性として保持WarningOnlyPolicy のログは warnings.warn ではなく logging.warning を使用。
ユーザーコードの警告フィルタに影響されないためMsgpackSerializer クラスが SerializerProtocol と TypeRegistryProtocol を実装。
dumps(obj) -> bytes / loads(data) -> Any が主要 API。
カスタム型は msgpack の ExtType(code, payload) で拡張する。
_default_packer() がエンコード時のカスタム型ディスパッチ、
_ext_hook() がデコード時の型復元を行う。
_encoders / _decoders 辞書は register() 時に新しい辞書を作成して
アトミックに差し替える(CoW)。読み取り側はスナップショットを参照するため、
ロックなしで安全に読める。GIL に依存せず、PEP 703(free-threading)にも
対応できる設計。世代カウンタ _registry_generation をレジストリ差し替えの前に
インクリメントすることで、読み取り側が古いキャッシュを使い続けることを防ぐ。
MRO スキャンの結果(サブクラス→登録済み親クラスの解決)を threading.local() の
OrderedDict にキャッシュする。_cache_generation と _registry_generation を比較し、
世代が変わったらキャッシュを破棄する。LRU のサイズ上限は max_cache_size で制御。
Pydantic モデルのサブクラスのように、親クラスが登録済みなら
サブクラスも自動的に同じエンコーダで処理できる。type(obj).__mro__ を走査し、
最初にマッチした登録型のエンコーダを使用する。
register() の code は 0-127(msgpack ExtType の制約)ConfigurationError で拒否_ext_hook 内のデシリアライズは再帰的に msgpack.unpackb(ext_hook=self._ext_hook) を
呼ぶことで、ネストされたカスタム型も復元できるSerializationError のメッセージには未登録型のヒント(spot.register() の使い方)を含める2つのレイヤーで構成:
- MsgpackSerializer.register(type, code, encoder, decoder): 低レベルの型登録 API
- Spot.register() / Spot.register_type(): ユーザー向けの高レベル API
Spot.register() はデコレータ形式、Spot.register_type() は命令形式で、
いずれも内部で serializer.register() に委譲する(TypeRegistryProtocol 準拠時のみ)。
デコレータ形式 @spot.register(code=1, ...) はクラス定義と同時に登録でき、
宣言的で読みやすい。命令形式 spot.register_type(MyClass, code=1, ...) は
外部ライブラリのクラスなどデコレートできない場合に使用する。
シリアライザが TypeRegistryProtocol を実装していない場合(カスタムシリアライザ)、
register() / register_type() は ConfigurationError を送出する。
これにより、型登録非対応のシリアライザを使用する場合のエラーメッセージが明確になる。
register() の内部で _write_lock を取得してから CoW で辞書を差し替えるmodel_validate(data) を使用し、
バリデーション付きの復元が可能_BackgroundLoop クラスが非同期タスクのバックグラウンド実行を管理する。
初期化時に asyncio.new_event_loop() で新しいイベントループを作成し、
_thread(デーモンスレッド)上で loop.run_forever() を実行する。
外部からは submit() を通じてコルーチンを投入でき、run_coroutine_threadsafe で
スレッドセーフにループへ渡される。
スレッドは daemon=True とし、メインスレッド終了時にプロセスがブロックされないようにしている。
しかし、安全なリソース解放のために drain() メソッドを提供し、
投入された全タスクの完了を drain_timeout の範囲で待機する。
メインスレッドのイベントループと競合しないよう、専用のイベントループを
作成してバックグラウンドスレッドで駆動する。これにより save_sync=False 時の
保存処理などが、呼び出し元の asyncio 環境から完全に隔離される。
submit() は投入されたタスクを _tasks 集合に登録し、add_done_callback で
完了時に自身を削除することで、未完了タスクの追跡を可能にしている_lock による排他制御を行っているatexit ハンドラとしてもシャットダウンが登録されるSpot クラスにおいて、バックグラウンド書き込みの同期と完了待機を制御する。
save_sync=False の場合、キャッシュへの保存処理は _bg_loop.submit() で
バックグラウンドに投入される。これら未完了のタスクやDBキューを同期するため、
flush() およびコンテキストマネージャによる drain が実装されている。
flush(): DBライタースレッドのキュー (self.db.flush()) を空になるまで待機する__exit__: コンテキストマネージャ終了時に flush() を呼び出した上で、
さらに _bg_loop.drain() を呼び出し、非同期タスクの完了も待つ
これにより、スクリプト終了時やバッチ処理の区切りで、データの完全な永続化を保証する。バックグラウンド保存時のエラーは呼び出し元のメインフローを妨げないよう、
on_background_error コールバックに通知され、ログ出力のみ行う。
例外の再送出は行わない。
flush() にはタイムアウトを設定でき、ハングアップを防止しているon_background_error は SaveErrorContext を引数に取り、失敗時の
キーや関数の詳細情報を提供するLifecyclePolicy クラスは Rule オブジェクトのリストを保持し、
関数名に基づいてキャッシュの保持期間を決定する。
resolve() メソッドが fnmatch.fnmatch() を使って関数名をパターンと照合し、
最初にマッチしたルールの保持期間を返す。
ルールのリストは順序が意味を持ち、最初にマッチしたものが採用される。
これにより、特定プレフィックスの関数には短い保持期間を設定し、
最後に *(ワイルドカード)でデフォルトポリシーを設定するような
フォールバック構造が簡単に記述できる。
後方互換性と柔軟性のため、まず func_identifier (モジュール名付きの完全修飾名) で
マッチングを試み、マッチしなかった場合は func_name (短い関数名) で再度マッチングを
試みる resolve_with_fallback() を提供している。
LifecyclePolicy コンストラクタに default_retention パラメータを追加。
どのルールにもマッチしない場合に返す保持期間を指定できる。
LifecyclePolicy.default() は default_retention="30d" で生成し、
デフォルトで30日の保持期間を設定する。
LifecyclePolicy.default() は空のルールリスト + default_retention="30d" を持つdefault_retention=None を指定すれば従来通り無期限保持となるRetention クラスを名前空間として使用し、保持期間のパースや
特殊な定数(INDEFINITE, FOREVER)を管理する。
parse_retention() 関数は文字列("7d", "12h"等)、timedelta、
または秒数(int/float)を受け取り、標準化された timedelta オブジェクトに変換する。
Retention.FOREVER はポリシーを強制的にバイパスするための特殊値。
PEP 703 (free-threading) 環境での安全性を考慮し、
_ForeverSentinel は threading.Lock を用いたダブルチェックロッキングで
厳密なシングルトンとして実装されている。
"7d", "12h", "30m", "10s" のような文字列フォーマットを
正規表現 _TIME_PATTERN でパースすることで、設定ファイルや
ハードコード時の可読性を高めている。
parse_retention() は無効なフォーマットや負の値に対して ValidationError を送出する_ForeverSentinel は __bool__() で True を返すようオーバーライドされているLimiterProtocol を実装する TokenBucket クラス。
GCRA (Generic Cell Rate Algorithm) に基づき、スレッドセーフおよび
非同期対応のスムーズなレートリミッタを提供する。
内部状態として Theoretical Arrival Time (TAT) を保持し、
consume() で同期的スリープ、consume_async() で非同期的スリープを行う。
伝統的なトークンバケットとは異なり、長時間アイドル後に 一気にバーストを許容しない「Strict Pacing」を実現できる。 TATの更新と現在時刻の比較だけで待機時間を計算できるため、 メモリ効率と計算効率に優れる。
時刻の取得に time.monotonic() を使用し、システム時刻の変更(NTP同期など)の
影響を受けない堅牢な設計としている。
_consume_reservation() メソッドが待機時間の計算と TAT 更新を
threading.Lock でアトミックに行うcost > max_cost(バケット容量超過)の場合は、どれだけ待っても
処理できないため即座に ValueError を送出するtime.sleep()、非同期パスは asyncio.sleep() で待機するタスク実行ライフサイクルに介入するインターフェース HookBase と、
そのスレッドセーフ版 ThreadSafeHookBase の実装。
pre_execute, on_cache_hit, on_cache_miss の各コールバックが提供される。
ThreadSafeHookBase は __init_subclass__ を利用し、サブクラスで定義された
フックメソッドを自動的にロックでラップする。
ユーザーが手動でロックを書く手間を省くため、メタクラス的なアプローチを採用。
クラス定義時にフックメソッドを抽出し、_wrap_with_lock デコレータで包むことで、
ユーザーコードを汚さずに完全な排他制御を実現している。
当初の Lock から RLock に変更された。
サブクラスが super().pre_execute(...) のように親のメソッドを呼び出した際、
同一スレッドが再度ロックを取得しようとしてデッドロックする問題を防ぐため。
_wrap_with_lock は functools.wraps を使い、元のメタデータを保持するThreadSafeHookBase は __getattr__ をオーバーライドし、
super().__init__() の呼び出し忘れに対して親切なエラーメッセージを返すMaintenanceService クラスがDBとBlobストレージ間の整合性チェック、
期限切れキャッシュの削除、孤立ファイルの検出といったガベージコレクション(GC)を担当。
主にCLIの beautyspot gc コマンドから呼び出される。
対象となる DB (TaskDBMaintenable) とストレージ (BlobStorageMaintenable) を受け取り、
複数のフェーズに分けてクリーンアップを実行する。
clean_garbage() は以下の順序で安全にGCを実行する:
1. 期限切れDBレコードの削除
2. Blobストレージの一時ファイル(.spot_tmp)のクリーンアップ
3. 孤立したBlobファイル(DBに存在しないファイル)の特定
4. 孤立ファイルの削除
5. 空ディレクトリの剪定
実行中のタスクがBlobを書き込んだ直後で、まだDBにメタデータが保存されていない
タイミングでGCが走ると、必要なファイルが孤立と誤認されるリスクがある。
これを防ぐため、orphan_grace_seconds(デフォルト60秒)より新しいファイルは
孤立判定から除外する設計とした。
scan_garbage() は、ストレージの全キーを列挙し、
DBの get_blob_keys() との差分を取ることで行うscan_orphan_projects() も提供されるTyper を利用したコマンドラインインターフェースの実装。
list, show, stats, clear, clean, gc, prune, version 等の
コマンドを提供し、rich ライブラリを用いてコンソール出力(テーブル、パネル、
プログレスバー等)をリッチにフォーマットしている。
Typer は型ヒントベースでコマンドやオプションを定義でき、コードの記述量と メンテナンスコストを大幅に削減できる。Rich による出力は、単なるテキストではなく 構造化された情報(JSONやMarkdown)の視認性を劇的に向上させ、DXを高める。
clear や clean など、データを大規模に削除するコマンドについては、
--force オプションが指定されない限り、rich.prompt.Confirm を用いて
ユーザーに明示的な確認を求めることで、誤操作によるデータ喪失を防いでいる。
MaintenanceService などの内部 API を利用して処理を行う--project / -p) に対して操作を行うが、
一部のコマンド(list 等)はワークスペース全体の走査もサポートするbeautyspot パッケージのメインエントリポイントとなる Spot ファクトリ関数の実装。
引数として渡された各コンポーネント(DB、Serializer、Storage等)を依存性注入(DI)で
解決し、デフォルトのコンポーネント(SQLiteTaskDB, MsgpackSerializer, LocalStorage等)を
インスタンス化して CacheManager と _Spot コアエンジンを組み立てる。
_Spot クラス自体のコンストラクタは複雑な依存関係を要求するが、
ユーザー向けに Spot() 関数を提供することで、通常は name を渡すだけで
「ゼロ設定」で動作するようにカプセル化している。
同時に、高度なユーザーは各コンポーネントを自由に差し替え可能な DI アーキテクチャを維持している。
Spot() 関数内でデフォルトのDB(SQLiteTaskDB)を自動生成した場合、
spot._owns_db = True フラグを立て、Spotエンジンのシャットダウン時に
DBも自動でクローズされるようにする。一方、ユーザーが明示的に db= を
渡した場合は、DBのライフサイクル管理は呼び出し元に委ねる(勝手に閉じない)。
save_blob フラグや引数によって WarningOnlyPolicy,
AlwaysBlobPolicy 等に解決される__all__ に各種プロトコルやデフォルト実装、例外クラスをエクスポートし、
ライブラリとしての公開API境界を明確に定義しているCacheManager クラス内で、Thundering Herd(キャッシュミス時に同一キーへの
大量アクセスが同時に発生する問題)を防止する直列化機構を実装。
_inflight 辞書で実行中のキーを管理し、最初の1スレッド/タスクだけが関数を実行し、
後続の呼び出しは wait_herd_sync / wait_herd_async でその完了を待機する。
_inflight 辞書の値として (threading.Event, list[asyncio.Future], list[result]) の
タプルを保持し、スレッドベースの待機 (Event.wait) と asyncio ベースの待機 (Future) の
両方を単一の管理下で混在できるようにしている。
関数の実行がハングアップした場合に待機側が永遠にブロックされるのを防ぐため、
HERD_TIMEOUT(300秒)と HERD_MAX_RETRIES(3回)を設定。
タイムアウト時には警告ログを出しつつ再試行し、上限を超えれば TimeoutError で
フェイルファストする堅牢な設計とした。
notify_and_cleanup_inflight() で待機タスクへの結果伝播と _inflight からの削除を
_inflight_lock によりアトミックに行うresult_box(要素数1のリスト)をリファレンス渡しで共有することで、
Event が set された際に安全に結果を取得できるようにしている_notify_future() は call_soon_threadsafe を使い、非同期Futureに対して
別スレッドから安全に結果(または例外)をセットするstorage.py 内の BlobStorageBase ABC が SPEC024 の中核実装。
5つの抽象メソッド(save, load, delete, list_keys, get_mtime)を定義し、
LocalStorage と S3Storage が具象実装として継承する。
加えて、ランタイム型チェックのために BlobStorageCore・Maintenable・
BlobStorageMaintenable の3つの Protocol クラスを定義し、
isinstance() での型判定を可能にしている(@runtime_checkable)。
BlobStorageBase は ABC として抽象メソッドを強制する一方、
BlobStorageCore / Maintenable は Protocol として構造的部分型を提供する。
これにより、BlobStorageBase を継承しないサードパーティ実装でも
isinstance(obj, BlobStorageCore) で利用可能になる柔軟性を確保した。
BlobStorageCore: 実行時に必要な最小限(save/load/delete)Maintenable: メンテナンス(GC)に必要な拡張(list_keys/get_mtime)BlobStorageMaintenable: 両方を兼備する完全版これにより、save/load/delete のみを実装した軽量バックエンドも受け入れ可能。
bytes | bytearray | memoryview を ReadableBuffer として定義。
memoryview を受け入れることでゼロコピー書き込みが可能になり、
大きなデータを保存する際のメモリ効率を改善している。
BlobStorageBase 自体はロジックを持たず、インターフェース定義のみ@abstractmethod にはdocstringで契約を明記(冪等性、エラー時の挙動など)delete は冪等であるべきことを docstring で規定list_keys は delete と同じフォーマットの識別子を yield すべきことを規定