Coverage for src / beautyspot / serializer.py: 86%

107 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-03-18 18:20 +0900

1# src/beautyspot/serializer.py 

2 

3import threading 

4import msgpack 

5from collections import OrderedDict 

6from typing import ( 

7 Any, 

8 Callable, 

9 Dict, 

10 Type, 

11 TypeVar, 

12 Tuple, 

13 Protocol, 

14 runtime_checkable, 

15) 

16from beautyspot.exceptions import SerializationError 

17 

18T = TypeVar("T") 

19 

20 

21@runtime_checkable 

22class SerializerProtocol(Protocol): 

23 def dumps(self, obj: Any, /) -> bytes: ... 

24 def loads(self, data: bytes, /) -> Any: ... 

25 

26 

27@runtime_checkable 

28class TypeRegistryProtocol(Protocol): 

29 def register( 

30 self, 

31 type_class: Type[Any], 

32 code: int, 

33 encoder: Callable[[Any], Any], 

34 decoder: Callable[[Any], Any], 

35 ) -> None: ... 

36 

37 

38class MsgpackSerializer(SerializerProtocol, TypeRegistryProtocol): 

39 """ 

40 A secure and extensible serializer based on MessagePack. 

41 

42 Allows registering custom types via `register()`. 

43 Automatically handles packing/unpacking of custom type payloads. 

44 

45 Thread Safety (No-GIL Compatible): 

46 This class is entirely thread-safe and avoids lock contention on the critical 

47 path (during serialization/deserialization). It achieves this by using: 

48 1. **Copy-on-Write (CoW)** for the shared type registry (`_encoders`, `_decoders`). 

49 Registrations are rare, but reads happen per-node. CoW ensures readers always 

50 see a consistent, immutable snapshot of the registry without locking. 

51 2. **Thread-Local Storage** for the LRU subclass cache (`_local.subclass_cache`). 

52 This eliminates lock contention completely when traversing deep object trees 

53 concurrently across multiple threads. 

54 

55 Note: 

56 To prevent memory leaks in environments where types are generated dynamically 

57 (e.g., namedtuples, dynamic Pydantic models), subclass resolution results 

58 are cached using an LRU strategy with a configurable maximum size per thread. 

59 """ 

60 

61 def __init__(self, max_cache_size: int = 1024): 

62 # 共有レジストリ(Copy-on-Write) 

63 self._encoders: Dict[Type, Tuple[int, Callable[[Any], Any]]] = {} 

64 self._decoders: Dict[int, Callable[[Any], Any]] = {} 

65 

66 self._max_cache_size = max_cache_size 

67 

68 # スレッドローカルなLRUキャッシュ 

69 self._local = threading.local() 

70 

71 # 書き込み(register)を直列化するためのロック 

72 self._write_lock = threading.Lock() 

73 

74 # レジストリ世代カウンタ: register() のたびにインクリメントされ、 

75 # スレッドローカルキャッシュの無効化に使用する。 

76 self._registry_generation = 0 

77 

78 def _get_local_cache( 

79 self, 

80 ) -> OrderedDict[Type, Tuple[int, Callable[[Any], Any]] | None]: 

81 """現在のスレッド固有のLRUキャッシュを取得(必要なら初期化)する。 

82 

83 register() によりレジストリ世代が進んでいた場合、 

84 キャッシュをクリアして stale エントリの参照を防ぐ。 

85 """ 

86 gen = self._registry_generation 

87 if ( 

88 not hasattr(self._local, "subclass_cache") 

89 or getattr(self._local, "_cache_generation", -1) != gen 

90 ): 

91 self._local.subclass_cache = OrderedDict() 

92 self._local._cache_generation = gen 

93 return self._local.subclass_cache 

94 

95 def _enforce_cache_size(self, cache: OrderedDict): 

96 """スレッドローカルキャッシュのサイズを制限する""" 

97 while len(cache) > self._max_cache_size: 

98 cache.popitem(last=False) 

99 

100 def register( 

101 self, 

102 type_class: Type, 

103 code: int, 

104 encoder: Callable[[Any], Any], 

105 decoder: Callable[[Any], Any], 

106 ): 

107 if not (0 <= code <= 127): 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 raise ValueError(f"ExtCode must be between 0 and 127, got {code}.") 

109 

110 with self._write_lock: 

111 if code in self._decoders: 

112 raise ValueError(f"ExtCode {code} is already registered.") 

113 if type_class in self._encoders: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true

114 existing_code = self._encoders[type_class][0] 

115 raise ValueError( 

116 f"Type '{type_class.__name__}' is already registered " 

117 f"(code={existing_code}). " 

118 "Registering the same type twice would silently overwrite the " 

119 "encoder while leaving the old decoder orphaned." 

120 ) 

121 

122 # Copy-on-Write (CoW) 

123 # 現在の辞書のコピーを作成し、新しい要素を追加 

124 new_encoders = self._encoders.copy() 

125 new_decoders = self._decoders.copy() 

126 

127 new_encoders[type_class] = (code, encoder) 

128 new_decoders[code] = decoder 

129 

130 # PEP 703 (free-threading) 対応: 世代カウンタを参照差し替えの**前**に 

131 # インクリメントする。リーダーが新しい世代番号を見た時点で古いキャッシュを 

132 # 破棄するが、参照はまだ旧レジストリを指している可能性がある。 

133 # この順序であれば、リーダーは旧レジストリで安全に動作し、 

134 # 次回のアクセスで新レジストリを取得する。 

135 # 逆順(差し替え→インクリメント)だと、リーダーが新世代番号を見て 

136 # キャッシュを破棄した後、旧レジストリを参照して新しい型を見つけられない 

137 # 問題が生じる。 

138 self._registry_generation += 1 

139 self._encoders = new_encoders 

140 self._decoders = new_decoders 

141 

142 # src/beautyspot/serializer.py 

143 

144 def _default_packer(self, obj: Any) -> Any: 

145 obj_type = type(obj) 

146 target_code = None 

147 target_encoder = None 

148 

149 # Lock-free read: スナップショットへの参照を取得 

150 current_encoders = self._encoders 

151 local_cache = self._get_local_cache() 

152 

153 if obj_type in current_encoders: 

154 target_code, target_encoder = current_encoders[obj_type] 

155 elif obj_type in local_cache: 

156 cached = local_cache[obj_type] 

157 local_cache.move_to_end(obj_type) 

158 if cached is not None: 158 ↛ 173line 158 didn't jump to line 173 because the condition on line 158 was always true

159 target_code, target_encoder = cached 

160 else: 

161 # MROをスキャンして登録済みの基底クラスを探す 

162 for base in obj_type.__mro__: 

163 if base in current_encoders: 

164 target_code, target_encoder = current_encoders[base] 

165 local_cache[obj_type] = (target_code, target_encoder) 

166 self._enforce_cache_size(local_cache) 

167 break 

168 else: 

169 local_cache[obj_type] = None 

170 self._enforce_cache_size(local_cache) 

171 

172 # Execute & Wrap 

173 if target_encoder: 

174 try: 

175 intermediate = target_encoder(obj) 

176 except Exception as e: 

177 raise SerializationError( 

178 f"Error occurred within the custom encoder for type '{obj_type.__name__}'." 

179 ) from e 

180 

181 try: 

182 payload = msgpack.packb( 

183 intermediate, default=self._default_packer, use_bin_type=True 

184 ) 

185 return msgpack.ExtType(target_code, payload) 

186 except (TypeError, SerializationError) as e: 

187 raise SerializationError( 

188 f"Encoder for '{obj_type.__name__}' returned a value that msgpack cannot serialize.\n" 

189 f"Hint: Ensure your encoder returns a primitive type (dict, list, str, int, bytes, etc.).\n" 

190 f" returned type: {type(intermediate).__name__}" 

191 ) from e 

192 

193 try: 

194 obj_repr = str(obj)[:200] 

195 except Exception: 

196 obj_repr = f"<{obj_type.__name__} (str() failed)>" 

197 raise SerializationError( 

198 f"Object of type '{obj_type.__name__}' is not serializable.\n" 

199 f"Value: {obj_repr}...\n" 

200 "Hint: Use `spot.register(...)` to handle this custom type." 

201 ) 

202 

203 def _ext_hook(self, code: int, data: bytes) -> Any: 

204 # Lock-free read: スナップショットへの参照を取得 

205 decoder = self._decoders.get(code) 

206 

207 if decoder is not None: 207 ↛ 221line 207 didn't jump to line 221 because the condition on line 207 was always true

208 try: 

209 intermediate = msgpack.unpackb(data, ext_hook=self._ext_hook, raw=False) 

210 return decoder(intermediate) 

211 except SerializationError: 

212 # Bug Fix (Bug7): 再帰的な _ext_hook から来た SerializationError を 

213 # 再度ラップすると、元のエラーメッセージが「CRITICAL:...」で上書きされ 

214 # 根本原因が隠れてしまう。そのままチェーンを保持して再送出する。 

215 raise 

216 except Exception as e: 

217 raise SerializationError( 

218 f"CRITICAL: Failed to decode custom type (ExtCode={code}).\n" 

219 "The cached data might be corrupted or incompatible with the current decoder." 

220 ) from e 

221 raise SerializationError( 

222 f"Received ExtType with unregistered code={code}. " 

223 "The cache may have been created with a different serializer configuration." 

224 ) 

225 

226 def dumps(self, obj: Any) -> bytes: 

227 try: 

228 result = msgpack.packb(obj, default=self._default_packer, use_bin_type=True) 

229 if result is None: 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true

230 raise SerializationError("msgpack.packb returned None unexpectedly.") 

231 return result 

232 except Exception as e: 

233 if isinstance(e, SerializationError): 233 ↛ 235line 233 didn't jump to line 235 because the condition on line 233 was always true

234 raise e 

235 raise SerializationError("Failed to serialize object tree.") from e 

236 

237 def loads(self, data: bytes) -> Any: 

238 try: 

239 return msgpack.unpackb(data, ext_hook=self._ext_hook, raw=False) 

240 except Exception as e: 

241 if isinstance(e, SerializationError): 241 ↛ 243line 241 didn't jump to line 243 because the condition on line 241 was always true

242 raise e 

243 raise SerializationError("Failed to deserialize data.") from e