Coverage for src\derivepassphrase\exporter\storeroom.py: 100.000%

207 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 12:17 +0200

1# SPDX-FileCopyrightText: 2025 Marco Ricci <software@the13thletter.info> 

2# 

3# SPDX-License-Identifier: Zlib 

4 

5"""Exporter for the vault "storeroom" configuration format. 

6 

7The "storeroom" format is the experimental format used in alpha and beta 

8versions of vault beyond v0.3.0. The configuration is stored as 

9a separate directory, which acts like a hash table (i.e. has named 

10slots) and provides an impure quasi-filesystem interface. Each hash 

11table entry is separately encrypted and authenticated. James Coglan 

12designed this format to avoid concurrent write issues when updating or 

13synchronizing the vault configuration with e.g. a cloud service. 

14 

15The public interface is the [`export_storeroom_data`][] function. 

16Multiple *non-public* functions are additionally documented here for 

17didactical and educational reasons, but they are not part of the module 

18API, are subject to change without notice (including removal), and 

19should *not* be used or relied on. 

20 

21""" 

22 

23# ruff: noqa: S303 

24 

25from __future__ import annotations 

26 

27import base64 

28import importlib 

29import json 

30import logging 

31import os 

32import os.path 

33import pathlib 

34import struct 

35from typing import TYPE_CHECKING, Any 

36 

37from derivepassphrase import _types, exporter 

38from derivepassphrase._internals import cli_messages as _msg 

39 

40if TYPE_CHECKING: 

41 from typing_extensions import Buffer 

42 

43if TYPE_CHECKING: 

44 from collections.abc import Iterator 

45 

46 from cryptography.hazmat.primitives import ciphers, hashes, hmac, padding 

47 from cryptography.hazmat.primitives.ciphers import algorithms, modes 

48 from cryptography.hazmat.primitives.kdf import pbkdf2 

49 

50 STUBBED = False 

51else: 

52 try: 

53 importlib.import_module('cryptography') 

54 except ModuleNotFoundError as exc: 

55 # For type checking only, will not actually be called, so no 

56 # coverage. 

57 class _DummyModule: # pragma: no cover 

58 def __init__(self, exc: type[Exception]) -> None: 

59 self.exc = exc 

60 

61 def __getattr__(self, name: str) -> Any: # noqa: ANN401 

62 def func(*args: Any, **kwargs: Any) -> Any: # noqa: ANN401,ARG001 

63 raise self.exc 

64 

65 return func 

66 

67 ciphers = hashes = hmac = padding = _DummyModule(exc) 

68 algorithms = modes = pbkdf2 = _DummyModule(exc) 

69 STUBBED = True 

70 else: 

71 from cryptography.hazmat.primitives import ( 

72 ciphers, 

73 hashes, 

74 hmac, 

75 padding, 

76 ) 

77 from cryptography.hazmat.primitives.ciphers import algorithms, modes 

78 from cryptography.hazmat.primitives.kdf import pbkdf2 

79 

80 STUBBED = False 

81 

82STOREROOM_MASTER_KEYS_UUID = b'35b7c7ed-f71e-4adf-9051-02fb0f1e0e17' 

83VAULT_CIPHER_UUID = b'73e69e8a-cb05-4b50-9f42-59d76a511299' 

84IV_SIZE = 16 

85KEY_SIZE = MAC_SIZE = 32 

86ENCRYPTED_KEYPAIR_SIZE = 128 

87VERSION_SIZE = 1 

88 

89__all__ = ('export_storeroom_data',) 

90 

91logger = logging.getLogger(__name__) 

92 

93 

94@exporter.register_export_vault_config_data_handler('storeroom') 

95def export_storeroom_data( # noqa: C901,D417,PLR0912,PLR0914,PLR0915 

96 path: str | bytes | os.PathLike | None = None, 

97 key: str | Buffer | None = None, 

98 *, 

99 format: str = 'storeroom', # noqa: A002 

100) -> dict[str, Any]: 

101 """Export the full configuration stored in the storeroom. 

102 

103 See [`exporter.ExportVaultConfigDataFunction`][] for an explanation 

104 of the call signature, and the exceptions to expect. 

105 

106 Other Args: 

107 format: 

108 The only supported format is `storeroom`. 

109 

110 """ # noqa: DOC201,DOC501 

111 # Trigger import errors if necessary. 

112 importlib.import_module('cryptography') 1ciafbk

113 if path is None: 1ciafb

114 path = exporter.get_vault_path() 1cafb

115 else: 

116 path = pathlib.Path(os.fsdecode(path)) 1ia

117 if key is None: 1ciafb

118 key = exporter.get_vault_key() 1iafb

119 if format != 'storeroom': # pragma: no cover 1ciafb

120 # Defensive programming, will not actually be called like this 

121 # by our code, so no coverage. 

122 msg = exporter.INVALID_VAULT_NATIVE_CONFIGURATION_FORMAT.format( 

123 fmt=format 

124 ) 

125 raise ValueError(msg) 

126 try: 1ciafb

127 master_keys_file = pathlib.Path(path, '.keys').open( # noqa: SIM115 1ciafb

128 encoding='utf-8', 

129 ) 

130 except FileNotFoundError as exc: 1i

131 raise exporter.NotAVaultConfigError(path, format='storeroom') from exc 1i

132 with master_keys_file: 1cafb

133 header = json.loads(master_keys_file.readline()) 1cafb

134 if header != {'version': 1}: 1cafb

135 msg = 'bad or unsupported keys version header' 1f

136 raise RuntimeError(msg) 1f

137 raw_keys_data = base64.standard_b64decode(master_keys_file.readline()) 1cafb

138 encrypted_keys_params, encrypted_keys = struct.unpack( 1cafb

139 f'B {len(raw_keys_data) - 1}s', raw_keys_data 

140 ) 

141 if master_keys_file.read(): 1cafb

142 msg = 'trailing data; cannot make sense of .keys file' 1f

143 raise RuntimeError(msg) 1f

144 encrypted_keys_version = encrypted_keys_params >> 4 1cafb

145 if encrypted_keys_version != 1: 1cafb

146 msg = f'cannot handle version {encrypted_keys_version} encrypted keys' 1f

147 raise RuntimeError(msg) 1f

148 logger.info( 1cab

149 _msg.TranslatedString(_msg.InfoMsgTemplate.PARSING_MASTER_KEYS_DATA) 

150 ) 

151 encrypted_keys_iterations = 2 ** (10 + (encrypted_keys_params & 0x0F)) 1cab

152 master_keys_keys = _derive_master_keys_keys(key, encrypted_keys_iterations) 1cab

153 master_keys = _decrypt_master_keys_data(encrypted_keys, master_keys_keys) 1cab

154 

155 config_structure: dict[str, Any] = {} 1cab

156 json_contents: dict[str, bytes] = {} 1cab

157 valid_hashdirs = list(path.glob('[01][0-9a-f]')) 1cab

158 for file in valid_hashdirs: 1cab

159 logger.info( 1cab

160 _msg.TranslatedString( 

161 _msg.InfoMsgTemplate.DECRYPTING_BUCKET, 

162 bucket_number=file, 

163 ) 

164 ) 

165 bucket_contents = [ 1cab

166 bytes(item) for item in _decrypt_bucket_file(file, master_keys) 

167 ] 

168 bucket_index = json.loads(bucket_contents.pop(0)) 1cab

169 for pos, item in enumerate(bucket_index): 1cab

170 json_contents[item] = bucket_contents[pos] 1cab

171 logger.debug( 1cab

172 _msg.TranslatedString( 

173 _msg.DebugMsgTemplate.BUCKET_ITEM_FOUND, 

174 path=item, 

175 value=bucket_contents[pos], 

176 ) 

177 ) 

178 dirs_to_check: dict[str, list[str]] = {} 1cab

179 json_payload: Any 

180 logger.info( 1cab

181 _msg.TranslatedString(_msg.InfoMsgTemplate.ASSEMBLING_CONFIG_STRUCTURE) 

182 ) 

183 for item_path, json_content in sorted(json_contents.items()): 1cab

184 if item_path.endswith('/'): 1cab

185 logger.debug( 1cab

186 _msg.TranslatedString( 

187 _msg.DebugMsgTemplate.POSTPONING_DIRECTORY_CONTENTS_CHECK, 

188 path=item_path, 

189 contents=json_content.decode('utf-8'), 

190 ) 

191 ) 

192 json_payload = json.loads(json_content) 1cab

193 if not isinstance(json_payload, list) or any( 1cab

194 not isinstance(x, str) for x in json_payload 

195 ): 

196 msg = ( 1b

197 f'Directory index is not actually an index: ' 

198 f'{json_content!r}' 

199 ) 

200 raise RuntimeError(msg) 1b

201 dirs_to_check[item_path] = json_payload 1cab

202 logger.debug( 1cab

203 _msg.TranslatedString( 

204 _msg.DebugMsgTemplate.SETTING_CONFIG_STRUCTURE_CONTENTS_EMPTY_DIRECTORY, 

205 path=item_path, 

206 ), 

207 ) 

208 _store(config_structure, item_path, b'{}') 1cab

209 else: 

210 logger.debug( 1ca

211 _msg.TranslatedString( 

212 _msg.DebugMsgTemplate.SETTING_CONFIG_STRUCTURE_CONTENTS, 

213 path=item_path, 

214 value=json_content.decode('utf-8'), 

215 ), 

216 ) 

217 _store(config_structure, item_path, json_content) 1ca

218 logger.info( 1cab

219 _msg.TranslatedString( 

220 _msg.InfoMsgTemplate.CHECKING_CONFIG_STRUCTURE_CONSISTENCY, 

221 ) 

222 ) 

223 # Sorted order is important; see `maybe_obj` below. 

224 for dir_, namelist_ in sorted(dirs_to_check.items()): 1cab

225 namelist = [x.rstrip('/') for x in namelist_] 1cab

226 obj: dict[Any, Any] = config_structure 1cab

227 for part in dir_.split('/'): 1cab

228 if part: 1cab

229 # Because we iterate paths in sorted order, parent 

230 # directories are encountered before child directories. 

231 # So parent directories always exist (lest we would have 

232 # aborted earlier). 

233 # 

234 # Of course, the type checker doesn't necessarily know 

235 # this, so we need to use assertions anyway. 

236 maybe_obj = obj.get(part) 1cab

237 assert isinstance(maybe_obj, dict), ( 1cab

238 f'Cannot traverse storage path {dir_!r}' 

239 ) 

240 obj = maybe_obj 1cab

241 if set(obj.keys()) != set(namelist): 1cab

242 msg = f'Object key mismatch for path {dir_!r}' 1b

243 raise RuntimeError(msg) 1b

244 logger.debug( 1cab

245 _msg.TranslatedString( 

246 _msg.DebugMsgTemplate.DIRECTORY_CONTENTS_CHECK_OK, 

247 path=dir_, 

248 contents=json.dumps(namelist_), 

249 ) 

250 ) 

251 return config_structure 1ca

252 

253 

254def _h(bs: Buffer) -> str: 

255 return '<{}>'.format(memoryview(bs).hex(' ')) 1cajbeg

256 

257 

258def _derive_master_keys_keys( 

259 password: str | Buffer, 

260 iterations: int, 

261) -> _types.StoreroomKeyPair: 

262 """Derive encryption and signing keys for the master keys data. 

263 

264 The master password is run through a key derivation function to 

265 obtain a 64-byte string, which is then split to yield two 32-byte 

266 keys. The key derivation function is PBKDF2, using HMAC-SHA1 and 

267 salted with the storeroom master keys UUID. 

268 

269 Args: 

270 password: 

271 A master password for the storeroom instance. Usually read 

272 from the `VAULT_KEY` environment variable, otherwise 

273 defaults to the username. 

274 iterations: 

275 A count of rounds for the underlying key derivation 

276 function. Usually stored as a setting next to the encrypted 

277 master keys data. 

278 

279 Returns: 

280 A 2-tuple of keys, the encryption key and the signing key, to 

281 decrypt and verify the master keys data with. 

282 

283 Warning: 

284 Non-public function, provided for didactical and educational 

285 purposes only. Subject to change without notice, including 

286 removal. 

287 

288 """ 

289 if isinstance(password, str): 1cab

290 password = password.encode('ASCII') 1a

291 master_keys_keys_blob = pbkdf2.PBKDF2HMAC( 1cab

292 algorithm=hashes.SHA1(), 

293 length=2 * KEY_SIZE, 

294 salt=STOREROOM_MASTER_KEYS_UUID, 

295 iterations=iterations, 

296 ).derive(bytes(password)) 

297 encryption_key, signing_key = struct.unpack( 1cab

298 f'{KEY_SIZE}s {KEY_SIZE}s', master_keys_keys_blob 

299 ) 

300 logger.debug( 1cab

301 _msg.TranslatedString( 

302 _msg.DebugMsgTemplate.DERIVED_MASTER_KEYS_KEYS, 

303 enc_key=_h(encryption_key), 

304 sign_key=_h(signing_key), 

305 pw_bytes=_h(password), 

306 algorithm='SHA256', 

307 length=64, 

308 salt=STOREROOM_MASTER_KEYS_UUID, 

309 iterations=iterations, 

310 ), 

311 ) 

312 return _types.StoreroomKeyPair( 1cab

313 encryption_key=encryption_key, 

314 signing_key=signing_key, 

315 ).toreadonly() 

316 

317 

318def _decrypt_master_keys_data( 

319 data: Buffer, 

320 keys: _types.StoreroomKeyPair, 

321) -> _types.StoreroomMasterKeys: 

322 r"""Decrypt the master keys data. 

323 

324 The master keys data contains: 

325 

326 - a 16-byte IV, 

327 - a 96-byte AES256-CBC-encrypted payload, plus 16 further bytes of 

328 PKCS7 padding, and 

329 - a 32-byte MAC of the preceding 128 bytes. 

330 

331 The decrypted payload itself consists of three 32-byte keys: the 

332 hashing, encryption and signing keys, in that order. 

333 

334 The encrypted payload is encrypted with the encryption key, and the 

335 MAC is created based on the signing key. As per standard 

336 cryptographic procedure, the MAC can be verified before attempting 

337 to decrypt the payload. 

338 

339 Because the payload size is both fixed and a multiple of the cipher 

340 blocksize, in this case, the PKCS7 padding always is `b'\x10' * 16`. 

341 

342 Args: 

343 data: 

344 The encrypted master keys data. 

345 keys: 

346 The encryption and signing keys for the master keys data. 

347 These should have previously been derived via the 

348 [`_derive_master_keys_keys`][] function. 

349 

350 Returns: 

351 The master encryption, signing and hashing keys. 

352 

353 Raises: 

354 cryptography.exceptions.InvalidSignature: 

355 The data does not contain a valid signature under the given 

356 key. 

357 ValueError: 

358 The format is invalid, in a non-cryptographic way. (For 

359 example, it contains an unsupported version marker, or 

360 unexpected extra contents, or invalid padding.) 

361 

362 Warning: 

363 Non-public function, provided for didactical and educational 

364 purposes only. Subject to change without notice, including 

365 removal. 

366 

367 """ 

368 data = memoryview(data).toreadonly().cast('c') 1cabeg

369 keys = keys.toreadonly() 1cabeg

370 ciphertext, claimed_mac = struct.unpack( 1cabeg

371 f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data 

372 ) 

373 actual_mac = hmac.HMAC(keys.signing_key, hashes.SHA256()) 1cabeg

374 actual_mac.update(ciphertext) 1cabeg

375 logger.debug( 1cabeg

376 _msg.TranslatedString( 

377 _msg.DebugMsgTemplate.MASTER_KEYS_DATA_MAC_INFO, 

378 sign_key=_h(keys.signing_key), 

379 ciphertext=_h(ciphertext), 

380 claimed_mac=_h(claimed_mac), 

381 actual_mac=_h(actual_mac.copy().finalize()), 

382 ), 

383 ) 

384 actual_mac.verify(claimed_mac) 1cabeg

385 

386 try: 1cabe

387 iv, payload = struct.unpack( 1cabe

388 f'{IV_SIZE}s {len(ciphertext) - IV_SIZE}s', ciphertext 

389 ) 

390 decryptor = ciphers.Cipher( 1cabe

391 algorithms.AES256(keys.encryption_key), modes.CBC(iv) 

392 ).decryptor() 

393 padded_plaintext = bytearray() 1cabe

394 padded_plaintext.extend(decryptor.update(payload)) 1cabe

395 padded_plaintext.extend(decryptor.finalize()) 1cabe

396 unpadder = padding.PKCS7(IV_SIZE * 8).unpadder() 1cabe

397 plaintext = bytearray() 1cabe

398 plaintext.extend(unpadder.update(padded_plaintext)) 1cabe

399 plaintext.extend(unpadder.finalize()) 1cabe

400 hashing_key, encryption_key, signing_key = struct.unpack( 1cabe

401 f'{KEY_SIZE}s {KEY_SIZE}s {KEY_SIZE}s', plaintext 

402 ) 

403 except (ValueError, struct.error) as exc: 1e

404 msg = 'Invalid encrypted master keys payload' 1e

405 raise ValueError(msg) from exc 1e

406 return _types.StoreroomMasterKeys( 1cab

407 hashing_key=hashing_key, 

408 encryption_key=encryption_key, 

409 signing_key=signing_key, 

410 ).toreadonly() 

411 

412 

413def _decrypt_session_keys( 

414 data: Buffer, 

415 master_keys: _types.StoreroomMasterKeys, 

416) -> _types.StoreroomKeyPair: 

417 r"""Decrypt the bucket item's session keys. 

418 

419 The bucket item's session keys are single-use keys for encrypting 

420 and signing a single item in the storage bucket. The encrypted 

421 session key data consists of: 

422 

423 - a 16-byte IV, 

424 - a 64-byte AES256-CBC-encrypted payload, plus 16 further bytes of 

425 PKCS7 padding, and 

426 - a 32-byte MAC of the preceding 96 bytes. 

427 

428 The encrypted payload is encrypted with the master encryption key, 

429 and the MAC is created with the master signing key. As per standard 

430 cryptographic procedure, the MAC can be verified before attempting 

431 to decrypt the payload. 

432 

433 Because the payload size is both fixed and a multiple of the cipher 

434 blocksize, in this case, the PKCS7 padding always is `b'\x10' * 16`. 

435 

436 Args: 

437 data: 

438 The encrypted bucket item session key data. 

439 master_keys: 

440 The master keys. Presumably these have previously been 

441 obtained via the [`_decrypt_master_keys_data`][] function. 

442 

443 Returns: 

444 The bucket item's encryption and signing keys. 

445 

446 Raises: 

447 cryptography.exceptions.InvalidSignature: 

448 The data does not contain a valid signature under the given 

449 key. 

450 ValueError: 

451 The format is invalid, in a non-cryptographic way. (For 

452 example, it contains an unsupported version marker, or 

453 unexpected extra contents, or invalid padding.) 

454 

455 Warning: 

456 Non-public function, provided for didactical and educational 

457 purposes only. Subject to change without notice, including 

458 removal. 

459 

460 """ 

461 data = memoryview(data).toreadonly().cast('c') 1cabeg

462 master_keys = master_keys.toreadonly() 1cabeg

463 ciphertext, claimed_mac = struct.unpack( 1cabeg

464 f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data 

465 ) 

466 actual_mac = hmac.HMAC(master_keys.signing_key, hashes.SHA256()) 1cabeg

467 actual_mac.update(ciphertext) 1cabeg

468 logger.debug( 1cabeg

469 _msg.TranslatedString( 

470 _msg.DebugMsgTemplate.DECRYPT_BUCKET_ITEM_SESSION_KEYS_MAC_INFO, 

471 sign_key=_h(master_keys.signing_key), 

472 ciphertext=_h(ciphertext), 

473 claimed_mac=_h(claimed_mac), 

474 actual_mac=_h(actual_mac.copy().finalize()), 

475 ), 

476 ) 

477 actual_mac.verify(claimed_mac) 1cabeg

478 

479 try: 1cabe

480 iv, payload = struct.unpack( 1cabe

481 f'{IV_SIZE}s {len(ciphertext) - IV_SIZE}s', ciphertext 

482 ) 

483 decryptor = ciphers.Cipher( 1cabe

484 algorithms.AES256(master_keys.encryption_key), modes.CBC(iv) 

485 ).decryptor() 

486 padded_plaintext = bytearray() 1cabe

487 padded_plaintext.extend(decryptor.update(payload)) 1cabe

488 padded_plaintext.extend(decryptor.finalize()) 1cabe

489 unpadder = padding.PKCS7(IV_SIZE * 8).unpadder() 1cabe

490 plaintext = bytearray() 1cabe

491 plaintext.extend(unpadder.update(padded_plaintext)) 1cabe

492 plaintext.extend(unpadder.finalize()) 1cabe

493 session_encryption_key, session_signing_key = struct.unpack( 1cabe

494 f'{KEY_SIZE}s {KEY_SIZE}s', plaintext 

495 ) 

496 except (ValueError, struct.error) as exc: 1e

497 msg = 'Invalid encrypted session keys payload' 1e

498 raise ValueError(msg) from exc 1e

499 

500 session_keys = _types.StoreroomKeyPair( 1cab

501 encryption_key=session_encryption_key, 

502 signing_key=session_signing_key, 

503 ).toreadonly() 

504 

505 logger.debug( 1cab

506 _msg.TranslatedString( 

507 _msg.DebugMsgTemplate.DECRYPT_BUCKET_ITEM_SESSION_KEYS_INFO, 

508 enc_key=_h(master_keys.encryption_key), 

509 iv=_h(iv), 

510 ciphertext=_h(payload), 

511 plaintext=_h(plaintext), 

512 code=_msg.TranslatedString( 

513 'StoreroomKeyPair(encryption_key=bytes.fromhex({enc_key!r}), ' 

514 'signing_key=bytes.fromhex({sign_key!r}))', 

515 enc_key=session_keys.encryption_key.hex(' '), 

516 sign_key=session_keys.signing_key.hex(' '), 

517 ), 

518 ), 

519 ) 

520 

521 return session_keys 1cab

522 

523 

524def _decrypt_contents( 

525 data: Buffer, 

526 session_keys: _types.StoreroomKeyPair, 

527) -> Buffer: 

528 """Decrypt the bucket item's contents. 

529 

530 The data consists of: 

531 

532 - a 16-byte IV, 

533 - a variable-sized AES256-CBC-encrypted payload (using PKCS7 padding 

534 on the inside), and 

535 - a 32-byte MAC of the preceding bytes. 

536 

537 The encrypted payload is encrypted with the bucket item's session 

538 encryption key, and the MAC is created with the bucket item's 

539 session signing key. As per standard cryptographic procedure, the 

540 MAC can be verified before attempting to decrypt the payload. 

541 

542 Args: 

543 data: 

544 The encrypted bucket item payload data. 

545 session_keys: 

546 The bucket item's session keys. Presumably these have 

547 previously been obtained via the [`_decrypt_session_keys`][] 

548 function. 

549 

550 Returns: 

551 The bucket item's payload. 

552 

553 Raises: 

554 cryptography.exceptions.InvalidSignature: 

555 The data does not contain a valid signature under the given 

556 key. 

557 ValueError: 

558 The format is invalid, in a non-cryptographic way. (For 

559 example, it contains an unsupported version marker, or 

560 unexpected extra contents, or invalid padding.) 

561 

562 Warning: 

563 Non-public function, provided for didactical and educational 

564 purposes only. Subject to change without notice, including 

565 removal. 

566 

567 """ 

568 data = memoryview(data).toreadonly().cast('c') 1cab

569 session_keys = session_keys.toreadonly() 1cab

570 ciphertext, claimed_mac = struct.unpack( 1cab

571 f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data 

572 ) 

573 actual_mac = hmac.HMAC(session_keys.signing_key, hashes.SHA256()) 1cab

574 actual_mac.update(ciphertext) 1cab

575 logger.debug( 1cab

576 _msg.TranslatedString( 

577 _msg.DebugMsgTemplate.DECRYPT_BUCKET_ITEM_MAC_INFO, 

578 sign_key=_h(session_keys.signing_key), 

579 ciphertext=_h(ciphertext), 

580 claimed_mac=_h(claimed_mac), 

581 actual_mac=_h(actual_mac.copy().finalize()), 

582 ), 

583 ) 

584 actual_mac.verify(claimed_mac) 1cab

585 

586 iv, payload = struct.unpack( 1cab

587 f'{IV_SIZE}s {len(ciphertext) - IV_SIZE}s', ciphertext 

588 ) 

589 decryptor = ciphers.Cipher( 1cab

590 algorithms.AES256(session_keys.encryption_key), modes.CBC(iv) 

591 ).decryptor() 

592 padded_plaintext = bytearray() 1cab

593 padded_plaintext.extend(decryptor.update(payload)) 1cab

594 padded_plaintext.extend(decryptor.finalize()) 1cab

595 unpadder = padding.PKCS7(IV_SIZE * 8).unpadder() 1cab

596 plaintext = bytearray() 1cab

597 plaintext.extend(unpadder.update(padded_plaintext)) 1cab

598 plaintext.extend(unpadder.finalize()) 1cab

599 

600 logger.debug( 1cab

601 _msg.TranslatedString( 

602 _msg.DebugMsgTemplate.DECRYPT_BUCKET_ITEM_INFO, 

603 enc_key=_h(session_keys.encryption_key), 

604 iv=_h(iv), 

605 ciphertext=_h(payload), 

606 plaintext=_h(plaintext), 

607 ), 

608 ) 

609 

610 return plaintext 1cab

611 

612 

613def _decrypt_bucket_item( 

614 bucket_item: Buffer, 

615 master_keys: _types.StoreroomMasterKeys, 

616) -> Buffer: 

617 """Decrypt a bucket item. 

618 

619 Args: 

620 bucket_item: 

621 The encrypted bucket item. 

622 master_keys: 

623 The master keys. Presumably these have previously been 

624 obtained via the [`_decrypt_master_keys_data`][] function. 

625 

626 Returns: 

627 The decrypted bucket item. 

628 

629 Raises: 

630 cryptography.exceptions.InvalidSignature: 

631 The data does not contain a valid signature under the given 

632 key. 

633 ValueError: 

634 The format is invalid, in a non-cryptographic way. (For 

635 example, it contains an unsupported version marker, or 

636 unexpected extra contents, or invalid padding.) 

637 

638 Warning: 

639 Non-public function, provided for didactical and educational 

640 purposes only. Subject to change without notice, including 

641 removal. 

642 

643 """ 

644 bucket_item = memoryview(bucket_item).toreadonly().cast('c') 1cajb

645 master_keys = master_keys.toreadonly() 1cajb

646 logger.debug( 1cajb

647 _msg.TranslatedString( 

648 _msg.DebugMsgTemplate.DECRYPT_BUCKET_ITEM_KEY_INFO, 

649 plaintext=_h(bucket_item), 

650 enc_key=_h(master_keys.encryption_key), 

651 sign_key=_h(master_keys.signing_key), 

652 ), 

653 ) 

654 data_version, encrypted_session_keys, data_contents = struct.unpack( 1cajb

655 ( 

656 f'B {ENCRYPTED_KEYPAIR_SIZE}s ' 

657 f'{len(bucket_item) - 1 - ENCRYPTED_KEYPAIR_SIZE}s' 

658 ), 

659 bucket_item, 

660 ) 

661 if data_version != 1: 1cajb

662 msg = f'Cannot handle version {data_version} encrypted data' 1j

663 raise ValueError(msg) 1j

664 session_keys = _decrypt_session_keys(encrypted_session_keys, master_keys) 1cab

665 return _decrypt_contents(data_contents, session_keys) 1cab

666 

667 

668def _decrypt_bucket_file( 

669 filename: str | bytes | os.PathLike, 

670 master_keys: _types.StoreroomMasterKeys, 

671 *, 

672 root_dir: str | bytes | os.PathLike = '.', 

673) -> Iterator[Buffer]: 

674 """Decrypt a complete bucket. 

675 

676 Args: 

677 filename: 

678 The bucket file's filename. 

679 master_keys: 

680 The master keys. Presumably these have previously been 

681 obtained via the [`_decrypt_master_keys_data`][] function. 

682 root_dir: 

683 The root directory of the data store. The filename is 

684 interpreted relatively to this directory. 

685 

686 Yields: 

687 A decrypted bucket item. 

688 

689 Raises: 

690 cryptography.exceptions.InvalidSignature: 

691 The data does not contain a valid signature under the given 

692 key. 

693 ValueError: 

694 The format is invalid, in a non-cryptographic way. (For 

695 example, it contains an unsupported version marker, or 

696 unexpected extra contents, or invalid padding.) 

697 

698 Warning: 

699 Non-public function, provided for didactical and educational 

700 purposes only. Subject to change without notice, including 

701 removal. 

702 

703 """ 

704 master_keys = master_keys.toreadonly() 1cahb

705 root_dir = pathlib.Path(os.fsdecode(root_dir)) 1cahb

706 filename = pathlib.Path(os.fsdecode(filename)) 1cahb

707 with (root_dir / filename).open('rb') as bucket_file: 1cahb

708 header_line = bucket_file.readline() 1cahb

709 try: 1cahb

710 header = json.loads(header_line) 1cahb

711 except ValueError as exc: 1h

712 msg = f'Invalid bucket file: {filename}' 1h

713 raise ValueError(msg) from exc 1h

714 if header != {'version': 1}: 1cahb

715 msg = f'Invalid bucket file: {filename}' 1h

716 raise ValueError(msg) from None 1h

717 for line in bucket_file: 1cab

718 yield _decrypt_bucket_item( 1cahb

719 base64.standard_b64decode(line), master_keys 

720 ) 

721 

722 

723def _store(config: dict[str, Any], path: str, json_contents: bytes) -> None: 

724 """Store the JSON contents at path in the config structure. 

725 

726 Traverse the config structure according to path, and set the value 

727 of the leaf to the decoded JSON contents. 

728 

729 A path `/foo/bar/xyz` translates to the JSON structure 

730 `{"foo": {"bar": {"xyz": ...}}}`. 

731 

732 Args: 

733 config: 

734 The (top-level) configuration structure to update. 

735 path: 

736 The path within the configuration structure to traverse. 

737 json_contents: 

738 The contents to set the item to, after JSON-decoding. 

739 

740 Raises: 

741 json.JSONDecodeError: 

742 There was an error parsing the JSON contents. 

743 

744 """ 

745 contents = json.loads(json_contents) 1cab

746 path_parts = [part for part in path.split('/') if part] 1cab

747 for part in path_parts[:-1]: 1cab

748 config = config.setdefault(part, {}) 1cab

749 if path_parts: 1cab

750 config[path_parts[-1]] = contents 1cab

751 

752 

753if __name__ == '__main__': 

754 logging.basicConfig(level=('DEBUG' if os.getenv('DEBUG') else 'WARNING')) 

755 config_structure = export_storeroom_data(format='storeroom') 

756 print(json.dumps(config_structure, indent=2, sort_keys=True)) # noqa: T201