Coverage for src\derivepassphrase\exporter\storeroom.py: 100.000%
207 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 12:17 +0200
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 12:17 +0200
1# SPDX-FileCopyrightText: 2025 Marco Ricci <software@the13thletter.info>
2#
3# SPDX-License-Identifier: Zlib
5"""Exporter for the vault "storeroom" configuration format.
7The "storeroom" format is the experimental format used in alpha and beta
8versions of vault beyond v0.3.0. The configuration is stored as
9a separate directory, which acts like a hash table (i.e. has named
10slots) and provides an impure quasi-filesystem interface. Each hash
11table entry is separately encrypted and authenticated. James Coglan
12designed this format to avoid concurrent write issues when updating or
13synchronizing the vault configuration with e.g. a cloud service.
15The public interface is the [`export_storeroom_data`][] function.
16Multiple *non-public* functions are additionally documented here for
17didactical and educational reasons, but they are not part of the module
18API, are subject to change without notice (including removal), and
19should *not* be used or relied on.
21"""
23# ruff: noqa: S303
25from __future__ import annotations
27import base64
28import importlib
29import json
30import logging
31import os
32import os.path
33import pathlib
34import struct
35from typing import TYPE_CHECKING, Any
37from derivepassphrase import _types, exporter
38from derivepassphrase._internals import cli_messages as _msg
40if TYPE_CHECKING:
41 from typing_extensions import Buffer
43if TYPE_CHECKING:
44 from collections.abc import Iterator
46 from cryptography.hazmat.primitives import ciphers, hashes, hmac, padding
47 from cryptography.hazmat.primitives.ciphers import algorithms, modes
48 from cryptography.hazmat.primitives.kdf import pbkdf2
50 STUBBED = False
51else:
52 try:
53 importlib.import_module('cryptography')
54 except ModuleNotFoundError as exc:
55 # For type checking only, will not actually be called, so no
56 # coverage.
57 class _DummyModule: # pragma: no cover
58 def __init__(self, exc: type[Exception]) -> None:
59 self.exc = exc
61 def __getattr__(self, name: str) -> Any: # noqa: ANN401
62 def func(*args: Any, **kwargs: Any) -> Any: # noqa: ANN401,ARG001
63 raise self.exc
65 return func
67 ciphers = hashes = hmac = padding = _DummyModule(exc)
68 algorithms = modes = pbkdf2 = _DummyModule(exc)
69 STUBBED = True
70 else:
71 from cryptography.hazmat.primitives import (
72 ciphers,
73 hashes,
74 hmac,
75 padding,
76 )
77 from cryptography.hazmat.primitives.ciphers import algorithms, modes
78 from cryptography.hazmat.primitives.kdf import pbkdf2
80 STUBBED = False
82STOREROOM_MASTER_KEYS_UUID = b'35b7c7ed-f71e-4adf-9051-02fb0f1e0e17'
83VAULT_CIPHER_UUID = b'73e69e8a-cb05-4b50-9f42-59d76a511299'
84IV_SIZE = 16
85KEY_SIZE = MAC_SIZE = 32
86ENCRYPTED_KEYPAIR_SIZE = 128
87VERSION_SIZE = 1
89__all__ = ('export_storeroom_data',)
91logger = logging.getLogger(__name__)
94@exporter.register_export_vault_config_data_handler('storeroom')
95def export_storeroom_data( # noqa: C901,D417,PLR0912,PLR0914,PLR0915
96 path: str | bytes | os.PathLike | None = None,
97 key: str | Buffer | None = None,
98 *,
99 format: str = 'storeroom', # noqa: A002
100) -> dict[str, Any]:
101 """Export the full configuration stored in the storeroom.
103 See [`exporter.ExportVaultConfigDataFunction`][] for an explanation
104 of the call signature, and the exceptions to expect.
106 Other Args:
107 format:
108 The only supported format is `storeroom`.
110 """ # noqa: DOC201,DOC501
111 # Trigger import errors if necessary.
112 importlib.import_module('cryptography') 1ciafbk
113 if path is None: 1ciafb
114 path = exporter.get_vault_path() 1cafb
115 else:
116 path = pathlib.Path(os.fsdecode(path)) 1ia
117 if key is None: 1ciafb
118 key = exporter.get_vault_key() 1iafb
119 if format != 'storeroom': # pragma: no cover 1ciafb
120 # Defensive programming, will not actually be called like this
121 # by our code, so no coverage.
122 msg = exporter.INVALID_VAULT_NATIVE_CONFIGURATION_FORMAT.format(
123 fmt=format
124 )
125 raise ValueError(msg)
126 try: 1ciafb
127 master_keys_file = pathlib.Path(path, '.keys').open( # noqa: SIM115 1ciafb
128 encoding='utf-8',
129 )
130 except FileNotFoundError as exc: 1i
131 raise exporter.NotAVaultConfigError(path, format='storeroom') from exc 1i
132 with master_keys_file: 1cafb
133 header = json.loads(master_keys_file.readline()) 1cafb
134 if header != {'version': 1}: 1cafb
135 msg = 'bad or unsupported keys version header' 1f
136 raise RuntimeError(msg) 1f
137 raw_keys_data = base64.standard_b64decode(master_keys_file.readline()) 1cafb
138 encrypted_keys_params, encrypted_keys = struct.unpack( 1cafb
139 f'B {len(raw_keys_data) - 1}s', raw_keys_data
140 )
141 if master_keys_file.read(): 1cafb
142 msg = 'trailing data; cannot make sense of .keys file' 1f
143 raise RuntimeError(msg) 1f
144 encrypted_keys_version = encrypted_keys_params >> 4 1cafb
145 if encrypted_keys_version != 1: 1cafb
146 msg = f'cannot handle version {encrypted_keys_version} encrypted keys' 1f
147 raise RuntimeError(msg) 1f
148 logger.info( 1cab
149 _msg.TranslatedString(_msg.InfoMsgTemplate.PARSING_MASTER_KEYS_DATA)
150 )
151 encrypted_keys_iterations = 2 ** (10 + (encrypted_keys_params & 0x0F)) 1cab
152 master_keys_keys = _derive_master_keys_keys(key, encrypted_keys_iterations) 1cab
153 master_keys = _decrypt_master_keys_data(encrypted_keys, master_keys_keys) 1cab
155 config_structure: dict[str, Any] = {} 1cab
156 json_contents: dict[str, bytes] = {} 1cab
157 valid_hashdirs = list(path.glob('[01][0-9a-f]')) 1cab
158 for file in valid_hashdirs: 1cab
159 logger.info( 1cab
160 _msg.TranslatedString(
161 _msg.InfoMsgTemplate.DECRYPTING_BUCKET,
162 bucket_number=file,
163 )
164 )
165 bucket_contents = [ 1cab
166 bytes(item) for item in _decrypt_bucket_file(file, master_keys)
167 ]
168 bucket_index = json.loads(bucket_contents.pop(0)) 1cab
169 for pos, item in enumerate(bucket_index): 1cab
170 json_contents[item] = bucket_contents[pos] 1cab
171 logger.debug( 1cab
172 _msg.TranslatedString(
173 _msg.DebugMsgTemplate.BUCKET_ITEM_FOUND,
174 path=item,
175 value=bucket_contents[pos],
176 )
177 )
178 dirs_to_check: dict[str, list[str]] = {} 1cab
179 json_payload: Any
180 logger.info( 1cab
181 _msg.TranslatedString(_msg.InfoMsgTemplate.ASSEMBLING_CONFIG_STRUCTURE)
182 )
183 for item_path, json_content in sorted(json_contents.items()): 1cab
184 if item_path.endswith('/'): 1cab
185 logger.debug( 1cab
186 _msg.TranslatedString(
187 _msg.DebugMsgTemplate.POSTPONING_DIRECTORY_CONTENTS_CHECK,
188 path=item_path,
189 contents=json_content.decode('utf-8'),
190 )
191 )
192 json_payload = json.loads(json_content) 1cab
193 if not isinstance(json_payload, list) or any( 1cab
194 not isinstance(x, str) for x in json_payload
195 ):
196 msg = ( 1b
197 f'Directory index is not actually an index: '
198 f'{json_content!r}'
199 )
200 raise RuntimeError(msg) 1b
201 dirs_to_check[item_path] = json_payload 1cab
202 logger.debug( 1cab
203 _msg.TranslatedString(
204 _msg.DebugMsgTemplate.SETTING_CONFIG_STRUCTURE_CONTENTS_EMPTY_DIRECTORY,
205 path=item_path,
206 ),
207 )
208 _store(config_structure, item_path, b'{}') 1cab
209 else:
210 logger.debug( 1ca
211 _msg.TranslatedString(
212 _msg.DebugMsgTemplate.SETTING_CONFIG_STRUCTURE_CONTENTS,
213 path=item_path,
214 value=json_content.decode('utf-8'),
215 ),
216 )
217 _store(config_structure, item_path, json_content) 1ca
218 logger.info( 1cab
219 _msg.TranslatedString(
220 _msg.InfoMsgTemplate.CHECKING_CONFIG_STRUCTURE_CONSISTENCY,
221 )
222 )
223 # Sorted order is important; see `maybe_obj` below.
224 for dir_, namelist_ in sorted(dirs_to_check.items()): 1cab
225 namelist = [x.rstrip('/') for x in namelist_] 1cab
226 obj: dict[Any, Any] = config_structure 1cab
227 for part in dir_.split('/'): 1cab
228 if part: 1cab
229 # Because we iterate paths in sorted order, parent
230 # directories are encountered before child directories.
231 # So parent directories always exist (lest we would have
232 # aborted earlier).
233 #
234 # Of course, the type checker doesn't necessarily know
235 # this, so we need to use assertions anyway.
236 maybe_obj = obj.get(part) 1cab
237 assert isinstance(maybe_obj, dict), ( 1cab
238 f'Cannot traverse storage path {dir_!r}'
239 )
240 obj = maybe_obj 1cab
241 if set(obj.keys()) != set(namelist): 1cab
242 msg = f'Object key mismatch for path {dir_!r}' 1b
243 raise RuntimeError(msg) 1b
244 logger.debug( 1cab
245 _msg.TranslatedString(
246 _msg.DebugMsgTemplate.DIRECTORY_CONTENTS_CHECK_OK,
247 path=dir_,
248 contents=json.dumps(namelist_),
249 )
250 )
251 return config_structure 1ca
254def _h(bs: Buffer) -> str:
255 return '<{}>'.format(memoryview(bs).hex(' ')) 1cajbeg
258def _derive_master_keys_keys(
259 password: str | Buffer,
260 iterations: int,
261) -> _types.StoreroomKeyPair:
262 """Derive encryption and signing keys for the master keys data.
264 The master password is run through a key derivation function to
265 obtain a 64-byte string, which is then split to yield two 32-byte
266 keys. The key derivation function is PBKDF2, using HMAC-SHA1 and
267 salted with the storeroom master keys UUID.
269 Args:
270 password:
271 A master password for the storeroom instance. Usually read
272 from the `VAULT_KEY` environment variable, otherwise
273 defaults to the username.
274 iterations:
275 A count of rounds for the underlying key derivation
276 function. Usually stored as a setting next to the encrypted
277 master keys data.
279 Returns:
280 A 2-tuple of keys, the encryption key and the signing key, to
281 decrypt and verify the master keys data with.
283 Warning:
284 Non-public function, provided for didactical and educational
285 purposes only. Subject to change without notice, including
286 removal.
288 """
289 if isinstance(password, str): 1cab
290 password = password.encode('ASCII') 1a
291 master_keys_keys_blob = pbkdf2.PBKDF2HMAC( 1cab
292 algorithm=hashes.SHA1(),
293 length=2 * KEY_SIZE,
294 salt=STOREROOM_MASTER_KEYS_UUID,
295 iterations=iterations,
296 ).derive(bytes(password))
297 encryption_key, signing_key = struct.unpack( 1cab
298 f'{KEY_SIZE}s {KEY_SIZE}s', master_keys_keys_blob
299 )
300 logger.debug( 1cab
301 _msg.TranslatedString(
302 _msg.DebugMsgTemplate.DERIVED_MASTER_KEYS_KEYS,
303 enc_key=_h(encryption_key),
304 sign_key=_h(signing_key),
305 pw_bytes=_h(password),
306 algorithm='SHA256',
307 length=64,
308 salt=STOREROOM_MASTER_KEYS_UUID,
309 iterations=iterations,
310 ),
311 )
312 return _types.StoreroomKeyPair( 1cab
313 encryption_key=encryption_key,
314 signing_key=signing_key,
315 ).toreadonly()
318def _decrypt_master_keys_data(
319 data: Buffer,
320 keys: _types.StoreroomKeyPair,
321) -> _types.StoreroomMasterKeys:
322 r"""Decrypt the master keys data.
324 The master keys data contains:
326 - a 16-byte IV,
327 - a 96-byte AES256-CBC-encrypted payload, plus 16 further bytes of
328 PKCS7 padding, and
329 - a 32-byte MAC of the preceding 128 bytes.
331 The decrypted payload itself consists of three 32-byte keys: the
332 hashing, encryption and signing keys, in that order.
334 The encrypted payload is encrypted with the encryption key, and the
335 MAC is created based on the signing key. As per standard
336 cryptographic procedure, the MAC can be verified before attempting
337 to decrypt the payload.
339 Because the payload size is both fixed and a multiple of the cipher
340 blocksize, in this case, the PKCS7 padding always is `b'\x10' * 16`.
342 Args:
343 data:
344 The encrypted master keys data.
345 keys:
346 The encryption and signing keys for the master keys data.
347 These should have previously been derived via the
348 [`_derive_master_keys_keys`][] function.
350 Returns:
351 The master encryption, signing and hashing keys.
353 Raises:
354 cryptography.exceptions.InvalidSignature:
355 The data does not contain a valid signature under the given
356 key.
357 ValueError:
358 The format is invalid, in a non-cryptographic way. (For
359 example, it contains an unsupported version marker, or
360 unexpected extra contents, or invalid padding.)
362 Warning:
363 Non-public function, provided for didactical and educational
364 purposes only. Subject to change without notice, including
365 removal.
367 """
368 data = memoryview(data).toreadonly().cast('c') 1cabeg
369 keys = keys.toreadonly() 1cabeg
370 ciphertext, claimed_mac = struct.unpack( 1cabeg
371 f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data
372 )
373 actual_mac = hmac.HMAC(keys.signing_key, hashes.SHA256()) 1cabeg
374 actual_mac.update(ciphertext) 1cabeg
375 logger.debug( 1cabeg
376 _msg.TranslatedString(
377 _msg.DebugMsgTemplate.MASTER_KEYS_DATA_MAC_INFO,
378 sign_key=_h(keys.signing_key),
379 ciphertext=_h(ciphertext),
380 claimed_mac=_h(claimed_mac),
381 actual_mac=_h(actual_mac.copy().finalize()),
382 ),
383 )
384 actual_mac.verify(claimed_mac) 1cabeg
386 try: 1cabe
387 iv, payload = struct.unpack( 1cabe
388 f'{IV_SIZE}s {len(ciphertext) - IV_SIZE}s', ciphertext
389 )
390 decryptor = ciphers.Cipher( 1cabe
391 algorithms.AES256(keys.encryption_key), modes.CBC(iv)
392 ).decryptor()
393 padded_plaintext = bytearray() 1cabe
394 padded_plaintext.extend(decryptor.update(payload)) 1cabe
395 padded_plaintext.extend(decryptor.finalize()) 1cabe
396 unpadder = padding.PKCS7(IV_SIZE * 8).unpadder() 1cabe
397 plaintext = bytearray() 1cabe
398 plaintext.extend(unpadder.update(padded_plaintext)) 1cabe
399 plaintext.extend(unpadder.finalize()) 1cabe
400 hashing_key, encryption_key, signing_key = struct.unpack( 1cabe
401 f'{KEY_SIZE}s {KEY_SIZE}s {KEY_SIZE}s', plaintext
402 )
403 except (ValueError, struct.error) as exc: 1e
404 msg = 'Invalid encrypted master keys payload' 1e
405 raise ValueError(msg) from exc 1e
406 return _types.StoreroomMasterKeys( 1cab
407 hashing_key=hashing_key,
408 encryption_key=encryption_key,
409 signing_key=signing_key,
410 ).toreadonly()
413def _decrypt_session_keys(
414 data: Buffer,
415 master_keys: _types.StoreroomMasterKeys,
416) -> _types.StoreroomKeyPair:
417 r"""Decrypt the bucket item's session keys.
419 The bucket item's session keys are single-use keys for encrypting
420 and signing a single item in the storage bucket. The encrypted
421 session key data consists of:
423 - a 16-byte IV,
424 - a 64-byte AES256-CBC-encrypted payload, plus 16 further bytes of
425 PKCS7 padding, and
426 - a 32-byte MAC of the preceding 96 bytes.
428 The encrypted payload is encrypted with the master encryption key,
429 and the MAC is created with the master signing key. As per standard
430 cryptographic procedure, the MAC can be verified before attempting
431 to decrypt the payload.
433 Because the payload size is both fixed and a multiple of the cipher
434 blocksize, in this case, the PKCS7 padding always is `b'\x10' * 16`.
436 Args:
437 data:
438 The encrypted bucket item session key data.
439 master_keys:
440 The master keys. Presumably these have previously been
441 obtained via the [`_decrypt_master_keys_data`][] function.
443 Returns:
444 The bucket item's encryption and signing keys.
446 Raises:
447 cryptography.exceptions.InvalidSignature:
448 The data does not contain a valid signature under the given
449 key.
450 ValueError:
451 The format is invalid, in a non-cryptographic way. (For
452 example, it contains an unsupported version marker, or
453 unexpected extra contents, or invalid padding.)
455 Warning:
456 Non-public function, provided for didactical and educational
457 purposes only. Subject to change without notice, including
458 removal.
460 """
461 data = memoryview(data).toreadonly().cast('c') 1cabeg
462 master_keys = master_keys.toreadonly() 1cabeg
463 ciphertext, claimed_mac = struct.unpack( 1cabeg
464 f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data
465 )
466 actual_mac = hmac.HMAC(master_keys.signing_key, hashes.SHA256()) 1cabeg
467 actual_mac.update(ciphertext) 1cabeg
468 logger.debug( 1cabeg
469 _msg.TranslatedString(
470 _msg.DebugMsgTemplate.DECRYPT_BUCKET_ITEM_SESSION_KEYS_MAC_INFO,
471 sign_key=_h(master_keys.signing_key),
472 ciphertext=_h(ciphertext),
473 claimed_mac=_h(claimed_mac),
474 actual_mac=_h(actual_mac.copy().finalize()),
475 ),
476 )
477 actual_mac.verify(claimed_mac) 1cabeg
479 try: 1cabe
480 iv, payload = struct.unpack( 1cabe
481 f'{IV_SIZE}s {len(ciphertext) - IV_SIZE}s', ciphertext
482 )
483 decryptor = ciphers.Cipher( 1cabe
484 algorithms.AES256(master_keys.encryption_key), modes.CBC(iv)
485 ).decryptor()
486 padded_plaintext = bytearray() 1cabe
487 padded_plaintext.extend(decryptor.update(payload)) 1cabe
488 padded_plaintext.extend(decryptor.finalize()) 1cabe
489 unpadder = padding.PKCS7(IV_SIZE * 8).unpadder() 1cabe
490 plaintext = bytearray() 1cabe
491 plaintext.extend(unpadder.update(padded_plaintext)) 1cabe
492 plaintext.extend(unpadder.finalize()) 1cabe
493 session_encryption_key, session_signing_key = struct.unpack( 1cabe
494 f'{KEY_SIZE}s {KEY_SIZE}s', plaintext
495 )
496 except (ValueError, struct.error) as exc: 1e
497 msg = 'Invalid encrypted session keys payload' 1e
498 raise ValueError(msg) from exc 1e
500 session_keys = _types.StoreroomKeyPair( 1cab
501 encryption_key=session_encryption_key,
502 signing_key=session_signing_key,
503 ).toreadonly()
505 logger.debug( 1cab
506 _msg.TranslatedString(
507 _msg.DebugMsgTemplate.DECRYPT_BUCKET_ITEM_SESSION_KEYS_INFO,
508 enc_key=_h(master_keys.encryption_key),
509 iv=_h(iv),
510 ciphertext=_h(payload),
511 plaintext=_h(plaintext),
512 code=_msg.TranslatedString(
513 'StoreroomKeyPair(encryption_key=bytes.fromhex({enc_key!r}), '
514 'signing_key=bytes.fromhex({sign_key!r}))',
515 enc_key=session_keys.encryption_key.hex(' '),
516 sign_key=session_keys.signing_key.hex(' '),
517 ),
518 ),
519 )
521 return session_keys 1cab
524def _decrypt_contents(
525 data: Buffer,
526 session_keys: _types.StoreroomKeyPair,
527) -> Buffer:
528 """Decrypt the bucket item's contents.
530 The data consists of:
532 - a 16-byte IV,
533 - a variable-sized AES256-CBC-encrypted payload (using PKCS7 padding
534 on the inside), and
535 - a 32-byte MAC of the preceding bytes.
537 The encrypted payload is encrypted with the bucket item's session
538 encryption key, and the MAC is created with the bucket item's
539 session signing key. As per standard cryptographic procedure, the
540 MAC can be verified before attempting to decrypt the payload.
542 Args:
543 data:
544 The encrypted bucket item payload data.
545 session_keys:
546 The bucket item's session keys. Presumably these have
547 previously been obtained via the [`_decrypt_session_keys`][]
548 function.
550 Returns:
551 The bucket item's payload.
553 Raises:
554 cryptography.exceptions.InvalidSignature:
555 The data does not contain a valid signature under the given
556 key.
557 ValueError:
558 The format is invalid, in a non-cryptographic way. (For
559 example, it contains an unsupported version marker, or
560 unexpected extra contents, or invalid padding.)
562 Warning:
563 Non-public function, provided for didactical and educational
564 purposes only. Subject to change without notice, including
565 removal.
567 """
568 data = memoryview(data).toreadonly().cast('c') 1cab
569 session_keys = session_keys.toreadonly() 1cab
570 ciphertext, claimed_mac = struct.unpack( 1cab
571 f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data
572 )
573 actual_mac = hmac.HMAC(session_keys.signing_key, hashes.SHA256()) 1cab
574 actual_mac.update(ciphertext) 1cab
575 logger.debug( 1cab
576 _msg.TranslatedString(
577 _msg.DebugMsgTemplate.DECRYPT_BUCKET_ITEM_MAC_INFO,
578 sign_key=_h(session_keys.signing_key),
579 ciphertext=_h(ciphertext),
580 claimed_mac=_h(claimed_mac),
581 actual_mac=_h(actual_mac.copy().finalize()),
582 ),
583 )
584 actual_mac.verify(claimed_mac) 1cab
586 iv, payload = struct.unpack( 1cab
587 f'{IV_SIZE}s {len(ciphertext) - IV_SIZE}s', ciphertext
588 )
589 decryptor = ciphers.Cipher( 1cab
590 algorithms.AES256(session_keys.encryption_key), modes.CBC(iv)
591 ).decryptor()
592 padded_plaintext = bytearray() 1cab
593 padded_plaintext.extend(decryptor.update(payload)) 1cab
594 padded_plaintext.extend(decryptor.finalize()) 1cab
595 unpadder = padding.PKCS7(IV_SIZE * 8).unpadder() 1cab
596 plaintext = bytearray() 1cab
597 plaintext.extend(unpadder.update(padded_plaintext)) 1cab
598 plaintext.extend(unpadder.finalize()) 1cab
600 logger.debug( 1cab
601 _msg.TranslatedString(
602 _msg.DebugMsgTemplate.DECRYPT_BUCKET_ITEM_INFO,
603 enc_key=_h(session_keys.encryption_key),
604 iv=_h(iv),
605 ciphertext=_h(payload),
606 plaintext=_h(plaintext),
607 ),
608 )
610 return plaintext 1cab
613def _decrypt_bucket_item(
614 bucket_item: Buffer,
615 master_keys: _types.StoreroomMasterKeys,
616) -> Buffer:
617 """Decrypt a bucket item.
619 Args:
620 bucket_item:
621 The encrypted bucket item.
622 master_keys:
623 The master keys. Presumably these have previously been
624 obtained via the [`_decrypt_master_keys_data`][] function.
626 Returns:
627 The decrypted bucket item.
629 Raises:
630 cryptography.exceptions.InvalidSignature:
631 The data does not contain a valid signature under the given
632 key.
633 ValueError:
634 The format is invalid, in a non-cryptographic way. (For
635 example, it contains an unsupported version marker, or
636 unexpected extra contents, or invalid padding.)
638 Warning:
639 Non-public function, provided for didactical and educational
640 purposes only. Subject to change without notice, including
641 removal.
643 """
644 bucket_item = memoryview(bucket_item).toreadonly().cast('c') 1cajb
645 master_keys = master_keys.toreadonly() 1cajb
646 logger.debug( 1cajb
647 _msg.TranslatedString(
648 _msg.DebugMsgTemplate.DECRYPT_BUCKET_ITEM_KEY_INFO,
649 plaintext=_h(bucket_item),
650 enc_key=_h(master_keys.encryption_key),
651 sign_key=_h(master_keys.signing_key),
652 ),
653 )
654 data_version, encrypted_session_keys, data_contents = struct.unpack( 1cajb
655 (
656 f'B {ENCRYPTED_KEYPAIR_SIZE}s '
657 f'{len(bucket_item) - 1 - ENCRYPTED_KEYPAIR_SIZE}s'
658 ),
659 bucket_item,
660 )
661 if data_version != 1: 1cajb
662 msg = f'Cannot handle version {data_version} encrypted data' 1j
663 raise ValueError(msg) 1j
664 session_keys = _decrypt_session_keys(encrypted_session_keys, master_keys) 1cab
665 return _decrypt_contents(data_contents, session_keys) 1cab
668def _decrypt_bucket_file(
669 filename: str | bytes | os.PathLike,
670 master_keys: _types.StoreroomMasterKeys,
671 *,
672 root_dir: str | bytes | os.PathLike = '.',
673) -> Iterator[Buffer]:
674 """Decrypt a complete bucket.
676 Args:
677 filename:
678 The bucket file's filename.
679 master_keys:
680 The master keys. Presumably these have previously been
681 obtained via the [`_decrypt_master_keys_data`][] function.
682 root_dir:
683 The root directory of the data store. The filename is
684 interpreted relatively to this directory.
686 Yields:
687 A decrypted bucket item.
689 Raises:
690 cryptography.exceptions.InvalidSignature:
691 The data does not contain a valid signature under the given
692 key.
693 ValueError:
694 The format is invalid, in a non-cryptographic way. (For
695 example, it contains an unsupported version marker, or
696 unexpected extra contents, or invalid padding.)
698 Warning:
699 Non-public function, provided for didactical and educational
700 purposes only. Subject to change without notice, including
701 removal.
703 """
704 master_keys = master_keys.toreadonly() 1cahb
705 root_dir = pathlib.Path(os.fsdecode(root_dir)) 1cahb
706 filename = pathlib.Path(os.fsdecode(filename)) 1cahb
707 with (root_dir / filename).open('rb') as bucket_file: 1cahb
708 header_line = bucket_file.readline() 1cahb
709 try: 1cahb
710 header = json.loads(header_line) 1cahb
711 except ValueError as exc: 1h
712 msg = f'Invalid bucket file: {filename}' 1h
713 raise ValueError(msg) from exc 1h
714 if header != {'version': 1}: 1cahb
715 msg = f'Invalid bucket file: {filename}' 1h
716 raise ValueError(msg) from None 1h
717 for line in bucket_file: 1cab
718 yield _decrypt_bucket_item( 1cahb
719 base64.standard_b64decode(line), master_keys
720 )
723def _store(config: dict[str, Any], path: str, json_contents: bytes) -> None:
724 """Store the JSON contents at path in the config structure.
726 Traverse the config structure according to path, and set the value
727 of the leaf to the decoded JSON contents.
729 A path `/foo/bar/xyz` translates to the JSON structure
730 `{"foo": {"bar": {"xyz": ...}}}`.
732 Args:
733 config:
734 The (top-level) configuration structure to update.
735 path:
736 The path within the configuration structure to traverse.
737 json_contents:
738 The contents to set the item to, after JSON-decoding.
740 Raises:
741 json.JSONDecodeError:
742 There was an error parsing the JSON contents.
744 """
745 contents = json.loads(json_contents) 1cab
746 path_parts = [part for part in path.split('/') if part] 1cab
747 for part in path_parts[:-1]: 1cab
748 config = config.setdefault(part, {}) 1cab
749 if path_parts: 1cab
750 config[path_parts[-1]] = contents 1cab
753if __name__ == '__main__':
754 logging.basicConfig(level=('DEBUG' if os.getenv('DEBUG') else 'WARNING'))
755 config_structure = export_storeroom_data(format='storeroom')
756 print(json.dumps(config_structure, indent=2, sort_keys=True)) # noqa: T201