Coverage for src\derivepassphrase\_internals\cli_helpers.py: 97.312%

314 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 12:17 +0200

1# SPDX-FileCopyrightText: 2025 Marco Ricci <software@the13thletter.info> 

2# 

3# SPDX-License-Identifier: Zlib 

4 

5 

6"""Helper functions for the derivepassphrase command-line. 

7 

8Warning: 

9 Non-public module (implementation detail), provided for didactical and 

10 educational purposes only. Subject to change without notice, including 

11 removal. 

12 

13""" 

14 

15from __future__ import annotations 

16 

17import base64 

18import copy 

19import enum 

20import hashlib 

21import json 

22import logging 

23import os 

24import pathlib 

25import shlex 

26import sys 

27import threading 

28import unicodedata 

29from typing import TYPE_CHECKING, cast 

30 

31import click 

32import click.shell_completion 

33import exceptiongroup 

34from typing_extensions import Any 

35 

36from derivepassphrase import _types, ssh_agent, vault 

37from derivepassphrase._internals import cli_messages as _msg 

38from derivepassphrase.ssh_agent import socketprovider 

39 

40if sys.version_info >= (3, 11): 

41 import tomllib 

42else: 

43 import tomli as tomllib 

44 from exceptiongroup import BaseExceptionGroup 

45 

46if TYPE_CHECKING: 

47 import types 

48 from collections.abc import ( 

49 Iterator, 

50 Mapping, 

51 Sequence, 

52 ) 

53 from contextlib import AbstractContextManager 

54 from typing import ( 

55 BinaryIO, 

56 Callable, 

57 Literal, 

58 NoReturn, 

59 TextIO, 

60 ) 

61 

62 from typing_extensions import Buffer, Self 

63 

64PROG_NAME = _msg.PROG_NAME 

65KEY_DISPLAY_LENGTH = 50 

66 

67# Error messages 

68INVALID_VAULT_CONFIG = 'Invalid vault config' 

69AGENT_COMMUNICATION_ERROR = 'Error communicating with the SSH agent' 

70NO_SUITABLE_KEYS = 'No suitable SSH keys were found' 

71EMPTY_SELECTION = 'Empty selection' 

72 

73 

74# Shell completion 

75# ================ 

76 

77# Use naive filename completion for the `path` argument of 

78# `derivepassphrase vault`'s `--import` and `--export` options, as well 

79# as the `path` argument of `derivepassphrase export vault`. The latter 

80# treats the pseudo-filename `VAULT_PATH` specially, but this is awkward 

81# to combine with standard filename completion, particularly in bash, so 

82# we would probably have to implement *all* completion (`VAULT_PATH` and 

83# filename completion) ourselves, lacking some niceties of bash's 

84# built-in completion (e.g., adding spaces or slashes depending on 

85# whether the completion is a directory or a complete filename). 

86 

87 

88def shell_complete_path( 

89 ctx: click.Context, 

90 parameter: click.Parameter, 

91 value: str, 

92) -> list[str | click.shell_completion.CompletionItem]: 

93 """Request standard path completion for the `path` argument.""" # noqa: DOC201 

94 del ctx, parameter, value 2ybW

95 return [click.shell_completion.CompletionItem('', type='file')] 2ybW

96 

97 

98# The standard `click` shell completion scripts serialize the completion 

99# items as newline-separated one-line entries, which get silently 

100# corrupted if the value contains newlines. Each shell imposes 

101# additional restrictions: Fish uses newlines in all internal completion 

102# helper scripts, so it is difficult, if not impossible, to register 

103# completion entries containing newlines if completion comes from within 

104# a Fish completion function (instead of a Fish builtin). Zsh's 

105# completion system supports descriptions for each completion item, and 

106# the completion helper functions parse every entry as a colon-separated 

107# 2-tuple of item and description, meaning any colon in the item value 

108# must be escaped. Finally, Bash requires the result array to be 

109# populated at the completion function's top-level scope, but for/while 

110# loops within pipelines do not run at top-level scope, and Bash *also* 

111# strips NUL characters from command substitution output, making it 

112# difficult to read in external data into an array in a cross-platform 

113# manner from entirely within Bash. 

114# 

115# We capitulate in front of these problems---most egregiously because of 

116# Fish---and ensure that completion items (in this case: service names) 

117# never contain ASCII control characters by refusing to offer such 

118# items as valid completions. On the other side, `derivepassphrase` 

119# will warn the user when configuring or importing a service with such 

120# a name that it will not be available for shell completion. 

121 

122 

123def is_completable_item(obj: object) -> bool: 

124 """Return whether the item is completable on the command-line. 

125 

126 The item is completable if and only if it contains no ASCII control 

127 characters (U+0000 through U+001F, and U+007F). 

128 

129 """ 

130 obj = str(obj) 2a u v k p w q H b F I l r y h z R xbY W m d e f s C D E J

131 forbidden = frozenset(chr(i) for i in range(32)) | {'\x7f'} 2a u v k p w q H b F I l r y h z R xbY W m d e f s C D E J

132 return not any(f in obj for f in forbidden) 2a u v k p w q H b F I l r y h z R xbY W m d e f s C D E J

133 

134 

135def shell_complete_service( 

136 ctx: click.Context, 

137 parameter: click.Parameter, 

138 value: str, 

139) -> list[str | click.shell_completion.CompletionItem]: 

140 """Return known vault service names as completion items. 

141 

142 Service names are looked up in the vault configuration file. All 

143 errors will be suppressed. Additionally, any service names deemed 

144 not completable as per [`is_completable_item`][] will be silently 

145 skipped. 

146 

147 """ 

148 del ctx, parameter 1RYWmX#

149 try: 1RYWmX#

150 config = load_config() 1RYWmX#

151 return sorted( 1YWm

152 sv 

153 for sv in config['services'] 

154 if sv.startswith(value) and is_completable_item(sv) 

155 ) 

156 except FileNotFoundError: 1RX#

157 try: 1RX

158 config, _exc = migrate_and_load_old_config() 1RX

159 return sorted( 1R

160 sv 

161 for sv in config['services'] 

162 if sv.startswith(value) and is_completable_item(sv) 

163 ) 

164 except FileNotFoundError: 1X

165 return [] 1X

166 except Exception: # noqa: BLE001 1#

167 return [] 1#

168 

169 

170# Vault 

171# ===== 

172 

173config_filename_table = { 

174 None: '.', 

175 'write lock': '', 

176 'vault': 'vault.json', 

177 'user configuration': 'config.toml', 

178 # TODO(the-13th-letter): Remove the old settings.json file. 

179 # https://the13thletter.info/derivepassphrase/latest/upgrade-notes.html#v1.0-old-settings-file 

180 'old settings.json': 'settings.json', 

181 'notes backup': 'old-notes.txt', 

182} 

183 

184LOCK_SIZE = 4096 

185""" 

186The size of the record to lock at the beginning of the file, for locking 

187implementations that lock byte ranges instead of whole files. 

188 

189While POSIX specifies that [`fcntl`][] locks shall support a size of zero to 

190denote "any conceivable file size", the locking system available in 

191[`msvcrt`][] does not support this, and requires an explicit size. 

192""" 

193 

194 

195class ConfigurationMutex: 

196 """A mutual exclusion context manager for configuration edits. 

197 

198 See [`configuration_mutex`][]. 

199 

200 """ 

201 

202 lock: Callable[[], None] 

203 """A function to lock the mutex exclusively. 

204 

205 This implementation uses a file descriptor of a well-known file, 

206 which is opened before locking and closed after unlocking (and on 

207 error when locking). On Windows, we use [`msvcrt.locking`][], on 

208 other systems, we use [`fcntl.flock`][]. 

209 

210 Note: 

211 This is a normal Python function, not a method. 

212 

213 Warning: 

214 You really should not have to change this. *If you absolutely 

215 must*, then it is *your responsibility* to ensure that 

216 [`lock`][] and [`unlock`][] are still compatible. 

217 

218 """ 

219 unlock: Callable[[], None] 

220 """A function to unlock the mutex. 

221 

222 This implementation uses a file descriptor of a well-known file, 

223 which is opened before locking and closed after unlocking (and on 

224 error when locking). It will fail if the file descriptor is 

225 unavailable. On Windows, we use [`msvcrt.locking`][], on other 

226 systems, we use [`fcntl.flock`][]. 

227 

228 Note: 

229 This is a normal Python function, not a method. 

230 

231 Warning: 

232 You really should not have to change this. *If you absolutely 

233 must*, then it is *your responsibility* to ensure that 

234 [`lock`][] and [`unlock`][] are still compatible. 

235 

236 """ 

237 write_lock_file: pathlib.Path 

238 """The filename to lock.""" 

239 write_lock_fileobj: BinaryIO | None 

240 """The file object, if currently locked by this context manager.""" 

241 write_lock_condition: threading.Condition 

242 """The lock protecting access to the file object.""" 

243 

244 def __init__(self) -> None: 

245 """Initialize self.""" 

246 if sys.platform == 'win32': # pragma: unless the-annoying-os no cover 246 ↛ 260line 246 didn't jump to line 260 because the condition on line 246 was always true1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

247 import msvcrt # noqa: PLC0415 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

248 

249 locking = msvcrt.locking 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

250 LK_LOCK = msvcrt.LK_LOCK # noqa: N806 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

251 LK_UNLCK = msvcrt.LK_UNLCK # noqa: N806 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

252 

253 def lock_fd(fd: int, /) -> None: 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

254 locking(fd, LK_LOCK, LOCK_SIZE) 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

255 

256 def unlock_fd(fd: int, /) -> None: 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

257 locking(fd, LK_UNLCK, LOCK_SIZE) 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

258 

259 else: # pragma: unless posix no cover 

260 import fcntl # noqa: PLC0415 

261 

262 flock = fcntl.flock 

263 LOCK_EX = fcntl.LOCK_EX # noqa: N806 

264 LOCK_UN = fcntl.LOCK_UN # noqa: N806 

265 

266 def lock_fd(fd: int, /) -> None: 

267 flock(fd, LOCK_EX) 

268 

269 def unlock_fd(fd: int, /) -> None: 

270 flock(fd, LOCK_UN) 

271 

272 def lock_func() -> None: 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

273 with self.write_lock_condition: 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

274 self.write_lock_condition.wait_for( 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

275 lambda: self.write_lock_fileobj is None 

276 ) 

277 self.write_lock_condition.notify() 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

278 self.write_lock_file.touch() 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

279 self.write_lock_fileobj = self.write_lock_file.open('wb') 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

280 lock_fd(self.write_lock_fileobj.fileno()) 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

281 

282 def unlock_func() -> None: 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

283 with self.write_lock_condition: 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

284 assert self.write_lock_fileobj is not None, ( 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

285 'We lost track of the configuration write lock ' 

286 'file object, so we cannot unlock it anymore!' 

287 ) 

288 unlock_fd(self.write_lock_fileobj.fileno()) 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

289 self.write_lock_fileobj.close() 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

290 self.write_lock_fileobj = None 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

291 

292 self.lock = lock_func 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

293 self.unlock = unlock_func 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

294 self.write_lock_fileobj = None 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

295 self.write_lock_file = config_filename('write lock') 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

296 self.write_lock_condition = threading.Condition(threading.Lock()) 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

297 

298 def __enter__(self) -> Self: 

299 """Enter the context, locking the configuration file.""" # noqa: DOC201 

300 self.lock() 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

301 return self 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

302 

303 def __exit__( 

304 self, 

305 exc_type: type[BaseException] | None, 

306 exc_value: BaseException | None, 

307 exc_tb: types.TracebackType | None, 

308 /, 

309 ) -> Literal[False]: 

310 """Exit the context, releasing the lock on the configuration file.""" # noqa: DOC201 

311 self.unlock() 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

312 return False 1auvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

313 

314 

315def configuration_mutex() -> AbstractContextManager[AbstractContextManager]: 

316 """Enter a mutually exclusive context for configuration writes. 

317 

318 Within this context, no other cooperating instance of 

319 `derivepassphrase` will attempt to write to its configuration 

320 directory. We achieve this by locking a specific temporary file 

321 (whose name depends on the location of the configuration directory) 

322 for the duration of the context. 

323 

324 Returns: 

325 A reusable but not reentrant context manager, ensuring mutual 

326 exclusion (while within its context) with all other 

327 `derivepassphrase` instances using the same configuration 

328 directory. 

329 

330 Upon entering the context, the context manager returns itself. 

331 

332 Note: Locking specifics 

333 The directory for the lock file is determined via 

334 [`get_tempdir`][]. The lock filename is 

335 `derivepassphrase-lock-<hash>.txt`, where `<hash>` is computed 

336 as follows. First, canonicalize the path to the configuration 

337 directory with [`pathlib.Path.resolve`][]. Then encode the 

338 result as per the filesystem encoding ([`os.fsencode`][]), and 

339 hash it with SHA256. Finally, convert the result to standard 

340 base32 and use the first twelve characters, in lowercase, as 

341 `<hash>`. 

342 

343 We use [`msvcrt.locking`][] on Windows platforms (`sys.platform 

344 == "win32"`) and [`fcntl.flock`][] on all others. All locks are 

345 exclusive locks. If the locking system requires a byte range, 

346 we lock the first [`LOCK_SIZE`][] bytes. For maximum 

347 portability between locking implementations, we first open the 

348 lock file for writing, which is sometimes necessary to lock 

349 a file exclusively. Thus locking will fail if we lack 

350 permission to write to an already-existing lockfile. 

351 

352 """ 

353 return ConfigurationMutex() 1uvkpwqLMNHbcxFIKinolryhzBtGmdgefsCDEJ

354 

355 

356def get_tempdir() -> pathlib.Path: 

357 """Return a suitable temporary directory. 

358 

359 We implement the same algorithm as [`tempfile.gettempdir`][], except 

360 that we default to the `derivepassphrase` configuration directory 

361 instead of the current directory if no other choice is suitable, and 

362 that we return [`pathlib.Path`][] objects directly. 

363 

364 """ 

365 paths_to_try: list[pathlib.PurePath] = [] 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb

366 env_paths_to_try = [ 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb

367 os.getenv('TMPDIR'), 

368 os.getenv('TEMP'), 

369 os.getenv('TMP'), 

370 ] 

371 paths_to_try.extend( 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb

372 pathlib.PurePath(p) for p in env_paths_to_try if p is not None 

373 ) 

374 posix_paths_to_try = [ 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb

375 pathlib.PurePosixPath('/tmp'), # noqa: S108 

376 pathlib.PurePosixPath('/var/tmp'), # noqa: S108 

377 pathlib.PurePosixPath('/usr/tmp'), 

378 ] 

379 windows_paths_to_try = [ 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb

380 pathlib.PureWindowsPath(r'~\AppData\Local\Temp'), 

381 pathlib.PureWindowsPath(os.path.expandvars(r'%SYSTEMROOT%\Temp')), 

382 pathlib.PureWindowsPath(r'C:\TEMP'), 

383 pathlib.PureWindowsPath(r'C:\TMP'), 

384 pathlib.PureWindowsPath(r'\TEMP'), 

385 pathlib.PureWindowsPath(r'\TMP'), 

386 ] 

387 paths_to_try.extend( 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb

388 windows_paths_to_try if sys.platform == 'win32' else posix_paths_to_try 

389 ) 

390 for p in paths_to_try: 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb

391 path = pathlib.Path(p).expanduser() 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb

392 try: 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb

393 points_to_dir = path.is_dir() 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb

394 except OSError: 1.

395 continue 1.

396 else: 

397 if points_to_dir: 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb

398 return path.resolve(strict=True) 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb

399 return config_filename(subsystem=None) 1.

400 

401 

402def config_filename( 

403 subsystem: str | None = 'old settings.json', 

404) -> pathlib.Path: 

405 """Return the filename of the configuration file for the subsystem. 

406 

407 The (implicit default) file is currently named `settings.json`, 

408 located within the configuration directory as determined by the 

409 `DERIVEPASSPHRASE_PATH` environment variable, or by 

410 [`click.get_app_dir`][] in POSIX mode. Depending on the requested 

411 subsystem, this will usually be a different file within that 

412 directory. 

413 

414 Args: 

415 subsystem: 

416 Name of the configuration subsystem whose configuration 

417 filename to return. If not given, return the old filename 

418 from before the subcommand migration. If `None`, return the 

419 configuration directory instead. 

420 

421 Raises: 

422 AssertionError: 

423 An unknown subsystem was passed. 

424 

425 Deprecated: 

426 Since v0.2.0: The implicit default subsystem and the old 

427 configuration filename are deprecated, and will be removed in v1.0. 

428 The subsystem will be mandatory to specify. 

429 

430 """ 

431 path = pathlib.Path( 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb

432 os.getenv(PROG_NAME.upper() + '_PATH') 

433 or click.get_app_dir(PROG_NAME, force_posix=True) 

434 ) 

435 if subsystem == 'write lock': 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb

436 path_hash = base64.b32encode( 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb

437 hashlib.sha256(os.fsencode(path.resolve())).digest() 

438 ) 

439 path_hash_text = path_hash[:12].lower().decode('ASCII') 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb

440 temp_path = get_tempdir() 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb

441 filename_ = f'derivepassphrase-lock-{path_hash_text}.txt' 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb

442 return temp_path / filename_ 2a u / : ; = ? v @ [ ] ^ _ ` S T P A j Q k { p w | q L M N % Z ' + ( H b c x F I K i n o 5 6 l r y h z B * , t 3 4 2 $ ab} U 7 1 0 R ) - . G Y W m X # d g e f ~ 8 s 9 C D E J ! bbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtb

443 try: 1au/:;=?v@[]^_`STPAjQk{pw|qLMN%Z'+(HbcxFIKino56lryhzB*,t342$}U710R)-.GYWmX#dgef~8s9CDEJ!

444 filename = config_filename_table[subsystem] 1au/:;=?v@[]^_`STPAjQk{pw|qLMN%Z'+(HbcxFIKino56lryhzB*,t342$}U710R)-.GYWmX#dgef~8s9CDEJ!

445 except (KeyError, TypeError): # pragma: no cover 

446 # Defensive programming, so not included in coverage. 

447 msg = f'Unknown configuration subsystem: {subsystem!r}' 

448 raise AssertionError(msg) from None 

449 return path / filename 1au/:;=?v@[]^_`STPAjQk{pw|qLMN%Z'+(HbcxFIKino56lryhzB*,t342$}U710R)-.GYWmX#dgef~8s9CDEJ!

450 

451 

452def load_config() -> _types.VaultConfig: 

453 """Load a vault(1)-compatible config from the application directory. 

454 

455 The filename is obtained via [`config_filename`][]. This must be 

456 an unencrypted JSON file. 

457 

458 Returns: 

459 The vault settings. See [`_types.VaultConfig`][] for details. 

460 

461 Raises: 

462 OSError: 

463 There was an OS error accessing the file. 

464 ValueError: 

465 The data loaded from the file is not a vault(1)-compatible 

466 config. 

467 

468 """ 

469 filename = config_filename(subsystem='vault') 1auvSTPAjQkpwq%Z'+(HbcxFIKino56lryhBtU710R)GYWmXdgef8s9CDEJ!

470 with filename.open('rb') as fileobj: 1auvSTPAjQkpwq%Z'+(HbcxFIKino56lryhBtU710R)GYWmXdgef8s9CDEJ!

471 data = json.load(fileobj) 1auPAjQkpwq%'HbcxFIKinohBt)GYWmdgef8s9CDEJ!

472 if not _types.is_vault_config(data): 1auPAjQkpwq%'HbcxFIKinohBt)GYWmdgef8s9CDEJ!

473 raise ValueError(INVALID_VAULT_CONFIG) 1'

474 return data 1auPAjQkpwq%HbcxFIKinohBt)GYWmdgef8s9CDEJ!

475 

476 

477# TODO(the-13th-letter): Remove this function. 

478# https://the13thletter.info/derivepassphrase/latest/upgrade-notes.html#v1.0-old-settings-file 

479def migrate_and_load_old_config() -> tuple[_types.VaultConfig, OSError | None]: 

480 """Load and migrate a vault(1)-compatible config. 

481 

482 The (old) filename is obtained via [`config_filename`][]. This 

483 must be an unencrypted JSON file. After loading, the file is 

484 migrated to the new standard filename. 

485 

486 Returns: 

487 The vault settings, and an optional exception encountered during 

488 migration. See [`_types.VaultConfig`][] for details on the 

489 former. 

490 

491 Raises: 

492 OSError: 

493 There was an OS error accessing the old file. 

494 ValueError: 

495 The data loaded from the file is not a vault(1)-compatible 

496 config. 

497 

498 """ 

499 new_filename = config_filename(subsystem='vault') 1vSTZ(56lry342$U710RX

500 old_filename = config_filename(subsystem='old settings.json') 1vSTZ(56lry342$U710RX

501 with old_filename.open('rb') as fileobj: 1vSTZ(56lry342$U710RX

502 data = json.load(fileobj) 1342$10R

503 if not _types.is_vault_config(data): 1342$10R

504 raise ValueError(INVALID_VAULT_CONFIG) 1$

505 try: 134210R

506 old_filename.rename(new_filename) 134210R

507 except OSError as exc: 120

508 return data, exc 120

509 else: 

510 return data, None 1341R

511 

512 

513def save_config(config: _types.VaultConfig, /) -> None: 

514 """Save a vault(1)-compatible config to the application directory. 

515 

516 The filename is obtained via [`config_filename`][]. The config 

517 will be stored as an unencrypted JSON file. 

518 

519 Args: 

520 config: 

521 vault configuration to save. 

522 

523 Raises: 

524 OSError: 

525 There was an OS error accessing or writing the file. 

526 ValueError: 

527 The data cannot be stored as a vault(1)-compatible config. 

528 

529 """ 

530 if not _types.is_vault_config(config): 1auvkpwqbFlrh-GmdgefsCDE

531 raise ValueError(INVALID_VAULT_CONFIG) 1-

532 filename = config_filename(subsystem='vault') 1auvkpwqbFlrhGmdgefsCDE

533 filedir = filename.resolve().parent 1auvkpwqbFlrhGmdgefsCDE

534 filedir.mkdir(parents=True, exist_ok=True) 1auvkpwqbFlrhGmdgefsCDE

535 with filename.open('w', encoding='UTF-8') as fileobj: 1auvkpwqbFlhGmdgefsCDE

536 json.dump( 1auvkpwqblhGmdgefsCDE

537 config, fileobj, ensure_ascii=False, indent=2, sort_keys=True 

538 ) 

539 

540 

541def load_user_config() -> dict[str, Any]: 

542 """Load the user config from the application directory. 

543 

544 The filename is obtained via [`config_filename`][]. 

545 

546 Returns: 

547 The user configuration, as a nested `dict`. 

548 

549 Raises: 

550 OSError: 

551 There was an OS error accessing the file. 

552 ValueError: 

553 The data loaded from the file is not a valid configuration 

554 file. 

555 

556 """ 

557 filename = config_filename(subsystem='user configuration') 1auvSTPAjQkpwqLMNHbcxFIKino56lryhzB*,tU7mdgef8s9CDEJ!

558 with filename.open('rb') as fileobj: 1auvSTPAjQkpwqLMNHbcxFIKino56lryhzB*,tU7mdgef8s9CDEJ!

559 return tomllib.load(fileobj) 1hzB*

560 

561 

562def get_suitable_ssh_keys( 

563 conn: ssh_agent.SSHAgentClient 

564 | _types.SSHAgentSocket 

565 | Sequence[str] 

566 | None = None, 

567 /, 

568) -> Iterator[_types.SSHKeyCommentPair]: 

569 """Yield all SSH keys suitable for passphrase derivation. 

570 

571 Suitable SSH keys are queried from the running SSH agent (see 

572 [`ssh_agent.SSHAgentClient.list_keys`][]). 

573 

574 Args: 

575 conn: 

576 An optional connection hint to the SSH agent. See 

577 [`ssh_agent.SSHAgentClient.ensure_agent_subcontext`][]. 

578 

579 Yields: 

580 Every SSH key from the SSH agent that is suitable for passphrase 

581 derivation. 

582 

583 Raises: 

584 derivepassphrase.ssh_agent.socketprovider.NoSuchProviderError: 

585 As per [`ssh_agent.SSHAgentClient.__init__`][]. Only 

586 applicable if agent auto-discovery is used. 

587 KeyError: 

588 As per [`ssh_agent.SSHAgentClient.__init__`][]. Only 

589 applicable if agent auto-discovery is used. 

590 NotImplementedError: 

591 As per [`ssh_agent.SSHAgentClient.__init__`][], including 

592 the mulitple raise as an exception group. Only applicable 

593 if agent auto-discovery is used. 

594 OSError: 

595 If the connection hint was a socket, then there was an error 

596 setting up the socket connection to the agent. 

597 

598 Otherwise, as per [`ssh_agent.SSHAgentClient.__init__`][]. 

599 Only applicable if agent auto-discovery is used. 

600 LookupError: 

601 No keys usable for passphrase derivation are loaded into the 

602 SSH agent. 

603 RuntimeError: 

604 There was an error communicating with the SSH agent. 

605 ssh_agent.SSHAgentFailedError: 

606 The agent failed to supply a list of loaded keys. 

607 

608 """ 

609 with ssh_agent.SSHAgentClient.ensure_agent_subcontext(conn) as client: 2j x i n o t wbO

610 try: 2j i n o wbO

611 all_key_comment_pairs = list(client.list_keys()) 2j i n o wbO

612 except EOFError as exc: # pragma: no cover 1no

613 # Defensive programming (all our code is well-behaved), and 

614 # an external source of nondeterminism (faulty SSH agents), 

615 # so no coverage. 

616 raise RuntimeError(AGENT_COMMUNICATION_ERROR) from exc 

617 suitable_keys = copy.copy(all_key_comment_pairs) 2j i wbO

618 for pair in all_key_comment_pairs: 2j i wbO

619 key, _comment = pair 2j wbO

620 if vault.Vault.is_suitable_ssh_key(key, client=client): 620 ↛ 618line 620 didn't jump to line 618 because the condition on line 620 was always true2j wbO

621 yield pair 2j n o wbO

622 if not suitable_keys: 2j i wbO

623 raise LookupError(NO_SUITABLE_KEYS) 1i

624 

625 

626def prompt_for_selection( 

627 items: Sequence[str | bytes], 

628 heading: str = 'Possible choices:', 

629 single_choice_prompt: str = 'Confirm this choice?', 

630 ctx: click.Context | None = None, 

631) -> int: 

632 """Prompt user for a choice among the given items. 

633 

634 Print the heading, if any, then present the items to the user. If 

635 there are multiple items, prompt the user for a selection, validate 

636 the choice, then return the list index of the selected item. If 

637 there is only a single item, request confirmation for that item 

638 instead, and return the correct index. 

639 

640 Args: 

641 items: 

642 The list of items to choose from. 

643 heading: 

644 A heading for the list of items, to print immediately 

645 before. Defaults to a reasonable standard heading. If 

646 explicitly empty, print no heading. 

647 single_choice_prompt: 

648 The confirmation prompt if there is only a single possible 

649 choice. Defaults to a reasonable standard prompt. 

650 ctx: 

651 An optional `click` context, from which output device 

652 properties and color preferences will be queried. 

653 

654 Returns: 

655 An index into the items sequence, indicating the user's 

656 selection. 

657 

658 Raises: 

659 IndexError: 

660 The user made an invalid or empty selection, or requested an 

661 abort. 

662 

663 """ 

664 n = len(items) 2A j b c ubvbO

665 color = ctx.color if ctx is not None else None 2A j b c ubvbO

666 if heading: 2A j b c ubvbO

667 click.echo(click.style(heading, bold=True), err=True, color=color) 2A j b c ubO

668 for i, x in enumerate(items, start=1): 2A j b c ubvbO

669 click.echo( 2A j b c ubvbO

670 click.style(f'[{i}]', bold=True), nl=False, err=True, color=color 

671 ) 

672 click.echo(' ', nl=False, err=True, color=color) 2A j b c ubvbO

673 click.echo(x, err=True, color=color) 2A j b c ubvbO

674 if n > 1: 2A j b c ubvbO

675 choices = click.Choice([''] + [str(i) for i in range(1, n + 1)]) 2A j b c ubO

676 try: 2A j b c ubO

677 choice = click.prompt( 2A j b c ubO

678 f'Your selection? (1-{n}, leave empty to abort)', 

679 err=True, 

680 type=choices, 

681 show_choices=False, 

682 show_default=False, 

683 default='', 

684 ) 

685 except click.Abort: # pragma: no cover 2c ub

686 # This branch will not be triggered during testing on 

687 # `click` versions < 8.2.1, due to (non-monkeypatch-able) 

688 # deficiencies in `click.testing.CliRunner`. Therefore, as 

689 # an external source of nondeterminism, exclude it from 

690 # coverage. 

691 # 

692 # https://github.com/pallets/click/issues/2934 

693 choice = '' 2c ub

694 if not choice: 2A j b c ubO

695 raise IndexError(EMPTY_SELECTION) 2c ub

696 return int(choice) - 1 2A j b ubO

697 prompt_suffix = ( 2vbO

698 ' ' if single_choice_prompt.endswith(tuple('?.!')) else ': ' 

699 ) 

700 try: 2vbO

701 click.confirm( 2vbO

702 single_choice_prompt, 

703 prompt_suffix=prompt_suffix, 

704 err=True, 

705 abort=True, 

706 default=False, 

707 show_default=False, 

708 ) 

709 except click.Abort: 2vb

710 raise IndexError(EMPTY_SELECTION) from None 2vb

711 return 0 2vbO

712 

713 

714def handle_keyerror( 

715 error_callback: Callable[..., NoReturn], 

716 warning_callback: Callable[..., None], 

717) -> Callable[[BaseExceptionGroup], NoReturn]: 

718 """Generate a handler for KeyError in try-except*. 

719 

720 Returns a function emitting a standard user-facing message. 

721 

722 """ # noqa: DOC201 

723 del warning_callback 1PAjQbcxinotVO

724 

725 def handle_keyerror(_excgroup: BaseExceptionGroup) -> NoReturn: 1PAjQbcxinotVO

726 error_callback( 1xV

727 _msg.TranslatedString(_msg.ErrMsgTemplate.NO_SSH_AGENT_FOUND) 

728 ) 

729 

730 return handle_keyerror 1PAjQbcxinotVO

731 

732 

733def handle_notimplementederror( 

734 error_callback: Callable[..., NoReturn], 

735 warning_callback: Callable[..., None], 

736) -> Callable[[BaseExceptionGroup], NoReturn]: 

737 """Generate a handler for NotImplementedError in try-except*. 

738 

739 Returns a function emitting a standard user-facing message. 

740 

741 """ # noqa: DOC201 

742 

743 def handle_notimplementederror(excgroup: BaseExceptionGroup) -> NoReturn: 1PAjQbcxinotVO

744 if excgroup.subgroup(socketprovider.AfUnixNotAvailableError): 1tV

745 warning_callback( 1tV

746 _msg.TranslatedString(_msg.WarnMsgTemplate.NO_AF_UNIX) 

747 ) 

748 if excgroup.subgroup( 1tV

749 socketprovider.TheAnnoyingOsNamedPipesNotAvailableError 

750 ): 

751 warning_callback( 1V

752 _msg.TranslatedString( 

753 _msg.WarnMsgTemplate.NO_ANNOYING_OS_NAMED_PIPES 

754 ) 

755 ) 

756 error_callback( 1tV

757 _msg.TranslatedString(_msg.ErrMsgTemplate.NO_AGENT_SUPPORT) 

758 ) 

759 

760 return handle_notimplementederror 1PAjQbcxinotVO

761 

762 

763def handle_oserror( 

764 error_callback: Callable[..., NoReturn], 

765 warning_callback: Callable[..., None], 

766) -> Callable[[BaseExceptionGroup], NoReturn]: 

767 """Generate a handler for OSError in try-except*. 

768 

769 Returns a function emitting a standard user-facing message. 

770 

771 """ # noqa: DOC201 

772 del warning_callback 1PAjQbcxinotVO

773 

774 def handle_oserror(excgroup: BaseExceptionGroup) -> NoReturn: 1PAjQbcxinotVO

775 for exc in excgroup.exceptions: 1V

776 assert isinstance(exc, OSError) 1V

777 error_callback( 1V

778 _msg.TranslatedString( 

779 _msg.ErrMsgTemplate.CANNOT_CONNECT_TO_AGENT, 

780 error=exc.strerror, 

781 filename=exc.filename, 

782 ).maybe_without_filename() 

783 ) 

784 raise AssertionError() 

785 

786 return handle_oserror 1PAjQbcxinotVO

787 

788 

789def handle_runtimeerror( 

790 error_callback: Callable[..., NoReturn], 

791 warning_callback: Callable[..., None], 

792) -> Callable[[BaseExceptionGroup], NoReturn]: 

793 """Generate a handler for RuntimeError in try-except*. 

794 

795 Returns a function emitting a standard user-facing message. 

796 

797 """ # noqa: DOC201 

798 del warning_callback 1PAjQbcxinotVO

799 

800 def handle_runtimeerror(excgroup: BaseExceptionGroup) -> NoReturn: 1PAjQbcxinotVO

801 for exc in excgroup.exceptions: 1nV

802 error_callback( 1nV

803 _msg.TranslatedString( 

804 _msg.ErrMsgTemplate.CANNOT_UNDERSTAND_AGENT 

805 ), 

806 exc_info=exc, 

807 ) 

808 raise AssertionError() 

809 

810 return handle_runtimeerror 1PAjQbcxinotVO

811 

812 

813# Defensive programming, so no coverage. 

814def default_error_callback( 

815 message: Any, # noqa: ANN401 

816 /, 

817 *_args: Any, # noqa: ANN401 

818 **_kwargs: Any, # noqa: ANN401 

819) -> NoReturn: # pragma: no cover 

820 """Calls [`sys.exit`][] on its first argument, ignoring the rest.""" 

821 sys.exit(message) 

822 

823 

824def select_ssh_key( 

825 conn: ssh_agent.SSHAgentClient 

826 | _types.SSHAgentSocket 

827 | Sequence[str] 

828 | None = None, 

829 /, 

830 *, 

831 ctx: click.Context | None = None, 

832 error_callback: Callable[..., NoReturn] = default_error_callback, 

833 warning_callback: Callable[..., None] = lambda *_args: None, 

834) -> bytes | bytearray: 

835 """Interactively select an SSH key for passphrase derivation. 

836 

837 Suitable SSH keys are queried from the running SSH agent (see 

838 [`ssh_agent.SSHAgentClient.list_keys`][]), then the user is prompted 

839 interactively (see [`click.prompt`][]) for a selection. If any 

840 error occurs, the user receives an appropriate error message. 

841 

842 Args: 

843 conn: 

844 An optional connection hint to the SSH agent. See 

845 [`ssh_agent.SSHAgentClient.ensure_agent_subcontext`][]. 

846 ctx: 

847 An `click` context, queried for output device properties and 

848 color preferences when issuing the prompt. 

849 error_callback: 

850 A callback function for an error message, if any. The 

851 callback function is responsible for aborting this function 

852 call after acknowledging, formatting and/or forwarding the 

853 error message; it would typically call [`sys.exit`][] or 

854 raise an exception of its own, based on the provided error 

855 message. 

856 warning_callback: 

857 A callback function for a warning message, if any. The 

858 callback function is responsible for formatting the warning 

859 message and dispatching it into the warning system, if so 

860 desired. 

861 

862 Returns: 

863 The selected SSH key. 

864 

865 """ 

866 

867 def handle_lookuperror(_excgroup: BaseExceptionGroup) -> NoReturn: 1AjbcxinotO

868 error_callback( 1i

869 _msg.TranslatedString( 

870 _msg.ErrMsgTemplate.NO_SUITABLE_SSH_KEYS, 

871 PROG_NAME=PROG_NAME, 

872 ) 

873 ) 

874 

875 def handle_sshagentfailederror(excgroup: BaseExceptionGroup) -> NoReturn: 1AjbcxinotO

876 for exc in excgroup.exceptions: 1o

877 error_callback( 1o

878 _msg.TranslatedString( 

879 _msg.ErrMsgTemplate.AGENT_REFUSED_LIST_KEYS 

880 ), 

881 exc_info=exc, 

882 ) 

883 raise AssertionError() 

884 

885 with exceptiongroup.catch({ 1AjbcxinotO

886 KeyError: handle_keyerror(error_callback, warning_callback), 

887 LookupError: handle_lookuperror, 

888 NotImplementedError: handle_notimplementederror( 

889 error_callback, warning_callback 

890 ), 

891 OSError: handle_oserror(error_callback, warning_callback), 

892 ssh_agent.SSHAgentFailedError: handle_sshagentfailederror, 

893 RuntimeError: handle_runtimeerror(error_callback, warning_callback), 

894 }): 

895 suitable_keys = list(get_suitable_ssh_keys(conn)) 1AjbcxinotO

896 key_listing: list[str] = [] 1AjbcO

897 unstring_prefix = ssh_agent.SSHAgentClient.unstring_prefix 1AjbcO

898 for key, comment in suitable_keys: 1AjbcO

899 keytype = unstring_prefix(key)[0].decode('ASCII') 1AjbcO

900 key_str = base64.standard_b64encode(key).decode('ASCII') 1AjbcO

901 remaining_key_display_length = KEY_DISPLAY_LENGTH - 1 - len(keytype) 1AjbcO

902 key_extract = min( 1AjbcO

903 key_str, 

904 '...' + key_str[-remaining_key_display_length:], 

905 key=len, 

906 ) 

907 comment_str = comment.decode('UTF-8', errors='replace') 1AjbcO

908 key_listing.append(f'{keytype} {key_extract} {comment_str}') 1AjbcO

909 try: 1AjbcO

910 choice = prompt_for_selection( 1AjbcO

911 key_listing, 

912 heading='Suitable SSH keys:', 

913 single_choice_prompt='Use this key?', 

914 ctx=ctx, 

915 ) 

916 except IndexError: 1c

917 error_callback( 1c

918 _msg.TranslatedString( 

919 _msg.ErrMsgTemplate.USER_ABORTED_SSH_KEY_SELECTION 

920 ) 

921 ) 

922 return suitable_keys[choice].key 1AjbO

923 

924 

925def prompt_for_passphrase() -> str: 

926 """Interactively prompt for the passphrase. 

927 

928 Calls [`click.prompt`][] internally. Moved into a separate function 

929 mainly for testing/mocking purposes. 

930 

931 Returns: 

932 The user input. 

933 

934 """ 

935 try: 2k b c l r y h B zb

936 return cast( 2k b c l r y h B zb

937 'str', 

938 click.prompt( 

939 'Passphrase', 

940 default='', 

941 hide_input=True, 

942 show_default=False, 

943 err=True, 

944 ), 

945 ) 

946 except click.Abort: # pragma: no cover 1c

947 # This branch will not be triggered during testing on `click` 

948 # versions < 8.2.1, due to (non-monkeypatch-able) deficiencies 

949 # in `click.testing.CliRunner`. Therefore, as an external source 

950 # of nondeterminism, exclude it from coverage. 

951 # 

952 # https://github.com/pallets/click/issues/2934 

953 return '' 1c

954 

955 

956def toml_key(*parts: str) -> str: 

957 """Return a formatted TOML key, given its parts.""" 

958 

959 def escape(string: str) -> str: 1hz

960 translated = string.translate({ 1hz

961 0: r'\u0000', 

962 1: r'\u0001', 

963 2: r'\u0002', 

964 3: r'\u0003', 

965 4: r'\u0004', 

966 5: r'\u0005', 

967 6: r'\u0006', 

968 7: r'\u0007', 

969 8: r'\b', 

970 9: r'\t', 

971 10: r'\n', 

972 11: r'\u000B', 

973 12: r'\f', 

974 13: r'\r', 

975 14: r'\u000E', 

976 15: r'\u000F', 

977 ord('"'): r'\"', 

978 ord('\\'): r'\\', 

979 127: r'\u007F', 

980 }) 

981 return f'"{translated}"' if translated != string else string 1hz

982 

983 return '.'.join(map(escape, parts)) 1hz

984 

985 

986class ORIGIN(enum.Enum): 

987 """The origin of a setting, if not from the user configuration file. 

988 

989 Attributes: 

990 INTERACTIVE (_msg.Label): interactive input 

991 

992 """ 

993 

994 INTERACTIVE = _msg.Label.SETTINGS_ORIGIN_INTERACTIVE 

995 """""" 

996 

997 

998def check_for_misleading_passphrase( 

999 key: tuple[str, ...] | ORIGIN, 

1000 value: Mapping[str, Any], 

1001 *, 

1002 main_config: dict[str, Any], 

1003 ctx: click.Context | None = None, 

1004) -> None: 

1005 """Check for a misleading passphrase according to user configuration. 

1006 

1007 Look up the desired Unicode normalization form in the user 

1008 configuration, and if the passphrase is not normalized according to 

1009 this form, issue a warning to the user. 

1010 

1011 Args: 

1012 key: 

1013 A vault configuration key or an origin of the 

1014 value/configuration section, e.g. [`ORIGIN.INTERACTIVE`][], 

1015 or `("global",)`, or `("services", "foo")`. 

1016 value: 

1017 The vault configuration section maybe containing 

1018 a passphrase to vet. 

1019 main_config: 

1020 The parsed main user configuration. 

1021 ctx: 

1022 The click context. This is necessary to pass output options 

1023 set on the context to the logging machinery. 

1024 

1025 Raises: 

1026 AssertionError: 

1027 The main user configuration is invalid. 

1028 

1029 """ 

1030 form_key = 'unicode-normalization-form' 1auvSTkpwqblryhzBUmdgefs

1031 default_form: str = main_config.get('vault', {}).get( 1auvSTkpwqblryhzBUmdgefs

1032 f'default-{form_key}', 'NFC' 

1033 ) 

1034 form_dict: dict[str, dict] = main_config.get('vault', {}).get(form_key, {}) 1auvSTkpwqblryhzBUmdgefs

1035 form: Any = ( 1auvSTkpwqblryhzBUmdgefs

1036 default_form 

1037 if isinstance(key, ORIGIN) or key == ('global',) 

1038 else form_dict.get(key[1], default_form) 

1039 ) 

1040 config_key = ( 1auvSTkpwqblryhzBUmdgefs

1041 toml_key('vault', key[1], form_key) 

1042 if isinstance(key, tuple) and len(key) > 1 and key[1] in form_dict 

1043 else f'vault.default-{form_key}' 

1044 ) 

1045 if form not in {'NFC', 'NFD', 'NFKC', 'NFKD'}: 1auvSTkpwqblryhzBUmdgefs

1046 msg = f'Invalid value {form!r} for config key {config_key}' 1zB

1047 raise AssertionError(msg) 1zB

1048 logger = logging.getLogger(PROG_NAME) 1auvSTkpwqblryhzUmdgefs

1049 formatted_key = ( 1auvSTkpwqblryhzUmdgefs

1050 str(_msg.TranslatedString(key.value)) 

1051 if isinstance(key, ORIGIN) 

1052 else _types.json_path(key) 

1053 ) 

1054 if 'phrase' in value: 1auvSTkpwqblryhzUmdgefs

1055 phrase = value['phrase'] 1STkpqblryhUdgefs

1056 if not unicodedata.is_normalized(form, phrase): 1STkpqblryhUdgefs

1057 logger.warning( 1h

1058 _msg.TranslatedString( 

1059 _msg.WarnMsgTemplate.PASSPHRASE_NOT_NORMALIZED, 

1060 key=formatted_key, 

1061 form=form, 

1062 ), 

1063 stacklevel=2, 

1064 extra={'color': ctx.color if ctx is not None else None}, 

1065 ) 

1066 

1067 

1068def key_to_phrase( 

1069 key: str | Buffer, 

1070 /, 

1071 *, 

1072 error_callback: Callable[..., NoReturn] = default_error_callback, 

1073 warning_callback: Callable[..., None] = lambda *_args: None, 

1074 conn: ssh_agent.SSHAgentClient 

1075 | _types.SSHAgentSocket 

1076 | Sequence[str] 

1077 | None = None, 

1078) -> bytes: 

1079 """Return the equivalent master passphrase, or abort. 

1080 

1081 This wrapper around [`vault.Vault.phrase_from_key`][] emits 

1082 user-facing error messages if no equivalent master passphrase can be 

1083 obtained from the key, because this is the first point of contact 

1084 with the SSH agent. 

1085 

1086 """ 

1087 key = base64.standard_b64decode(key) 1PAjQV

1088 with exceptiongroup.catch({ # noqa: SIM117 1PAjQV

1089 KeyError: handle_keyerror(error_callback, warning_callback), 

1090 NotImplementedError: handle_notimplementederror( 

1091 error_callback, warning_callback 

1092 ), 

1093 OSError: handle_oserror(error_callback, warning_callback), 

1094 RuntimeError: handle_runtimeerror(error_callback, warning_callback), 

1095 }): 

1096 with ssh_agent.SSHAgentClient.ensure_agent_subcontext(conn) as client: 1PAjQV

1097 try: 1PAjQV

1098 return vault.Vault.phrase_from_key(key, conn=client) 1PAjQV

1099 except ssh_agent.SSHAgentFailedError as exc: 1V

1100 try: 1V

1101 keylist = client.list_keys() 1V

1102 except ssh_agent.SSHAgentFailedError: 1V

1103 pass 1V

1104 except Exception as exc2: # noqa: BLE001 1V

1105 exc.__context__ = exc2 1V

1106 else: 

1107 if not any( # pragma: no branch 1V

1108 k == key for k, _ in keylist 

1109 ): 

1110 error_callback( 1V

1111 _msg.TranslatedString( 

1112 _msg.ErrMsgTemplate.SSH_KEY_NOT_LOADED 

1113 ) 

1114 ) 

1115 error_callback( 1V

1116 _msg.TranslatedString( 

1117 _msg.ErrMsgTemplate.AGENT_REFUSED_SIGNATURE 

1118 ), 

1119 exc_info=exc, 

1120 ) 

1121 

1122 

1123def print_config_as_sh_script( 

1124 config: _types.VaultConfig, 

1125 /, 

1126 *, 

1127 outfile: TextIO, 

1128 prog_name_list: Sequence[str], 

1129) -> None: 

1130 """Print the given vault configuration as a sh(1) script. 

1131 

1132 This implements the `--export-as=sh` option of `derivepassphrase vault`. 

1133 

1134 Args: 

1135 config: 

1136 The configuration to serialize. 

1137 outfile: 

1138 A file object to write the output to. 

1139 prog_name_list: 

1140 A list of (subcommand) names for the command emitting this 

1141 output, e.g. `["derivepassphrase", "vault"]`. 

1142 

1143 """ 

1144 service_keys = ( 1Zdgef

1145 'length', 

1146 'repeat', 

1147 'lower', 

1148 'upper', 

1149 'number', 

1150 'space', 

1151 'dash', 

1152 'symbol', 

1153 ) 

1154 print('#!/bin/sh -e', file=outfile) 1Zdgef

1155 print(file=outfile) 1Zdgef

1156 print(shlex.join([*prog_name_list, '--clear']), file=outfile) 1Zdgef

1157 sv_obj_pairs: list[ 1Zdgef

1158 tuple[ 

1159 str | None, 

1160 _types.VaultConfigGlobalSettings 

1161 | _types.VaultConfigServicesSettings, 

1162 ], 

1163 ] = list(config['services'].items()) 

1164 if config.get('global', {}): 1Zdgef

1165 sv_obj_pairs.insert(0, (None, config['global'])) 1dg

1166 for sv, sv_obj in sv_obj_pairs: 1Zdgef

1167 this_service_keys = tuple(k for k in service_keys if k in sv_obj) 1dgef

1168 this_other_keys = tuple(k for k in sv_obj if k not in service_keys) 1dgef

1169 if this_other_keys: 1dgef

1170 other_sv_obj = {k: sv_obj[k] for k in this_other_keys} # type: ignore[literal-required] 1dgef

1171 dumped_config = json.dumps( 1dgef

1172 ( 

1173 {'services': {sv: other_sv_obj}} 

1174 if sv is not None 

1175 else {'global': other_sv_obj, 'services': {}} 

1176 ), 

1177 ensure_ascii=False, 

1178 indent=None, 

1179 ) 

1180 print( 1dgef

1181 shlex.join([*prog_name_list, '--import', '-']) + " <<'HERE'", 

1182 dumped_config, 

1183 'HERE', 

1184 sep='\n', 

1185 file=outfile, 

1186 ) 

1187 if not this_service_keys and not this_other_keys and sv: 1dgef

1188 dumped_config = json.dumps( 1f

1189 {'services': {sv: {}}}, 

1190 ensure_ascii=False, 

1191 indent=None, 

1192 ) 

1193 print( 1f

1194 shlex.join([*prog_name_list, '--import', '-']) + " <<'HERE'", 

1195 dumped_config, 

1196 'HERE', 

1197 sep='\n', 

1198 file=outfile, 

1199 ) 

1200 elif this_service_keys: 1dgef

1201 tokens = [*prog_name_list, '--config'] 1de

1202 for key in this_service_keys: 1de

1203 tokens.extend([f'--{key}', str(sv_obj[key])]) # type: ignore[literal-required] 1de

1204 if sv is not None: 1de

1205 tokens.extend(['--', sv]) 1e

1206 print(shlex.join(tokens), file=outfile) 1de