Coverage for src / beautyspot / cachekey.py: 66%

214 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-03-18 18:20 +0900

1# src/beautyspot/cachekey.py 

2 

3import hashlib 

4import logging 

5import os 

6import msgpack 

7import inspect 

8from collections import deque, OrderedDict, defaultdict 

9from enum import Enum, auto 

10from functools import singledispatch 

11from typing import Any, Union, Callable, Dict, ParamSpec 

12 

13logger = logging.getLogger(__name__) 

14logger.addHandler(logging.NullHandler()) 

15 

16ReadableBuffer = Union[bytes, bytearray, memoryview] 

17 

18P = ParamSpec("P") 

19 

20 

21def _safe_sort_key(obj: Any): 

22 """ 

23 Helper for sorting mixed types. 

24 Returns a tuple (priority, type_name, str_repr) to ensure consistent ordering 

25 even across different types that are not natively comparable in Python 3. 

26 """ 

27 if obj is None: 

28 return (0, "", "") 

29 return (1, str(type(obj)), str(obj)) 

30 

31 

32# --------------------------------------------------------------------------- 

33# Canonicalization helpers (extracted to reduce CC of the default handler) 

34# --------------------------------------------------------------------------- 

35 

36 

37def _canonicalize_ndarray(obj: Any) -> tuple: 

38 """Numpy-like array → tagged tuple with raw bytes (efficient & exact).""" 

39 return ("__numpy__", obj.shape, str(obj.dtype), obj.tobytes()) 

40 

41 

42def _canonicalize_instance(obj: Any) -> Any: 

43 """Custom object instance → canonical form via __dict__ and/or __slots__. 

44 

45 型名 (module + qualname) を含めることで、同じ属性構造を持つ 

46 異なる型のインスタンス同士のキャッシュ衝突を防ぐ。 

47 """ 

48 obj_type = type(obj) 

49 type_tag = ("__instance__", obj_type.__module__, obj_type.__qualname__) 

50 

51 attrs = {} 

52 if hasattr(obj, "__dict__"): 

53 attrs.update(obj.__dict__) 

54 

55 # __slots__ path: MRO を辿って全階層の __slots__ を収集する 

56 all_slots: list[str] = [] 

57 for klass in obj_type.__mro__: 

58 cls_slots = getattr(klass, "__slots__", []) 

59 if isinstance(cls_slots, str): 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true

60 cls_slots = [cls_slots] 

61 else: 

62 try: 

63 cls_slots = list(cls_slots) 

64 except TypeError: 

65 cls_slots = [] 

66 all_slots.extend(cls_slots) 

67 

68 # __slots__ の値を収集(__dict__ スロット自体は既に上で処理済み) 

69 for s in all_slots: 

70 if s == "__dict__": 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true

71 continue 

72 if hasattr(obj, s): 72 ↛ 69line 72 didn't jump to line 69 because the condition on line 72 was always true

73 attrs[s] = getattr(obj, s) 

74 

75 return ( 

76 *type_tag, 

77 [ 

78 [k, canonicalize(v)] 

79 for k, v in sorted(attrs.items(), key=lambda i: _safe_sort_key(i[0])) 

80 ], 

81 ) 

82 

83 

84def _is_ndarray_like(obj: Any) -> bool: 

85 """Duck-type check for numpy-like arrays (avoids hard dependency).""" 

86 return hasattr(obj, "shape") and hasattr(obj, "dtype") and hasattr(obj, "tobytes") 

87 

88 

89# --------------------------------------------------------------------------- 

90# singledispatch canonicalize 

91# --------------------------------------------------------------------------- 

92 

93 

94@singledispatch 

95def canonicalize(obj: Any) -> Any: 

96 """ 

97 Recursively converts an object into a canonical form suitable for stable 

98 Msgpack serialization. 

99 

100 Dispatch order for unregistered types: 

101 1. Primitives → return as-is 

102 2. Numpy-like arrays → tagged tuple via duck typing 

103 3. Object instances → via __dict__ / __slots__ 

104 4. Fallback → str() 

105 """ 

106 if obj is None: 

107 return obj 

108 # bool は int のサブクラスなので、先に判定して型タグを付与する。 

109 # これにより f(True) と f(1) が異なるキャッシュキーを生成する。 

110 if isinstance(obj, bool): 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true

111 return ("__bool__", obj) 

112 if isinstance(obj, (int, float, str, bytes)): 

113 return obj 

114 

115 if _is_ndarray_like(obj): 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true

116 try: 

117 return _canonicalize_ndarray(obj) 

118 except Exception: 

119 pass 

120 

121 if hasattr(obj, "__dict__") or hasattr(obj, "__slots__"): 121 ↛ 124line 121 didn't jump to line 124 because the condition on line 121 was always true

122 return _canonicalize_instance(obj) 

123 

124 logger.warning( 

125 f"Using str() fallback for unhandled type {type(obj)}. " 

126 "This may cause unstable cache keys across processes. " 

127 "Consider explicit type registration." 

128 ) 

129 return str(obj) 

130 

131 

132@canonicalize.register(dict) 

133def _canonicalize_dict(obj: dict) -> list: 

134 """Dict → List of [k, v], sorted by key.""" 

135 canonical_items = [(canonicalize(k), canonicalize(v)) for k, v in obj.items()] 

136 return [ 

137 [k, v] for k, v in sorted(canonical_items, key=lambda i: _safe_sort_key(i[0])) 

138 ] 

139 

140 

141@canonicalize.register(list) 

142def _canonicalize_list(obj: list) -> tuple: 

143 """List → type-tagged recursive canonicalization. 

144 

145 Note: 

146 型タグ ``"__list__"`` を付与することで ``tuple`` との衝突を防ぐ。 

147 既存キャッシュとの互換性は意図的に切る(list/tuple の混同はバグ)。 

148 """ 

149 return ("__list__", [canonicalize(x) for x in obj]) 

150 

151 

152@canonicalize.register(tuple) 

153def _canonicalize_tuple(obj: tuple) -> tuple: 

154 """Tuple → type-tagged recursive canonicalization. 

155 

156 Note: 

157 型タグ ``"__tuple__"`` を付与することで ``list`` との衝突を防ぐ。 

158 """ 

159 return ("__tuple__", [canonicalize(x) for x in obj]) 

160 

161 

162@canonicalize.register(set) 

163def _canonicalize_set(obj: set) -> tuple: 

164 """Set → type-tagged sorted list. 

165 

166 Note: 

167 型タグ ``"__set__"`` を付与することで ``frozenset`` との衝突を防ぐ。 

168 ``{1,2,3}`` と ``frozenset({1,2,3})`` が異なるキャッシュキーを生成する。 

169 

170 .. warning:: 

171 v2.7.x 以前のキャッシュとは非互換(型タグなしから変更)。 

172 """ 

173 normalized_items = [canonicalize(x) for x in obj] 

174 return ("__set__", sorted(normalized_items, key=_safe_sort_key)) 

175 

176 

177@canonicalize.register(frozenset) 

178def _canonicalize_frozenset(obj: frozenset) -> tuple: 

179 """Frozenset → type-tagged sorted list. 

180 

181 Note: 

182 型タグ ``"__frozenset__"`` を付与することで ``set`` との衝突を防ぐ。 

183 

184 .. warning:: 

185 v2.7.x 以前のキャッシュとは非互換(型タグなしから変更)。 

186 """ 

187 normalized_items = [canonicalize(x) for x in obj] 

188 return ("__frozenset__", sorted(normalized_items, key=_safe_sort_key)) 

189 

190 

191@canonicalize.register(deque) 

192def _canonicalize_deque(obj: deque) -> tuple: 

193 """Deque → type-tagged recursive canonicalization. 

194 

195 Note: 

196 型タグ ``"__deque__"`` を付与することで ``list`` / ``tuple`` との衝突を防ぐ。 

197 """ 

198 return ("__deque__", [canonicalize(x) for x in obj]) 

199 

200 

201@canonicalize.register(defaultdict) 

202def _canonicalize_defaultdict(obj: defaultdict) -> tuple: 

203 """defaultdict → type-tagged canonical dict. 

204 

205 Note: 

206 型タグ ``"__defaultdict__"`` を付与することで通常の ``dict`` との衝突を防ぐ。 

207 ``default_factory`` は非決定的(lambda 等)な場合があるため、ハッシュには含めない。 

208 """ 

209 return ("__defaultdict__", _canonicalize_dict(obj)) 

210 

211 

212@canonicalize.register(OrderedDict) 

213def _canonicalize_ordereddict(obj: OrderedDict) -> tuple: 

214 """OrderedDict → order-preserving representation with type tag. 

215 

216 Note: 

217 ``OrderedDict`` の意味的本質は挿入順序であるため、 

218 キーをソートせず挿入順のまま保持する。 

219 型タグ ``"__ordered_dict__"`` で通常の ``dict`` と区別する。 

220 """ 

221 return ( 

222 "__ordered_dict__", 

223 [[canonicalize(k), canonicalize(v)] for k, v in obj.items()], 

224 ) 

225 

226 

227@canonicalize.register(Enum) 

228def _canonicalize_enum(obj: Enum) -> Any: 

229 """Enum member → canonical value (stable across sessions).""" 

230 return ( 

231 "__enum__", 

232 type(obj).__module__, 

233 type(obj).__qualname__, 

234 canonicalize(obj.value), 

235 ) 

236 

237 

238@canonicalize.register(type) 

239def _canonicalize_type(obj: type) -> Any: 

240 """Type / Class handling (structure awareness).""" 

241 # Pydantic v2 

242 if hasattr(obj, "model_json_schema"): 242 ↛ 248line 242 didn't jump to line 248 because the condition on line 242 was always true

243 try: 

244 return ("__pydantic_v2__", canonicalize(obj.model_json_schema())) 

245 except Exception: 

246 pass 

247 # Pydantic v1 (schema + __fields__ で誤検出を防ぐ) 

248 if hasattr(obj, "schema") and hasattr(obj, "__fields__"): 

249 try: 

250 return ("__pydantic_v1__", canonicalize(obj.schema())) 

251 except Exception: 

252 pass 

253 

254 # Generic class (structure-based) 

255 class_attrs = {} 

256 try: 

257 for k, v in obj.__dict__.items(): 

258 if k.startswith("__") and k != "__annotations__": 

259 continue 

260 if callable(v): 

261 continue 

262 class_attrs[k] = v 

263 except AttributeError: 

264 pass 

265 

266 return ( 

267 "__class__", 

268 obj.__module__, 

269 obj.__qualname__, 

270 canonicalize(class_attrs), 

271 ) 

272 

273 

274# --------------------------------------------------------------------------- 

275# Optional: register numpy.ndarray directly when numpy is available 

276# --------------------------------------------------------------------------- 

277 

278try: 

279 import numpy as _np 

280 

281 @canonicalize.register(_np.ndarray) 

282 def _canonicalize_np_ndarray(obj: _np.ndarray) -> tuple: 

283 return _canonicalize_ndarray(obj) 

284 

285except ImportError: 

286 pass 

287 

288 

289# --------------------------------------------------------------------------- 

290# Strategy & Policy 

291# --------------------------------------------------------------------------- 

292 

293 

294class Strategy(Enum): 

295 """ 

296 Defines the strategy for hashing a specific argument. 

297 """ 

298 

299 DEFAULT = auto() # Recursively canonicalize and hash (Default behavior) 

300 IGNORE = auto() # Exclude from hash calculation completely 

301 FILE_CONTENT = auto() # Treat as file path and hash its content (Strict) 

302 PATH_STAT = ( 

303 auto() 

304 ) # Treat as file path and hash its metadata (Fast: path+size+mtime) 

305 

306 

307class KeyGenPolicy: 

308 """ 

309 A policy object that binds to a function signature to generate cache keys 

310 based on argument-specific strategies. 

311 """ 

312 

313 def __init__( 

314 self, 

315 strategies: Dict[str, Strategy], 

316 default_strategy: Strategy = Strategy.DEFAULT, 

317 ): 

318 self.strategies = strategies 

319 self.default_strategy = default_strategy 

320 

321 def bind(self, func: Callable[P, Any]) -> Callable[P, str]: 

322 """ 

323 Creates a key generation function bound to the specific signature of `func`. 

324 """ 

325 sig = inspect.signature(func) 

326 

327 def _bound_keygen(*args: P.args, **kwargs: P.kwargs) -> str: 

328 # Bind arguments to names, applying defaults 

329 bound = sig.bind(*args, **kwargs) 

330 bound.apply_defaults() 

331 

332 items_to_hash = [] 

333 

334 # Iterate over arguments in definition order 

335 for name, val in bound.arguments.items(): 

336 strategy = self.strategies.get(name, self.default_strategy) 

337 

338 if strategy == Strategy.IGNORE: 

339 continue 

340 

341 elif strategy == Strategy.FILE_CONTENT: 

342 # Expecting val to be a path-like string 

343 items_to_hash.append(KeyGen.from_file_content(str(val))) 

344 

345 elif strategy == Strategy.PATH_STAT: 345 ↛ 346line 345 didn't jump to line 346 because the condition on line 345 was never true

346 items_to_hash.append(KeyGen.from_path_stat(str(val))) 

347 

348 else: # DEFAULT 

349 try: 

350 items_to_hash.append(canonicalize(val)) 

351 except RecursionError: 

352 logger.warning( 

353 f"Circular reference detected in argument '{name}'; " 

354 "falling back to str-based representation for this argument." 

355 ) 

356 items_to_hash.append(str(val)) 

357 

358 # Hash the accumulated list of canonical items 

359 return KeyGen.hash_items(items_to_hash) 

360 

361 return _bound_keygen 

362 

363 

364class KeyGen: 

365 """ 

366 Generates stable cache keys (SHA-256) for function inputs (Identity Layer). 

367 """ 

368 

369 # Constants for convenience usage in KeyGen.map() 

370 HASH = Strategy.DEFAULT 

371 IGNORE = Strategy.IGNORE 

372 FILE_CONTENT = Strategy.FILE_CONTENT 

373 PATH_STAT = Strategy.PATH_STAT 

374 

375 @staticmethod 

376 def from_path_stat(filepath: str) -> str: 

377 """Fast: path + size + mtime (SHA-256)""" 

378 if not os.path.exists(filepath): 

379 return f"MISSING_{filepath}" 

380 stat = os.stat(filepath) 

381 identifier = f"{filepath}_{stat.st_size}_{stat.st_mtime}" 

382 return hashlib.sha256(identifier.encode()).hexdigest() 

383 

384 @staticmethod 

385 def from_file_content(filepath: str) -> str: 

386 """Strict: file content hash (SHA-256)""" 

387 if not os.path.exists(filepath): 387 ↛ 390line 387 didn't jump to line 390 because the condition on line 387 was always true

388 return f"MISSING_{filepath}" 

389 

390 hasher = hashlib.sha256() 

391 # Include extension to distinguish format changes 

392 hasher.update(os.path.splitext(filepath)[1].lower().encode()) 

393 

394 try: 

395 with open(filepath, "rb") as f: 

396 while chunk := f.read(65536): 

397 hasher.update(chunk) 

398 except OSError: 

399 return f"ERROR_{filepath}" 

400 return hasher.hexdigest() 

401 

402 @staticmethod 

403 def _default(args: tuple, kwargs: dict) -> str: 

404 """ 

405 Generates a stable SHA-256 hash from function arguments using recursive canonicalization. 

406 This is the default legacy behavior sensitive to args/kwargs structure. 

407 """ 

408 try: 

409 # 1. Normalize structure 

410 normalized = [canonicalize(args), canonicalize(kwargs)] 

411 

412 # 2. Serialize to bytes 

413 packed = msgpack.packb(normalized) 

414 

415 if packed is None: 415 ↛ 416line 415 didn't jump to line 416 because the condition on line 415 was never true

416 raise ValueError("msgpack.packb returned None") 

417 

418 # 3. Hash (SHA-256) 

419 return hashlib.sha256(packed).hexdigest() 

420 

421 except RecursionError: 

422 logger.warning( 

423 "Circular reference detected in arguments; falling back to str-based hash. " 

424 "This may cause unexpected cache misses if argument repr is not stable." 

425 ) 

426 return hashlib.sha256(str((args, kwargs)).encode()).hexdigest() 

427 except Exception: 

428 logger.warning( 

429 "Failed to canonicalize or pack arguments; falling back to str-based hash. " 

430 "This may cause unexpected cache misses if argument repr is not stable." 

431 ) 

432 return hashlib.sha256(str((args, kwargs)).encode()).hexdigest() 

433 

434 @staticmethod 

435 def hash_items(items: list) -> str: 

436 """Helper to hash a list of canonicalized items.""" 

437 try: 

438 packed = msgpack.packb(items) 

439 if packed is None: 439 ↛ 440line 439 didn't jump to line 440 because the condition on line 439 was never true

440 raise ValueError("msgpack.packb returned None") 

441 return hashlib.sha256(packed).hexdigest() 

442 except Exception: 

443 logger.warning( 

444 "Failed to pack canonicalized items; falling back to str-based hash. " 

445 "This may cause unexpected cache misses if argument repr is not stable." 

446 ) 

447 return hashlib.sha256(str(items).encode()).hexdigest() 

448 

449 # --- Factory Methods for Policies --- 

450 

451 @classmethod 

452 def ignore(cls, *arg_names: str) -> KeyGenPolicy: 

453 """ 

454 Creates a policy that ignores specific arguments (e.g., 'verbose', 'logger'). 

455 """ 

456 strategies = {name: Strategy.IGNORE for name in arg_names} 

457 return KeyGenPolicy(strategies, default_strategy=Strategy.DEFAULT) 

458 

459 @classmethod 

460 def map(cls, **arg_strategies: Strategy) -> KeyGenPolicy: 

461 """ 

462 Creates a policy with explicit strategies for specific arguments. 

463 """ 

464 return KeyGenPolicy(arg_strategies, default_strategy=Strategy.DEFAULT) 

465 

466 @classmethod 

467 def file_content(cls, *arg_names: str) -> KeyGenPolicy: 

468 """ 

469 Creates a policy that treats specified arguments as file paths and hashes their content. 

470 """ 

471 strategies = {name: Strategy.FILE_CONTENT for name in arg_names} 

472 return KeyGenPolicy(strategies, default_strategy=Strategy.DEFAULT) 

473 

474 @classmethod 

475 def path_stat(cls, *arg_names: str) -> KeyGenPolicy: 

476 """ 

477 Creates a policy that treats specified arguments as file paths and hashes their metadata (stat). 

478 """ 

479 strategies = {name: Strategy.PATH_STAT for name in arg_names} 

480 return KeyGenPolicy(strategies, default_strategy=Strategy.DEFAULT)