Coverage for src\derivepassphrase\vault.py: 98.454%
154 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 12:17 +0200
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 12:17 +0200
1# SPDX-FileCopyrightText: 2025 Marco Ricci <software@the13thletter.info>
2#
3# SPDX-License-Identifier: Zlib
5"""Python port of the vault(1) password generation scheme."""
7from __future__ import annotations
9import base64
10import collections
11import hashlib
12import hmac
13import math
14import types
15from typing import TYPE_CHECKING, Final
17from typing_extensions import TypeAlias, assert_type
19from derivepassphrase import _types, sequin, ssh_agent
21if TYPE_CHECKING:
22 from collections.abc import Callable, Sequence
24 from typing_extensions import Buffer
27class Vault:
28 """A work-alike of James Coglan's vault.
30 Store settings for generating (actually: deriving) passphrases for
31 named services, with various constraints, given only a master
32 passphrase. Also, actually generate the passphrase. The derivation
33 is deterministic and non-secret; only the master passphrase need be
34 kept secret. The implementation is compatible with [vault][].
36 [James Coglan explains the passphrase derivation algorithm in great
37 detail][ALGORITHM] in his blog post on said topic: A principally
38 infinite bit stream is obtained by running a key-derivation function
39 on the master passphrase and the service name, then this bit stream
40 is fed into a [sequin.Sequin][] to generate random numbers in the
41 correct range, and finally these random numbers select passphrase
42 characters until the desired length is reached.
44 [vault]: https://www.npmjs.com/package/vault
45 [ALGORITHM]: https://blog.jcoglan.com/2012/07/16/designing-vaults-generator-algorithm/
47 """
49 UUID: Final = b'e87eb0f4-34cb-46b9-93ad-766c5ab063e7'
50 """A tag used by vault in the bit stream generation."""
51 CHARSETS: Final = types.MappingProxyType(
52 collections.OrderedDict([
53 ('lower', b'abcdefghijklmnopqrstuvwxyz'),
54 ('upper', b'ABCDEFGHIJKLMNOPQRSTUVWXYZ'),
55 (
56 'alpha',
57 (
58 # CHARSETS['lower']
59 b'abcdefghijklmnopqrstuvwxyz'
60 # CHARSETS['upper']
61 b'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
62 ),
63 ),
64 ('number', b'0123456789'),
65 (
66 'alphanum',
67 (
68 # CHARSETS['lower']
69 b'abcdefghijklmnopqrstuvwxyz'
70 # CHARSETS['upper']
71 b'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
72 # CHARSETS['number']
73 b'0123456789'
74 ),
75 ),
76 ('space', b' '),
77 ('dash', b'-_'),
78 ('symbol', b'!"#$%&\'()*+,./:;<=>?@[\\]^{|}~-_'),
79 (
80 'all',
81 (
82 # CHARSETS['lower']
83 b'abcdefghijklmnopqrstuvwxyz'
84 # CHARSETS['upper']
85 b'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
86 # CHARSETS['number']
87 b'0123456789'
88 # CHARSETS['space']
89 b' '
90 # CHARSETS['symbol']
91 b'!"#$%&\'()*+,./:;<=>?@[\\]^{|}~-_'
92 ),
93 ),
94 ])
95 )
96 """
97 Known character sets from which to draw passphrase characters.
98 Relies on a certain, fixed order for their definition and their
99 contents.
101 """
103 def __init__( # noqa: PLR0913
104 self,
105 *,
106 phrase: Buffer | str = b'',
107 length: int = 20,
108 repeat: int = 0,
109 lower: int | None = None,
110 upper: int | None = None,
111 number: int | None = None,
112 space: int | None = None,
113 dash: int | None = None,
114 symbol: int | None = None,
115 ) -> None:
116 """Initialize the Vault object.
118 Args:
119 phrase:
120 The master passphrase from which to derive the service
121 passphrases. If a string, then the UTF-8 encoding of
122 the string is used.
123 length:
124 Desired passphrase length.
125 repeat:
126 The maximum number of immediate character repetitions
127 allowed in the passphrase. Disabled if set to 0.
128 lower:
129 Optional constraint on ASCII lowercase characters. If
130 positive, include this many lowercase characters
131 somewhere in the passphrase. If 0, avoid lowercase
132 characters altogether.
133 upper:
134 Same as `lower`, but for ASCII uppercase characters.
135 number:
136 Same as `lower`, but for ASCII digits.
137 space:
138 Same as `lower`, but for the space character.
139 dash:
140 Same as `lower`, but for the hyphen-minus and underscore
141 characters.
142 symbol:
143 Same as `lower`, but for all other ASCII printable
144 characters except lowercase characters, uppercase
145 characters, digits, space and backquote.
147 Raises:
148 ValueError:
149 Conflicting passphrase constraints. Permit more
150 characters, or increase the desired passphrase length.
152 Warning:
153 Because of repetition constraints, it is not always possible
154 to detect conflicting passphrase constraints at construction
155 time.
157 """
158 self._phrase = self._get_binary_string(phrase) 1ahzpqieorjstuvABCJKDfklmwxydnMNLQOPEFGHcIbg
159 self._length = length 1ahzpqieorjstuvABCJKDfklmwxydnMNLQOPEFGHcIbg
160 self._repeat = repeat 1ahzpqieorjstuvABCJKDfklmwxydnMNLQOPEFGHcIbg
161 self._allowed = bytearray(self.CHARSETS['all']) 1ahzpqieorjstuvABCJKDfklmwxydnMNLQOPEFGHcIbg
162 self._required: list[bytes] = [] 1ahzpqieorjstuvABCJKDfklmwxydnMNLQOPEFGHcIbg
164 def subtract_or_require( 1ahzpqieorjstuvABCJKDfklmwxydnMNLQOPEFGHcIbg
165 count: int | None, characters: bytes | bytearray
166 ) -> None:
167 if not isinstance(count, int): 1ahzpqieorjstuvABCJKDfklmwxydnMNLQOPEFGHcIbg
168 return 1ahziojABCJKDfklmwxdnMNLQPEFGHIg
169 if count <= 0: 1ahpqeorjstuvfklmwxydnMNQOcb
170 self._allowed = self._subtract(characters, self._allowed) 1ahjfklmdnMNOcb
171 else:
172 for _ in range(count): 1apqeorstuvwxyNQcb
173 self._required.append(characters) 1apqeorstuvwxyNQcb
175 subtract_or_require(lower, self.CHARSETS['lower']) 1ahzpqieorjstuvABCJKDfklmwxydnMNLQOPEFGHcIbg
176 subtract_or_require(upper, self.CHARSETS['upper']) 1ahzpqieorjstuvABCJKDfklmwxydnMNLQOPEFGHcIbg
177 subtract_or_require(number, self.CHARSETS['number']) 1ahzpqieorjstuvABCJKDfklmwxydnMNLQOPEFGHcIbg
178 subtract_or_require(space, self.CHARSETS['space']) 1ahzpqieorjstuvABCJKDfklmwxydnMNLQOPEFGHcIbg
179 subtract_or_require(dash, self.CHARSETS['dash']) 1ahzpqieorjstuvABCJKDfklmwxydnMNLQOPEFGHcIbg
180 subtract_or_require(symbol, self.CHARSETS['symbol']) 1ahzpqieorjstuvABCJKDfklmwxydnMNLQOPEFGHcIbg
181 if len(self._required) > self._length: 1ahzpqieorjstuvABCJKDfklmwxydnMNLQOPEFGHcIbg
182 msg = 'requested passphrase length too short' 1Q
183 raise ValueError(msg) 1Q
184 if not self._allowed: 1ahzpqieorjstuvABCJKDfklmwxydnMNLOPEFGHcIbg
185 msg = 'no allowed characters left' 1O
186 raise ValueError(msg) 1O
187 for _ in range(len(self._required), self._length): 1ahzpqieorjstuvABCJKDfklmwxydnMNLPEFGHcIbg
188 self._required.append(bytes(self._allowed)) 1ahzpqieorjstuvABCJKDfklmwxydnMLPEFGHcIbg
190 def _entropy(self) -> float:
191 """Estimate the passphrase entropy, given the current settings.
193 The entropy is the base 2 logarithm of the amount of
194 possibilities. We operate directly on the logarithms, and use
195 sorting and [`math.fsum`][] to keep high accuracy.
197 Note:
198 We actually overestimate the entropy here because of poor
199 handling of character repetitions. In the extreme, assuming
200 that only one character were allowed, then because there is
201 only one possible string of each given length, the entropy
202 of that string `s` is always be zero. However, we calculate
203 the entropy as `math.log2(math.factorial(len(s)))`, i.e. we
204 assume the characters at the respective string position are
205 distinguishable from each other.
207 Returns:
208 A valid (and somewhat close) upper bound to the entropy.
210 """
211 factors: list[int] = [] 1ahzpqieorjstuvABCJKDfklmwxydnMNEFGHcIbg
212 if not self._required or any(not x for x in self._required): 1ahzpqieorjstuvABCJKDfklmwxydnMNEFGHcIbg
213 return float('-inf') 1M
214 for i, charset in enumerate(self._required): 1ahzpqieorjstuvABCJKDfklmwxydnMNEFGHcIbg
215 factors.extend([i + 1, len(charset)]) 1ahzpqieorjstuvABCJKDfklmwxydnMNEFGHcIbg
216 factors.sort() 1ahzpqieorjstuvABCJKDfklmwxydnMNEFGHcIbg
217 return math.fsum(math.log2(f) for f in factors) 1ahzpqieorjstuvABCJKDfklmwxydnMNEFGHcIbg
219 def _estimate_sufficient_hash_length(
220 self,
221 safety_factor: float = 2.0,
222 ) -> int:
223 """Estimate the sufficient hash length, given the current settings.
225 Using the entropy (via [`_entropy`][]) and a safety factor, give
226 an initial estimate of the length to use for [`create_hash`][]
227 such that using a [`sequin.Sequin`][] with this hash will not
228 exhaust it during passphrase generation.
230 Args:
231 safety_factor: The safety factor. Must be at least 1.
233 Returns:
234 The estimated sufficient hash length.
236 Raises:
237 ValueError: The safety factor is less than 1, or not finite.
239 Warning:
240 This is a heuristic, not an exact computation; it may
241 underestimate the true necessary hash length. It is
242 intended as a starting point for searching for a sufficient
243 hash length, usually by doubling the hash length each time
244 it does not yet prove so.
246 """ # noqa: DOC501
247 try: 1ahzpqieorjstuvABCJKDfklmwxydnMNPEFGHcIbg
248 safety_factor = float(safety_factor) 1ahzpqieorjstuvABCJKDfklmwxydnMNPEFGHcIbg
249 except TypeError as e: 1P
250 msg = f'invalid safety factor: not a float: {safety_factor!r}' 1P
251 raise TypeError(msg) from e 1P
252 if not math.isfinite(safety_factor) or safety_factor < 1.0: 1ahzpqieorjstuvABCJKDfklmwxydnMNPEFGHcIbg
253 msg = f'invalid safety factor {safety_factor!r}' 1P
254 raise ValueError(msg) 1P
255 # Ensure the bound is strictly positive.
256 entropy_bound = max(1, self._entropy()) 1ahzpqieorjstuvABCJKDfklmwxydnMNEFGHcIbg
257 return math.ceil(safety_factor * entropy_bound / 8) 1ahzpqieorjstuvABCJKDfklmwxydnMNEFGHcIbg
259 @staticmethod
260 def _get_binary_string(s: Buffer | str, /) -> bytes:
261 """Convert the input string to a read-only, binary string.
263 If it is a text string, return the string's UTF-8
264 representation.
266 Args:
267 s: The string to (check and) convert.
269 Returns:
270 A read-only, binary copy of the string.
272 """
273 if isinstance(s, str): 1ahzpqieorjstuvABCJKDfklmwxydnMNL4QOPZ0123SREFGHcIbg
274 return s.encode('UTF-8') 1ahzpqieorjstuvABCDfklmwxydnL4Z0123SREFGHcIbg
275 return bytes(s) 1ahzpqieorjstuvABCJKDfklmwxydnMNL4QOPZ0123SREFGHcIbg
277 @classmethod
278 def create_hash(
279 cls,
280 phrase: Buffer | str,
281 service: Buffer | str,
282 *,
283 length: int = 32,
284 ) -> bytes:
285 r"""Create a pseudorandom byte stream from phrase and service.
287 Create a pseudorandom byte stream from `phrase` and `service` by
288 feeding them into the key-derivation function PBKDF2
289 (8 iterations, using SHA-1).
291 Args:
292 phrase:
293 A master passphrase, or sometimes an SSH signature.
294 Used as the key for PBKDF2, the underlying cryptographic
295 primitive. If a string, then the UTF-8 encoding of the
296 string is used.
297 service:
298 A vault service name. Will be suffixed with the
299 [`UUID`][], and then used as the salt value for
300 PBKDF2. If a string, then the UTF-8 encoding of the
301 string is used.
302 length:
303 The length of the byte stream to generate.
305 Returns:
306 A pseudorandom byte string of length `length`.
308 Note:
309 Shorter values returned from this method (with the same key
310 and message) are prefixes of longer values returned from
311 this method. (This property is inherited from the
312 underlying PBKDF2 function.) It is thus safe (if slow) to
313 call this method with the same input with ever-increasing
314 target lengths.
316 Examples:
317 >>> # See also Vault.phrase_from_key examples.
318 >>> phrase = bytes.fromhex('''
319 ... 00 00 00 0b 73 73 68 2d 65 64 32 35 35 31 39
320 ... 00 00 00 40
321 ... f0 98 19 80 6c 1a 97 d5 26 03 6e cc e3 65 8f 86
322 ... 66 07 13 19 13 09 21 33 33 f9 e4 36 53 1d af fd
323 ... 0d 08 1f ec f8 73 9b 8c 5f 55 39 16 7c 53 54 2c
324 ... 1e 52 bb 30 ed 7f 89 e2 2f 69 51 55 d8 9e a6 02
325 ... ''')
326 >>> Vault.create_hash(phrase, 'some_service', length=4)
327 b'M\xb1<S'
328 >>> Vault.create_hash(phrase, b'some_service', length=16)
329 b'M\xb1<S\x827E\xd1M\xaf\xf8~\xc8n\x10\xcc'
330 >>> Vault.create_hash(phrase, b'NOSUCHSERVICE', length=16)
331 b'\x1c\xc3\x9c\xd9\xb6\x1a\x99CS\x07\xc41\xf4\x85#s'
333 """
334 phrase = cls._get_binary_string(phrase) 1ahzpqieorjstuvABCJKDfklmwxydnLZ0123SREFGHcIbg
335 assert isinstance(phrase, bytes) 1ahzpqieorjstuvABCJKDfklmwxydnLZ0123SREFGHcIbg
336 salt = cls._get_binary_string(service) + cls.UUID 1ahzpqieorjstuvABCJKDfklmwxydnLZ0123SREFGHcIbg
337 return hashlib.pbkdf2_hmac( 1ahzpqieorjstuvABCJKDfklmwxydnLZ0123SREFGHcIbg
338 hash_name='sha1',
339 password=phrase,
340 salt=salt,
341 iterations=8,
342 dklen=length,
343 )
345 def generate(
346 self,
347 service_name: Buffer | str,
348 /,
349 *,
350 phrase: Buffer | str = b'',
351 ) -> bytes:
352 r"""Generate a service passphrase.
354 Args:
355 service_name:
356 The service name. If a string, then the UTF-8 encoding
357 of the string is used.
358 phrase:
359 If given, override the passphrase given during
360 construction. If a string, then the UTF-8 encoding of
361 the string is used.
363 Returns:
364 The service passphrase.
366 Raises:
367 ValueError:
368 Conflicting passphrase constraints. Permit more
369 characters, or increase the desired passphrase length.
371 Examples:
372 >>> phrase = b'She cells C shells bye the sea shoars'
373 >>> # Using default options in constructor.
374 >>> Vault(phrase=phrase).generate(b'google')
375 b': 4TVH#5:aZl8LueOT\\{'
376 >>> # Also possible:
377 >>> Vault().generate(b'google', phrase=phrase)
378 b': 4TVH#5:aZl8LueOT\\{'
380 Conflicting constraints are sometimes only found during
381 generation.
383 >>> # Note: no error here...
384 >>> v = Vault(
385 ... lower=0,
386 ... upper=0,
387 ... number=0,
388 ... space=2,
389 ... dash=0,
390 ... symbol=1,
391 ... repeat=2,
392 ... length=3,
393 ... )
394 >>> # ... but here.
395 >>> v.generate(
396 ... '0', phrase=b'\x00'
397 ... ) # doctest: +IGNORE_EXCEPTION_DETAIL
398 Traceback (most recent call last):
399 ...
400 ValueError: no allowed characters left
403 """
404 hash_length = self._estimate_sufficient_hash_length() 1ahzpqieorjstuvABCJKDfklmwxydnLEFGHcIbg
405 assert hash_length >= 1 1ahzpqieorjstuvABCJKDfklmwxydnLEFGHcIbg
406 # Ensure the phrase and the service name are bytes objects.
407 # This is needed later for safe concatenation.
408 service_name = self._get_binary_string(service_name) 1ahzpqieorjstuvABCJKDfklmwxydnLEFGHcIbg
409 assert_type(service_name, bytes) 1ahzpqieorjstuvABCJKDfklmwxydnLEFGHcIbg
410 if not phrase: 1ahzpqieorjstuvABCJKDfklmwxydnLEFGHcIbg
411 phrase = self._phrase 1ahzpqieorjstuvABCJKDfklmwxydnLEFGHcIbg
412 phrase = self._get_binary_string(phrase) 1ahzpqieorjstuvABCJKDfklmwxydnLEFGHcIbg
413 assert_type(phrase, bytes) 1ahzpqieorjstuvABCJKDfklmwxydnLEFGHcIbg
414 # Repeat the passphrase generation with ever-increasing hash
415 # lengths, until the passphrase can be formed without exhausting
416 # the sequin. See the guarantee in the create_hash method for
417 # why this works.
418 while True: 1ahzpqieorjstuvABCJKDfklmwxydnLEFGHcIbg
419 try: 1ahzpqieorjstuvABCJKDfklmwxydnLEFGHcIbg
420 required = self._required[:] 1ahzpqieorjstuvABCJKDfklmwxydnLEFGHcIbg
421 seq = sequin.Sequin( 1ahzpqieorjstuvABCJKDfklmwxydnLEFGHcIbg
422 self.create_hash(
423 phrase=phrase, service=service_name, length=hash_length
424 )
425 )
426 result = bytearray() 1ahzpqieorjstuvABCJKDfklmwxydnLEFGHcIbg
427 while len(result) < self._length: 1ahzpqieorjstuvABCJKDfklmwxydnLEFGHcIbg
428 pos = seq.generate(len(required)) 1ahzpqieorjstuvABCJKDfklmwxydnLEFGHcIbg
429 charset = required.pop(pos) 1ahzpqieorjstuvABCJKDfklmwxydnLEFGHcIbg
430 # Determine if an unlucky choice right now might
431 # violate the restriction on repeated characters.
432 # That is, check if the current partial passphrase
433 # ends with r - 1 copies of the same character
434 # (where r is the repeat limit that must not be
435 # reached), and if so, remove this same character
436 # from the current character's allowed set.
437 if self._repeat and result: 1ahzpqieorjstuvABCJKDfklmwxydnLEFGHcIbg
438 bad_suffix = bytes(result[-1:]) * (self._repeat - 1) 1apqeorstuvfdcbg
439 if result.endswith(bad_suffix): 1apqeorstuvfdcbg
440 charset = self._subtract( 1afdcbg
441 bytes(result[-1:]), charset
442 )
443 pos = seq.generate(len(charset)) 1ahzpqieorjstuvABCJKDfklmwxydnLEFGHcIbg
444 result.extend(charset[pos : pos + 1]) 1ahzpqieorjstuvABCJKDfklmwxydnLEFGHcIbg
445 except ValueError as exc: 1aLb
446 msg = 'no allowed characters left' 1ab
447 raise ValueError(msg) from exc 1ab
448 except sequin.SequinExhaustedError: 1L
449 hash_length *= 2 1L
450 else:
451 return bytes(result) 1ahzpqieorjstuvABCJKDfklmwxydnLEFGHcIbg
453 @staticmethod
454 def is_suitable_ssh_key(
455 key: Buffer,
456 /,
457 *,
458 client: ssh_agent.SSHAgentClient | None = None,
459 ) -> bool:
460 """Check whether the key is suitable for passphrase derivation.
462 Some key types are guaranteed to be deterministic. Other keys
463 types are only deterministic if the SSH agent supports this
464 feature.
466 Args:
467 key:
468 SSH public key to check.
469 client:
470 An optional SSH agent client to check for additional
471 deterministic key types. If not given, assume no such
472 types.
474 Returns:
475 True if and only if the key is guaranteed suitable for use
476 in deriving a passphrase deterministically (perhaps
477 restricted to the indicated SSH agent).
479 """
480 key = bytes(key) 1aieXVTY
481 TestFunc: TypeAlias = 'Callable[[bytes | bytearray], bool]' 1aieXVTY
482 deterministic_signature_types: dict[str, TestFunc]
483 deterministic_signature_types = { 1aieXVTY
484 'ssh-ed25519': lambda k: k.startswith(
485 b'\x00\x00\x00\x0bssh-ed25519'
486 ),
487 'ssh-ed448': lambda k: k.startswith(b'\x00\x00\x00\x09ssh-ed448'),
488 'ssh-rsa': lambda k: k.startswith(b'\x00\x00\x00\x07ssh-rsa'),
489 }
490 dsa_signature_types = { 1aieXVTY
491 'ssh-dss': lambda k: k.startswith(b'\x00\x00\x00\x07ssh-dss'),
492 'ecdsa-sha2-nistp256': lambda k: k.startswith(
493 b'\x00\x00\x00\x13ecdsa-sha2-nistp256'
494 ),
495 'ecdsa-sha2-nistp384': lambda k: k.startswith(
496 b'\x00\x00\x00\x13ecdsa-sha2-nistp384'
497 ),
498 'ecdsa-sha2-nistp521': lambda k: k.startswith(
499 b'\x00\x00\x00\x13ecdsa-sha2-nistp521'
500 ),
501 }
502 criteria = [ 1aieXVTY
503 lambda: any(
504 v(key) for v in deterministic_signature_types.values()
505 ),
506 ]
507 if client is not None: 1aieXVTY
508 criteria.append( 1ieXVTY
509 lambda: (
510 client.has_deterministic_dsa_signatures()
511 and any(v(key) for v in dsa_signature_types.values())
512 )
513 )
514 return any(crit() for crit in criteria) 1aieXVTY
516 @classmethod
517 def phrase_from_key(
518 cls,
519 key: Buffer,
520 /,
521 *,
522 conn: ssh_agent.SSHAgentClient
523 | _types.SSHAgentSocket
524 | Sequence[str]
525 | None = None,
526 ) -> bytes:
527 """Obtain the master passphrase from a configured SSH key.
529 vault allows the usage of certain SSH keys to derive a master
530 passphrase, by signing the vault [`UUID`][] with the SSH key.
531 The key type must ensure that signatures are deterministic
532 (perhaps only in conjunction with the given SSH agent).
534 Args:
535 key:
536 The (public) SSH key to use for signing.
537 conn:
538 An optional connection hint to the SSH agent. See
539 [`ssh_agent.SSHAgentClient.ensure_agent_subcontext`][].
541 Returns:
542 The signature of the vault [`UUID`][] under this key,
543 unframed but encoded in base64.
545 Raises:
546 derivepassphrase.ssh_agent.socketprovider.NoSuchProviderError:
547 As per [`ssh_agent.SSHAgentClient.__init__`][]. Only
548 applicable if agent auto-discovery is used.
549 KeyError:
550 As per [`ssh_agent.SSHAgentClient.__init__`][]. Only
551 applicable if agent auto-discovery is used.
552 NotImplementedError:
553 As per [`ssh_agent.SSHAgentClient.__init__`][],
554 including the mulitple raise as an exception group.
555 Only applicable if agent auto-discovery is used.
556 OSError:
557 If the connection hint was a socket, then there was an
558 error setting up the socket connection to the agent.
560 Otherwise, as per
561 [`ssh_agent.SSHAgentClient.__init__`][]. Only
562 applicable if agent auto-discovery is used.
563 ValueError:
564 The SSH key is principally unsuitable for this use case.
565 Usually this means that the signature is not
566 deterministic.
568 Examples:
569 >>> import base64
570 >>> # Actual Ed25519 test public key.
571 >>> public_key = bytes.fromhex('''
572 ... 00 00 00 0b 73 73 68 2d 65 64 32 35 35 31 39
573 ... 00 00 00 20
574 ... 81 78 81 68 26 d6 02 48 5f 0f ff 32 48 6f e4 c1
575 ... 30 89 dc 1c 6a 45 06 09 e9 09 0f fb c2 12 69 76
576 ... ''')
577 >>> expected_sig_raw = bytes.fromhex('''
578 ... 00 00 00 0b 73 73 68 2d 65 64 32 35 35 31 39
579 ... 00 00 00 40
580 ... f0 98 19 80 6c 1a 97 d5 26 03 6e cc e3 65 8f 86
581 ... 66 07 13 19 13 09 21 33 33 f9 e4 36 53 1d af fd
582 ... 0d 08 1f ec f8 73 9b 8c 5f 55 39 16 7c 53 54 2c
583 ... 1e 52 bb 30 ed 7f 89 e2 2f 69 51 55 d8 9e a6 02
584 ... ''')
585 >>> # Raw Ed25519 signatures are 64 bytes long.
586 >>> signature_blob = expected_sig_raw[-64:]
587 >>> phrase = base64.standard_b64encode(signature_blob)
588 >>> Vault.phrase_from_key(phrase) == expected # doctest:+SKIP
589 True
591 """
592 with ssh_agent.SSHAgentClient.ensure_agent_subcontext(conn) as client: 1ieVT
593 if not cls.is_suitable_ssh_key(key, client=client): 593 ↛ 594line 593 didn't jump to line 594 because the condition on line 593 was never true1ieVT
594 msg = (
595 'unsuitable SSH key: bad key, or '
596 'signature not deterministic under this agent'
597 )
598 raise ValueError(msg)
599 raw_sig = client.sign(key, cls.UUID) 1ieVT
600 _keytype, trailer = ssh_agent.SSHAgentClient.unstring_prefix(raw_sig) 1ieT
601 signature_blob = ssh_agent.SSHAgentClient.unstring(trailer) 1ieT
602 return bytes(base64.standard_b64encode(signature_blob)) 1ieT
604 @classmethod
605 def phrases_are_interchangable(
606 cls,
607 phrase1: Buffer,
608 phrase2: Buffer,
609 /,
610 ) -> bool:
611 """Return true if the passphrases are interchangable to Vault.
613 Vault internally passes the passphrase as the key to HMAC-SHA1.
614 HMAC requires keys to have a certain fixed length, and therefore
615 transforms keys of other lengths suitably. Because of this, in
616 general, there exist multiple passphrases that behave
617 identically under Vault.
619 Note: HMAC key transformation
620 Keys strictly larger than the SHA1 block size (64 bytes) are
621 first hashed with SHA1, then the digest is used in place of
622 the original key. Then, any keys/digests smaller than the
623 block size are padded with NUL bytes on the right, up to the
624 block size.
626 As a result, keys smaller than the block size are padded,
627 keys larger than the block size are hashed and then padded,
628 and keys exactly as large as the block size are used as-is.
630 Args:
631 phrase1:
632 A passphrase to compare. Must be a binary string to
633 mitigate timing attacks.
634 phrase2:
635 A passphrase to compare. Must be a binary string to
636 mitigate timing attacks.
638 Warning: Likely non-resistant to timing attacks
639 This method makes some effort to be resistant to timing
640 attacks, but cannot guarantee that Python
641 micro-optimizations, version or platform differences affect
642 the effectiveness of these efforts.
644 Callers can definitely observe timing differences due to the
645 length of the passphrase passed in.
647 """
648 to_key = cls._phrase_to_hmac_key 1SR
649 return hmac.compare_digest(to_key(phrase1), to_key(phrase2)) 1SR
651 @classmethod
652 def _phrase_to_hmac_key(
653 cls,
654 phrase: Buffer | str,
655 /,
656 ) -> bytes:
657 r"""Return the HMAC key belonging to a passphrase.
659 This is the actual HMAC key this passphrase would be transformed
660 into when used within Vault.
662 See [`phrases_are_interchangable`][] for further explanations
663 and warnings about timing attack resistance.
665 Args:
666 phrase:
667 A passphrase to compare. Must be a binary string to
668 mitigate timing attacks.
670 """
671 phrase = cls._get_binary_string(phrase) 1SR
672 h = hashlib.sha1(phrase, usedforsecurity=False) 1SR
673 try: 1SR
674 key = bytearray(h.block_size) 1SR
675 for i, byte in enumerate(phrase): 1SR
676 key[i] = byte 1SR
677 return bytes(key) 1SR
678 except IndexError: 1R
679 return h.digest() + b'\x00' * (h.block_size - h.digest_size) 1R
681 @staticmethod
682 def _subtract(
683 charset: Buffer,
684 allowed: Buffer,
685 ) -> bytearray:
686 """Remove the characters in charset from allowed.
688 This preserves the relative order of characters in `allowed`.
690 Args:
691 charset:
692 Characters to remove. Must not contain duplicate
693 characters.
694 allowed:
695 Character set to remove the other characters from. Must
696 not contain duplicate characters.
698 Returns:
699 The pruned "allowed" character set.
701 Raises:
702 ValueError:
703 `allowed` or `charset` contained duplicate characters.
705 """
706 allowed = ( 1ahjfklmdnUMNOWcbg
707 allowed if isinstance(allowed, bytearray) else bytearray(allowed)
708 )
709 assert_type(allowed, bytearray) 1ahjfklmdnUMNOWcbg
710 charset = memoryview(charset).toreadonly().cast('c') 1ahjfklmdnUMNOWcbg
711 assert_type(charset, 'memoryview[bytes]') 1ahjfklmdnUMNOWcbg
712 msg_dup_characters = 'duplicate characters in set' 1ahjfklmdnUMNOWcbg
713 if len(frozenset(allowed)) != len(allowed): 1ahjfklmdnUMNOWcbg
714 raise ValueError(msg_dup_characters) 1W
715 if len(frozenset(charset)) != len(charset): 1ahjfklmdnUMNOWcbg
716 raise ValueError(msg_dup_characters) 1W
717 for c in charset: 1ahjfklmdnUMNOcbg
718 try: 1ahjfklmdnUMNOcbg
719 pos = allowed.index(c) 1ahjfklmdnUMNOcbg
720 except ValueError: 1adOcb
721 pass 1adOcb
722 else:
723 allowed[pos : pos + 1] = [] 1ahjfklmdnUMNOcbg
724 return allowed 1ahjfklmdnUMNOcbg