Coverage for src / beautyspot / serializer.py: 86%
107 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-11 19:10 +0900
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-11 19:10 +0900
1# src/beautyspot/serializer.py
3import threading
4import msgpack
5from collections import OrderedDict
6from typing import (
7 Any,
8 Callable,
9 Dict,
10 Type,
11 TypeVar,
12 Tuple,
13 Protocol,
14 runtime_checkable,
15)
16from beautyspot.exceptions import SerializationError
18T = TypeVar("T")
21@runtime_checkable
22class SerializerProtocol(Protocol):
23 def dumps(self, obj: Any, /) -> bytes: ...
24 def loads(self, data: bytes, /) -> Any: ...
27@runtime_checkable
28class TypeRegistryProtocol(Protocol):
29 def register(
30 self,
31 type_class: Type[Any],
32 code: int,
33 encoder: Callable[[Any], Any],
34 decoder: Callable[[Any], Any],
35 ) -> None: ...
38class MsgpackSerializer(SerializerProtocol, TypeRegistryProtocol):
39 """
40 A secure and extensible serializer based on MessagePack.
42 Allows registering custom types via `register()`.
43 Automatically handles packing/unpacking of custom type payloads.
45 Thread Safety (No-GIL Compatible):
46 This class is entirely thread-safe and avoids lock contention on the critical
47 path (during serialization/deserialization). It achieves this by using:
48 1. **Copy-on-Write (CoW)** for the shared type registry (`_encoders`, `_decoders`).
49 Registrations are rare, but reads happen per-node. CoW ensures readers always
50 see a consistent, immutable snapshot of the registry without locking.
51 2. **Thread-Local Storage** for the LRU subclass cache (`_local.subclass_cache`).
52 This eliminates lock contention completely when traversing deep object trees
53 concurrently across multiple threads.
55 Note:
56 To prevent memory leaks in environments where types are generated dynamically
57 (e.g., namedtuples, dynamic Pydantic models), subclass resolution results
58 are cached using an LRU strategy with a configurable maximum size per thread.
59 """
61 def __init__(self, max_cache_size: int = 1024):
62 # 共有レジストリ(Copy-on-Write)
63 self._encoders: Dict[Type, Tuple[int, Callable[[Any], Any]]] = {}
64 self._decoders: Dict[int, Callable[[Any], Any]] = {}
66 self._max_cache_size = max_cache_size
68 # スレッドローカルなLRUキャッシュ
69 self._local = threading.local()
71 # 書き込み(register)を直列化するためのロック
72 self._write_lock = threading.Lock()
74 # レジストリ世代カウンタ: register() のたびにインクリメントされ、
75 # スレッドローカルキャッシュの無効化に使用する。
76 self._registry_generation = 0
78 def _get_local_cache(
79 self,
80 ) -> OrderedDict[Type, Tuple[int, Callable[[Any], Any]] | None]:
81 """現在のスレッド固有のLRUキャッシュを取得(必要なら初期化)する。
83 register() によりレジストリ世代が進んでいた場合、
84 キャッシュをクリアして stale エントリの参照を防ぐ。
85 """
86 gen = self._registry_generation
87 if (
88 not hasattr(self._local, "subclass_cache")
89 or getattr(self._local, "_cache_generation", -1) != gen
90 ):
91 self._local.subclass_cache = OrderedDict()
92 self._local._cache_generation = gen
93 return self._local.subclass_cache
95 def _enforce_cache_size(self, cache: OrderedDict):
96 """スレッドローカルキャッシュのサイズを制限する"""
97 while len(cache) > self._max_cache_size:
98 cache.popitem(last=False)
100 def register(
101 self,
102 type_class: Type,
103 code: int,
104 encoder: Callable[[Any], Any],
105 decoder: Callable[[Any], Any],
106 ):
107 if not (0 <= code <= 127): 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true
108 raise ValueError(f"ExtCode must be between 0 and 127, got {code}.")
110 with self._write_lock:
111 if code in self._decoders:
112 raise ValueError(f"ExtCode {code} is already registered.")
113 if type_class in self._encoders: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true
114 existing_code = self._encoders[type_class][0]
115 raise ValueError(
116 f"Type '{type_class.__name__}' is already registered "
117 f"(code={existing_code}). "
118 "Registering the same type twice would silently overwrite the "
119 "encoder while leaving the old decoder orphaned."
120 )
122 # Copy-on-Write (CoW)
123 # 現在の辞書のコピーを作成し、新しい要素を追加
124 new_encoders = self._encoders.copy()
125 new_decoders = self._decoders.copy()
127 new_encoders[type_class] = (code, encoder)
128 new_decoders[code] = decoder
130 # PEP 703 (free-threading) 対応: 世代カウンタを参照差し替えの**前**に
131 # インクリメントする。リーダーが新しい世代番号を見た時点で古いキャッシュを
132 # 破棄するが、参照はまだ旧レジストリを指している可能性がある。
133 # この順序であれば、リーダーは旧レジストリで安全に動作し、
134 # 次回のアクセスで新レジストリを取得する。
135 # 逆順(差し替え→インクリメント)だと、リーダーが新世代番号を見て
136 # キャッシュを破棄した後、旧レジストリを参照して新しい型を見つけられない
137 # 問題が生じる。
138 self._registry_generation += 1
139 self._encoders = new_encoders
140 self._decoders = new_decoders
142 # src/beautyspot/serializer.py
144 def _default_packer(self, obj: Any) -> Any:
145 obj_type = type(obj)
146 target_code = None
147 target_encoder = None
149 # Lock-free read: スナップショットへの参照を取得
150 current_encoders = self._encoders
151 local_cache = self._get_local_cache()
153 if obj_type in current_encoders:
154 target_code, target_encoder = current_encoders[obj_type]
155 elif obj_type in local_cache:
156 cached = local_cache[obj_type]
157 local_cache.move_to_end(obj_type)
158 if cached is not None: 158 ↛ 173line 158 didn't jump to line 173 because the condition on line 158 was always true
159 target_code, target_encoder = cached
160 else:
161 # MROをスキャンして登録済みの基底クラスを探す
162 for base in obj_type.__mro__:
163 if base in current_encoders:
164 target_code, target_encoder = current_encoders[base]
165 local_cache[obj_type] = (target_code, target_encoder)
166 self._enforce_cache_size(local_cache)
167 break
168 else:
169 local_cache[obj_type] = None
170 self._enforce_cache_size(local_cache)
172 # Execute & Wrap
173 if target_encoder:
174 try:
175 intermediate = target_encoder(obj)
176 except Exception as e:
177 raise SerializationError(
178 f"Error occurred within the custom encoder for type '{obj_type.__name__}'."
179 ) from e
181 try:
182 payload = msgpack.packb(
183 intermediate, default=self._default_packer, use_bin_type=True
184 )
185 return msgpack.ExtType(target_code, payload)
186 except (TypeError, SerializationError) as e:
187 raise SerializationError(
188 f"Encoder for '{obj_type.__name__}' returned a value that msgpack cannot serialize.\n"
189 f"Hint: Ensure your encoder returns a primitive type (dict, list, str, int, bytes, etc.).\n"
190 f" returned type: {type(intermediate).__name__}"
191 ) from e
193 try:
194 obj_repr = str(obj)[:200]
195 except Exception:
196 obj_repr = f"<{obj_type.__name__} (str() failed)>"
197 raise SerializationError(
198 f"Object of type '{obj_type.__name__}' is not serializable.\n"
199 f"Value: {obj_repr}...\n"
200 "Hint: Use `spot.register(...)` to handle this custom type."
201 )
203 def _ext_hook(self, code: int, data: bytes) -> Any:
204 # Lock-free read: スナップショットへの参照を取得
205 decoder = self._decoders.get(code)
207 if decoder is not None: 207 ↛ 221line 207 didn't jump to line 221 because the condition on line 207 was always true
208 try:
209 intermediate = msgpack.unpackb(data, ext_hook=self._ext_hook, raw=False)
210 return decoder(intermediate)
211 except SerializationError:
212 # Bug Fix (Bug7): 再帰的な _ext_hook から来た SerializationError を
213 # 再度ラップすると、元のエラーメッセージが「CRITICAL:...」で上書きされ
214 # 根本原因が隠れてしまう。そのままチェーンを保持して再送出する。
215 raise
216 except Exception as e:
217 raise SerializationError(
218 f"CRITICAL: Failed to decode custom type (ExtCode={code}).\n"
219 "The cached data might be corrupted or incompatible with the current decoder."
220 ) from e
221 raise SerializationError(
222 f"Received ExtType with unregistered code={code}. "
223 "The cache may have been created with a different serializer configuration."
224 )
226 def dumps(self, obj: Any) -> bytes:
227 try:
228 result = msgpack.packb(obj, default=self._default_packer, use_bin_type=True)
229 if result is None: 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true
230 raise SerializationError("msgpack.packb returned None unexpectedly.")
231 return result
232 except Exception as e:
233 if isinstance(e, SerializationError): 233 ↛ 235line 233 didn't jump to line 235 because the condition on line 233 was always true
234 raise e
235 raise SerializationError("Failed to serialize object tree.") from e
237 def loads(self, data: bytes) -> Any:
238 try:
239 return msgpack.unpackb(data, ext_hook=self._ext_hook, raw=False)
240 except Exception as e:
241 if isinstance(e, SerializationError): 241 ↛ 243line 241 didn't jump to line 243 because the condition on line 241 was always true
242 raise e
243 raise SerializationError("Failed to deserialize data.") from e