Coverage for src/configuraptor/helpers.py: 100%
92 statements
« prev ^ index » next coverage.py v7.2.7, created at 2026-02-11 11:45 +0100
« prev ^ index » next coverage.py v7.2.7, created at 2026-02-11 11:45 +0100
1"""
2Contains stand-alone helper functions.
3"""
5import contextlib
6import dataclasses as dc
7import io
8import math
9import types
10import typing
11from collections import ChainMap
12from pathlib import Path
14from expandvars import expand
15from typeguard import TypeCheckError
16from typeguard import check_type as _check_type
18try:
19 import annotationlib
20except ImportError: # pragma: no cover
21 annotationlib = None
24def camel_to_snake(s: str) -> str:
25 """
26 Convert CamelCase to snake_case.
28 Source:
29 https://stackoverflow.com/questions/1175208/elegant-python-function-to-convert-camelcase-to-snake-case
30 """
31 return "".join([f"_{c.lower()}" if c.isupper() else c for c in s]).lstrip("_")
34# def find_pyproject_toml() -> typing.Optional[str]:
35# """
36# Find the project's config toml, looks up until it finds the project root (black's logic).
37# """
38# return black.files.find_pyproject_toml((os.getcwd(),))
41def find_pyproject_toml(start_dir: typing.Optional[Path | str] = None) -> Path | None:
42 """
43 Search for pyproject.toml starting from the current working directory \
44 and moving upwards in the directory tree.
46 Args:
47 start_dir: Starting directory to begin the search.
48 If not provided, uses the current working directory.
50 Returns:
51 Path or None: Path object to the found pyproject.toml file, or None if not found.
52 """
53 start_dir = Path.cwd() if start_dir is None else Path(start_dir).resolve()
55 current_dir = start_dir
57 while str(current_dir) != str(current_dir.root):
58 pyproject_toml = current_dir / "pyproject.toml"
59 if pyproject_toml.is_file():
60 return pyproject_toml
61 current_dir = current_dir.parent
63 # If not found anywhere
64 return None
67Type = typing.Type[typing.Any]
70def _cls_annotations(c: type) -> dict[str, type]: # pragma: no cover
71 """
72 Functions to get the annotations of a class (excl inherited, use _all_annotations for that).
74 Uses `annotationlib` if available (since 3.14) and if so, resolves forward references immediately.
75 """
76 if annotationlib:
77 return typing.cast(
78 dict[str, type],
79 annotationlib.get_annotations(c, format=annotationlib.Format.VALUE, eval_str=True),
80 )
81 else:
82 # note: idk why but this is not equivalent (the first doesn't work well):
83 # return getattr(c, "__annotations__", {})
84 return c.__dict__.get("__annotations__") or {}
87def _all_annotations(cls: type) -> ChainMap[str, type]:
88 """
89 Returns a dictionary-like ChainMap that includes annotations for all \
90 attributes defined in cls or inherited from superclasses.
91 """
92 # chainmap reverses the iterable, so reverse again beforehand to keep order normally:
94 return ChainMap(*(_cls_annotations(c) for c in getattr(cls, "__mro__", [])))
97def all_annotations(cls: Type, _except: typing.Iterable[str] = None) -> dict[str, type[object]]:
98 """
99 Wrapper around `_all_annotations` that filters away any keys in _except.
101 It also flattens the ChainMap to a regular dict.
102 """
103 if _except is None:
104 _except = set()
106 _all = _all_annotations(cls)
107 return {k: v for k, v in _all.items() if k not in _except}
110T = typing.TypeVar("T")
113def check_type(value: typing.Any, expected_type: typing.Type[T]) -> typing.TypeGuard[T]:
114 """
115 Given a variable, check if it matches 'expected_type' (which can be a Union, parameterized generic etc.).
117 Based on typeguard but this returns a boolean instead of returning the value or throwing a TypeCheckError
118 """
119 try:
120 _check_type(value, expected_type)
121 return True
122 except TypeCheckError:
123 return False
126def is_builtin_type(_type: Type) -> bool:
127 """
128 Returns whether _type is one of the builtin types.
129 """
130 return _type.__module__ in ("__builtin__", "builtins")
133# def is_builtin_class_instance(obj: typing.Any) -> bool:
134# return is_builtin_type(obj.__class__)
137def is_from_types_or_typing(_type: Type) -> bool:
138 """
139 Returns whether _type is one of the stlib typing/types types.
141 e.g. types.UnionType or typing.Union
142 """
143 return _type.__module__ in ("types", "typing")
146def is_from_other_toml_supported_module(_type: Type) -> bool:
147 """
148 Besides builtins, toml also supports 'datetime' and 'math' types, \
149 so this returns whether _type is a type from these stdlib modules.
150 """
151 return _type.__module__ in ("datetime", "math")
154def is_parameterized(_type: Type) -> bool:
155 """
156 Returns whether _type is a parameterized type.
158 Examples:
159 list[str] -> True
160 str -> False
161 """
162 return typing.get_origin(_type) is not None
165def is_custom_class(_type: Type) -> bool:
166 """
167 Tries to guess if _type is a builtin or a custom (user-defined) class.
169 Other logic in this module depends on knowing that.
170 """
171 return (
172 type(_type) is type
173 and not is_builtin_type(_type)
174 and not is_from_other_toml_supported_module(_type)
175 and not is_from_types_or_typing(_type)
176 )
179def instance_of_custom_class(var: typing.Any) -> bool:
180 """
181 Calls `is_custom_class` on an instance of a (possibly custom) class.
182 """
183 return is_custom_class(var.__class__)
186def is_union(sometype: typing.Type[typing.Any] | typing.Any) -> bool:
187 """
188 Determines if a given type is a Union type.
190 A Union type in Python is used to represent a type that can be one of multiple
191 types. This function checks whether the provided type object corresponds to a
192 Union type as defined in Python's type hints or annotations.
194 Returns:
195 bool
196 True if the provided type is a Union type, False otherwise.
197 """
198 origin = typing.get_origin(sometype)
199 return origin in (typing.Union, types.UnionType)
202def is_optional(_type: Type | typing.Any) -> bool:
203 """
204 Tries to guess if _type could be optional.
206 Examples:
207 None -> True
208 NoneType -> True
209 typing.Union[str, None] -> True
210 str | None -> True
211 list[str | None] -> False
212 list[str] -> False
213 """
214 if _type and (is_parameterized(_type) and typing.get_origin(_type) in (dict, list)) or (_type is math.nan):
215 # e.g. list[str]
216 # will crash issubclass to test it first here
217 return False
219 try:
220 return (
221 _type is None
222 or types.NoneType in typing.get_args(_type) # union with Nonetype
223 or issubclass(types.NoneType, _type)
224 or issubclass(types.NoneType, type(_type)) # no type # Nonetype
225 )
226 except TypeError:
227 # probably some weird input that's not a type
228 return False
231def dataclass_field(cls: Type, key: str) -> typing.Optional[dc.Field[typing.Any]]:
232 """
233 Get Field info for a dataclass cls.
234 """
235 fields = getattr(cls, "__dataclass_fields__", {})
236 return fields.get(key)
239@contextlib.contextmanager
240def uncloseable(fd: typing.BinaryIO) -> typing.Generator[typing.BinaryIO, typing.Any, None]:
241 """
242 Context manager which turns the fd's close operation to no-op for the duration of the context.
243 """
244 close = fd.close
245 fd.close = lambda: None # type: ignore
246 yield fd
247 fd.close = close # type: ignore
250def as_binaryio(file: str | Path | typing.BinaryIO | None, mode: typing.Literal["rb", "wb"] = "rb") -> typing.BinaryIO:
251 """
252 Convert a number of possible 'file' descriptions into a single BinaryIO interface.
253 """
254 if isinstance(file, str):
255 file = Path(file)
256 if isinstance(file, Path):
257 file = file.open(mode)
258 if file is None:
259 file = io.BytesIO()
260 if isinstance(file, io.BytesIO):
261 # so .read() works after .write():
262 file.seek(0)
263 # so the with-statement doesn't close the in-memory file:
264 file = uncloseable(file) # type: ignore
266 return file
269def expand_posix_vars(posix_expr: str, context: dict[str, str]) -> str:
270 """
271 Replace case-insensitive POSIX and Docker Compose-like environment variables in a string with their values.
273 Args:
274 posix_expr (str): The input string containing case-insensitive POSIX or Docker Compose-like variables.
275 context (dict): A dictionary containing variable names and their respective values.
277 Returns:
278 str: The string with replaced variable values.
279 """
280 return typing.cast(str, expand(posix_expr, environ=context))
283def expand_env_vars_into_toml_values(toml: dict[str, typing.Any], env: dict[str, typing.Any]) -> None:
284 """
285 Recursively expands POSIX/Docker Compose-like environment variables in a TOML dictionary.
287 This function traverses a TOML dictionary and expands POSIX/Docker Compose-like
288 environment variables (${VAR:default}) using values provided in the 'env' dictionary.
289 It performs in-place modification of the 'toml' dictionary.
291 Args:
292 toml (dict): A TOML dictionary with string values possibly containing environment variables.
293 env (dict): A dictionary containing environment variable names and their respective values.
295 Returns:
296 None: The function modifies the 'toml' dictionary in place.
298 Notes:
299 The function recursively traverses the 'toml' dictionary. If a value is a string or a list of strings,
300 it attempts to substitute any environment variables found within those strings using the 'env' dictionary.
302 Example:
303 toml_data = {
304 'key1': 'This has ${ENV_VAR:default}',
305 'key2': ['String with ${ANOTHER_VAR}', 'Another ${YET_ANOTHER_VAR}']
306 }
307 environment = {
308 'ENV_VAR': 'replaced_value',
309 'ANOTHER_VAR': 'value_1',
310 'YET_ANOTHER_VAR': 'value_2'
311 }
313 expand_env_vars_into_toml_values(toml_data, environment)
314 # 'toml_data' will be modified in place:
315 # {
316 # 'key1': 'This has replaced_value',
317 # 'key2': ['String with value_1', 'Another value_2']
318 # }
319 """
320 if not toml or not env: # pragma: no cover
321 return
323 for key, var in toml.items():
324 if isinstance(var, dict):
325 expand_env_vars_into_toml_values(var, env)
326 elif isinstance(var, list):
327 toml[key] = [expand_posix_vars(value, env) if isinstance(value, str) else value for value in var]
328 elif isinstance(var, str):
329 toml[key] = expand_posix_vars(var, env)
330 else:
331 # nothing to substitute
332 continue