Coverage for src\derivepassphrase\_types.py: 100.000%
273 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 12:17 +0200
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 12:17 +0200
1# SPDX-FileCopyrightText: 2025 Marco Ricci <software@the13thletter.info>
2#
3# SPDX-License-Identifier: Zlib
5"""Types used by derivepassphrase."""
7from __future__ import annotations
9import enum
10import json
11import math
12import string
13import warnings
14from typing import TYPE_CHECKING, Generic, Protocol, TypeVar, cast
16from typing_extensions import (
17 Buffer,
18 NamedTuple,
19 NotRequired,
20 TypeAlias,
21 TypedDict,
22 deprecated,
23 get_overloads,
24 overload,
25 runtime_checkable,
26)
28if TYPE_CHECKING:
29 from collections.abc import Callable, Iterator, Sequence
30 from typing import Literal
32 from typing_extensions import (
33 Any,
34 Required,
35 TypeIs,
36 )
38__all__ = (
39 'SSH_AGENT',
40 'SSH_AGENTC',
41 'SSHKeyCommentPair',
42 'VaultConfig',
43 'is_vault_config',
44)
47# For type checking purposes, not actual use, so no coverage.
48class _Omitted: # pragma: no cover
49 def __bool__(self) -> bool:
50 return False
52 def __repr__(self) -> str:
53 return '...'
56class VaultConfigGlobalSettings(TypedDict, total=False):
57 r"""Configuration for vault: global settings.
59 Attributes:
60 key:
61 The base64-encoded ssh public key to use, overriding the
62 master passphrase. Optional.
63 phrase:
64 The master passphrase. Optional.
65 unicode_normalization_form:
66 The preferred Unicode normalization form; we warn the user
67 if textual passphrases do not match their normalized forms.
68 Optional, and a `derivepassphrase` extension.
69 length:
70 Desired passphrase length.
71 repeat:
72 The maximum number of immediate character repetitions
73 allowed in the passphrase. Disabled if set to 0.
74 lower:
75 Optional constraint on ASCII lowercase characters. If
76 positive, include this many lowercase characters
77 somewhere in the passphrase. If 0, avoid lowercase
78 characters altogether.
79 upper:
80 Same as `lower`, but for ASCII uppercase characters.
81 number:
82 Same as `lower`, but for ASCII digits.
83 space:
84 Same as `lower`, but for the space character.
85 dash:
86 Same as `lower`, but for the hyphen-minus and underscore
87 characters.
88 symbol:
89 Same as `lower`, but for all other hitherto unlisted
90 ASCII printable characters (except backquote).
92 """
94 key: NotRequired[str]
95 """"""
96 phrase: NotRequired[str]
97 """"""
98 unicode_normalization_form: NotRequired[
99 Literal['NFC', 'NFD', 'NFKC', 'NFKD']
100 ]
101 """"""
102 length: NotRequired[int]
103 """"""
104 repeat: NotRequired[int]
105 """"""
106 lower: NotRequired[int]
107 """"""
108 upper: NotRequired[int]
109 """"""
110 number: NotRequired[int]
111 """"""
112 space: NotRequired[int]
113 """"""
114 dash: NotRequired[int]
115 """"""
116 symbol: NotRequired[int]
117 """"""
120class VaultConfigServicesSettings(VaultConfigGlobalSettings, total=False):
121 r"""Configuration for vault: services settings.
123 Attributes:
124 notes:
125 Optional notes for this service, to display to the user when
126 generating the passphrase.
127 key:
128 As per the global settings.
129 phrase:
130 As per the global settings.
131 unicode_normalization_form:
132 As per the global settings.
133 length:
134 As per the global settings.
135 repeat:
136 As per the global settings.
137 lower:
138 As per the global settings.
139 upper:
140 As per the global settings.
141 number:
142 As per the global settings.
143 space:
144 As per the global settings.
145 dash:
146 As per the global settings.
147 symbol:
148 As per the global settings.
150 """
152 notes: NotRequired[str]
153 """"""
156_VaultConfig = TypedDict(
157 '_VaultConfig',
158 {'global': NotRequired[VaultConfigGlobalSettings]},
159 total=False,
160)
163class VaultConfig(TypedDict, _VaultConfig, total=False):
164 r"""Configuration for vault. For typing purposes.
166 Usually stored as JSON.
168 Attributes:
169 global (NotRequired[VaultConfigGlobalSettings]):
170 Global settings.
171 services (Required[dict[str, VaultConfigServicesSettings]]):
172 Service-specific settings.
174 """
176 services: Required[dict[str, VaultConfigServicesSettings]]
179def json_path(path: Sequence[str | int], /) -> str:
180 r"""Transform a series of keys and indices into a JSONPath selector.
182 The resulting JSONPath selector conforms to RFC 9535, is always
183 rooted at the JSON root node (i.e., starts with `$`), and only
184 contains name and index selectors (in shorthand dot notation, where
185 possible).
187 Args:
188 path:
189 A sequence of object keys or array indices to navigate to
190 the desired JSON value, starting from the root node.
192 Returns:
193 A valid JSONPath selector (a string) identifying the desired
194 JSON value.
196 Examples:
197 >>> json_path(['global', 'phrase'])
198 '$.global.phrase'
199 >>> json_path(['services', 'service name with spaces', 'length'])
200 '$.services["service name with spaces"].length'
201 >>> json_path(['services', 'special\u000acharacters', 'notes'])
202 '$.services["special\\ncharacters"].notes'
203 >>> print(json_path(['services', 'special\u000acharacters', 'notes']))
204 $.services["special\ncharacters"].notes
205 >>> json_path(['custom_array', 2, 0])
206 '$.custom_array[2][0]'
208 """
210 def needs_longhand(x: str | int) -> bool: 1acjnzOVABqkfr8WCXYDEZ012345gmP6FGH7tQRSIJTUoiphluevKLMNwxysbd
211 initial = ( 1acjnzOVABqkfr8WCXYDEZ012345gmP6FGH7tQRSIJTUoiphluevKLMNwxysbd
212 frozenset(string.ascii_lowercase)
213 | frozenset(string.ascii_uppercase)
214 | frozenset('_')
215 )
216 chars = initial | frozenset(string.digits) 1acjnzOVABqkfr8WCXYDEZ012345gmP6FGH7tQRSIJTUoiphluevKLMNwxysbd
217 return not ( 1acjnzOVABqkfr8WCXYDEZ012345gmP6FGH7tQRSIJTUoiphluevKLMNwxysbd
218 isinstance(x, str)
219 and x
220 and set(x).issubset(chars)
221 and x[:1] in initial
222 )
224 chunks = ['$'] 1acjnzOVABqkfr8WCXYDEZ012345gmP6FGH7tQRSIJTUoiphluevKLMNwxysbd
225 chunks.extend( 1acjnzOVABqkfr8WCXYDEZ012345gmP6FGH7tQRSIJTUoiphluevKLMNwxysbd
226 f'[{json.dumps(x)}]' if needs_longhand(x) else f'.{x}' for x in path
227 )
228 return ''.join(chunks) 1acjnzOVABqkfr8WCXYDEZ012345gmP6FGH7tQRSIJTUoiphluevKLMNwxysbd
231class _VaultConfigValidator:
232 INVALID_CONFIG_ERROR = 'vault config is invalid'
234 def __init__(self, maybe_config: Any) -> None: # noqa: ANN401
235 self.maybe_config = maybe_config 1acjnzOVABqkf9r8WCXYDEZ012345gmP6FGH7tQRSI!JTUoiphluevKLMNwxys#bd
237 def traverse_path(self, path: tuple[str, ...]) -> Any: # noqa: ANN401
238 obj = self.maybe_config 1acjnkfgmoiphlebd
239 for key in path: 1acjnkfgmoiphlebd
240 obj = obj[key] 1acjnkfgmoiphlebd
241 return obj 1acjnkfgmoiphlebd
243 def walk_subconfigs(
244 self,
245 ) -> Iterator[tuple[tuple[str] | tuple[str, str], str, Any]]:
246 obj = cast('dict[str, dict[str, Any]]', self.maybe_config) 1acjnzOVABqkfrWCXYDEZ012345gmP6FGHtQRSIJTUoiphluevKLMNwxysbd
247 if isinstance(obj.get('global', False), dict): 1acjnzOVABqkfrWCXYDEZ012345gmP6FGHtQRSIJTUoiphluevKLMNwxysbd
248 for k, v in list(obj['global'].items()): 1acjzVABqkfrWCXYDEZ012345g6FGHtIJipuevKLMNwxysbd
249 yield ('global',), k, v 1acjzVABqkfrWCXYDEZ012345g6FGHtIJipuevKLMNwxysbd
250 for sv_name, sv_obj in list(obj['services'].items()): 1acjnzOVABqkfrWCXYDEZ012345gmP6FGHtQRSIJTUoiphluevKLMNwxysbd
251 for k, v in list(sv_obj.items()): 1acjnzOVABqkfrWCDEgmPFGHtQRSIJTUohluevKLMNwxysbd
252 yield ('services', sv_name), k, v 1acjnzOVABqkfrCDEgmPFGHtQRSIJTUohluevKLMNwxysbd
254 def validate( # noqa: C901,PLR0912
255 self,
256 *,
257 allow_unknown_settings: bool = False,
258 ) -> None:
259 err_obj_not_a_dict = 'vault config is not a dict' 1acjnzOVABqkf9r8WCXYDEZ012345gmP6FGH7tQRSI!JTUoiphluevKLMNwxys#bd
260 err_non_str_service_name = ( 1acjnzOVABqkf9r8WCXYDEZ012345gmP6FGH7tQRSI!JTUoiphluevKLMNwxys#bd
261 'vault config contains non-string service name {sv_name!r}'
262 )
263 err_not_a_dict = 'vault config entry {json_path_str} is not a dict' 1acjnzOVABqkf9r8WCXYDEZ012345gmP6FGH7tQRSI!JTUoiphluevKLMNwxys#bd
264 err_not_a_string = 'vault config entry {json_path_str} is not a string' 1acjnzOVABqkf9r8WCXYDEZ012345gmP6FGH7tQRSI!JTUoiphluevKLMNwxys#bd
265 err_not_an_int = 'vault config entry {json_path_str} is not an integer' 1acjnzOVABqkf9r8WCXYDEZ012345gmP6FGH7tQRSI!JTUoiphluevKLMNwxys#bd
266 err_unknown_setting = ( 1acjnzOVABqkf9r8WCXYDEZ012345gmP6FGH7tQRSI!JTUoiphluevKLMNwxys#bd
267 'vault config entry {json_path_str} uses unknown setting {key!r}'
268 )
269 err_bad_number0 = 'vault config entry {json_path_str} is negative' 1acjnzOVABqkf9r8WCXYDEZ012345gmP6FGH7tQRSI!JTUoiphluevKLMNwxys#bd
270 err_bad_number1 = 'vault config entry {json_path_str} is not positive' 1acjnzOVABqkf9r8WCXYDEZ012345gmP6FGH7tQRSI!JTUoiphluevKLMNwxys#bd
272 kwargs: dict[str, Any] = { 1acjnzOVABqkf9r8WCXYDEZ012345gmP6FGH7tQRSI!JTUoiphluevKLMNwxys#bd
273 'allow_unknown_settings': allow_unknown_settings,
274 }
275 if not isinstance(self.maybe_config, dict): 1acjnzOVABqkf9r8WCXYDEZ012345gmP6FGH7tQRSI!JTUoiphluevKLMNwxys#bd
276 raise TypeError(err_obj_not_a_dict.format(**kwargs)) 19!#bd
277 if 'global' in self.maybe_config: 1acjnzOVABqkfr8WCXYDEZ012345gmP6FGH7tQRSIJTUoiphluevKLMNwxysbd
278 o_global = self.maybe_config['global'] 1acjzVABqkfrWCXYDEZ012345g6FGH7tIJipuevKLMNwxysbd
279 if not isinstance(o_global, dict): 1acjzVABqkfrWCXYDEZ012345g6FGH7tIJipuevKLMNwxysbd
280 kwargs['json_path_str'] = json_path(['global']) 17bd
281 raise TypeError(err_not_a_dict.format(**kwargs)) 17bd
282 if not isinstance(self.maybe_config.get('services'), dict): 1acjnzOVABqkfr8WCXYDEZ012345gmP6FGH7tQRSIJTUoiphluevKLMNwxysbd
283 kwargs['json_path_str'] = json_path(['services']) 187bd
284 raise TypeError(err_not_a_dict.format(**kwargs)) 187bd
285 for sv_name, service in self.maybe_config['services'].items(): 1acjnzOVABqkfrWCXYDEZ012345gmP6FGHtQRSIJTUoiphluevKLMNwxysbd
286 if not isinstance(sv_name, str): 1acjnzOVABqkfrWCDEgmPFGHtQRSIJTUohluevKLMNwxysbd
287 kwargs['sv_name'] = sv_name 1bd
288 raise TypeError(err_non_str_service_name.format(**kwargs)) 1bd
289 if not isinstance(service, dict): 1acjnzOVABqkfrWCDEgmPFGHtQRSIJTUohluevKLMNwxysbd
290 kwargs['json_path_str'] = json_path(['services', sv_name]) 1bd
291 raise TypeError(err_not_a_dict.format(**kwargs)) 1bd
292 for path, key, value in self.walk_subconfigs(): 1acjnzOVABqkfrWCXYDEZ012345gmP6FGHtQRSIJTUoiphluevKLMNwxysbd
293 kwargs['path'] = path 1acjnzOVABqkfrWCXYDEZ012345gmP6FGHtQRSIJTUoiphluevKLMNwxysbd
294 kwargs['key'] = key 1acjnzOVABqkfrWCXYDEZ012345gmP6FGHtQRSIJTUoiphluevKLMNwxysbd
295 kwargs['value'] = value 1acjnzOVABqkfrWCXYDEZ012345gmP6FGHtQRSIJTUoiphluevKLMNwxysbd
296 kwargs['json_path_str'] = json_path([*path, key]) 1acjnzOVABqkfrWCXYDEZ012345gmP6FGHtQRSIJTUoiphluevKLMNwxysbd
297 # TODO(the-13th-letter): Rewrite using structural pattern
298 # matching.
299 # https://the13thletter.info/derivepassphrase/latest/pycompatibility/#after-eol-py3.9
300 if key in {'key', 'phrase'}: 1acjnzOVABqkfrWCXYDEZ012345gmP6FGHtQRSIJTUoiphluevKLMNwxysbd
301 if not isinstance(value, str): 1czVABqfrWCXYDEZ012345gm6FGHtIJiphluevKLMNwxysbd
302 raise TypeError(err_not_a_string.format(**kwargs)) 1cbd
303 elif key == 'unicode_normalization_form' and path == ('global',): 1acjnzOABkfrCDEgmPFGHtQRSIJTUoihluevKLMNwxysbd
304 if not isinstance(value, str): 1cfrebd
305 raise TypeError(err_not_a_string.format(**kwargs)) 1bd
306 if not allow_unknown_settings: 1cfrebd
307 raise ValueError(err_unknown_setting.format(**kwargs)) 1d
308 elif key == 'notes' and path != ('global',): 1acjnzOABkfrCDEgmPFGHtQRSIJTUoihluevKLMNwxysbd
309 if not isinstance(value, str): 1cfrtJhluevKLMNwxysbd
310 raise TypeError(err_not_a_string.format(**kwargs)) 1bd
311 elif key in { 1acjnzOABkfrCDEgmPFGHtQRSITUoihuevwxysbd
312 'length',
313 'repeat',
314 'lower',
315 'upper',
316 'number',
317 'space',
318 'dash',
319 'symbol',
320 }:
321 if not isinstance(value, int): 1acjnzOABkfrCDEgmPFGHtQRSITUoihuevwxysbd
322 raise TypeError(err_not_an_int.format(**kwargs)) 1cbd
323 if key == 'length' and value < 1: 1acjnzOABkfrCDEgmPFGHtQRSITUoihuevwxysbd
324 raise ValueError(err_bad_number1.format(**kwargs)) 1bd
325 if key != 'length' and value < 0: 1acjnzOABkfrCDEgmPFGHtQRSITUoihuevwxysbd
326 raise ValueError(err_bad_number0.format(**kwargs)) 1cbd
327 elif not allow_unknown_settings: 1cfrebd
328 raise ValueError(err_unknown_setting.format(**kwargs)) 1d
330 def clean_up_falsy_values(self) -> Iterator[CleanupStep]: # noqa: C901
331 obj = self.maybe_config 1acjnqkf9gmoiphlebd
332 if ( 1acjnqkf9gmoiphlebd
333 not isinstance(obj, dict)
334 or 'services' not in obj
335 or not isinstance(obj['services'], dict)
336 ):
337 # Defensive programming, so no coverage.
338 raise ValueError(self.INVALID_CONFIG_ERROR) # pragma: no cover 19bd
339 if 'global' in obj and not isinstance(obj['global'], dict): 1acjnqkfgmoiphlebd
340 # Defensive programming, so no coverage.
341 raise ValueError(self.INVALID_CONFIG_ERROR) # pragma: no cover 1bd
342 if not all( 1acjnqkfgmoiphlebd
343 isinstance(service_obj, dict)
344 for service_obj in obj['services'].values()
345 ):
346 # Defensive programming, so no coverage.
347 raise ValueError(self.INVALID_CONFIG_ERROR) # pragma: no cover 1bd
349 def falsy(value: Any) -> bool: # noqa: ANN401 1acjnqkfgmoiphlebd
350 return not js_truthiness(value) 1acjnkfgmoiphlebd
352 def falsy_but_not_zero(value: Any) -> bool: # noqa: ANN401 1acjnqkfgmoiphlebd
353 return not js_truthiness(value) and not ( 1acjfgmihebd
354 isinstance(value, int) and value == 0
355 )
357 def falsy_but_not_string(value: Any) -> bool: # noqa: ANN401 1acjnqkfgmoiphlebd
358 return not js_truthiness(value) and value != '' # noqa: PLC1901 1cfgmiphlebd
360 for path, key, value in self.walk_subconfigs(): 1acjnqkfgmoiphlebd
361 service_obj = self.traverse_path(path) 1acjnkfgmoiphlebd
362 # TODO(the-13th-letter): Rewrite using structural pattern
363 # matching.
364 # https://the13thletter.info/derivepassphrase/latest/pycompatibility/#after-eol-py3.9
365 if key == 'phrase' and falsy_but_not_string(value): 1acjnkfgmoiphlebd
366 yield CleanupStep( 1ce
367 (*path, key), service_obj[key], 'replace', ''
368 )
369 service_obj[key] = '' 1ce
370 elif key == 'notes' and falsy(value): 1acjnkfgmoiphlebd
371 yield CleanupStep( 1cle
372 (*path, key), service_obj[key], 'remove', None
373 )
374 service_obj.pop(key) 1cle
375 elif key == 'key' and falsy(value): 1acjnkfgmoiphlebd
376 if path == ('global',): 1ciphle
377 yield CleanupStep( 1cip
378 (*path, key), service_obj[key], 'remove', None
379 )
380 service_obj.pop(key) 1cip
381 else:
382 yield CleanupStep( 1chle
383 (*path, key), service_obj[key], 'replace', ''
384 )
385 service_obj[key] = '' 1chle
386 elif key == 'length' and falsy(value): 1acjnkfgmoiphlebd
387 yield CleanupStep( 1ce
388 (*path, key), service_obj[key], 'replace', 20
389 )
390 service_obj[key] = 20 1ce
391 elif key == 'repeat' and falsy_but_not_zero(value): 1acjnkfgmoiphlebd
392 yield CleanupStep((*path, key), service_obj[key], 'replace', 0) 1ce
393 service_obj[key] = 0 1ce
394 elif key in { 1acjnkfgmoiphlebd
395 'lower',
396 'upper',
397 'number',
398 'space',
399 'dash',
400 'symbol',
401 } and falsy_but_not_zero(value):
402 yield CleanupStep( 1ce
403 (*path, key), service_obj[key], 'remove', None
404 )
405 service_obj.pop(key) 1ce
408@overload
409@deprecated(
410 'allow_derivepassphrase_extensions argument is deprecated since v0.4.0, '
411 'to be removed in v1.0: no extensions are defined'
412)
413def validate_vault_config(
414 obj: Any, # noqa: ANN401
415 /,
416 *,
417 allow_derivepassphrase_extensions: bool,
418 allow_unknown_settings: bool = False,
419) -> None: ...
422@overload
423def validate_vault_config(
424 obj: Any, # noqa: ANN401
425 /,
426 *,
427 allow_unknown_settings: bool = False,
428) -> None: ...
431def validate_vault_config(
432 obj: Any,
433 /,
434 *,
435 allow_unknown_settings: bool = False,
436 allow_derivepassphrase_extensions: bool = _Omitted(), # type: ignore[assignment]
437) -> None:
438 """Check that `obj` is a valid vault config.
440 Args:
441 obj:
442 The object to test.
443 allow_unknown_settings:
444 If false, abort on unknown settings.
445 allow_derivepassphrase_extensions:
446 (Deprecated.) Ignored since v0.4.0.
448 Raises:
449 TypeError:
450 An entry in the vault config, or the vault config itself,
451 has the wrong type.
452 ValueError:
453 An entry in the vault config is not allowed, or has a
454 disallowed value.
456 Warning: Deprecated argument
457 **v0.4.0**:
458 The `allow_derivepassphrase_extensions` keyword argument is
459 deprecated, and will be removed in v1.0. There are no
460 specified `derivepassphrase` extensions.
462 """
463 # TODO(the-13th-letter): Remove this block in v1.0.
464 # https://the13thletter.info/derivepassphrase/latest/upgrade-notes/#v1.0-allow-derivepassphrase-extensions
465 # TODO(the-13th-letter): Add tests that trigger the deprecation warning,
466 # then include this in coverage.
467 if not isinstance( 1acjnzOVABqkf9r8WCXYDEZ012345gmP6FGH7tQRSI!JTUoiphluevKLMNwxys#bd
468 allow_derivepassphrase_extensions, _Omitted
469 ): # pragma: no cover
470 warnings.warn(
471 get_overloads(validate_vault_config)[0].__deprecated__, # type: ignore[attr-defined]
472 DeprecationWarning,
473 stacklevel=2,
474 )
476 return _VaultConfigValidator(obj).validate( 1acjnzOVABqkf9r8WCXYDEZ012345gmP6FGH7tQRSI!JTUoiphluevKLMNwxys#bd
477 allow_unknown_settings=allow_unknown_settings
478 )
481def is_vault_config(obj: Any) -> TypeIs[VaultConfig]: # noqa: ANN401
482 """Check if `obj` is a valid vault config, according to typing.
484 Args:
485 obj: The object to test.
487 Returns:
488 True if this is a vault config, false otherwise.
490 """ # noqa: DOC501
491 try: 1acjnzOVABqkf9r8WCXYDEZ012345gmP6FGH7tQRSI!JTUoiphluevKLMNwxys#b
492 validate_vault_config( 1acjnzOVABqkf9r8WCXYDEZ012345gmP6FGH7tQRSI!JTUoiphluevKLMNwxys#b
493 obj,
494 allow_unknown_settings=True,
495 )
496 except (TypeError, ValueError) as exc: 1987!#b
497 if 'vault config ' not in str(exc): # pragma: no cover 1987!#b
498 # Defensive programming, so no coverage.
499 raise
500 return False 1987!#b
501 return True 1acjnzOVABqkfrWCXYDEZ012345gmP6FGHtQRSIJTUoiphluevKLMNwxysb
504def js_truthiness(value: Any, /) -> bool: # noqa: ANN401
505 """Return the truthiness of the value, according to JavaScript/ECMAScript.
507 Like Python, ECMAScript considers certain values to be false in
508 a boolean context, and every other value to be true. These
509 considerations do not agree: ECMAScript considers [`math.nan`][] to
510 be false too, and empty arrays and objects/dicts to be true,
511 contrary to Python. Because of these discrepancies, we cannot defer
512 to [`bool`][] for ECMAScript truthiness checking, and need
513 a separate, explicit predicate.
515 (Some falsy values in ECMAScript aren't defined in Python:
516 `undefined`, and `document.all`. We do not implement support for
517 those.)
519 !!! note
521 We cannot use a simple `value not in falsy_values` check,
522 because [`math.nan`][] behaves in annoying and obstructive ways.
523 In general, `float('NaN') == float('NaN')` is false, and
524 `float('NaN') != math.nan` and `math.nan != math.nan` are true.
525 CPython says `float('NaN') in [math.nan]` is false, PyPy3 says
526 it is true. Seemingly the only reliable and portable way to
527 check for [`math.nan`][] is to use [`math.isnan`][] directly.
529 Args:
530 value: The value to test.
532 """ # noqa: RUF002
533 try: 1acjnqkfgmoiphle$bd
534 if value in {None, False, 0, 0.0, ''}: # noqa: B033 1acjnqkfgmoiphle$bd
535 return False 1acjnqkfgoiphle
536 except TypeError: 1cbd
537 # All falsy values are hashable, so this can't be falsy.
538 return True 1cbd
539 return not (isinstance(value, float) and math.isnan(value)) 1acjnkfgmoiphle$bd
542class CleanupStep(NamedTuple):
543 """A single executed step during vault config cleanup.
545 Attributes:
546 path:
547 A sequence of object keys or array indices to navigate to
548 the JSON value that was cleaned up.
549 old_value:
550 The old value.
551 action:
552 Either `'replace'` if `old_value` was replaced with
553 `new_value`, or `'remove'` if `old_value` was removed.
554 new_value:
555 The new value.
557 """
559 path: Sequence[str | int]
560 """"""
561 old_value: Any
562 """"""
563 action: Literal['replace', 'remove']
564 """"""
565 new_value: Any
566 """"""
569def clean_up_falsy_vault_config_values(
570 obj: Any, # noqa: ANN401
571) -> Sequence[CleanupStep] | None:
572 """Convert falsy values in a vault config to correct types, in-place.
574 Needed for compatibility with vault(1), which sometimes uses only
575 truthiness checks.
577 If vault(1) considered `obj` to be valid, then after clean up,
578 `obj` will be valid as per [`validate_vault_config`][].
580 Args:
581 obj:
582 A presumed valid vault configuration save for using falsy
583 values of the wrong type.
585 Returns:
586 A list of 4-tuples `(key_tup, old_value, action, new_value)`,
587 indicating the cleanup actions performed. `key_tup` is
588 a sequence of object keys and/or array indices indicating the
589 JSON path to the leaf value that was cleaned up, `old_value` is
590 the old value, `new_value` is the new value, and `action` is
591 either `replace` (`old_value` was replaced with `new_value`) or
592 `remove` (`old_value` was removed, and `new_value` is
593 meaningless).
595 If cleanup was never attempted because of an obviously invalid
596 vault configuration, then `None` is returned, directly.
598 """
599 try: 1acjnqkf9gmoiphlebd
600 return list(_VaultConfigValidator(obj).clean_up_falsy_values()) 1acjnqkf9gmoiphlebd
601 except ValueError: 19bd
602 return None 19bd
605# TODO(the-13th-letter): Use type variables local to each class.
606# https://the13thletter.info/derivepassphrase/latest/pycompatibility/#after-eol-py3.11
607T_Buffer = TypeVar('T_Buffer', bound=Buffer)
608"""
609A [`TypeVar`][] for classes implementing the [`Buffer`][] interface.
611Warning:
612 Non-public attribute, provided for didactical and educational
613 purposes only. Subject to change without notice, including
614 removal.
616"""
619class SSHKeyCommentPair(NamedTuple, Generic[T_Buffer]):
620 """SSH key plus comment pair. For typing purposes.
622 Attributes:
623 key: SSH key.
624 comment: SSH key comment.
626 """
628 key: T_Buffer
629 """"""
630 comment: T_Buffer
631 """"""
633 def toreadonly(self) -> SSHKeyCommentPair[bytes]:
634 """Return a copy with read-only entries."""
635 return SSHKeyCommentPair( 1*
636 key=bytes(self.key),
637 comment=bytes(self.comment),
638 )
641class SSH_AGENTC(enum.Enum): # noqa: N801
642 """SSH agent protocol numbers: client requests.
644 Attributes:
645 REQUEST_IDENTITIES (int):
646 List identities. Expecting
647 [`SSH_AGENT.IDENTITIES_ANSWER`][].
648 SIGN_REQUEST (int):
649 Sign data. Expecting [`SSH_AGENT.SIGN_RESPONSE`][].
650 ADD_IDENTITY (int):
651 Add an (SSH2) identity.
652 REMOVE_IDENTITY (int):
653 Remove an (SSH2) identity.
654 ADD_ID_CONSTRAINED (int):
655 Add an (SSH2) identity, including key constraints.
656 EXTENSION (int):
657 Issue a named request that isn't part of the core agent
658 protocol. Expecting [`SSH_AGENT.EXTENSION_RESPONSE`][] or
659 [`SSH_AGENT.EXTENSION_FAILURE`][] if the named request is
660 supported, [`SSH_AGENT.FAILURE`][] otherwise.
662 """
664 REQUEST_IDENTITIES = 11
665 """"""
666 SIGN_REQUEST = 13
667 """"""
668 ADD_IDENTITY = 17
669 """"""
670 REMOVE_IDENTITY = 18
671 """"""
672 ADD_ID_CONSTRAINED = 25
673 """"""
674 EXTENSION = 27
675 """"""
678class SSH_AGENT(enum.Enum): # noqa: N801
679 """SSH agent protocol numbers: server replies.
681 Attributes:
682 FAILURE (int):
683 Generic failure code.
684 SUCCESS (int):
685 Generic success code.
686 IDENTITIES_ANSWER (int):
687 Successful answer to [`SSH_AGENTC.REQUEST_IDENTITIES`][].
688 SIGN_RESPONSE (int):
689 Successful answer to [`SSH_AGENTC.SIGN_REQUEST`][].
690 EXTENSION_FAILURE (int):
691 Unsuccessful answer to [`SSH_AGENTC.EXTENSION`][].
692 EXTENSION_RESPONSE (int):
693 Successful answer to [`SSH_AGENTC.EXTENSION`][].
695 """
697 FAILURE = 5
698 """"""
699 SUCCESS = 6
700 """"""
701 IDENTITIES_ANSWER = 12
702 """"""
703 SIGN_RESPONSE = 14
704 """"""
705 EXTENSION_FAILURE = 28
706 """"""
707 EXTENSION_RESPONSE = 29
708 """"""
711class StoreroomKeyPair(NamedTuple, Generic[T_Buffer]):
712 """A pair of AES256 keys, one for encryption and one for signing.
714 Attributes:
715 encryption_key:
716 AES256 key, used for encryption with AES256-CBC (with PKCS#7
717 padding).
718 signing_key:
719 AES256 key, used for signing with HMAC-SHA256.
721 """
723 encryption_key: T_Buffer
724 """"""
725 signing_key: T_Buffer
726 """"""
728 def toreadonly(self) -> StoreroomKeyPair[bytes]:
729 """Return a copy with read-only entries."""
730 return StoreroomKeyPair( 1s%'()
731 encryption_key=bytes(self.encryption_key),
732 signing_key=bytes(self.signing_key),
733 )
736class StoreroomMasterKeys(NamedTuple, Generic[T_Buffer]):
737 """A triple of AES256 keys, for encryption, signing and hashing.
739 Attributes:
740 hashing_key:
741 AES256 key, used for hashing with HMAC-SHA256 to derive
742 a hash table slot for an item.
743 encryption_key:
744 AES256 key, used for encryption with AES256-CBC (with PKCS#7
745 padding).
746 signing_key:
747 AES256 key, used for signing with HMAC-SHA256.
749 """
751 hashing_key: T_Buffer
752 """"""
753 encryption_key: T_Buffer
754 """"""
755 signing_key: T_Buffer
756 """"""
758 def toreadonly(self) -> StoreroomMasterKeys[bytes]:
759 """Return a copy with read-only entries."""
760 return StoreroomMasterKeys( 1s%+,'()
761 hashing_key=bytes(self.hashing_key),
762 encryption_key=bytes(self.encryption_key),
763 signing_key=bytes(self.signing_key),
764 )
767class PEP508Extra(str, enum.Enum):
768 """PEP 508 extras supported by `derivepassphrase`.
770 Attributes:
771 EXPORT:
772 The necessary dependencies to allow the `export` subcommand
773 to handle as many foreign configuration formats as possible.
775 """
777 EXPORT = 'export'
778 """"""
780 __str__ = str.__str__
781 __format__ = str.__format__ # type: ignore[assignment]
784class Feature(str, enum.Enum):
785 """Optional features supported by `derivepassphrase`.
787 Attributes:
788 SSH_KEY:
789 The `vault` subcommand supports using a master SSH key,
790 instead of a master passphrase, if an SSH agent is running
791 and the master SSH key is loaded into it.
793 This feature requires Python support for the SSH agent's
794 chosen communication channel technology.
796 """
798 SSH_KEY = 'master SSH key'
799 """"""
801 __str__ = str.__str__
802 __format__ = str.__format__ # type: ignore[assignment]
805class DerivationScheme(str, enum.Enum):
806 """Derivation schemes provided by `derivepassphrase`.
808 Attributes:
809 VAULT:
810 The derivation scheme used by James Coglan's `vault`.
812 """
814 VAULT = 'vault'
815 """"""
817 __str__ = str.__str__
818 __format__ = str.__format__ # type: ignore[assignment]
821class ForeignConfigurationFormat(str, enum.Enum):
822 """Configuration formats supported by `derivepassphrase export`.
824 Attributes:
825 VAULT_STOREROOM:
826 The vault "storeroom" format for the `export vault`
827 subcommand.
828 VAULT_V02:
829 The vault-native "v0.2" format for the `export vault`
830 subcommand.
831 VAULT_V03:
832 The vault-native "v0.3" format for the `export vault`
833 subcommand.
835 """
837 VAULT_STOREROOM = 'vault storeroom'
838 """"""
839 VAULT_V02 = 'vault v0.2'
840 """"""
841 VAULT_V03 = 'vault v0.3'
842 """"""
844 __str__ = str.__str__
845 __format__ = str.__format__ # type: ignore[assignment]
848class ExportSubcommand(str, enum.Enum):
849 """Subcommands provided by `derivepassphrase export`.
851 Attributes:
852 VAULT:
853 The `export vault` subcommand.
855 """
857 VAULT = 'vault'
858 """"""
860 __str__ = str.__str__
861 __format__ = str.__format__ # type: ignore[assignment]
864class Subcommand(str, enum.Enum):
865 """Subcommands provided by `derivepassphrase`.
867 Attributes:
868 EXPORT:
869 The `export` subcommand.
870 VAULT:
871 The `vault` subcommand.
873 """
875 EXPORT = 'export'
876 """"""
877 VAULT = 'vault'
878 """"""
880 __str__ = str.__str__
881 __format__ = str.__format__ # type: ignore[assignment]
884@runtime_checkable
885class SSHAgentSocket(Protocol):
886 """An abstract networking socket connected to an SSH agent.
888 The abstract socket supports the [`sendall`][socket.sendall] and
889 a [`recv`][socket.recv] operation, with the same signatures and
890 semantics as for "real" sockets. The abstract socket also supports
891 use as a context manager, for automatically closing the socket upon
892 exiting the context.
894 """
896 def __enter__(self) -> Any: ... # noqa: ANN401
898 # mypy/typeshed has a *very* lax annotation of
899 # socket.socket.__exit__, which we need to be compatible with.
900 # *sigh*
901 def __exit__(
902 self,
903 *args: object,
904 ) -> bool | None: ...
906 def sendall(self, data: Buffer, flags: int = 0, /) -> None: ...
908 def recv(self, data: int, flags: int = 0, /) -> bytes: ...
911SSHAgentSocketProvider: TypeAlias = 'Callable[[], SSHAgentSocket]'
912"""A callable that provides an SSH agent socket."""
915class SSHAgentSocketProviderEntry(NamedTuple):
916 """Registry information for the table of SSH agent socket providers.
918 Attributes:
919 provider: The callable that provides the socket.
920 name: The table key which this entry is registered as.
921 aliases: Other keys that shall point to this entry's key.
923 Note:
924 The table uses the key as the key, and the provider as the
925 value. It does not store this info object directly.
927 """
929 provider: SSHAgentSocketProvider
930 """"""
931 key: str
932 """"""
933 aliases: tuple[str, ...]