Coverage for src / beautyspot / cachekey.py: 66%
214 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-18 18:20 +0900
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-18 18:20 +0900
1# src/beautyspot/cachekey.py
3import hashlib
4import logging
5import os
6import msgpack
7import inspect
8from collections import deque, OrderedDict, defaultdict
9from enum import Enum, auto
10from functools import singledispatch
11from typing import Any, Union, Callable, Dict, ParamSpec
13logger = logging.getLogger(__name__)
14logger.addHandler(logging.NullHandler())
16ReadableBuffer = Union[bytes, bytearray, memoryview]
18P = ParamSpec("P")
21def _safe_sort_key(obj: Any):
22 """
23 Helper for sorting mixed types.
24 Returns a tuple (priority, type_name, str_repr) to ensure consistent ordering
25 even across different types that are not natively comparable in Python 3.
26 """
27 if obj is None:
28 return (0, "", "")
29 return (1, str(type(obj)), str(obj))
32# ---------------------------------------------------------------------------
33# Canonicalization helpers (extracted to reduce CC of the default handler)
34# ---------------------------------------------------------------------------
37def _canonicalize_ndarray(obj: Any) -> tuple:
38 """Numpy-like array → tagged tuple with raw bytes (efficient & exact)."""
39 return ("__numpy__", obj.shape, str(obj.dtype), obj.tobytes())
42def _canonicalize_instance(obj: Any) -> Any:
43 """Custom object instance → canonical form via __dict__ and/or __slots__.
45 型名 (module + qualname) を含めることで、同じ属性構造を持つ
46 異なる型のインスタンス同士のキャッシュ衝突を防ぐ。
47 """
48 obj_type = type(obj)
49 type_tag = ("__instance__", obj_type.__module__, obj_type.__qualname__)
51 attrs = {}
52 if hasattr(obj, "__dict__"):
53 attrs.update(obj.__dict__)
55 # __slots__ path: MRO を辿って全階層の __slots__ を収集する
56 all_slots: list[str] = []
57 for klass in obj_type.__mro__:
58 cls_slots = getattr(klass, "__slots__", [])
59 if isinstance(cls_slots, str): 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true
60 cls_slots = [cls_slots]
61 else:
62 try:
63 cls_slots = list(cls_slots)
64 except TypeError:
65 cls_slots = []
66 all_slots.extend(cls_slots)
68 # __slots__ の値を収集(__dict__ スロット自体は既に上で処理済み)
69 for s in all_slots:
70 if s == "__dict__": 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true
71 continue
72 if hasattr(obj, s): 72 ↛ 69line 72 didn't jump to line 69 because the condition on line 72 was always true
73 attrs[s] = getattr(obj, s)
75 return (
76 *type_tag,
77 [
78 [k, canonicalize(v)]
79 for k, v in sorted(attrs.items(), key=lambda i: _safe_sort_key(i[0]))
80 ],
81 )
84def _is_ndarray_like(obj: Any) -> bool:
85 """Duck-type check for numpy-like arrays (avoids hard dependency)."""
86 return hasattr(obj, "shape") and hasattr(obj, "dtype") and hasattr(obj, "tobytes")
89# ---------------------------------------------------------------------------
90# singledispatch canonicalize
91# ---------------------------------------------------------------------------
94@singledispatch
95def canonicalize(obj: Any) -> Any:
96 """
97 Recursively converts an object into a canonical form suitable for stable
98 Msgpack serialization.
100 Dispatch order for unregistered types:
101 1. Primitives → return as-is
102 2. Numpy-like arrays → tagged tuple via duck typing
103 3. Object instances → via __dict__ / __slots__
104 4. Fallback → str()
105 """
106 if obj is None:
107 return obj
108 # bool は int のサブクラスなので、先に判定して型タグを付与する。
109 # これにより f(True) と f(1) が異なるキャッシュキーを生成する。
110 if isinstance(obj, bool): 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true
111 return ("__bool__", obj)
112 if isinstance(obj, (int, float, str, bytes)):
113 return obj
115 if _is_ndarray_like(obj): 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true
116 try:
117 return _canonicalize_ndarray(obj)
118 except Exception:
119 pass
121 if hasattr(obj, "__dict__") or hasattr(obj, "__slots__"): 121 ↛ 124line 121 didn't jump to line 124 because the condition on line 121 was always true
122 return _canonicalize_instance(obj)
124 logger.warning(
125 f"Using str() fallback for unhandled type {type(obj)}. "
126 "This may cause unstable cache keys across processes. "
127 "Consider explicit type registration."
128 )
129 return str(obj)
132@canonicalize.register(dict)
133def _canonicalize_dict(obj: dict) -> list:
134 """Dict → List of [k, v], sorted by key."""
135 canonical_items = [(canonicalize(k), canonicalize(v)) for k, v in obj.items()]
136 return [
137 [k, v] for k, v in sorted(canonical_items, key=lambda i: _safe_sort_key(i[0]))
138 ]
141@canonicalize.register(list)
142def _canonicalize_list(obj: list) -> tuple:
143 """List → type-tagged recursive canonicalization.
145 Note:
146 型タグ ``"__list__"`` を付与することで ``tuple`` との衝突を防ぐ。
147 既存キャッシュとの互換性は意図的に切る(list/tuple の混同はバグ)。
148 """
149 return ("__list__", [canonicalize(x) for x in obj])
152@canonicalize.register(tuple)
153def _canonicalize_tuple(obj: tuple) -> tuple:
154 """Tuple → type-tagged recursive canonicalization.
156 Note:
157 型タグ ``"__tuple__"`` を付与することで ``list`` との衝突を防ぐ。
158 """
159 return ("__tuple__", [canonicalize(x) for x in obj])
162@canonicalize.register(set)
163def _canonicalize_set(obj: set) -> tuple:
164 """Set → type-tagged sorted list.
166 Note:
167 型タグ ``"__set__"`` を付与することで ``frozenset`` との衝突を防ぐ。
168 ``{1,2,3}`` と ``frozenset({1,2,3})`` が異なるキャッシュキーを生成する。
170 .. warning::
171 v2.7.x 以前のキャッシュとは非互換(型タグなしから変更)。
172 """
173 normalized_items = [canonicalize(x) for x in obj]
174 return ("__set__", sorted(normalized_items, key=_safe_sort_key))
177@canonicalize.register(frozenset)
178def _canonicalize_frozenset(obj: frozenset) -> tuple:
179 """Frozenset → type-tagged sorted list.
181 Note:
182 型タグ ``"__frozenset__"`` を付与することで ``set`` との衝突を防ぐ。
184 .. warning::
185 v2.7.x 以前のキャッシュとは非互換(型タグなしから変更)。
186 """
187 normalized_items = [canonicalize(x) for x in obj]
188 return ("__frozenset__", sorted(normalized_items, key=_safe_sort_key))
191@canonicalize.register(deque)
192def _canonicalize_deque(obj: deque) -> tuple:
193 """Deque → type-tagged recursive canonicalization.
195 Note:
196 型タグ ``"__deque__"`` を付与することで ``list`` / ``tuple`` との衝突を防ぐ。
197 """
198 return ("__deque__", [canonicalize(x) for x in obj])
201@canonicalize.register(defaultdict)
202def _canonicalize_defaultdict(obj: defaultdict) -> tuple:
203 """defaultdict → type-tagged canonical dict.
205 Note:
206 型タグ ``"__defaultdict__"`` を付与することで通常の ``dict`` との衝突を防ぐ。
207 ``default_factory`` は非決定的(lambda 等)な場合があるため、ハッシュには含めない。
208 """
209 return ("__defaultdict__", _canonicalize_dict(obj))
212@canonicalize.register(OrderedDict)
213def _canonicalize_ordereddict(obj: OrderedDict) -> tuple:
214 """OrderedDict → order-preserving representation with type tag.
216 Note:
217 ``OrderedDict`` の意味的本質は挿入順序であるため、
218 キーをソートせず挿入順のまま保持する。
219 型タグ ``"__ordered_dict__"`` で通常の ``dict`` と区別する。
220 """
221 return (
222 "__ordered_dict__",
223 [[canonicalize(k), canonicalize(v)] for k, v in obj.items()],
224 )
227@canonicalize.register(Enum)
228def _canonicalize_enum(obj: Enum) -> Any:
229 """Enum member → canonical value (stable across sessions)."""
230 return (
231 "__enum__",
232 type(obj).__module__,
233 type(obj).__qualname__,
234 canonicalize(obj.value),
235 )
238@canonicalize.register(type)
239def _canonicalize_type(obj: type) -> Any:
240 """Type / Class handling (structure awareness)."""
241 # Pydantic v2
242 if hasattr(obj, "model_json_schema"): 242 ↛ 248line 242 didn't jump to line 248 because the condition on line 242 was always true
243 try:
244 return ("__pydantic_v2__", canonicalize(obj.model_json_schema()))
245 except Exception:
246 pass
247 # Pydantic v1 (schema + __fields__ で誤検出を防ぐ)
248 if hasattr(obj, "schema") and hasattr(obj, "__fields__"):
249 try:
250 return ("__pydantic_v1__", canonicalize(obj.schema()))
251 except Exception:
252 pass
254 # Generic class (structure-based)
255 class_attrs = {}
256 try:
257 for k, v in obj.__dict__.items():
258 if k.startswith("__") and k != "__annotations__":
259 continue
260 if callable(v):
261 continue
262 class_attrs[k] = v
263 except AttributeError:
264 pass
266 return (
267 "__class__",
268 obj.__module__,
269 obj.__qualname__,
270 canonicalize(class_attrs),
271 )
274# ---------------------------------------------------------------------------
275# Optional: register numpy.ndarray directly when numpy is available
276# ---------------------------------------------------------------------------
278try:
279 import numpy as _np
281 @canonicalize.register(_np.ndarray)
282 def _canonicalize_np_ndarray(obj: _np.ndarray) -> tuple:
283 return _canonicalize_ndarray(obj)
285except ImportError:
286 pass
289# ---------------------------------------------------------------------------
290# Strategy & Policy
291# ---------------------------------------------------------------------------
294class Strategy(Enum):
295 """
296 Defines the strategy for hashing a specific argument.
297 """
299 DEFAULT = auto() # Recursively canonicalize and hash (Default behavior)
300 IGNORE = auto() # Exclude from hash calculation completely
301 FILE_CONTENT = auto() # Treat as file path and hash its content (Strict)
302 PATH_STAT = (
303 auto()
304 ) # Treat as file path and hash its metadata (Fast: path+size+mtime)
307class KeyGenPolicy:
308 """
309 A policy object that binds to a function signature to generate cache keys
310 based on argument-specific strategies.
311 """
313 def __init__(
314 self,
315 strategies: Dict[str, Strategy],
316 default_strategy: Strategy = Strategy.DEFAULT,
317 ):
318 self.strategies = strategies
319 self.default_strategy = default_strategy
321 def bind(self, func: Callable[P, Any]) -> Callable[P, str]:
322 """
323 Creates a key generation function bound to the specific signature of `func`.
324 """
325 sig = inspect.signature(func)
327 def _bound_keygen(*args: P.args, **kwargs: P.kwargs) -> str:
328 # Bind arguments to names, applying defaults
329 bound = sig.bind(*args, **kwargs)
330 bound.apply_defaults()
332 items_to_hash = []
334 # Iterate over arguments in definition order
335 for name, val in bound.arguments.items():
336 strategy = self.strategies.get(name, self.default_strategy)
338 if strategy == Strategy.IGNORE:
339 continue
341 elif strategy == Strategy.FILE_CONTENT:
342 # Expecting val to be a path-like string
343 items_to_hash.append(KeyGen.from_file_content(str(val)))
345 elif strategy == Strategy.PATH_STAT: 345 ↛ 346line 345 didn't jump to line 346 because the condition on line 345 was never true
346 items_to_hash.append(KeyGen.from_path_stat(str(val)))
348 else: # DEFAULT
349 try:
350 items_to_hash.append(canonicalize(val))
351 except RecursionError:
352 logger.warning(
353 f"Circular reference detected in argument '{name}'; "
354 "falling back to str-based representation for this argument."
355 )
356 items_to_hash.append(str(val))
358 # Hash the accumulated list of canonical items
359 return KeyGen.hash_items(items_to_hash)
361 return _bound_keygen
364class KeyGen:
365 """
366 Generates stable cache keys (SHA-256) for function inputs (Identity Layer).
367 """
369 # Constants for convenience usage in KeyGen.map()
370 HASH = Strategy.DEFAULT
371 IGNORE = Strategy.IGNORE
372 FILE_CONTENT = Strategy.FILE_CONTENT
373 PATH_STAT = Strategy.PATH_STAT
375 @staticmethod
376 def from_path_stat(filepath: str) -> str:
377 """Fast: path + size + mtime (SHA-256)"""
378 if not os.path.exists(filepath):
379 return f"MISSING_{filepath}"
380 stat = os.stat(filepath)
381 identifier = f"{filepath}_{stat.st_size}_{stat.st_mtime}"
382 return hashlib.sha256(identifier.encode()).hexdigest()
384 @staticmethod
385 def from_file_content(filepath: str) -> str:
386 """Strict: file content hash (SHA-256)"""
387 if not os.path.exists(filepath): 387 ↛ 390line 387 didn't jump to line 390 because the condition on line 387 was always true
388 return f"MISSING_{filepath}"
390 hasher = hashlib.sha256()
391 # Include extension to distinguish format changes
392 hasher.update(os.path.splitext(filepath)[1].lower().encode())
394 try:
395 with open(filepath, "rb") as f:
396 while chunk := f.read(65536):
397 hasher.update(chunk)
398 except OSError:
399 return f"ERROR_{filepath}"
400 return hasher.hexdigest()
402 @staticmethod
403 def _default(args: tuple, kwargs: dict) -> str:
404 """
405 Generates a stable SHA-256 hash from function arguments using recursive canonicalization.
406 This is the default legacy behavior sensitive to args/kwargs structure.
407 """
408 try:
409 # 1. Normalize structure
410 normalized = [canonicalize(args), canonicalize(kwargs)]
412 # 2. Serialize to bytes
413 packed = msgpack.packb(normalized)
415 if packed is None: 415 ↛ 416line 415 didn't jump to line 416 because the condition on line 415 was never true
416 raise ValueError("msgpack.packb returned None")
418 # 3. Hash (SHA-256)
419 return hashlib.sha256(packed).hexdigest()
421 except RecursionError:
422 logger.warning(
423 "Circular reference detected in arguments; falling back to str-based hash. "
424 "This may cause unexpected cache misses if argument repr is not stable."
425 )
426 return hashlib.sha256(str((args, kwargs)).encode()).hexdigest()
427 except Exception:
428 logger.warning(
429 "Failed to canonicalize or pack arguments; falling back to str-based hash. "
430 "This may cause unexpected cache misses if argument repr is not stable."
431 )
432 return hashlib.sha256(str((args, kwargs)).encode()).hexdigest()
434 @staticmethod
435 def hash_items(items: list) -> str:
436 """Helper to hash a list of canonicalized items."""
437 try:
438 packed = msgpack.packb(items)
439 if packed is None: 439 ↛ 440line 439 didn't jump to line 440 because the condition on line 439 was never true
440 raise ValueError("msgpack.packb returned None")
441 return hashlib.sha256(packed).hexdigest()
442 except Exception:
443 logger.warning(
444 "Failed to pack canonicalized items; falling back to str-based hash. "
445 "This may cause unexpected cache misses if argument repr is not stable."
446 )
447 return hashlib.sha256(str(items).encode()).hexdigest()
449 # --- Factory Methods for Policies ---
451 @classmethod
452 def ignore(cls, *arg_names: str) -> KeyGenPolicy:
453 """
454 Creates a policy that ignores specific arguments (e.g., 'verbose', 'logger').
455 """
456 strategies = {name: Strategy.IGNORE for name in arg_names}
457 return KeyGenPolicy(strategies, default_strategy=Strategy.DEFAULT)
459 @classmethod
460 def map(cls, **arg_strategies: Strategy) -> KeyGenPolicy:
461 """
462 Creates a policy with explicit strategies for specific arguments.
463 """
464 return KeyGenPolicy(arg_strategies, default_strategy=Strategy.DEFAULT)
466 @classmethod
467 def file_content(cls, *arg_names: str) -> KeyGenPolicy:
468 """
469 Creates a policy that treats specified arguments as file paths and hashes their content.
470 """
471 strategies = {name: Strategy.FILE_CONTENT for name in arg_names}
472 return KeyGenPolicy(strategies, default_strategy=Strategy.DEFAULT)
474 @classmethod
475 def path_stat(cls, *arg_names: str) -> KeyGenPolicy:
476 """
477 Creates a policy that treats specified arguments as file paths and hashes their metadata (stat).
478 """
479 strategies = {name: Strategy.PATH_STAT for name in arg_names}
480 return KeyGenPolicy(strategies, default_strategy=Strategy.DEFAULT)