Coverage for src/configuraptor/helpers.py: 100%

92 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2026-02-11 11:45 +0100

1""" 

2Contains stand-alone helper functions. 

3""" 

4 

5import contextlib 

6import dataclasses as dc 

7import io 

8import math 

9import types 

10import typing 

11from collections import ChainMap 

12from pathlib import Path 

13 

14from expandvars import expand 

15from typeguard import TypeCheckError 

16from typeguard import check_type as _check_type 

17 

18try: 

19 import annotationlib 

20except ImportError: # pragma: no cover 

21 annotationlib = None 

22 

23 

24def camel_to_snake(s: str) -> str: 

25 """ 

26 Convert CamelCase to snake_case. 

27 

28 Source: 

29 https://stackoverflow.com/questions/1175208/elegant-python-function-to-convert-camelcase-to-snake-case 

30 """ 

31 return "".join([f"_{c.lower()}" if c.isupper() else c for c in s]).lstrip("_") 

32 

33 

34# def find_pyproject_toml() -> typing.Optional[str]: 

35# """ 

36# Find the project's config toml, looks up until it finds the project root (black's logic). 

37# """ 

38# return black.files.find_pyproject_toml((os.getcwd(),)) 

39 

40 

41def find_pyproject_toml(start_dir: typing.Optional[Path | str] = None) -> Path | None: 

42 """ 

43 Search for pyproject.toml starting from the current working directory \ 

44 and moving upwards in the directory tree. 

45 

46 Args: 

47 start_dir: Starting directory to begin the search. 

48 If not provided, uses the current working directory. 

49 

50 Returns: 

51 Path or None: Path object to the found pyproject.toml file, or None if not found. 

52 """ 

53 start_dir = Path.cwd() if start_dir is None else Path(start_dir).resolve() 

54 

55 current_dir = start_dir 

56 

57 while str(current_dir) != str(current_dir.root): 

58 pyproject_toml = current_dir / "pyproject.toml" 

59 if pyproject_toml.is_file(): 

60 return pyproject_toml 

61 current_dir = current_dir.parent 

62 

63 # If not found anywhere 

64 return None 

65 

66 

67Type = typing.Type[typing.Any] 

68 

69 

70def _cls_annotations(c: type) -> dict[str, type]: # pragma: no cover 

71 """ 

72 Functions to get the annotations of a class (excl inherited, use _all_annotations for that). 

73 

74 Uses `annotationlib` if available (since 3.14) and if so, resolves forward references immediately. 

75 """ 

76 if annotationlib: 

77 return typing.cast( 

78 dict[str, type], 

79 annotationlib.get_annotations(c, format=annotationlib.Format.VALUE, eval_str=True), 

80 ) 

81 else: 

82 # note: idk why but this is not equivalent (the first doesn't work well): 

83 # return getattr(c, "__annotations__", {}) 

84 return c.__dict__.get("__annotations__") or {} 

85 

86 

87def _all_annotations(cls: type) -> ChainMap[str, type]: 

88 """ 

89 Returns a dictionary-like ChainMap that includes annotations for all \ 

90 attributes defined in cls or inherited from superclasses. 

91 """ 

92 # chainmap reverses the iterable, so reverse again beforehand to keep order normally: 

93 

94 return ChainMap(*(_cls_annotations(c) for c in getattr(cls, "__mro__", []))) 

95 

96 

97def all_annotations(cls: Type, _except: typing.Iterable[str] = None) -> dict[str, type[object]]: 

98 """ 

99 Wrapper around `_all_annotations` that filters away any keys in _except. 

100 

101 It also flattens the ChainMap to a regular dict. 

102 """ 

103 if _except is None: 

104 _except = set() 

105 

106 _all = _all_annotations(cls) 

107 return {k: v for k, v in _all.items() if k not in _except} 

108 

109 

110T = typing.TypeVar("T") 

111 

112 

113def check_type(value: typing.Any, expected_type: typing.Type[T]) -> typing.TypeGuard[T]: 

114 """ 

115 Given a variable, check if it matches 'expected_type' (which can be a Union, parameterized generic etc.). 

116 

117 Based on typeguard but this returns a boolean instead of returning the value or throwing a TypeCheckError 

118 """ 

119 try: 

120 _check_type(value, expected_type) 

121 return True 

122 except TypeCheckError: 

123 return False 

124 

125 

126def is_builtin_type(_type: Type) -> bool: 

127 """ 

128 Returns whether _type is one of the builtin types. 

129 """ 

130 return _type.__module__ in ("__builtin__", "builtins") 

131 

132 

133# def is_builtin_class_instance(obj: typing.Any) -> bool: 

134# return is_builtin_type(obj.__class__) 

135 

136 

137def is_from_types_or_typing(_type: Type) -> bool: 

138 """ 

139 Returns whether _type is one of the stlib typing/types types. 

140 

141 e.g. types.UnionType or typing.Union 

142 """ 

143 return _type.__module__ in ("types", "typing") 

144 

145 

146def is_from_other_toml_supported_module(_type: Type) -> bool: 

147 """ 

148 Besides builtins, toml also supports 'datetime' and 'math' types, \ 

149 so this returns whether _type is a type from these stdlib modules. 

150 """ 

151 return _type.__module__ in ("datetime", "math") 

152 

153 

154def is_parameterized(_type: Type) -> bool: 

155 """ 

156 Returns whether _type is a parameterized type. 

157 

158 Examples: 

159 list[str] -> True 

160 str -> False 

161 """ 

162 return typing.get_origin(_type) is not None 

163 

164 

165def is_custom_class(_type: Type) -> bool: 

166 """ 

167 Tries to guess if _type is a builtin or a custom (user-defined) class. 

168 

169 Other logic in this module depends on knowing that. 

170 """ 

171 return ( 

172 type(_type) is type 

173 and not is_builtin_type(_type) 

174 and not is_from_other_toml_supported_module(_type) 

175 and not is_from_types_or_typing(_type) 

176 ) 

177 

178 

179def instance_of_custom_class(var: typing.Any) -> bool: 

180 """ 

181 Calls `is_custom_class` on an instance of a (possibly custom) class. 

182 """ 

183 return is_custom_class(var.__class__) 

184 

185 

186def is_union(sometype: typing.Type[typing.Any] | typing.Any) -> bool: 

187 """ 

188 Determines if a given type is a Union type. 

189 

190 A Union type in Python is used to represent a type that can be one of multiple 

191 types. This function checks whether the provided type object corresponds to a 

192 Union type as defined in Python's type hints or annotations. 

193 

194 Returns: 

195 bool 

196 True if the provided type is a Union type, False otherwise. 

197 """ 

198 origin = typing.get_origin(sometype) 

199 return origin in (typing.Union, types.UnionType) 

200 

201 

202def is_optional(_type: Type | typing.Any) -> bool: 

203 """ 

204 Tries to guess if _type could be optional. 

205 

206 Examples: 

207 None -> True 

208 NoneType -> True 

209 typing.Union[str, None] -> True 

210 str | None -> True 

211 list[str | None] -> False 

212 list[str] -> False 

213 """ 

214 if _type and (is_parameterized(_type) and typing.get_origin(_type) in (dict, list)) or (_type is math.nan): 

215 # e.g. list[str] 

216 # will crash issubclass to test it first here 

217 return False 

218 

219 try: 

220 return ( 

221 _type is None 

222 or types.NoneType in typing.get_args(_type) # union with Nonetype 

223 or issubclass(types.NoneType, _type) 

224 or issubclass(types.NoneType, type(_type)) # no type # Nonetype 

225 ) 

226 except TypeError: 

227 # probably some weird input that's not a type 

228 return False 

229 

230 

231def dataclass_field(cls: Type, key: str) -> typing.Optional[dc.Field[typing.Any]]: 

232 """ 

233 Get Field info for a dataclass cls. 

234 """ 

235 fields = getattr(cls, "__dataclass_fields__", {}) 

236 return fields.get(key) 

237 

238 

239@contextlib.contextmanager 

240def uncloseable(fd: typing.BinaryIO) -> typing.Generator[typing.BinaryIO, typing.Any, None]: 

241 """ 

242 Context manager which turns the fd's close operation to no-op for the duration of the context. 

243 """ 

244 close = fd.close 

245 fd.close = lambda: None # type: ignore 

246 yield fd 

247 fd.close = close # type: ignore 

248 

249 

250def as_binaryio(file: str | Path | typing.BinaryIO | None, mode: typing.Literal["rb", "wb"] = "rb") -> typing.BinaryIO: 

251 """ 

252 Convert a number of possible 'file' descriptions into a single BinaryIO interface. 

253 """ 

254 if isinstance(file, str): 

255 file = Path(file) 

256 if isinstance(file, Path): 

257 file = file.open(mode) 

258 if file is None: 

259 file = io.BytesIO() 

260 if isinstance(file, io.BytesIO): 

261 # so .read() works after .write(): 

262 file.seek(0) 

263 # so the with-statement doesn't close the in-memory file: 

264 file = uncloseable(file) # type: ignore 

265 

266 return file 

267 

268 

269def expand_posix_vars(posix_expr: str, context: dict[str, str]) -> str: 

270 """ 

271 Replace case-insensitive POSIX and Docker Compose-like environment variables in a string with their values. 

272 

273 Args: 

274 posix_expr (str): The input string containing case-insensitive POSIX or Docker Compose-like variables. 

275 context (dict): A dictionary containing variable names and their respective values. 

276 

277 Returns: 

278 str: The string with replaced variable values. 

279 """ 

280 return typing.cast(str, expand(posix_expr, environ=context)) 

281 

282 

283def expand_env_vars_into_toml_values(toml: dict[str, typing.Any], env: dict[str, typing.Any]) -> None: 

284 """ 

285 Recursively expands POSIX/Docker Compose-like environment variables in a TOML dictionary. 

286 

287 This function traverses a TOML dictionary and expands POSIX/Docker Compose-like 

288 environment variables (${VAR:default}) using values provided in the 'env' dictionary. 

289 It performs in-place modification of the 'toml' dictionary. 

290 

291 Args: 

292 toml (dict): A TOML dictionary with string values possibly containing environment variables. 

293 env (dict): A dictionary containing environment variable names and their respective values. 

294 

295 Returns: 

296 None: The function modifies the 'toml' dictionary in place. 

297 

298 Notes: 

299 The function recursively traverses the 'toml' dictionary. If a value is a string or a list of strings, 

300 it attempts to substitute any environment variables found within those strings using the 'env' dictionary. 

301 

302 Example: 

303 toml_data = { 

304 'key1': 'This has ${ENV_VAR:default}', 

305 'key2': ['String with ${ANOTHER_VAR}', 'Another ${YET_ANOTHER_VAR}'] 

306 } 

307 environment = { 

308 'ENV_VAR': 'replaced_value', 

309 'ANOTHER_VAR': 'value_1', 

310 'YET_ANOTHER_VAR': 'value_2' 

311 } 

312 

313 expand_env_vars_into_toml_values(toml_data, environment) 

314 # 'toml_data' will be modified in place: 

315 # { 

316 # 'key1': 'This has replaced_value', 

317 # 'key2': ['String with value_1', 'Another value_2'] 

318 # } 

319 """ 

320 if not toml or not env: # pragma: no cover 

321 return 

322 

323 for key, var in toml.items(): 

324 if isinstance(var, dict): 

325 expand_env_vars_into_toml_values(var, env) 

326 elif isinstance(var, list): 

327 toml[key] = [expand_posix_vars(value, env) if isinstance(value, str) else value for value in var] 

328 elif isinstance(var, str): 

329 toml[key] = expand_posix_vars(var, env) 

330 else: 

331 # nothing to substitute 

332 continue