zanj.serializing
1from __future__ import annotations 2 3import json 4import sys 5from dataclasses import dataclass 6from typing import IO, Any, Callable, Iterable, Sequence 7 8import numpy as np 9from muutils.json_serialize.array import arr_metadata 10from muutils.json_serialize.json_serialize import ( # JsonSerializer, 11 DEFAULT_HANDLERS, 12 ObjectPath, 13 SerializerHandler, 14) 15from muutils.json_serialize.util import ( 16 JSONdict, 17 JSONitem, 18 MonoTuple, 19 _FORMAT_KEY, 20 _REF_KEY, 21) 22 23from zanj.externals import ExternalItem, ExternalItemType, _ZANJ_pre 24 25KW_ONLY_KWARGS: dict = dict() 26if sys.version_info >= (3, 10): 27 KW_ONLY_KWARGS["kw_only"] = True 28 29# pylint: disable=unused-argument, protected-access, unexpected-keyword-arg 30# for some reason pylint complains about kwargs to ZANJSerializerHandler 31 32 33def jsonl_metadata(data: list[JSONdict]) -> dict: 34 """metadata about a jsonl object""" 35 all_cols: set[str] = set([col for item in data for col in item.keys()]) 36 return { 37 "data[0]": data[0], 38 "len(data)": len(data), 39 "columns": { 40 col: { 41 "types": list( 42 set([type(item[col]).__name__ for item in data if col in item]) 43 ), 44 "len": len([item[col] for item in data if col in item]), 45 } 46 for col in all_cols 47 if col != _FORMAT_KEY 48 }, 49 } 50 51 52def store_npy(self: _ZANJ_pre, fp: IO[bytes], data: np.ndarray) -> None: 53 """store numpy array to given file as .npy""" 54 # TODO: Type `<module 'numpy.lib'>` has no attribute `format` --> zanj/serializing.py:54:5 55 # info: rule `unresolved-attribute` is enabled by default 56 np.lib.format.write_array( # ty: ignore[unresolved-attribute] 57 fp=fp, 58 array=np.asanyarray(data), 59 allow_pickle=False, 60 ) 61 62 63def store_jsonl(self: _ZANJ_pre, fp: IO[bytes], data: Sequence[JSONitem]) -> None: 64 """store sequence to given file as .jsonl""" 65 66 for item in data: 67 fp.write(json.dumps(item).encode("utf-8")) 68 fp.write("\n".encode("utf-8")) 69 70 71EXTERNAL_STORE_FUNCS: dict[ 72 ExternalItemType, Callable[[_ZANJ_pre, IO[bytes], Any], None] 73] = { 74 "npy": store_npy, 75 "jsonl": store_jsonl, 76} 77 78 79@dataclass(**KW_ONLY_KWARGS) 80class ZANJSerializerHandler(SerializerHandler): 81 """a handler for ZANJ serialization""" 82 83 # unique identifier for the handler, saved in _FORMAT_KEY field 84 # uid: str 85 # source package of the handler -- note that this might be overridden by ZANJ 86 source_pckg: str 87 # (self_config, object) -> whether to use this handler 88 check: Callable[[_ZANJ_pre, Any, ObjectPath], bool] 89 # (self_config, object, path) -> serialized object 90 serialize_func: Callable[[_ZANJ_pre, Any, ObjectPath], JSONitem] 91 # optional description of how this serializer works 92 # desc: str = "(no description)" 93 94 95def zanj_external_serialize( 96 jser: _ZANJ_pre, 97 data: Any, 98 path: ObjectPath, 99 item_type: ExternalItemType, 100 _format: str, 101) -> JSONitem: 102 """stores a numpy array or jsonl externally in a ZANJ object 103 104 # Parameters: 105 - `jser: ZANJ` 106 - `data: Any` 107 - `path: ObjectPath` 108 - `item_type: ExternalItemType` 109 110 # Returns: 111 - `JSONitem` 112 json data with reference 113 114 # Modifies: 115 - modifies `jser._externals` 116 """ 117 # get the path, make sure its unique 118 assert isinstance(path, tuple), ( 119 f"path must be a tuple, got {type(path) = } {path = }" 120 ) 121 joined_path: str = "/".join([str(p) for p in path]) 122 archive_path: str = f"{joined_path}.{item_type}" 123 124 if archive_path in jser._externals: 125 raise ValueError(f"external path {archive_path} already exists!") 126 if any([p.startswith(joined_path) for p in jser._externals.keys()]): 127 raise ValueError(f"external path {joined_path} is a prefix of another path!") 128 129 # process the data if needed, assemble metadata 130 data_new: Any = data 131 output: dict = { 132 _FORMAT_KEY: _format, 133 _REF_KEY: archive_path, 134 } 135 if item_type == "npy": 136 # check type 137 data_type_str: str = str(type(data)) 138 if data_type_str == "<class 'torch.Tensor'>": 139 # detach and convert 140 data_new = data.detach().cpu().numpy() 141 elif data_type_str == "<class 'numpy.ndarray'>": 142 pass 143 else: 144 # if not a numpy array, except 145 raise TypeError(f"expected numpy.ndarray, got {data_type_str}") 146 # get metadata 147 output.update(arr_metadata(data)) 148 elif item_type.startswith("jsonl"): 149 # check via mro to avoid importing pandas 150 if any("pandas.core.frame.DataFrame" in str(t) for t in data.__class__.__mro__): 151 output["columns"] = data.columns.tolist() 152 data_new = data.to_dict(orient="records") 153 elif isinstance(data, (list, tuple, Iterable, Sequence)): 154 data_new = [ 155 jser.json_serialize(item, tuple(path) + (i,)) 156 for i, item in enumerate(data) 157 ] 158 else: 159 raise TypeError( 160 f"expected list or pandas.DataFrame for jsonl, got {type(data)}" 161 ) 162 163 if all([isinstance(item, dict) for item in data_new]): 164 output.update(jsonl_metadata(data_new)) 165 166 # store the item for external serialization 167 jser._externals[archive_path] = ExternalItem( 168 item_type=item_type, 169 data=data_new, 170 path=path, 171 ) 172 173 return output 174 175 176DEFAULT_SERIALIZER_HANDLERS_ZANJ: MonoTuple[ZANJSerializerHandler] = tuple( 177 [ 178 ZANJSerializerHandler( 179 check=lambda self, obj, path: ( 180 isinstance(obj, np.ndarray) 181 and obj.size >= self.external_array_threshold 182 ), 183 serialize_func=lambda self, obj, path: zanj_external_serialize( 184 self, obj, path, item_type="npy", _format="numpy.ndarray:external" 185 ), 186 uid="numpy.ndarray:external", 187 source_pckg="zanj", 188 desc="external numpy array", 189 ), 190 ZANJSerializerHandler( 191 check=lambda self, obj, path: ( 192 str(type(obj)) == "<class 'torch.Tensor'>" 193 and int(obj.nelement()) >= self.external_array_threshold 194 ), 195 serialize_func=lambda self, obj, path: zanj_external_serialize( 196 self, obj, path, item_type="npy", _format="torch.Tensor:external" 197 ), 198 uid="torch.Tensor:external", 199 source_pckg="zanj", 200 desc="external torch tensor", 201 ), 202 ZANJSerializerHandler( 203 check=lambda self, obj, path: isinstance(obj, list) 204 and len(obj) >= self.external_list_threshold, 205 serialize_func=lambda self, obj, path: zanj_external_serialize( 206 self, obj, path, item_type="jsonl", _format="list:external" 207 ), 208 uid="list:external", 209 source_pckg="zanj", 210 desc="external list", 211 ), 212 ZANJSerializerHandler( 213 check=lambda self, obj, path: isinstance(obj, tuple) 214 and len(obj) >= self.external_list_threshold, 215 serialize_func=lambda self, obj, path: zanj_external_serialize( 216 self, obj, path, item_type="jsonl", _format="tuple:external" 217 ), 218 uid="tuple:external", 219 source_pckg="zanj", 220 desc="external tuple", 221 ), 222 ZANJSerializerHandler( 223 check=lambda self, obj, path: ( 224 any( 225 "pandas.core.frame.DataFrame" in str(t) 226 for t in obj.__class__.__mro__ 227 ) 228 and len(obj) >= self.external_list_threshold 229 ), 230 serialize_func=lambda self, obj, path: zanj_external_serialize( 231 self, obj, path, item_type="jsonl", _format="pandas.DataFrame:external" 232 ), 233 uid="pandas.DataFrame:external", 234 source_pckg="zanj", 235 desc="external pandas DataFrame", 236 ), 237 # ZANJSerializerHandler( 238 # check=lambda self, obj, path: "<class 'torch.nn.modules.module.Module'>" 239 # in [str(t) for t in obj.__class__.__mro__], 240 # serialize_func=lambda self, obj, path: zanj_serialize_torchmodule( 241 # self, obj, path, 242 # ), 243 # uid="torch.nn.Module", 244 # source_pckg="zanj", 245 # desc="fallback torch serialization", 246 # ), 247 ] 248) + tuple( 249 DEFAULT_HANDLERS # type: ignore[arg-type] 250) 251 252# the complaint above is: 253# error: Argument 1 to "tuple" has incompatible type "Sequence[SerializerHandler]"; expected "Iterable[ZANJSerializerHandler]" [arg-type]
KW_ONLY_KWARGS: dict =
{'kw_only': True}
def
jsonl_metadata( data: list[typing.Dict[str, typing.Union[bool, int, float, str, NoneType, typing.List[typing.Union[bool, int, float, str, NoneType, typing.List[typing.Any], typing.Dict[str, typing.Any]]], typing.Dict[str, typing.Union[bool, int, float, str, NoneType, typing.List[typing.Any], typing.Dict[str, typing.Any]]]]]]) -> dict:
34def jsonl_metadata(data: list[JSONdict]) -> dict: 35 """metadata about a jsonl object""" 36 all_cols: set[str] = set([col for item in data for col in item.keys()]) 37 return { 38 "data[0]": data[0], 39 "len(data)": len(data), 40 "columns": { 41 col: { 42 "types": list( 43 set([type(item[col]).__name__ for item in data if col in item]) 44 ), 45 "len": len([item[col] for item in data if col in item]), 46 } 47 for col in all_cols 48 if col != _FORMAT_KEY 49 }, 50 }
metadata about a jsonl object
def
store_npy(self: Any, fp: IO[bytes], data: numpy.ndarray) -> None:
53def store_npy(self: _ZANJ_pre, fp: IO[bytes], data: np.ndarray) -> None: 54 """store numpy array to given file as .npy""" 55 # TODO: Type `<module 'numpy.lib'>` has no attribute `format` --> zanj/serializing.py:54:5 56 # info: rule `unresolved-attribute` is enabled by default 57 np.lib.format.write_array( # ty: ignore[unresolved-attribute] 58 fp=fp, 59 array=np.asanyarray(data), 60 allow_pickle=False, 61 )
store numpy array to given file as .npy
def
store_jsonl( self: Any, fp: IO[bytes], data: Sequence[Union[bool, int, float, str, NoneType, List[Union[bool, int, float, str, NoneType, List[Any], Dict[str, Any]]], Dict[str, Union[bool, int, float, str, NoneType, List[Any], Dict[str, Any]]]]]) -> None:
64def store_jsonl(self: _ZANJ_pre, fp: IO[bytes], data: Sequence[JSONitem]) -> None: 65 """store sequence to given file as .jsonl""" 66 67 for item in data: 68 fp.write(json.dumps(item).encode("utf-8")) 69 fp.write("\n".encode("utf-8"))
store sequence to given file as .jsonl
EXTERNAL_STORE_FUNCS: dict[typing.Literal['jsonl', 'npy'], typing.Callable[[typing.Any, typing.IO[bytes], typing.Any], NoneType]] =
{'npy': <function store_npy>, 'jsonl': <function store_jsonl>}
@dataclass(**KW_ONLY_KWARGS)
class
ZANJSerializerHandler80@dataclass(**KW_ONLY_KWARGS) 81class ZANJSerializerHandler(SerializerHandler): 82 """a handler for ZANJ serialization""" 83 84 # unique identifier for the handler, saved in _FORMAT_KEY field 85 # uid: str 86 # source package of the handler -- note that this might be overridden by ZANJ 87 source_pckg: str 88 # (self_config, object) -> whether to use this handler 89 check: Callable[[_ZANJ_pre, Any, ObjectPath], bool] 90 # (self_config, object, path) -> serialized object 91 serialize_func: Callable[[_ZANJ_pre, Any, ObjectPath], JSONitem] 92 # optional description of how this serializer works 93 # desc: str = "(no description)"
a handler for ZANJ serialization
ZANJSerializerHandler( uid: str, desc: str, *, check: Callable[[Any, Any, tuple[Union[str, int], ...]], bool], serialize_func: Callable[[Any, Any, tuple[Union[str, int], ...]], Union[bool, int, float, str, NoneType, List[Union[bool, int, float, str, NoneType, List[Any], Dict[str, Any]]], Dict[str, Union[bool, int, float, str, NoneType, List[Any], Dict[str, Any]]]]], source_pckg: str)
serialize_func: Callable[[Any, Any, tuple[Union[str, int], ...]], Union[bool, int, float, str, NoneType, List[Union[bool, int, float, str, NoneType, List[Any], Dict[str, Any]]], Dict[str, Union[bool, int, float, str, NoneType, List[Any], Dict[str, Any]]]]]
Inherited Members
- muutils.json_serialize.json_serialize.SerializerHandler
- uid
- desc
- serialize
def
zanj_external_serialize( jser: Any, data: Any, path: tuple[typing.Union[str, int], ...], item_type: Literal['jsonl', 'npy'], _format: str) -> Union[bool, int, float, str, NoneType, List[Union[bool, int, float, str, NoneType, List[Any], Dict[str, Any]]], Dict[str, Union[bool, int, float, str, NoneType, List[Any], Dict[str, Any]]]]:
96def zanj_external_serialize( 97 jser: _ZANJ_pre, 98 data: Any, 99 path: ObjectPath, 100 item_type: ExternalItemType, 101 _format: str, 102) -> JSONitem: 103 """stores a numpy array or jsonl externally in a ZANJ object 104 105 # Parameters: 106 - `jser: ZANJ` 107 - `data: Any` 108 - `path: ObjectPath` 109 - `item_type: ExternalItemType` 110 111 # Returns: 112 - `JSONitem` 113 json data with reference 114 115 # Modifies: 116 - modifies `jser._externals` 117 """ 118 # get the path, make sure its unique 119 assert isinstance(path, tuple), ( 120 f"path must be a tuple, got {type(path) = } {path = }" 121 ) 122 joined_path: str = "/".join([str(p) for p in path]) 123 archive_path: str = f"{joined_path}.{item_type}" 124 125 if archive_path in jser._externals: 126 raise ValueError(f"external path {archive_path} already exists!") 127 if any([p.startswith(joined_path) for p in jser._externals.keys()]): 128 raise ValueError(f"external path {joined_path} is a prefix of another path!") 129 130 # process the data if needed, assemble metadata 131 data_new: Any = data 132 output: dict = { 133 _FORMAT_KEY: _format, 134 _REF_KEY: archive_path, 135 } 136 if item_type == "npy": 137 # check type 138 data_type_str: str = str(type(data)) 139 if data_type_str == "<class 'torch.Tensor'>": 140 # detach and convert 141 data_new = data.detach().cpu().numpy() 142 elif data_type_str == "<class 'numpy.ndarray'>": 143 pass 144 else: 145 # if not a numpy array, except 146 raise TypeError(f"expected numpy.ndarray, got {data_type_str}") 147 # get metadata 148 output.update(arr_metadata(data)) 149 elif item_type.startswith("jsonl"): 150 # check via mro to avoid importing pandas 151 if any("pandas.core.frame.DataFrame" in str(t) for t in data.__class__.__mro__): 152 output["columns"] = data.columns.tolist() 153 data_new = data.to_dict(orient="records") 154 elif isinstance(data, (list, tuple, Iterable, Sequence)): 155 data_new = [ 156 jser.json_serialize(item, tuple(path) + (i,)) 157 for i, item in enumerate(data) 158 ] 159 else: 160 raise TypeError( 161 f"expected list or pandas.DataFrame for jsonl, got {type(data)}" 162 ) 163 164 if all([isinstance(item, dict) for item in data_new]): 165 output.update(jsonl_metadata(data_new)) 166 167 # store the item for external serialization 168 jser._externals[archive_path] = ExternalItem( 169 item_type=item_type, 170 data=data_new, 171 path=path, 172 ) 173 174 return output
stores a numpy array or jsonl externally in a ZANJ object
Parameters:
jser: ZANJdata: Anypath: ObjectPathitem_type: ExternalItemType
Returns:
JSONitemjson data with reference
Modifies:
- modifies
jser._externals
DEFAULT_SERIALIZER_HANDLERS_ZANJ: None =
(ZANJSerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='numpy.ndarray:external', desc='external numpy array', source_pckg='zanj'), ZANJSerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='torch.Tensor:external', desc='external torch tensor', source_pckg='zanj'), ZANJSerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='list:external', desc='external list', source_pckg='zanj'), ZANJSerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='tuple:external', desc='external tuple', source_pckg='zanj'), ZANJSerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='pandas.DataFrame:external', desc='external pandas DataFrame', source_pckg='zanj'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='base types', desc='base types (bool, int, float, str, None)'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='dictionaries', desc='dictionaries'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='(list, tuple) -> list', desc='lists and tuples as lists'), SerializerHandler(check=<function <lambda>>, serialize_func=<function _serialize_override_serialize_func>, uid='.serialize override', desc='objects with .serialize method'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='namedtuple -> dict', desc='namedtuples as dicts'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='dataclass -> dict', desc='dataclasses as dicts'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='path -> str', desc='Path objects as posix strings'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='obj -> str(obj)', desc='directly serialize objects in `SERIALIZE_DIRECT_AS_STR` to strings'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='numpy.ndarray', desc='numpy arrays'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='torch.Tensor', desc='pytorch tensors'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='pandas.DataFrame', desc='pandas DataFrames'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='(set, list, tuple, Iterable) -> list', desc='sets, lists, tuples, and Iterables as lists'), SerializerHandler(check=<function <lambda>>, serialize_func=<function <lambda>>, uid='fallback', desc='fallback handler -- serialize object attributes and special functions as strings'))