Coverage for src\derivepassphrase\_internals\cli_helpers.py: 97.312%
314 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 12:17 +0200
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 12:17 +0200
1# SPDX-FileCopyrightText: 2025 Marco Ricci <software@the13thletter.info>
2#
3# SPDX-License-Identifier: Zlib
6"""Helper functions for the derivepassphrase command-line.
8Warning:
9 Non-public module (implementation detail), provided for didactical and
10 educational purposes only. Subject to change without notice, including
11 removal.
13"""
15from __future__ import annotations
17import base64
18import copy
19import enum
20import hashlib
21import json
22import logging
23import os
24import pathlib
25import shlex
26import sys
27import threading
28import unicodedata
29from typing import TYPE_CHECKING, cast
31import click
32import click.shell_completion
33import exceptiongroup
34from typing_extensions import Any
36from derivepassphrase import _types, ssh_agent, vault
37from derivepassphrase._internals import cli_messages as _msg
38from derivepassphrase.ssh_agent import socketprovider
40if sys.version_info >= (3, 11):
41 import tomllib
42else:
43 import tomli as tomllib
44 from exceptiongroup import BaseExceptionGroup
46if TYPE_CHECKING:
47 import types
48 from collections.abc import (
49 Iterator,
50 Mapping,
51 Sequence,
52 )
53 from contextlib import AbstractContextManager
54 from typing import (
55 BinaryIO,
56 Callable,
57 Literal,
58 NoReturn,
59 TextIO,
60 )
62 from typing_extensions import Buffer, Self
64PROG_NAME = _msg.PROG_NAME
65KEY_DISPLAY_LENGTH = 50
67# Error messages
68INVALID_VAULT_CONFIG = 'Invalid vault config'
69AGENT_COMMUNICATION_ERROR = 'Error communicating with the SSH agent'
70NO_SUITABLE_KEYS = 'No suitable SSH keys were found'
71EMPTY_SELECTION = 'Empty selection'
74# Shell completion
75# ================
77# Use naive filename completion for the `path` argument of
78# `derivepassphrase vault`'s `--import` and `--export` options, as well
79# as the `path` argument of `derivepassphrase export vault`. The latter
80# treats the pseudo-filename `VAULT_PATH` specially, but this is awkward
81# to combine with standard filename completion, particularly in bash, so
82# we would probably have to implement *all* completion (`VAULT_PATH` and
83# filename completion) ourselves, lacking some niceties of bash's
84# built-in completion (e.g., adding spaces or slashes depending on
85# whether the completion is a directory or a complete filename).
88def shell_complete_path(
89 ctx: click.Context,
90 parameter: click.Parameter,
91 value: str,
92) -> list[str | click.shell_completion.CompletionItem]:
93 """Request standard path completion for the `path` argument.""" # noqa: DOC201
94 del ctx, parameter, value 2ybW
95 return [click.shell_completion.CompletionItem('', type='file')] 2ybW
98# The standard `click` shell completion scripts serialize the completion
99# items as newline-separated one-line entries, which get silently
100# corrupted if the value contains newlines. Each shell imposes
101# additional restrictions: Fish uses newlines in all internal completion
102# helper scripts, so it is difficult, if not impossible, to register
103# completion entries containing newlines if completion comes from within
104# a Fish completion function (instead of a Fish builtin). Zsh's
105# completion system supports descriptions for each completion item, and
106# the completion helper functions parse every entry as a colon-separated
107# 2-tuple of item and description, meaning any colon in the item value
108# must be escaped. Finally, Bash requires the result array to be
109# populated at the completion function's top-level scope, but for/while
110# loops within pipelines do not run at top-level scope, and Bash *also*
111# strips NUL characters from command substitution output, making it
112# difficult to read in external data into an array in a cross-platform
113# manner from entirely within Bash.
114#
115# We capitulate in front of these problems---most egregiously because of
116# Fish---and ensure that completion items (in this case: service names)
117# never contain ASCII control characters by refusing to offer such
118# items as valid completions. On the other side, `derivepassphrase`
119# will warn the user when configuring or importing a service with such
120# a name that it will not be available for shell completion.
123def is_completable_item(obj: object) -> bool:
124 """Return whether the item is completable on the command-line.
126 The item is completable if and only if it contains no ASCII control
127 characters (U+0000 through U+001F, and U+007F).
129 """
130 obj = str(obj) 2a u v k p w q H b F I l r y h z R xbY W m d e f s C D E J
131 forbidden = frozenset(chr(i) for i in range(32)) | {'\x7f'} 2a u v k p w q H b F I l r y h z R xbY W m d e f s C D E J
132 return not any(f in obj for f in forbidden) 2a u v k p w q H b F I l r y h z R xbY W m d e f s C D E J
135def shell_complete_service(
136 ctx: click.Context,
137 parameter: click.Parameter,
138 value: str,
139) -> list[str | click.shell_completion.CompletionItem]:
140 """Return known vault service names as completion items.
142 Service names are looked up in the vault configuration file. All
143 errors will be suppressed. Additionally, any service names deemed
144 not completable as per [`is_completable_item`][] will be silently
145 skipped.
147 """
148 del ctx, parameter 1RYWmX#
149 try: 1RYWmX#
150 config = load_config() 1RYWmX#
151 return sorted( 1YWm
152 sv
153 for sv in config['services']
154 if sv.startswith(value) and is_completable_item(sv)
155 )
156 except FileNotFoundError: 1RX#
157 try: 1RX
158 config, _exc = migrate_and_load_old_config() 1RX
159 return sorted( 1R
160 sv
161 for sv in config['services']
162 if sv.startswith(value) and is_completable_item(sv)
163 )
164 except FileNotFoundError: 1X
165 return [] 1X
166 except Exception: # noqa: BLE001 1#
167 return [] 1#
170# Vault
171# =====
173config_filename_table = {
174 None: '.',
175 'write lock': '',
176 'vault': 'vault.json',
177 'user configuration': 'config.toml',
178 # TODO(the-13th-letter): Remove the old settings.json file.
179 # https://the13thletter.info/derivepassphrase/latest/upgrade-notes.html#v1.0-old-settings-file
180 'old settings.json': 'settings.json',
181 'notes backup': 'old-notes.txt',
182}
184LOCK_SIZE = 4096
185"""
186The size of the record to lock at the beginning of the file, for locking
187implementations that lock byte ranges instead of whole files.
189While POSIX specifies that [`fcntl`][] locks shall support a size of zero to
190denote "any conceivable file size", the locking system available in
191[`msvcrt`][] does not support this, and requires an explicit size.
192"""
195class ConfigurationMutex:
196 """A mutual exclusion context manager for configuration edits.
198 See [`configuration_mutex`][].
200 """
202 lock: Callable[[], None]
203 """A function to lock the mutex exclusively.
205 This implementation uses a file descriptor of a well-known file,
206 which is opened before locking and closed after unlocking (and on
207 error when locking). On Windows, we use [`msvcrt.locking`][], on
208 other systems, we use [`fcntl.flock`][].
210 Note:
211 This is a normal Python function, not a method.
213 Warning:
214 You really should not have to change this. *If you absolutely
215 must*, then it is *your responsibility* to ensure that
216 [`lock`][] and [`unlock`][] are still compatible.
218 """
219 unlock: Callable[[], None]
220 """A function to unlock the mutex.
222 This implementation uses a file descriptor of a well-known file,
223 which is opened before locking and closed after unlocking (and on
224 error when locking). It will fail if the file descriptor is
225 unavailable. On Windows, we use [`msvcrt.locking`][], on other
226 systems, we use [`fcntl.flock`][].
228 Note:
229 This is a normal Python function, not a method.
231 Warning:
232 You really should not have to change this. *If you absolutely
233 must*, then it is *your responsibility* to ensure that
234 [`lock`][] and [`unlock`][] are still compatible.
236 """
237 write_lock_file: pathlib.Path
238 """The filename to lock."""
239 write_lock_fileobj: BinaryIO | None
240 """The file object, if currently locked by this context manager."""
241 write_lock_condition: threading.Condition
242 """The lock protecting access to the file object."""
244 def __init__(self) -> None:
245 """Initialize self."""
246 if sys.platform == 'win32': # pragma: unless the-annoying-os no cover 246 ↛ 260line 246 didn't jump to line 260 because the condition on line 246 was always true1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
247 import msvcrt # noqa: PLC0415 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
249 locking = msvcrt.locking 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
250 LK_LOCK = msvcrt.LK_LOCK # noqa: N806 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
251 LK_UNLCK = msvcrt.LK_UNLCK # noqa: N806 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
253 def lock_fd(fd: int, /) -> None: 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
254 locking(fd, LK_LOCK, LOCK_SIZE) 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
256 def unlock_fd(fd: int, /) -> None: 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
257 locking(fd, LK_UNLCK, LOCK_SIZE) 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
259 else: # pragma: unless posix no cover
260 import fcntl # noqa: PLC0415
262 flock = fcntl.flock
263 LOCK_EX = fcntl.LOCK_EX # noqa: N806
264 LOCK_UN = fcntl.LOCK_UN # noqa: N806
266 def lock_fd(fd: int, /) -> None:
267 flock(fd, LOCK_EX)
269 def unlock_fd(fd: int, /) -> None:
270 flock(fd, LOCK_UN)
272 def lock_func() -> None: 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
273 with self.write_lock_condition: 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
274 self.write_lock_condition.wait_for( 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
275 lambda: self.write_lock_fileobj is None
276 )
277 self.write_lock_condition.notify() 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
278 self.write_lock_file.touch() 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
279 self.write_lock_fileobj = self.write_lock_file.open('wb') 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
280 lock_fd(self.write_lock_fileobj.fileno()) 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
282 def unlock_func() -> None: 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
283 with self.write_lock_condition: 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
284 assert self.write_lock_fileobj is not None, ( 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
285 'We lost track of the configuration write lock '
286 'file object, so we cannot unlock it anymore!'
287 )
288 unlock_fd(self.write_lock_fileobj.fileno()) 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
289 self.write_lock_fileobj.close() 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
290 self.write_lock_fileobj = None 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
292 self.lock = lock_func 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
293 self.unlock = unlock_func 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
294 self.write_lock_fileobj = None 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
295 self.write_lock_file = config_filename('write lock') 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
296 self.write_lock_condition = threading.Condition(threading.Lock()) 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
298 def __enter__(self) -> Self:
299 """Enter the context, locking the configuration file.""" # noqa: DOC201
300 self.lock() 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
301 return self 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
303 def __exit__(
304 self,
305 exc_type: type[BaseException] | None,
306 exc_value: BaseException | None,
307 exc_tb: types.TracebackType | None,
308 /,
309 ) -> Literal[False]:
310 """Exit the context, releasing the lock on the configuration file.""" # noqa: DOC201
311 self.unlock() 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
312 return False 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
315def configuration_mutex() -> AbstractContextManager[AbstractContextManager]:
316 """Enter a mutually exclusive context for configuration writes.
318 Within this context, no other cooperating instance of
319 `derivepassphrase` will attempt to write to its configuration
320 directory. We achieve this by locking a specific temporary file
321 (whose name depends on the location of the configuration directory)
322 for the duration of the context.
324 Returns:
325 A reusable but not reentrant context manager, ensuring mutual
326 exclusion (while within its context) with all other
327 `derivepassphrase` instances using the same configuration
328 directory.
330 Upon entering the context, the context manager returns itself.
332 Note: Locking specifics
333 The directory for the lock file is determined via
334 [`get_tempdir`][]. The lock filename is
335 `derivepassphrase-lock-<hash>.txt`, where `<hash>` is computed
336 as follows. First, canonicalize the path to the configuration
337 directory with [`pathlib.Path.resolve`][]. Then encode the
338 result as per the filesystem encoding ([`os.fsencode`][]), and
339 hash it with SHA256. Finally, convert the result to standard
340 base32 and use the first twelve characters, in lowercase, as
341 `<hash>`.
343 We use [`msvcrt.locking`][] on Windows platforms (`sys.platform
344 == "win32"`) and [`fcntl.flock`][] on all others. All locks are
345 exclusive locks. If the locking system requires a byte range,
346 we lock the first [`LOCK_SIZE`][] bytes. For maximum
347 portability between locking implementations, we first open the
348 lock file for writing, which is sometimes necessary to lock
349 a file exclusively. Thus locking will fail if we lack
350 permission to write to an already-existing lockfile.
352 """
353 return ConfigurationMutex() 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ
356def get_tempdir() -> pathlib.Path:
357 """Return a suitable temporary directory.
359 We implement the same algorithm as [`tempfile.gettempdir`][], except
360 that we default to the `derivepassphrase` configuration directory
361 instead of the current directory if no other choice is suitable, and
362 that we return [`pathlib.Path`][] objects directly.
364 """
365 paths_to_try: list[pathlib.PurePath] = [] 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb
366 env_paths_to_try = [ 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb
367 os.getenv('TMPDIR'),
368 os.getenv('TEMP'),
369 os.getenv('TMP'),
370 ]
371 paths_to_try.extend( 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb
372 pathlib.PurePath(p) for p in env_paths_to_try if p is not None
373 )
374 posix_paths_to_try = [ 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb
375 pathlib.PurePosixPath('/tmp'), # noqa: S108
376 pathlib.PurePosixPath('/var/tmp'), # noqa: S108
377 pathlib.PurePosixPath('/usr/tmp'),
378 ]
379 windows_paths_to_try = [ 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb
380 pathlib.PureWindowsPath(r'~\AppData\Local\Temp'),
381 pathlib.PureWindowsPath(os.path.expandvars(r'%SYSTEMROOT%\Temp')),
382 pathlib.PureWindowsPath(r'C:\TEMP'),
383 pathlib.PureWindowsPath(r'C:\TMP'),
384 pathlib.PureWindowsPath(r'\TEMP'),
385 pathlib.PureWindowsPath(r'\TMP'),
386 ]
387 paths_to_try.extend( 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb
388 windows_paths_to_try if sys.platform == 'win32' else posix_paths_to_try
389 )
390 for p in paths_to_try: 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb
391 path = pathlib.Path(p).expanduser() 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb
392 try: 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb
393 points_to_dir = path.is_dir() 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb
394 except OSError: 1.
395 continue 1.
396 else:
397 if points_to_dir: 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb
398 return path.resolve(strict=True) 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb
399 return config_filename(subsystem=None) 1.
402def config_filename(
403 subsystem: str | None = 'old settings.json',
404) -> pathlib.Path:
405 """Return the filename of the configuration file for the subsystem.
407 The (implicit default) file is currently named `settings.json`,
408 located within the configuration directory as determined by the
409 `DERIVEPASSPHRASE_PATH` environment variable, or by
410 [`click.get_app_dir`][] in POSIX mode. Depending on the requested
411 subsystem, this will usually be a different file within that
412 directory.
414 Args:
415 subsystem:
416 Name of the configuration subsystem whose configuration
417 filename to return. If not given, return the old filename
418 from before the subcommand migration. If `None`, return the
419 configuration directory instead.
421 Raises:
422 AssertionError:
423 An unknown subsystem was passed.
425 Deprecated:
426 Since v0.2.0: The implicit default subsystem and the old
427 configuration filename are deprecated, and will be removed in v1.0.
428 The subsystem will be mandatory to specify.
430 """
431 path = pathlib.Path( 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb
432 os.getenv(PROG_NAME.upper() + '_PATH')
433 or click.get_app_dir(PROG_NAME, force_posix=True)
434 )
435 if subsystem == 'write lock': 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb
436 path_hash = base64.b32encode( 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb
437 hashlib.sha256(os.fsencode(path.resolve())).digest()
438 )
439 path_hash_text = path_hash[:12].lower().decode('ASCII') 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb
440 temp_path = get_tempdir() 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb
441 filename_ = f'derivepassphrase-lock-{path_hash_text}.txt' 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb
442 return temp_path / filename_ 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb
443 try: 1au/:;=?v@[]^_`STPAjQk{pw|qLMN%Z'+(HbcxFIKino56lryhzB*,t342$}U710R)-.GYWmX#dgef~8s9CDEJ!
444 filename = config_filename_table[subsystem] 1au/:;=?v@[]^_`STPAjQk{pw|qLMN%Z'+(HbcxFIKino56lryhzB*,t342$}U710R)-.GYWmX#dgef~8s9CDEJ!
445 except (KeyError, TypeError): # pragma: no cover
446 # Defensive programming, so not included in coverage.
447 msg = f'Unknown configuration subsystem: {subsystem!r}'
448 raise AssertionError(msg) from None
449 return path / filename 1au/:;=?v@[]^_`STPAjQk{pw|qLMN%Z'+(HbcxFIKino56lryhzB*,t342$}U710R)-.GYWmX#dgef~8s9CDEJ!
452def load_config() -> _types.VaultConfig:
453 """Load a vault(1)-compatible config from the application directory.
455 The filename is obtained via [`config_filename`][]. This must be
456 an unencrypted JSON file.
458 Returns:
459 The vault settings. See [`_types.VaultConfig`][] for details.
461 Raises:
462 OSError:
463 There was an OS error accessing the file.
464 ValueError:
465 The data loaded from the file is not a vault(1)-compatible
466 config.
468 """
469 filename = config_filename(subsystem='vault') 1auvSTPAjQkpwq%Z'+(HbcxFIKino56lryhBtU710R)GYWmXdgef8s9CDEJ!
470 with filename.open('rb') as fileobj: 1auvSTPAjQkpwq%Z'+(HbcxFIKino56lryhBtU710R)GYWmXdgef8s9CDEJ!
471 data = json.load(fileobj) 1auPAjQkpwq%'HbcxFIKinohBt)GYWmdgef8s9CDEJ!
472 if not _types.is_vault_config(data): 1auPAjQkpwq%'HbcxFIKinohBt)GYWmdgef8s9CDEJ!
473 raise ValueError(INVALID_VAULT_CONFIG) 1'
474 return data 1auPAjQkpwq%HbcxFIKinohBt)GYWmdgef8s9CDEJ!
477# TODO(the-13th-letter): Remove this function.
478# https://the13thletter.info/derivepassphrase/latest/upgrade-notes.html#v1.0-old-settings-file
479def migrate_and_load_old_config() -> tuple[_types.VaultConfig, OSError | None]:
480 """Load and migrate a vault(1)-compatible config.
482 The (old) filename is obtained via [`config_filename`][]. This
483 must be an unencrypted JSON file. After loading, the file is
484 migrated to the new standard filename.
486 Returns:
487 The vault settings, and an optional exception encountered during
488 migration. See [`_types.VaultConfig`][] for details on the
489 former.
491 Raises:
492 OSError:
493 There was an OS error accessing the old file.
494 ValueError:
495 The data loaded from the file is not a vault(1)-compatible
496 config.
498 """
499 new_filename = config_filename(subsystem='vault') 1vSTZ(56lry342$U710RX
500 old_filename = config_filename(subsystem='old settings.json') 1vSTZ(56lry342$U710RX
501 with old_filename.open('rb') as fileobj: 1vSTZ(56lry342$U710RX
502 data = json.load(fileobj) 1342$10R
503 if not _types.is_vault_config(data): 1342$10R
504 raise ValueError(INVALID_VAULT_CONFIG) 1$
505 try: 134210R
506 old_filename.rename(new_filename) 134210R
507 except OSError as exc: 120
508 return data, exc 120
509 else:
510 return data, None 1341R
513def save_config(config: _types.VaultConfig, /) -> None:
514 """Save a vault(1)-compatible config to the application directory.
516 The filename is obtained via [`config_filename`][]. The config
517 will be stored as an unencrypted JSON file.
519 Args:
520 config:
521 vault configuration to save.
523 Raises:
524 OSError:
525 There was an OS error accessing or writing the file.
526 ValueError:
527 The data cannot be stored as a vault(1)-compatible config.
529 """
530 if not _types.is_vault_config(config): 1auvkpwqbFlrh-GmdgefsCDE
531 raise ValueError(INVALID_VAULT_CONFIG) 1-
532 filename = config_filename(subsystem='vault') 1auvkpwqbFlrhGmdgefsCDE
533 filedir = filename.resolve().parent 1auvkpwqbFlrhGmdgefsCDE
534 filedir.mkdir(parents=True, exist_ok=True) 1auvkpwqbFlrhGmdgefsCDE
535 with filename.open('w', encoding='UTF-8') as fileobj: 1auvkpwqbFlhGmdgefsCDE
536 json.dump( 1auvkpwqblhGmdgefsCDE
537 config, fileobj, ensure_ascii=False, indent=2, sort_keys=True
538 )
541def load_user_config() -> dict[str, Any]:
542 """Load the user config from the application directory.
544 The filename is obtained via [`config_filename`][].
546 Returns:
547 The user configuration, as a nested `dict`.
549 Raises:
550 OSError:
551 There was an OS error accessing the file.
552 ValueError:
553 The data loaded from the file is not a valid configuration
554 file.
556 """
557 filename = config_filename(subsystem='user configuration') 1auvSTPAjQkpwqLMNHbcxFIKino56lryhzB*,tU7mdgef8s9CDEJ!
558 with filename.open('rb') as fileobj: 1auvSTPAjQkpwqLMNHbcxFIKino56lryhzB*,tU7mdgef8s9CDEJ!
559 return tomllib.load(fileobj) 1hzB*
562def get_suitable_ssh_keys(
563 conn: ssh_agent.SSHAgentClient
564 | _types.SSHAgentSocket
565 | Sequence[str]
566 | None = None,
567 /,
568) -> Iterator[_types.SSHKeyCommentPair]:
569 """Yield all SSH keys suitable for passphrase derivation.
571 Suitable SSH keys are queried from the running SSH agent (see
572 [`ssh_agent.SSHAgentClient.list_keys`][]).
574 Args:
575 conn:
576 An optional connection hint to the SSH agent. See
577 [`ssh_agent.SSHAgentClient.ensure_agent_subcontext`][].
579 Yields:
580 Every SSH key from the SSH agent that is suitable for passphrase
581 derivation.
583 Raises:
584 derivepassphrase.ssh_agent.socketprovider.NoSuchProviderError:
585 As per [`ssh_agent.SSHAgentClient.__init__`][]. Only
586 applicable if agent auto-discovery is used.
587 KeyError:
588 As per [`ssh_agent.SSHAgentClient.__init__`][]. Only
589 applicable if agent auto-discovery is used.
590 NotImplementedError:
591 As per [`ssh_agent.SSHAgentClient.__init__`][], including
592 the mulitple raise as an exception group. Only applicable
593 if agent auto-discovery is used.
594 OSError:
595 If the connection hint was a socket, then there was an error
596 setting up the socket connection to the agent.
598 Otherwise, as per [`ssh_agent.SSHAgentClient.__init__`][].
599 Only applicable if agent auto-discovery is used.
600 LookupError:
601 No keys usable for passphrase derivation are loaded into the
602 SSH agent.
603 RuntimeError:
604 There was an error communicating with the SSH agent.
605 ssh_agent.SSHAgentFailedError:
606 The agent failed to supply a list of loaded keys.
608 """
609 with ssh_agent.SSHAgentClient.ensure_agent_subcontext(conn) as client: 2j x i n o t wbO
610 try: 2j i n o wbO
611 all_key_comment_pairs = list(client.list_keys()) 2j i n o wbO
612 except EOFError as exc: # pragma: no cover 1no
613 # Defensive programming (all our code is well-behaved), and
614 # an external source of nondeterminism (faulty SSH agents),
615 # so no coverage.
616 raise RuntimeError(AGENT_COMMUNICATION_ERROR) from exc
617 suitable_keys = copy.copy(all_key_comment_pairs) 2j i wbO
618 for pair in all_key_comment_pairs: 2j i wbO
619 key, _comment = pair 2j wbO
620 if vault.Vault.is_suitable_ssh_key(key, client=client): 620 ↛ 618line 620 didn't jump to line 618 because the condition on line 620 was always true2j wbO
621 yield pair 2j n o wbO
622 if not suitable_keys: 2j i wbO
623 raise LookupError(NO_SUITABLE_KEYS) 1i
626def prompt_for_selection(
627 items: Sequence[str | bytes],
628 heading: str = 'Possible choices:',
629 single_choice_prompt: str = 'Confirm this choice?',
630 ctx: click.Context | None = None,
631) -> int:
632 """Prompt user for a choice among the given items.
634 Print the heading, if any, then present the items to the user. If
635 there are multiple items, prompt the user for a selection, validate
636 the choice, then return the list index of the selected item. If
637 there is only a single item, request confirmation for that item
638 instead, and return the correct index.
640 Args:
641 items:
642 The list of items to choose from.
643 heading:
644 A heading for the list of items, to print immediately
645 before. Defaults to a reasonable standard heading. If
646 explicitly empty, print no heading.
647 single_choice_prompt:
648 The confirmation prompt if there is only a single possible
649 choice. Defaults to a reasonable standard prompt.
650 ctx:
651 An optional `click` context, from which output device
652 properties and color preferences will be queried.
654 Returns:
655 An index into the items sequence, indicating the user's
656 selection.
658 Raises:
659 IndexError:
660 The user made an invalid or empty selection, or requested an
661 abort.
663 """
664 n = len(items) 2A j b c ubvbO
665 color = ctx.color if ctx is not None else None 2A j b c ubvbO
666 if heading: 2A j b c ubvbO
667 click.echo(click.style(heading, bold=True), err=True, color=color) 2A j b c ubO
668 for i, x in enumerate(items, start=1): 2A j b c ubvbO
669 click.echo( 2A j b c ubvbO
670 click.style(f'[{i}]', bold=True), nl=False, err=True, color=color
671 )
672 click.echo(' ', nl=False, err=True, color=color) 2A j b c ubvbO
673 click.echo(x, err=True, color=color) 2A j b c ubvbO
674 if n > 1: 2A j b c ubvbO
675 choices = click.Choice([''] + [str(i) for i in range(1, n + 1)]) 2A j b c ubO
676 try: 2A j b c ubO
677 choice = click.prompt( 2A j b c ubO
678 f'Your selection? (1-{n}, leave empty to abort)',
679 err=True,
680 type=choices,
681 show_choices=False,
682 show_default=False,
683 default='',
684 )
685 except click.Abort: # pragma: no cover 2c ub
686 # This branch will not be triggered during testing on
687 # `click` versions < 8.2.1, due to (non-monkeypatch-able)
688 # deficiencies in `click.testing.CliRunner`. Therefore, as
689 # an external source of nondeterminism, exclude it from
690 # coverage.
691 #
692 # https://github.com/pallets/click/issues/2934
693 choice = '' 2c ub
694 if not choice: 2A j b c ubO
695 raise IndexError(EMPTY_SELECTION) 2c ub
696 return int(choice) - 1 2A j b ubO
697 prompt_suffix = ( 2vbO
698 ' ' if single_choice_prompt.endswith(tuple('?.!')) else ': '
699 )
700 try: 2vbO
701 click.confirm( 2vbO
702 single_choice_prompt,
703 prompt_suffix=prompt_suffix,
704 err=True,
705 abort=True,
706 default=False,
707 show_default=False,
708 )
709 except click.Abort: 2vb
710 raise IndexError(EMPTY_SELECTION) from None 2vb
711 return 0 2vbO
714def handle_keyerror(
715 error_callback: Callable[..., NoReturn],
716 warning_callback: Callable[..., None],
717) -> Callable[[BaseExceptionGroup], NoReturn]:
718 """Generate a handler for KeyError in try-except*.
720 Returns a function emitting a standard user-facing message.
722 """ # noqa: DOC201
723 del warning_callback 1PAjQbcxinotVO
725 def handle_keyerror(_excgroup: BaseExceptionGroup) -> NoReturn: 1PAjQbcxinotVO
726 error_callback( 1xV
727 _msg.TranslatedString(_msg.ErrMsgTemplate.NO_SSH_AGENT_FOUND)
728 )
730 return handle_keyerror 1PAjQbcxinotVO
733def handle_notimplementederror(
734 error_callback: Callable[..., NoReturn],
735 warning_callback: Callable[..., None],
736) -> Callable[[BaseExceptionGroup], NoReturn]:
737 """Generate a handler for NotImplementedError in try-except*.
739 Returns a function emitting a standard user-facing message.
741 """ # noqa: DOC201
743 def handle_notimplementederror(excgroup: BaseExceptionGroup) -> NoReturn: 1PAjQbcxinotVO
744 if excgroup.subgroup(socketprovider.AfUnixNotAvailableError): 1tV
745 warning_callback( 1tV
746 _msg.TranslatedString(_msg.WarnMsgTemplate.NO_AF_UNIX)
747 )
748 if excgroup.subgroup( 1tV
749 socketprovider.TheAnnoyingOsNamedPipesNotAvailableError
750 ):
751 warning_callback( 1V
752 _msg.TranslatedString(
753 _msg.WarnMsgTemplate.NO_ANNOYING_OS_NAMED_PIPES
754 )
755 )
756 error_callback( 1tV
757 _msg.TranslatedString(_msg.ErrMsgTemplate.NO_AGENT_SUPPORT)
758 )
760 return handle_notimplementederror 1PAjQbcxinotVO
763def handle_oserror(
764 error_callback: Callable[..., NoReturn],
765 warning_callback: Callable[..., None],
766) -> Callable[[BaseExceptionGroup], NoReturn]:
767 """Generate a handler for OSError in try-except*.
769 Returns a function emitting a standard user-facing message.
771 """ # noqa: DOC201
772 del warning_callback 1PAjQbcxinotVO
774 def handle_oserror(excgroup: BaseExceptionGroup) -> NoReturn: 1PAjQbcxinotVO
775 for exc in excgroup.exceptions: 1V
776 assert isinstance(exc, OSError) 1V
777 error_callback( 1V
778 _msg.TranslatedString(
779 _msg.ErrMsgTemplate.CANNOT_CONNECT_TO_AGENT,
780 error=exc.strerror,
781 filename=exc.filename,
782 ).maybe_without_filename()
783 )
784 raise AssertionError()
786 return handle_oserror 1PAjQbcxinotVO
789def handle_runtimeerror(
790 error_callback: Callable[..., NoReturn],
791 warning_callback: Callable[..., None],
792) -> Callable[[BaseExceptionGroup], NoReturn]:
793 """Generate a handler for RuntimeError in try-except*.
795 Returns a function emitting a standard user-facing message.
797 """ # noqa: DOC201
798 del warning_callback 1PAjQbcxinotVO
800 def handle_runtimeerror(excgroup: BaseExceptionGroup) -> NoReturn: 1PAjQbcxinotVO
801 for exc in excgroup.exceptions: 1nV
802 error_callback( 1nV
803 _msg.TranslatedString(
804 _msg.ErrMsgTemplate.CANNOT_UNDERSTAND_AGENT
805 ),
806 exc_info=exc,
807 )
808 raise AssertionError()
810 return handle_runtimeerror 1PAjQbcxinotVO
813# Defensive programming, so no coverage.
814def default_error_callback(
815 message: Any, # noqa: ANN401
816 /,
817 *_args: Any, # noqa: ANN401
818 **_kwargs: Any, # noqa: ANN401
819) -> NoReturn: # pragma: no cover
820 """Calls [`sys.exit`][] on its first argument, ignoring the rest."""
821 sys.exit(message)
824def select_ssh_key(
825 conn: ssh_agent.SSHAgentClient
826 | _types.SSHAgentSocket
827 | Sequence[str]
828 | None = None,
829 /,
830 *,
831 ctx: click.Context | None = None,
832 error_callback: Callable[..., NoReturn] = default_error_callback,
833 warning_callback: Callable[..., None] = lambda *_args: None,
834) -> bytes | bytearray:
835 """Interactively select an SSH key for passphrase derivation.
837 Suitable SSH keys are queried from the running SSH agent (see
838 [`ssh_agent.SSHAgentClient.list_keys`][]), then the user is prompted
839 interactively (see [`click.prompt`][]) for a selection. If any
840 error occurs, the user receives an appropriate error message.
842 Args:
843 conn:
844 An optional connection hint to the SSH agent. See
845 [`ssh_agent.SSHAgentClient.ensure_agent_subcontext`][].
846 ctx:
847 An `click` context, queried for output device properties and
848 color preferences when issuing the prompt.
849 error_callback:
850 A callback function for an error message, if any. The
851 callback function is responsible for aborting this function
852 call after acknowledging, formatting and/or forwarding the
853 error message; it would typically call [`sys.exit`][] or
854 raise an exception of its own, based on the provided error
855 message.
856 warning_callback:
857 A callback function for a warning message, if any. The
858 callback function is responsible for formatting the warning
859 message and dispatching it into the warning system, if so
860 desired.
862 Returns:
863 The selected SSH key.
865 """
867 def handle_lookuperror(_excgroup: BaseExceptionGroup) -> NoReturn: 1AjbcxinotO
868 error_callback( 1i
869 _msg.TranslatedString(
870 _msg.ErrMsgTemplate.NO_SUITABLE_SSH_KEYS,
871 PROG_NAME=PROG_NAME,
872 )
873 )
875 def handle_sshagentfailederror(excgroup: BaseExceptionGroup) -> NoReturn: 1AjbcxinotO
876 for exc in excgroup.exceptions: 1o
877 error_callback( 1o
878 _msg.TranslatedString(
879 _msg.ErrMsgTemplate.AGENT_REFUSED_LIST_KEYS
880 ),
881 exc_info=exc,
882 )
883 raise AssertionError()
885 with exceptiongroup.catch({ 1AjbcxinotO
886 KeyError: handle_keyerror(error_callback, warning_callback),
887 LookupError: handle_lookuperror,
888 NotImplementedError: handle_notimplementederror(
889 error_callback, warning_callback
890 ),
891 OSError: handle_oserror(error_callback, warning_callback),
892 ssh_agent.SSHAgentFailedError: handle_sshagentfailederror,
893 RuntimeError: handle_runtimeerror(error_callback, warning_callback),
894 }):
895 suitable_keys = list(get_suitable_ssh_keys(conn)) 1AjbcxinotO
896 key_listing: list[str] = [] 1AjbcO
897 unstring_prefix = ssh_agent.SSHAgentClient.unstring_prefix 1AjbcO
898 for key, comment in suitable_keys: 1AjbcO
899 keytype = unstring_prefix(key)[0].decode('ASCII') 1AjbcO
900 key_str = base64.standard_b64encode(key).decode('ASCII') 1AjbcO
901 remaining_key_display_length = KEY_DISPLAY_LENGTH - 1 - len(keytype) 1AjbcO
902 key_extract = min( 1AjbcO
903 key_str,
904 '...' + key_str[-remaining_key_display_length:],
905 key=len,
906 )
907 comment_str = comment.decode('UTF-8', errors='replace') 1AjbcO
908 key_listing.append(f'{keytype} {key_extract} {comment_str}') 1AjbcO
909 try: 1AjbcO
910 choice = prompt_for_selection( 1AjbcO
911 key_listing,
912 heading='Suitable SSH keys:',
913 single_choice_prompt='Use this key?',
914 ctx=ctx,
915 )
916 except IndexError: 1c
917 error_callback( 1c
918 _msg.TranslatedString(
919 _msg.ErrMsgTemplate.USER_ABORTED_SSH_KEY_SELECTION
920 )
921 )
922 return suitable_keys[choice].key 1AjbO
925def prompt_for_passphrase() -> str:
926 """Interactively prompt for the passphrase.
928 Calls [`click.prompt`][] internally. Moved into a separate function
929 mainly for testing/mocking purposes.
931 Returns:
932 The user input.
934 """
935 try: 2k b c l r y h B zb
936 return cast( 2k b c l r y h B zb
937 'str',
938 click.prompt(
939 'Passphrase',
940 default='',
941 hide_input=True,
942 show_default=False,
943 err=True,
944 ),
945 )
946 except click.Abort: # pragma: no cover 1c
947 # This branch will not be triggered during testing on `click`
948 # versions < 8.2.1, due to (non-monkeypatch-able) deficiencies
949 # in `click.testing.CliRunner`. Therefore, as an external source
950 # of nondeterminism, exclude it from coverage.
951 #
952 # https://github.com/pallets/click/issues/2934
953 return '' 1c
956def toml_key(*parts: str) -> str:
957 """Return a formatted TOML key, given its parts."""
959 def escape(string: str) -> str: 1hz
960 translated = string.translate({ 1hz
961 0: r'\u0000',
962 1: r'\u0001',
963 2: r'\u0002',
964 3: r'\u0003',
965 4: r'\u0004',
966 5: r'\u0005',
967 6: r'\u0006',
968 7: r'\u0007',
969 8: r'\b',
970 9: r'\t',
971 10: r'\n',
972 11: r'\u000B',
973 12: r'\f',
974 13: r'\r',
975 14: r'\u000E',
976 15: r'\u000F',
977 ord('"'): r'\"',
978 ord('\\'): r'\\',
979 127: r'\u007F',
980 })
981 return f'"{translated}"' if translated != string else string 1hz
983 return '.'.join(map(escape, parts)) 1hz
986class ORIGIN(enum.Enum):
987 """The origin of a setting, if not from the user configuration file.
989 Attributes:
990 INTERACTIVE (_msg.Label): interactive input
992 """
994 INTERACTIVE = _msg.Label.SETTINGS_ORIGIN_INTERACTIVE
995 """"""
998def check_for_misleading_passphrase(
999 key: tuple[str, ...] | ORIGIN,
1000 value: Mapping[str, Any],
1001 *,
1002 main_config: dict[str, Any],
1003 ctx: click.Context | None = None,
1004) -> None:
1005 """Check for a misleading passphrase according to user configuration.
1007 Look up the desired Unicode normalization form in the user
1008 configuration, and if the passphrase is not normalized according to
1009 this form, issue a warning to the user.
1011 Args:
1012 key:
1013 A vault configuration key or an origin of the
1014 value/configuration section, e.g. [`ORIGIN.INTERACTIVE`][],
1015 or `("global",)`, or `("services", "foo")`.
1016 value:
1017 The vault configuration section maybe containing
1018 a passphrase to vet.
1019 main_config:
1020 The parsed main user configuration.
1021 ctx:
1022 The click context. This is necessary to pass output options
1023 set on the context to the logging machinery.
1025 Raises:
1026 AssertionError:
1027 The main user configuration is invalid.
1029 """
1030 form_key = 'unicode-normalization-form' 1auvSTkpwqblryhzBUmdgefs
1031 default_form: str = main_config.get('vault', {}).get( 1auvSTkpwqblryhzBUmdgefs
1032 f'default-{form_key}', 'NFC'
1033 )
1034 form_dict: dict[str, dict] = main_config.get('vault', {}).get(form_key, {}) 1auvSTkpwqblryhzBUmdgefs
1035 form: Any = ( 1auvSTkpwqblryhzBUmdgefs
1036 default_form
1037 if isinstance(key, ORIGIN) or key == ('global',)
1038 else form_dict.get(key[1], default_form)
1039 )
1040 config_key = ( 1auvSTkpwqblryhzBUmdgefs
1041 toml_key('vault', key[1], form_key)
1042 if isinstance(key, tuple) and len(key) > 1 and key[1] in form_dict
1043 else f'vault.default-{form_key}'
1044 )
1045 if form not in {'NFC', 'NFD', 'NFKC', 'NFKD'}: 1auvSTkpwqblryhzBUmdgefs
1046 msg = f'Invalid value {form!r} for config key {config_key}' 1zB
1047 raise AssertionError(msg) 1zB
1048 logger = logging.getLogger(PROG_NAME) 1auvSTkpwqblryhzUmdgefs
1049 formatted_key = ( 1auvSTkpwqblryhzUmdgefs
1050 str(_msg.TranslatedString(key.value))
1051 if isinstance(key, ORIGIN)
1052 else _types.json_path(key)
1053 )
1054 if 'phrase' in value: 1auvSTkpwqblryhzUmdgefs
1055 phrase = value['phrase'] 1STkpqblryhUdgefs
1056 if not unicodedata.is_normalized(form, phrase): 1STkpqblryhUdgefs
1057 logger.warning( 1h
1058 _msg.TranslatedString(
1059 _msg.WarnMsgTemplate.PASSPHRASE_NOT_NORMALIZED,
1060 key=formatted_key,
1061 form=form,
1062 ),
1063 stacklevel=2,
1064 extra={'color': ctx.color if ctx is not None else None},
1065 )
1068def key_to_phrase(
1069 key: str | Buffer,
1070 /,
1071 *,
1072 error_callback: Callable[..., NoReturn] = default_error_callback,
1073 warning_callback: Callable[..., None] = lambda *_args: None,
1074 conn: ssh_agent.SSHAgentClient
1075 | _types.SSHAgentSocket
1076 | Sequence[str]
1077 | None = None,
1078) -> bytes:
1079 """Return the equivalent master passphrase, or abort.
1081 This wrapper around [`vault.Vault.phrase_from_key`][] emits
1082 user-facing error messages if no equivalent master passphrase can be
1083 obtained from the key, because this is the first point of contact
1084 with the SSH agent.
1086 """
1087 key = base64.standard_b64decode(key) 1PAjQV
1088 with exceptiongroup.catch({ # noqa: SIM117 1PAjQV
1089 KeyError: handle_keyerror(error_callback, warning_callback),
1090 NotImplementedError: handle_notimplementederror(
1091 error_callback, warning_callback
1092 ),
1093 OSError: handle_oserror(error_callback, warning_callback),
1094 RuntimeError: handle_runtimeerror(error_callback, warning_callback),
1095 }):
1096 with ssh_agent.SSHAgentClient.ensure_agent_subcontext(conn) as client: 1PAjQV
1097 try: 1PAjQV
1098 return vault.Vault.phrase_from_key(key, conn=client) 1PAjQV
1099 except ssh_agent.SSHAgentFailedError as exc: 1V
1100 try: 1V
1101 keylist = client.list_keys() 1V
1102 except ssh_agent.SSHAgentFailedError: 1V
1103 pass 1V
1104 except Exception as exc2: # noqa: BLE001 1V
1105 exc.__context__ = exc2 1V
1106 else:
1107 if not any( # pragma: no branch 1V
1108 k == key for k, _ in keylist
1109 ):
1110 error_callback( 1V
1111 _msg.TranslatedString(
1112 _msg.ErrMsgTemplate.SSH_KEY_NOT_LOADED
1113 )
1114 )
1115 error_callback( 1V
1116 _msg.TranslatedString(
1117 _msg.ErrMsgTemplate.AGENT_REFUSED_SIGNATURE
1118 ),
1119 exc_info=exc,
1120 )
1123def print_config_as_sh_script(
1124 config: _types.VaultConfig,
1125 /,
1126 *,
1127 outfile: TextIO,
1128 prog_name_list: Sequence[str],
1129) -> None:
1130 """Print the given vault configuration as a sh(1) script.
1132 This implements the `--export-as=sh` option of `derivepassphrase vault`.
1134 Args:
1135 config:
1136 The configuration to serialize.
1137 outfile:
1138 A file object to write the output to.
1139 prog_name_list:
1140 A list of (subcommand) names for the command emitting this
1141 output, e.g. `["derivepassphrase", "vault"]`.
1143 """
1144 service_keys = ( 1Zdgef
1145 'length',
1146 'repeat',
1147 'lower',
1148 'upper',
1149 'number',
1150 'space',
1151 'dash',
1152 'symbol',
1153 )
1154 print('#!/bin/sh -e', file=outfile) 1Zdgef
1155 print(file=outfile) 1Zdgef
1156 print(shlex.join([*prog_name_list, '--clear']), file=outfile) 1Zdgef
1157 sv_obj_pairs: list[ 1Zdgef
1158 tuple[
1159 str | None,
1160 _types.VaultConfigGlobalSettings
1161 | _types.VaultConfigServicesSettings,
1162 ],
1163 ] = list(config['services'].items())
1164 if config.get('global', {}): 1Zdgef
1165 sv_obj_pairs.insert(0, (None, config['global'])) 1dg
1166 for sv, sv_obj in sv_obj_pairs: 1Zdgef
1167 this_service_keys = tuple(k for k in service_keys if k in sv_obj) 1dgef
1168 this_other_keys = tuple(k for k in sv_obj if k not in service_keys) 1dgef
1169 if this_other_keys: 1dgef
1170 other_sv_obj = {k: sv_obj[k] for k in this_other_keys} # type: ignore[literal-required] 1dgef
1171 dumped_config = json.dumps( 1dgef
1172 (
1173 {'services': {sv: other_sv_obj}}
1174 if sv is not None
1175 else {'global': other_sv_obj, 'services': {}}
1176 ),
1177 ensure_ascii=False,
1178 indent=None,
1179 )
1180 print( 1dgef
1181 shlex.join([*prog_name_list, '--import', '-']) + " <<'HERE'",
1182 dumped_config,
1183 'HERE',
1184 sep='\n',
1185 file=outfile,
1186 )
1187 if not this_service_keys and not this_other_keys and sv: 1dgef
1188 dumped_config = json.dumps( 1f
1189 {'services': {sv: {}}},
1190 ensure_ascii=False,
1191 indent=None,
1192 )
1193 print( 1f
1194 shlex.join([*prog_name_list, '--import', '-']) + " <<'HERE'",
1195 dumped_config,
1196 'HERE',
1197 sep='\n',
1198 file=outfile,
1199 )
1200 elif this_service_keys: 1dgef
1201 tokens = [*prog_name_list, '--config'] 1de
1202 for key in this_service_keys: 1de
1203 tokens.extend([f'--{key}', str(sv_obj[key])]) # type: ignore[literal-required] 1de
1204 if sv is not None: 1de
1205 tokens.extend(['--', sv]) 1e
1206 print(shlex.join(tokens), file=outfile) 1de