Coverage for tests\test_derivepassphrase_ssh_agent.py: 97.137%

420 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 12:17 +0200

1# SPDX-FileCopyrightText: 2025 Marco Ricci <software@the13thletter.info> 

2# 

3# SPDX-License-Identifier: Zlib 

4 

5"""Test OpenSSH key loading and signing.""" 

6 

7from __future__ import annotations 

8 

9import base64 

10import contextlib 

11import errno 

12import importlib.metadata 

13import io 

14import os 

15import pathlib 

16import re 

17import socket 

18import sys 

19import types 

20from typing import TYPE_CHECKING 

21 

22import click 

23import click.testing 

24import hypothesis 

25import pytest 

26from hypothesis import strategies 

27 

28import tests 

29from derivepassphrase import _types, ssh_agent, vault 

30from derivepassphrase._internals import cli_helpers 

31from derivepassphrase.ssh_agent import socketprovider 

32 

33if TYPE_CHECKING: 

34 from collections.abc import Iterable 

35 

36 from typing_extensions import Any, Buffer, Literal 

37 

38if sys.version_info < (3, 11): 

39 from exceptiongroup import ExceptionGroup 

40 

41 

42class Parametrize(types.SimpleNamespace): 

43 BAD_ENTRY_POINTS = pytest.mark.parametrize( 

44 'additional_entry_points', 

45 [ 

46 pytest.param( 

47 [ 

48 importlib.metadata.EntryPoint( 

49 name=tests.faulty_entry_callable.key, 

50 group=socketprovider.SocketProvider.ENTRY_POINT_GROUP_NAME, 

51 value='tests: faulty_entry_callable', 

52 ), 

53 ], 

54 id='not-callable', 

55 ), 

56 pytest.param( 

57 [ 

58 importlib.metadata.EntryPoint( 

59 name=tests.faulty_entry_name_exists.key, 

60 group=socketprovider.SocketProvider.ENTRY_POINT_GROUP_NAME, 

61 value='tests: faulty_entry_name_exists', 

62 ), 

63 ], 

64 id='name-already-exists', 

65 ), 

66 pytest.param( 

67 [ 

68 importlib.metadata.EntryPoint( 

69 name=tests.faulty_entry_alias_exists.key, 

70 group=socketprovider.SocketProvider.ENTRY_POINT_GROUP_NAME, 

71 value='tests: faulty_entry_alias_exists', 

72 ), 

73 ], 

74 id='alias-already-exists', 

75 ), 

76 ], 

77 ) 

78 GOOD_ENTRY_POINTS = pytest.mark.parametrize( 

79 'additional_entry_points', 

80 [ 

81 pytest.param( 

82 [ 

83 importlib.metadata.EntryPoint( 

84 name=tests.posix_entry.key, 

85 group=socketprovider.SocketProvider.ENTRY_POINT_GROUP_NAME, 

86 value='tests: posix_entry', 

87 ), 

88 importlib.metadata.EntryPoint( 

89 name=tests.the_annoying_os_entry.key, 

90 group=socketprovider.SocketProvider.ENTRY_POINT_GROUP_NAME, 

91 value='tests: the_annoying_os_entry', 

92 ), 

93 ], 

94 id='existing-entries', 

95 ), 

96 pytest.param( 

97 [ 

98 importlib.metadata.EntryPoint( 

99 name=tests.provider_entry1.key, 

100 group=socketprovider.SocketProvider.ENTRY_POINT_GROUP_NAME, 

101 value='tests: provider_entry1', 

102 ), 

103 importlib.metadata.EntryPoint( 

104 name=tests.provider_entry2.key, 

105 group=socketprovider.SocketProvider.ENTRY_POINT_GROUP_NAME, 

106 value='tests: provider_entry2', 

107 ), 

108 ], 

109 id='new-entries', 

110 ), 

111 ], 

112 ) 

113 SSH_STRING_EXCEPTIONS = pytest.mark.parametrize( 

114 ['input', 'exc_type', 'exc_pattern'], 

115 [ 

116 pytest.param( 

117 'some string', TypeError, 'invalid payload type', id='str' 

118 ), 

119 ], 

120 ) 

121 UINT32_EXCEPTIONS = pytest.mark.parametrize( 

122 ['input', 'exc_type', 'exc_pattern'], 

123 [ 

124 pytest.param( 

125 10000000000000000, 

126 OverflowError, 

127 'int too big to convert', 

128 id='10000000000000000', 

129 ), 

130 pytest.param( 

131 -1, 

132 OverflowError, 

133 "can't convert negative int to unsigned", 

134 id='-1', 

135 ), 

136 ], 

137 ) 

138 SSH_UNSTRING_EXCEPTIONS = pytest.mark.parametrize( 

139 ['input', 'exc_type', 'exc_pattern', 'has_trailer', 'parts'], 

140 [ 

141 pytest.param( 

142 b'ssh', 

143 ValueError, 

144 'malformed SSH byte string', 

145 False, 

146 None, 

147 id='unencoded', 

148 ), 

149 pytest.param( 

150 b'\x00\x00\x00\x08ssh-rsa', 

151 ValueError, 

152 'malformed SSH byte string', 

153 False, 

154 None, 

155 id='truncated', 

156 ), 

157 pytest.param( 

158 b'\x00\x00\x00\x04XXX trailing text', 

159 ValueError, 

160 'malformed SSH byte string', 

161 True, 

162 (b'XXX ', b'trailing text'), 

163 id='trailing-data', 

164 ), 

165 ], 

166 ) 

167 SSH_STRING_INPUT = pytest.mark.parametrize( 

168 ['input', 'expected'], 

169 [ 

170 pytest.param( 

171 b'ssh-rsa', 

172 b'\x00\x00\x00\x07ssh-rsa', 

173 id='ssh-rsa', 

174 ), 

175 pytest.param( 

176 b'ssh-ed25519', 

177 b'\x00\x00\x00\x0bssh-ed25519', 

178 id='ssh-ed25519', 

179 ), 

180 pytest.param( 

181 ssh_agent.SSHAgentClient.string(b'ssh-ed25519'), 

182 b'\x00\x00\x00\x0f\x00\x00\x00\x0bssh-ed25519', 

183 id='string(ssh-ed25519)', 

184 ), 

185 ], 

186 ) 

187 SSH_UNSTRING_INPUT = pytest.mark.parametrize( 

188 ['input', 'expected'], 

189 [ 

190 pytest.param( 

191 b'\x00\x00\x00\x07ssh-rsa', 

192 b'ssh-rsa', 

193 id='ssh-rsa', 

194 ), 

195 pytest.param( 

196 ssh_agent.SSHAgentClient.string(b'ssh-ed25519'), 

197 b'ssh-ed25519', 

198 id='ssh-ed25519', 

199 ), 

200 ], 

201 ) 

202 UINT32_INPUT = pytest.mark.parametrize( 

203 ['input', 'expected'], 

204 [ 

205 pytest.param(16777216, b'\x01\x00\x00\x00', id='16777216'), 

206 ], 

207 ) 

208 SIGN_ERROR_RESPONSES = pytest.mark.parametrize( 

209 [ 

210 'key', 

211 'check', 

212 'response_code', 

213 'response', 

214 'exc_type', 

215 'exc_pattern', 

216 ], 

217 [ 

218 pytest.param( 

219 b'invalid-key', 

220 True, 

221 _types.SSH_AGENT.FAILURE, 

222 b'', 

223 KeyError, 

224 'target SSH key not loaded into agent', 

225 id='key-not-loaded', 

226 ), 

227 pytest.param( 

228 tests.SUPPORTED_KEYS['ed25519'].public_key_data, 

229 True, 

230 _types.SSH_AGENT.FAILURE, 

231 b'', 

232 ssh_agent.SSHAgentFailedError, 

233 'failed to complete the request', 

234 id='failed-to-complete', 

235 ), 

236 ], 

237 ) 

238 SSH_KEY_SELECTION = pytest.mark.parametrize( 

239 ['key', 'single'], 

240 [ 

241 (value.public_key_data, False) 

242 for value in tests.SUPPORTED_KEYS.values() 

243 ] 

244 + [(tests.list_keys_singleton()[0].key, True)], 

245 ids=[*tests.SUPPORTED_KEYS.keys(), 'singleton'], 

246 ) 

247 SH_EXPORT_LINES = pytest.mark.parametrize( 

248 ['line', 'env_name', 'value'], 

249 [ 

250 pytest.param( 

251 'SSH_AUTH_SOCK=/tmp/pageant.user/pageant.27170; export SSH_AUTH_SOCK;', 

252 'SSH_AUTH_SOCK', 

253 '/tmp/pageant.user/pageant.27170', 

254 id='value-export-semicolon-pageant', 

255 ), 

256 pytest.param( 

257 'SSH_AUTH_SOCK=/tmp/ssh-3CSTC1W5M22A/agent.27270; export SSH_AUTH_SOCK;', 

258 'SSH_AUTH_SOCK', 

259 '/tmp/ssh-3CSTC1W5M22A/agent.27270', 

260 id='value-export-semicolon-openssh', 

261 ), 

262 pytest.param( 

263 'SSH_AUTH_SOCK=/tmp/pageant.user/pageant.27170; export SSH_AUTH_SOCK', 

264 'SSH_AUTH_SOCK', 

265 '/tmp/pageant.user/pageant.27170', 

266 id='value-export-pageant', 

267 ), 

268 pytest.param( 

269 'export SSH_AUTH_SOCK=/tmp/ssh-3CSTC1W5M22A/agent.27270;', 

270 'SSH_AUTH_SOCK', 

271 '/tmp/ssh-3CSTC1W5M22A/agent.27270', 

272 id='export-value-semicolon-openssh', 

273 ), 

274 pytest.param( 

275 'export SSH_AUTH_SOCK=/tmp/pageant.user/pageant.27170', 

276 'SSH_AUTH_SOCK', 

277 '/tmp/pageant.user/pageant.27170', 

278 id='export-value-pageant', 

279 ), 

280 pytest.param( 

281 'SSH_AGENT_PID=27170; export SSH_AGENT_PID;', 

282 'SSH_AGENT_PID', 

283 '27170', 

284 id='pid-export-semicolon', 

285 ), 

286 pytest.param( 

287 'SSH_AGENT_PID=27170; export SSH_AGENT_PID', 

288 'SSH_AGENT_PID', 

289 '27170', 

290 id='pid-export', 

291 ), 

292 pytest.param( 

293 'export SSH_AGENT_PID=27170;', 

294 'SSH_AGENT_PID', 

295 '27170', 

296 id='export-pid-semicolon', 

297 ), 

298 pytest.param( 

299 'export SSH_AGENT_PID=27170', 

300 'SSH_AGENT_PID', 

301 '27170', 

302 id='export-pid', 

303 ), 

304 pytest.param( 

305 'export VARIABLE=value; export OTHER_VARIABLE=other_value;', 

306 'VARIABLE', 

307 None, 

308 id='export-too-much', 

309 ), 

310 pytest.param( 

311 'VARIABLE=value', 

312 'VARIABLE', 

313 None, 

314 id='no-export', 

315 ), 

316 ], 

317 ) 

318 PUBLIC_KEY_DATA = pytest.mark.parametrize( 

319 'public_key_struct', 

320 list(tests.SUPPORTED_KEYS.values()), 

321 ids=list(tests.SUPPORTED_KEYS.keys()), 

322 ) 

323 REQUEST_ERROR_RESPONSES = pytest.mark.parametrize( 

324 ['request_code', 'response_code', 'exc_type', 'exc_pattern'], 

325 [ 

326 pytest.param( 

327 _types.SSH_AGENTC.REQUEST_IDENTITIES, 

328 _types.SSH_AGENT.SUCCESS, 

329 ssh_agent.SSHAgentFailedError, 

330 re.escape( 

331 f'[Code {_types.SSH_AGENT.IDENTITIES_ANSWER.value}]' 

332 ), 

333 id='REQUEST_IDENTITIES-expect-SUCCESS', 

334 ), 

335 ], 

336 ) 

337 TRUNCATED_AGENT_RESPONSES = pytest.mark.parametrize( 

338 'response', 

339 [ 

340 b'\x00\x00', 

341 b'\x00\x00\x00\x1f some bytes missing', 

342 ], 

343 ids=['in-header', 'in-body'], 

344 ) 

345 LIST_KEYS_ERROR_RESPONSES = pytest.mark.parametrize( 

346 ['response_code', 'response', 'exc_type', 'exc_pattern'], 

347 [ 

348 pytest.param( 

349 _types.SSH_AGENT.FAILURE, 

350 b'', 

351 ssh_agent.SSHAgentFailedError, 

352 'failed to complete the request', 

353 id='failed-to-complete', 

354 ), 

355 pytest.param( 

356 _types.SSH_AGENT.IDENTITIES_ANSWER, 

357 b'\x00\x00\x00\x01', 

358 EOFError, 

359 'truncated response', 

360 id='truncated-response', 

361 ), 

362 pytest.param( 

363 _types.SSH_AGENT.IDENTITIES_ANSWER, 

364 b'\x00\x00\x00\x00abc', 

365 ssh_agent.TrailingDataError, 

366 'Overlong response', 

367 id='overlong-response', 

368 ), 

369 ], 

370 ) 

371 QUERY_EXTENSIONS_MALFORMED_RESPONSES = pytest.mark.parametrize( 

372 'response_data', 

373 [ 

374 pytest.param(b'\xde\xad\xbe\xef', id='truncated'), 

375 pytest.param( 

376 b'\x00\x00\x00\x0fwrong extension', id='wrong-extension' 

377 ), 

378 pytest.param( 

379 b'\x00\x00\x00\x05query\xde\xad\xbe\xef', id='with-trailer' 

380 ), 

381 pytest.param( 

382 b'\x00\x00\x00\x05query\x00\x00\x00\x04ext1\x00\x00', 

383 id='with-extra-fields', 

384 ), 

385 ], 

386 ) 

387 SUPPORTED_SSH_TEST_KEYS = pytest.mark.parametrize( 

388 ['ssh_test_key_type', 'ssh_test_key'], 

389 list(tests.SUPPORTED_KEYS.items()), 

390 ids=tests.SUPPORTED_KEYS.keys(), 

391 ) 

392 UNSUITABLE_SSH_TEST_KEYS = pytest.mark.parametrize( 

393 ['ssh_test_key_type', 'ssh_test_key'], 

394 list(tests.UNSUITABLE_KEYS.items()), 

395 ids=tests.UNSUITABLE_KEYS.keys(), 

396 ) 

397 INVALID_SSH_AGENT_MESSAGES = pytest.mark.parametrize( 

398 'message', 

399 [ 

400 pytest.param(b'\x00\x00\x00\x00', id='empty-message'), 

401 pytest.param(b'\x00\x00\x00\x0f\x0d', id='truncated-message'), 

402 pytest.param( 

403 b'\x00\x00\x00\x06\x1b\x00\x00\x00\x01\xff', 

404 id='invalid-extension-name', 

405 ), 

406 pytest.param( 

407 b'\x00\x00\x00\x11\x0d\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', 

408 id='sign-with-trailing-data', 

409 ), 

410 ], 

411 ) 

412 UNSUPPORTED_SSH_AGENT_MESSAGES = pytest.mark.parametrize( 

413 'message', 

414 [ 

415 pytest.param( 

416 ssh_agent.SSHAgentClient.string( 

417 b''.join([ 

418 b'\x0d', 

419 ssh_agent.SSHAgentClient.string( 

420 tests.ALL_KEYS['rsa'].public_key_data 

421 ), 

422 ssh_agent.SSHAgentClient.string(vault.Vault.UUID), 

423 b'\x00\x00\x00\x02', 

424 ]) 

425 ), 

426 id='sign-with-flags', 

427 ), 

428 pytest.param( 

429 ssh_agent.SSHAgentClient.string( 

430 b''.join([ 

431 b'\x0d', 

432 ssh_agent.SSHAgentClient.string( 

433 tests.ALL_KEYS['ed25519'].public_key_data 

434 ), 

435 b'\x00\x00\x00\x08\x00\x01\x02\x03\x04\x05\x06\x07', 

436 b'\x00\x00\x00\x00', 

437 ]) 

438 ), 

439 id='sign-with-nonstandard-passphrase', 

440 ), 

441 pytest.param( 

442 ssh_agent.SSHAgentClient.string( 

443 b''.join([ 

444 b'\x0d', 

445 ssh_agent.SSHAgentClient.string( 

446 tests.ALL_KEYS['dsa1024'].public_key_data 

447 ), 

448 ssh_agent.SSHAgentClient.string(vault.Vault.UUID), 

449 b'\x00\x00\x00\x00', 

450 ]) 

451 ), 

452 id='sign-key-no-expected-signature', 

453 ), 

454 pytest.param( 

455 ssh_agent.SSHAgentClient.string( 

456 b''.join([ 

457 b'\x0d', 

458 b'\x00\x00\x00\x00', 

459 ssh_agent.SSHAgentClient.string(vault.Vault.UUID), 

460 b'\x00\x00\x00\x00', 

461 ]) 

462 ), 

463 id='sign-key-unregistered-test-key', 

464 ), 

465 ], 

466 ) 

467 FAKE_AGENT_ADDRESSES = pytest.mark.parametrize( 

468 ['address', 'exception', 'match'], 

469 [ 

470 pytest.param(None, KeyError, 'SSH_AUTH_SOCK', id='unset'), 

471 pytest.param('fake-ssh-agent:', None, '', id='standard'), 

472 pytest.param( 

473 str(pathlib.Path('~').expanduser()), 

474 FileNotFoundError, 

475 os.strerror(errno.ENOENT), 

476 id='invalid-url', 

477 ), 

478 pytest.param( 

479 'fake-ssh-agent:EPROTONOSUPPORT', 

480 OSError, 

481 os.strerror(errno.EPROTONOSUPPORT), 

482 id='protocol-not-supported', 

483 ), 

484 pytest.param( 

485 'fake-ssh-agent:ABCDEFGHIJKLMNOPQRSTUVWXYZ', 

486 OSError, 

487 os.strerror(errno.EINVAL), 

488 id='invalid-error-code', 

489 ), 

490 ], 

491 ) 

492 

493 

494class TestTestingMachineryFakeSSHAgentSocket: 

495 """Test the fake SSH agent socket for the `ssh_agent` module tests.""" 

496 

497 def test_100_query_extensions(self) -> None: 

498 """The agent implements a known list of extensions.""" 

499 with tests.FakeSSHAgentSocket() as agent: 1C

500 agent.sendall(b'\x00\x00\x00\x0a\x1b\x00\x00\x00\x05query') 1C

501 assert ( 1C

502 agent.recv(1000) 

503 == b'\x00\x00\x00\x40\x1d\x00\x00\x00\x05query\x00\x00\x00\x05query\x00\x00\x00\x29list-extended@putty.projects.tartarus.org' 

504 ) 

505 

506 def test_101_request_identities(self) -> None: 

507 """The agent implements a known list of identities.""" 

508 unstring_prefix = ssh_agent.SSHAgentClient.unstring_prefix 1g

509 with tests.FakeSSHAgentSocket() as agent: 1g

510 agent.sendall(b'\x00\x00\x00\x01\x0b') 1g

511 message_length = int.from_bytes(agent.recv(4), 'big') 1g

512 orig_message: bytes | bytearray = bytearray( 1g

513 agent.recv(message_length) 

514 ) 

515 assert ( 1g

516 _types.SSH_AGENT(orig_message[0]) 

517 == _types.SSH_AGENT.IDENTITIES_ANSWER 

518 ) 

519 identity_count = int.from_bytes(orig_message[1:5], 'big') 1g

520 message = bytes(orig_message[5:]) 1g

521 for _ in range(identity_count): 1g

522 key, message = unstring_prefix(message) 1g

523 _comment, message = unstring_prefix(message) 1g

524 assert key 1g

525 assert key in { 1g

526 k.public_key_data for k in tests.ALL_KEYS.values() 

527 } 

528 assert not message 1g

529 

530 @Parametrize.SUPPORTED_SSH_TEST_KEYS 

531 def test_102_sign( 

532 self, 

533 ssh_test_key_type: str, 

534 ssh_test_key: tests.SSHTestKey, 

535 ) -> None: 

536 """The agent signs known key/message pairs.""" 

537 del ssh_test_key_type 1q

538 assert ssh_test_key.expected_signature is not None 1q

539 string = ssh_agent.SSHAgentClient.string 1q

540 with tests.FakeSSHAgentSocket() as agent: 1q

541 agent.sendall( 1q

542 string( 

543 b'\x0d' 

544 + string(ssh_test_key.public_key_data) 

545 + string(vault.Vault.UUID) 

546 + b'\x00\x00\x00\x00' 

547 ) 

548 ) 

549 assert agent.recv(1000) == string( 1q

550 b'\x0e' + string(ssh_test_key.expected_signature) 

551 ) 

552 

553 def test_120_close_multiple(self) -> None: 

554 """The agent can be closed repeatedly.""" 

555 with tests.FakeSSHAgentSocket() as agent: 1s

556 pass 1s

557 with tests.FakeSSHAgentSocket() as agent: 1s

558 pass 1s

559 del agent 1s

560 

561 def test_121_closed_agents_cannot_be_interacted_with(self) -> None: 

562 """The agent can be closed repeatedly.""" 

563 with tests.FakeSSHAgentSocket() as agent: 1t

564 pass 1t

565 with pytest.raises( 1t

566 ValueError, 

567 match=re.escape(tests.FakeSSHAgentSocket._SOCKET_IS_CLOSED), 

568 ): 

569 agent.sendall(b'\x00\x00\x00\x0a\x23\x00\x00\x00\x05query') 1t

570 assert agent.recv(100) == b'' 1t

571 

572 def test_122_no_recv_without_sendall(self) -> None: 

573 """The agent requires a message before sending a response.""" 

574 with tests.FakeSSHAgentSocket() as agent: # noqa: SIM117 1D

575 with pytest.raises( 1D

576 AssertionError, 

577 match=re.escape(tests.FakeSSHAgentSocket._PROTOCOL_VIOLATION), 

578 ): 

579 agent.recv(100) 1D

580 

581 @Parametrize.INVALID_SSH_AGENT_MESSAGES 

582 def test_123_invalid_ssh_agent_messages( 

583 self, 

584 message: Buffer, 

585 ) -> None: 

586 """The agent responds with errors on invalid messages.""" 

587 with tests.FakeSSHAgentSocket() as agent: 1E

588 agent.sendall(message) 1E

589 assert agent.recv(100) == b'\x00\x00\x00\x01\x05' 1E

590 

591 @Parametrize.UNSUPPORTED_SSH_AGENT_MESSAGES 

592 def test_124_unsupported_ssh_agent_messages( 

593 self, 

594 message: Buffer, 

595 ) -> None: 

596 """The agent responds with errors on unsupported messages.""" 

597 with tests.FakeSSHAgentSocket() as agent: 1F

598 agent.sendall(message) 1F

599 assert agent.recv(100) == b'\x00\x00\x00\x01\x05' 1F

600 

601 @Parametrize.FAKE_AGENT_ADDRESSES 

602 def test_125_addresses( 

603 self, 

604 address: str | None, 

605 exception: type[Exception] | None, 

606 match: str, 

607 ) -> None: 

608 """The agent accepts addresses.""" 

609 with contextlib.ExitStack() as stack: 1m

610 monkeypatch = stack.enter_context(pytest.MonkeyPatch.context()) 1m

611 if address: 1m

612 monkeypatch.setenv('SSH_AUTH_SOCK', address) 1m

613 else: 

614 monkeypatch.delenv('SSH_AUTH_SOCK', raising=False) 1m

615 if exception: 1m

616 stack.enter_context( 1m

617 pytest.raises(exception, match=re.escape(match)) 

618 ) 

619 tests.FakeSSHAgentSocketWithAddress() 1m

620 

621 

622class TestStaticFunctionality: 

623 """Test the static functionality of the `ssh_agent` module.""" 

624 

625 @staticmethod 

626 def as_ssh_string(bytestring: bytes) -> bytes: 

627 """Return an encoded SSH string from a bytestring. 

628 

629 This is a helper function for hypothesis data generation. 

630 

631 """ 

632 return int.to_bytes(len(bytestring), 4, 'big') + bytestring 1k

633 

634 @staticmethod 

635 def canonicalize1(data: bytes) -> bytes: 

636 """Return an encoded SSH string from a bytestring. 

637 

638 This is a helper function for hypothesis testing. 

639 

640 References: 

641 

642 * [David R. MacIver: Another invariant to test for 

643 encoders][DECODE_ENCODE] 

644 

645 [DECODE_ENCODE]: https://hypothesis.works/articles/canonical-serialization/ 

646 

647 """ 

648 return ssh_agent.SSHAgentClient.string( 1k

649 ssh_agent.SSHAgentClient.unstring(data) 

650 ) 

651 

652 @staticmethod 

653 def canonicalize2(data: bytes) -> bytes: 

654 """Return an encoded SSH string from a bytestring. 

655 

656 This is a helper function for hypothesis testing. 

657 

658 References: 

659 

660 * [David R. MacIver: Another invariant to test for 

661 encoders][DECODE_ENCODE] 

662 

663 [DECODE_ENCODE]: https://hypothesis.works/articles/canonical-serialization/ 

664 

665 """ 

666 unstringed, trailer = ssh_agent.SSHAgentClient.unstring_prefix(data) 1k

667 assert not trailer 1k

668 return ssh_agent.SSHAgentClient.string(unstringed) 1k

669 

670 # TODO(the-13th-letter): Re-evaluate if this check is worth keeping. 

671 # It cannot provide true tamper-resistence, but probably appears to. 

672 @Parametrize.PUBLIC_KEY_DATA 

673 def test_100_key_decoding( 

674 self, 

675 public_key_struct: tests.SSHTestKey, 

676 ) -> None: 

677 """The [`tests.ALL_KEYS`][] public key data looks sane.""" 

678 keydata = base64.b64decode( 1K

679 public_key_struct.public_key.split(None, 2)[1] 

680 ) 

681 assert keydata == public_key_struct.public_key_data, ( 1K

682 "recorded public key data doesn't match" 

683 ) 

684 

685 @Parametrize.SH_EXPORT_LINES 

686 def test_190_sh_export_line_parsing( 

687 self, line: str, env_name: str, value: str | None 

688 ) -> None: 

689 """[`tests.parse_sh_export_line`][] works.""" 

690 if value is not None: 1z

691 assert tests.parse_sh_export_line(line, env_name=env_name) == value 1z

692 else: 

693 with pytest.raises(ValueError, match='Cannot parse sh line:'): 1z

694 tests.parse_sh_export_line(line, env_name=env_name) 1z

695 

696 def test_200_constructor_posix_no_ssh_auth_sock( 

697 self, 

698 skip_if_no_af_unix_support: None, 

699 ) -> None: 

700 """Abort if the running agent cannot be located.""" 

701 del skip_if_no_af_unix_support 

702 posix_handler = socketprovider.SocketProvider.resolve('posix') 

703 with pytest.MonkeyPatch.context() as monkeypatch: 

704 monkeypatch.delenv('SSH_AUTH_SOCK', raising=False) 

705 with pytest.raises( 

706 KeyError, match='SSH_AUTH_SOCK environment variable' 

707 ): 

708 posix_handler() 

709 

710 @Parametrize.UINT32_INPUT 

711 def test_210_uint32(self, input: int, expected: bytes | bytearray) -> None: 

712 """`uint32` encoding works.""" 

713 uint32 = ssh_agent.SSHAgentClient.uint32 1L

714 assert uint32(input) == expected 1L

715 

716 @hypothesis.given(strategies.integers(min_value=0, max_value=0xFFFFFFFF)) 

717 @hypothesis.example(0xDEADBEEF).via('manual, pre-hypothesis example') 1aG

718 def test_210a_uint32_from_number(self, num: int) -> None: 

719 """`uint32` encoding works, starting from numbers.""" 

720 uint32 = ssh_agent.SSHAgentClient.uint32 1G

721 assert int.from_bytes(uint32(num), 'big', signed=False) == num 1G

722 

723 @hypothesis.given(strategies.binary(min_size=4, max_size=4)) 

724 @hypothesis.example(b'\xde\xad\xbe\xef').via( 1aH

725 'manual, pre-hypothesis example' 

726 ) 

727 def test_210b_uint32_from_bytestring(self, bytestring: bytes) -> None: 

728 """`uint32` encoding works, starting from length four byte strings.""" 

729 uint32 = ssh_agent.SSHAgentClient.uint32 1H

730 assert ( 1H

731 uint32(int.from_bytes(bytestring, 'big', signed=False)) 

732 == bytestring 

733 ) 

734 

735 @Parametrize.SSH_STRING_INPUT 

736 def test_211_string( 

737 self, input: bytes | bytearray, expected: bytes | bytearray 

738 ) -> None: 

739 """SSH string encoding works.""" 

740 string = ssh_agent.SSHAgentClient.string 1M

741 assert bytes(string(input)) == expected 1M

742 

743 @hypothesis.given(strategies.binary(max_size=0x0001FFFF)) 

744 @hypothesis.example(b'DEADBEEF' * 10000).via( 1au

745 'manual, pre-hypothesis example with highest order bit set' 

746 ) 

747 def test_211a_string_from_bytestring(self, bytestring: bytes) -> None: 

748 """SSH string encoding works, starting from a byte string.""" 

749 res = ssh_agent.SSHAgentClient.string(bytestring) 1u

750 assert res.startswith((b'\x00\x00', b'\x00\x01')) 1u

751 assert int.from_bytes(res[:4], 'big', signed=False) == len(bytestring) 1u

752 assert res[4:] == bytestring 1u

753 

754 @Parametrize.SSH_UNSTRING_INPUT 

755 def test_212_unstring( 

756 self, input: bytes | bytearray, expected: bytes | bytearray 

757 ) -> None: 

758 """SSH string decoding works.""" 

759 unstring = ssh_agent.SSHAgentClient.unstring 1A

760 unstring_prefix = ssh_agent.SSHAgentClient.unstring_prefix 1A

761 assert bytes(unstring(input)) == expected 1A

762 assert tuple(bytes(x) for x in unstring_prefix(input)) == ( 1A

763 expected, 

764 b'', 

765 ) 

766 

767 @hypothesis.given(strategies.binary(max_size=0x00FFFFFF)) 

768 @hypothesis.example(b'\x00\x00\x00\x07ssh-rsa').via( 1al

769 'manual, pre-hypothesis example to attempt to detect double-decoding' 

770 ) 

771 @hypothesis.example(b'\x00\x00\x00\x01').via( 

772 'detect no-op encoding via ill-formed SSH string' 

773 ) 

774 def test_212a_unstring_of_string_of_data(self, bytestring: bytes) -> None: 

775 """SSH string decoding of encoded SSH strings works. 

776 

777 References: 

778 

779 * [David R. MacIver: The Encode/Decode invariant][ENCODE_DECODE] 

780 

781 [ENCODE_DECODE]: https://hypothesis.works/articles/encode-decode-invariant/ 

782 

783 """ 

784 string = ssh_agent.SSHAgentClient.string 1l

785 unstring = ssh_agent.SSHAgentClient.unstring 1l

786 unstring_prefix = ssh_agent.SSHAgentClient.unstring_prefix 1l

787 encoded = string(bytestring) 1l

788 assert unstring(encoded) == bytestring 1l

789 assert unstring_prefix(encoded) == (bytestring, b'') 1l

790 trailing_data = b' trailing data' 1l

791 encoded2 = string(bytestring) + trailing_data 1l

792 assert unstring_prefix(encoded2) == (bytestring, trailing_data) 1l

793 

794 @hypothesis.given( 

795 strategies.binary(max_size=0x00FFFFFF).map( 

796 # Scoping issues, and the fact that staticmethod objects 

797 # (before class finalization) are not callable, necessitate 

798 # wrapping this staticmethod call in a lambda. 

799 lambda x: TestStaticFunctionality.as_ssh_string(x) # noqa: PLW0108 

800 ), 

801 ) 

802 def test_212b_string_of_unstring_of_data(self, encoded: bytes) -> None: 

803 """SSH string decoding of encoded SSH strings works. 

804 

805 References: 

806 

807 * [David R. MacIver: Another invariant to test for 

808 encoders][DECODE_ENCODE] 

809 

810 [DECODE_ENCODE]: https://hypothesis.works/articles/canonical-serialization/ 

811 

812 """ 

813 canonical_functions = [self.canonicalize1, self.canonicalize2] 1k

814 for canon1 in canonical_functions: 1k

815 for canon2 in canonical_functions: 1k

816 assert canon1(encoded) == canon2(encoded) 1k

817 assert canon1(canon2(encoded)) == canon1(encoded) 1k

818 

819 def test_220_registry_resolve( 

820 self, 

821 ) -> None: 

822 """Resolving entries in the socket provider registry works.""" 

823 registry = socketprovider.SocketProvider.registry 1o

824 resolve = socketprovider.SocketProvider.resolve 1o

825 with pytest.MonkeyPatch.context() as monkeypatch: 1o

826 monkeypatch.setitem(registry, 'fake', None) 1o

827 assert callable(resolve('native')) 1o

828 with pytest.raises(NotImplementedError): 1o

829 resolve('fake') 1o

830 

831 @hypothesis.given( 

832 terminal=strategies.sampled_from([ 

833 'unimplemented', 

834 'alias', 

835 'callable', 

836 ]), 

837 chain=strategies.lists( 

838 strategies.sampled_from([ 

839 'c1', 

840 'c2', 

841 'c3', 

842 'c4', 

843 'c5', 

844 'c6', 

845 'c7', 

846 'c8', 

847 'c9', 

848 'c10', 

849 ]), 

850 min_size=1, 

851 unique=True, 

852 ), 

853 ) 

854 def test_221_registry_resolve_chains( 

855 self, 

856 terminal: Literal['unimplemented', 'alias', 'callable'], 

857 chain: list[str], 

858 ) -> None: 

859 """Resolving a chain of providers works.""" 

860 registry = socketprovider.SocketProvider.registry 1h

861 resolve = socketprovider.SocketProvider.resolve 1h

862 try: 1h

863 implementation = resolve('native') 1h

864 except NotImplementedError: # pragma: no cover 

865 hypothesis.note(f'{registry = }') 

866 pytest.fail('Native SSH agent socket provider is unavailable?!') 

867 # TODO(the-13th-letter): Rewrite using structural pattern matching. 

868 # https://the13thletter.info/derivepassphrase/latest/pycompatibility/#after-eol-py3.9 

869 target = ( 1h

870 None 

871 if terminal == 'unimplemented' 

872 else 'native' 

873 if terminal == 'alias' 

874 else implementation 

875 ) 

876 with pytest.MonkeyPatch.context() as monkeypatch: 1h

877 for link in chain: 1h

878 monkeypatch.setitem(registry, link, target) 1h

879 target = link 1h

880 if terminal == 'unimplemented': 1h

881 with pytest.raises(NotImplementedError): 1h

882 resolve(chain[-1]) 1h

883 else: 

884 assert resolve(chain[-1]) == implementation 1h

885 

886 @Parametrize.GOOD_ENTRY_POINTS 

887 def test_230_find_all_socket_providers( 

888 self, 

889 additional_entry_points: list[importlib.metadata.EntryPoint], 

890 ) -> None: 

891 """Finding all SSH agent socket providers works.""" 

892 resolve = socketprovider.SocketProvider.resolve 1p

893 old_registry = socketprovider.SocketProvider.registry 1p

894 with tests.faked_entry_point_list( 1p

895 additional_entry_points, remove_conflicting_entries=False 

896 ) as names: 

897 socketprovider.SocketProvider._find_all_ssh_agent_socket_providers() 1p

898 for name in names: 1p

899 assert name in socketprovider.SocketProvider.registry 1p

900 assert resolve(name) in { 1p

901 tests.provider_entry_provider, 

902 *old_registry.values(), 

903 } 

904 

905 @Parametrize.BAD_ENTRY_POINTS 

906 def test_231_find_all_socket_providers_errors( 

907 self, 

908 additional_entry_points: list[importlib.metadata.EntryPoint], 

909 ) -> None: 

910 """Finding faulty SSH agent socket providers raises errors.""" 

911 with contextlib.ExitStack() as stack: 1B

912 stack.enter_context( 1B

913 tests.faked_entry_point_list( 

914 additional_entry_points, remove_conflicting_entries=False 

915 ) 

916 ) 

917 stack.enter_context(pytest.raises(AssertionError)) 1B

918 socketprovider.SocketProvider._find_all_ssh_agent_socket_providers() 1B

919 

920 @Parametrize.UINT32_EXCEPTIONS 

921 def test_310_uint32_exceptions( 

922 self, input: int, exc_type: type[Exception], exc_pattern: str 

923 ) -> None: 

924 """`uint32` encoding fails for out-of-bound values.""" 

925 uint32 = ssh_agent.SSHAgentClient.uint32 1I

926 with pytest.raises(exc_type, match=exc_pattern): 1I

927 uint32(input) 1I

928 

929 @Parametrize.SSH_STRING_EXCEPTIONS 

930 def test_311_string_exceptions( 

931 self, input: Any, exc_type: type[Exception], exc_pattern: str 

932 ) -> None: 

933 """SSH string encoding fails for non-strings.""" 

934 string = ssh_agent.SSHAgentClient.string 1J

935 with pytest.raises(exc_type, match=exc_pattern): 1J

936 string(input) 1J

937 

938 @Parametrize.SSH_UNSTRING_EXCEPTIONS 

939 def test_312_unstring_exceptions( 

940 self, 

941 input: bytes | bytearray, 

942 exc_type: type[Exception], 

943 exc_pattern: str, 

944 has_trailer: bool, 

945 parts: tuple[bytes | bytearray, bytes | bytearray] | None, 

946 ) -> None: 

947 """SSH string decoding fails for invalid values.""" 

948 unstring = ssh_agent.SSHAgentClient.unstring 1n

949 unstring_prefix = ssh_agent.SSHAgentClient.unstring_prefix 1n

950 with pytest.raises(exc_type, match=exc_pattern): 1n

951 unstring(input) 1n

952 if has_trailer: 1n

953 assert tuple(bytes(x) for x in unstring_prefix(input)) == parts 1n

954 else: 

955 with pytest.raises(exc_type, match=exc_pattern): 1n

956 unstring_prefix(input) 1n

957 

958 def test_320_registry_already_registered( 

959 self, 

960 ) -> None: 

961 """The registry forbids overwriting entries.""" 

962 registry = socketprovider.SocketProvider.registry.copy() 1e

963 resolve = socketprovider.SocketProvider.resolve 1e

964 register = socketprovider.SocketProvider.register 1e

965 the_annoying_os = resolve('the_annoying_os') 1e

966 posix = resolve('posix') 1e

967 with pytest.MonkeyPatch.context() as monkeypatch: 1e

968 monkeypatch.setattr( 1e

969 socketprovider.SocketProvider, 'registry', registry 

970 ) 

971 register('posix')(posix) 1e

972 register('the_annoying_os')(the_annoying_os) 1e

973 with pytest.raises(ValueError, match='already registered'): 1e

974 register('posix')(the_annoying_os) 1e

975 with pytest.raises(ValueError, match='already registered'): 1e

976 register('the_annoying_os')(posix) 1e

977 with pytest.raises(ValueError, match='already registered'): 1e

978 register('posix', 'the_annoying_os_named_pipe')(posix) 1e

979 with pytest.raises(ValueError, match='already registered'): 1e

980 register('the_annoying_os', 'unix_domain')(the_annoying_os) 1e

981 

982 @hypothesis.given( 

983 entry=strategies.text('abcdefghijklmnopqrstuvwxyz0123456789_').filter( 

984 lambda s: s not in socketprovider.SocketProvider.registry 

985 ), 

986 ) 

987 def test_321_registry_resolve_non_existant_entries( 

988 self, 

989 entry: str, 

990 ) -> None: 

991 """Resolving a non-existant entry fails.""" 

992 with pytest.raises(socketprovider.NoSuchProviderError): 1N

993 socketprovider.SocketProvider.resolve(entry) 1N

994 

995 

996class TestAgentInteraction: 

997 """Test actually talking to the SSH agent.""" 

998 

999 @Parametrize.SUPPORTED_SSH_TEST_KEYS 

1000 def test_200_sign_data_via_agent( 

1001 self, 

1002 ssh_agent_client_with_test_keys_loaded: ssh_agent.SSHAgentClient, 

1003 ssh_test_key_type: str, 

1004 ssh_test_key: tests.SSHTestKey, 

1005 ) -> None: 

1006 """Signing data with specific SSH keys works. 

1007 

1008 Single tests may abort early (skip) if the indicated key is not 

1009 loaded in the agent. Presumably this means the key type is 

1010 unsupported. 

1011 

1012 """ 

1013 client = ssh_agent_client_with_test_keys_loaded 1i

1014 key_comment_pairs = {bytes(k): bytes(c) for k, c in client.list_keys()} 1i

1015 public_key_data = ssh_test_key.public_key_data 1i

1016 expected_signature = ssh_test_key.expected_signature 1i

1017 derived_passphrase = ssh_test_key.derived_passphrase 1i

1018 assert expected_signature is not None 1i

1019 assert derived_passphrase is not None 1i

1020 if public_key_data not in key_comment_pairs: # pragma: no cover 1i

1021 pytest.skip(f'prerequisite {ssh_test_key_type} SSH key not loaded') 

1022 signature = bytes( 1i

1023 client.sign(payload=vault.Vault.UUID, key=public_key_data) 

1024 ) 

1025 assert signature == expected_signature, ( 1i

1026 f'SSH signature mismatch ({ssh_test_key_type})' 

1027 ) 

1028 signature2 = bytes( 1i

1029 client.sign(payload=vault.Vault.UUID, key=public_key_data) 

1030 ) 

1031 assert signature2 == expected_signature, ( 1i

1032 f'SSH signature mismatch ({ssh_test_key_type})' 

1033 ) 

1034 assert ( 1i

1035 vault.Vault.phrase_from_key(public_key_data, conn=client) 

1036 == derived_passphrase 

1037 ), f'SSH signature mismatch ({ssh_test_key_type})' 

1038 

1039 @Parametrize.UNSUITABLE_SSH_TEST_KEYS 

1040 def test_201_sign_data_via_agent_unsupported( 

1041 self, 

1042 ssh_agent_client_with_test_keys_loaded: ssh_agent.SSHAgentClient, 

1043 ssh_test_key_type: str, 

1044 ssh_test_key: tests.SSHTestKey, 

1045 ) -> None: 

1046 """Using an unsuitable key with [`vault.Vault`][] fails. 

1047 

1048 Single tests may abort early (skip) if the indicated key is not 

1049 loaded in the agent. Presumably this means the key type is 

1050 unsupported. Single tests may also abort early if the agent 

1051 ensures that the generally unsuitable key is actually suitable 

1052 under this agent. 

1053 

1054 """ 

1055 client = ssh_agent_client_with_test_keys_loaded 1v

1056 key_comment_pairs = {bytes(k): bytes(c) for k, c in client.list_keys()} 1v

1057 public_key_data = ssh_test_key.public_key_data 1v

1058 if public_key_data not in key_comment_pairs: # pragma: no cover 1v

1059 pytest.skip(f'prerequisite {ssh_test_key_type} SSH key not loaded') 1v

1060 assert not vault.Vault.is_suitable_ssh_key( 

1061 public_key_data, client=None 

1062 ), f'Expected {ssh_test_key_type} key to be unsuitable in general' 

1063 if vault.Vault.is_suitable_ssh_key(public_key_data, client=client): 

1064 pytest.skip( 

1065 f'agent automatically ensures {ssh_test_key_type} key is suitable' 

1066 ) 

1067 with pytest.raises(ValueError, match='unsuitable SSH key'): 

1068 vault.Vault.phrase_from_key(public_key_data, conn=client) 

1069 

1070 @Parametrize.SSH_KEY_SELECTION 

1071 def test_210_ssh_key_selector( 

1072 self, 

1073 monkeypatch: pytest.MonkeyPatch, 

1074 ssh_agent_client_with_test_keys_loaded: ssh_agent.SSHAgentClient, 

1075 key: bytes, 

1076 single: bool, 

1077 ) -> None: 

1078 """The key selector presents exactly the suitable keys. 

1079 

1080 "Suitable" here means suitability for this SSH agent 

1081 specifically. 

1082 

1083 """ 

1084 client = ssh_agent_client_with_test_keys_loaded 1b

1085 

1086 def key_is_suitable(key: bytes) -> bool: 1b

1087 """Stub out [`vault.Vault.key_is_suitable`][].""" 

1088 always = {v.public_key_data for v in tests.SUPPORTED_KEYS.values()} 1b

1089 dsa = { 1b

1090 v.public_key_data 

1091 for k, v in tests.UNSUITABLE_KEYS.items() 

1092 if k.startswith(('dsa', 'ecdsa')) 

1093 } 

1094 return key in always or ( 1b

1095 client.has_deterministic_dsa_signatures() and key in dsa 

1096 ) 

1097 

1098 # TODO(the-13th-letter): Handle the unlikely(?) case that only 

1099 # one test key is loaded, but `single` is False. Rename the 

1100 # `index` variable to `input`, store the `input` in there, and 

1101 # make the definition of `text` in the else block dependent on 

1102 # `n` being singular or non-singular. 

1103 if single: 1b

1104 monkeypatch.setattr( 1b

1105 ssh_agent.SSHAgentClient, 

1106 'list_keys', 

1107 tests.list_keys_singleton, 

1108 ) 

1109 keys = [ 1b

1110 pair.key 

1111 for pair in tests.list_keys_singleton() 

1112 if key_is_suitable(pair.key) 

1113 ] 

1114 index = '1' 1b

1115 text = 'Use this key? yes\n' 1b

1116 else: 

1117 monkeypatch.setattr( 1b

1118 ssh_agent.SSHAgentClient, 'list_keys', tests.list_keys 

1119 ) 

1120 keys = [ 1b

1121 pair.key 

1122 for pair in tests.list_keys() 

1123 if key_is_suitable(pair.key) 

1124 ] 

1125 index = str(1 + keys.index(key)) 1b

1126 n = len(keys) 1b

1127 text = f'Your selection? (1-{n}, leave empty to abort): {index}\n' 1b

1128 b64_key = base64.standard_b64encode(key).decode('ASCII') 1b

1129 

1130 @click.command() 1b

1131 def driver() -> None: 1b

1132 """Call [`cli_helpers.select_ssh_key`][] directly, as a command.""" 

1133 key = cli_helpers.select_ssh_key(client) 1b

1134 click.echo(base64.standard_b64encode(key).decode('ASCII')) 1b

1135 

1136 # TODO(the-13th-letter): (Continued from above.) Update input 

1137 # data to use `index`/`input` directly and unconditionally. 

1138 runner = tests.CliRunner(mix_stderr=True) 1b

1139 result = runner.invoke( 1b

1140 driver, 

1141 [], 

1142 input=('yes\n' if single else f'{index}\n'), 

1143 catch_exceptions=True, 

1144 ) 

1145 for snippet in ('Suitable SSH keys:\n', text, f'\n{b64_key}\n'): 1b

1146 assert result.clean_exit(output=snippet), 'expected clean exit' 1b

1147 

1148 def test_300_constructor_bad_running_agent( 

1149 self, 

1150 running_ssh_agent: tests.RunningSSHAgentInfo, 

1151 ) -> None: 

1152 """Fail if the agent address is invalid.""" 

1153 with pytest.MonkeyPatch.context() as monkeypatch: 1w

1154 new_socket_name = ( 1w

1155 running_ssh_agent.socket + '~' 

1156 if isinstance(running_ssh_agent.socket, str) 

1157 else '<invalid//address>' 

1158 ) 

1159 monkeypatch.setenv('SSH_AUTH_SOCK', new_socket_name) 1w

1160 with pytest.raises(OSError): # noqa: PT011 1w

1161 ssh_agent.SSHAgentClient() 1w

1162 

1163 def test_301_constructor_no_af_unix_support(self) -> None: 

1164 """Fail without [`socket.AF_UNIX`][] support.""" 

1165 assert 'posix' in socketprovider.SocketProvider.registry 1r

1166 with pytest.MonkeyPatch.context() as monkeypatch: 1r

1167 monkeypatch.setenv('SSH_AUTH_SOCK', "the value doesn't matter") 1r

1168 monkeypatch.delattr(socket, 'AF_UNIX', raising=False) 1r

1169 with pytest.raises( 1r

1170 NotImplementedError, 

1171 match='UNIX domain sockets', 

1172 ): 

1173 ssh_agent.SSHAgentClient(socket='posix') 1r

1174 

1175 def test_302_no_ssh_agent_socket_provider_available( 

1176 self, 

1177 ) -> None: 

1178 """Fail if no SSH agent socket provider is available.""" 

1179 with pytest.MonkeyPatch.context() as monkeypatch: 1x

1180 monkeypatch.setitem( 1x

1181 socketprovider.SocketProvider.registry, 'fake', None 

1182 ) 

1183 with pytest.raises(ExceptionGroup) as excinfo: 1x

1184 ssh_agent.SSHAgentClient(socket=['fake', 'fake', 'fake']) 1x

1185 assert all([ 1x

1186 isinstance(e, NotImplementedError) 

1187 for e in excinfo.value.exceptions 

1188 ]) 

1189 

1190 def test_303_explicit_socket( 

1191 self, 

1192 spawn_ssh_agent: tests.SpawnedSSHAgentInfo, 

1193 ) -> None: 

1194 conn = spawn_ssh_agent.client._connection 1O

1195 ssh_agent.SSHAgentClient(socket=conn) 1O

1196 

1197 @Parametrize.TRUNCATED_AGENT_RESPONSES 

1198 def test_310_truncated_server_response( 

1199 self, 

1200 running_ssh_agent: tests.RunningSSHAgentInfo, 

1201 response: bytes, 

1202 ) -> None: 

1203 """Fail on truncated responses from the SSH agent.""" 

1204 del running_ssh_agent 1j

1205 client = ssh_agent.SSHAgentClient() 1j

1206 response_stream = io.BytesIO(response) 1j

1207 

1208 class PseudoSocket: 1j

1209 def sendall(self, *args: Any, **kwargs: Any) -> Any: # noqa: ARG002 1j

1210 return None 1j

1211 

1212 def recv(self, *args: Any, **kwargs: Any) -> Any: 1j

1213 return response_stream.read(*args, **kwargs) 1j

1214 

1215 pseudo_socket = PseudoSocket() 1j

1216 with pytest.MonkeyPatch.context() as monkeypatch: 1j

1217 monkeypatch.setattr(client, '_connection', pseudo_socket) 1j

1218 with pytest.raises(EOFError): 1j

1219 client.request(255, b'') 1j

1220 

1221 @Parametrize.LIST_KEYS_ERROR_RESPONSES 

1222 def test_320_list_keys_error_responses( 

1223 self, 

1224 running_ssh_agent: tests.RunningSSHAgentInfo, 

1225 response_code: _types.SSH_AGENT, 

1226 response: bytes | bytearray, 

1227 exc_type: type[Exception], 

1228 exc_pattern: str, 

1229 ) -> None: 

1230 """Fail on problems during key listing. 

1231 

1232 Known problems: 

1233 

1234 - The agent refuses, or otherwise indicates the operation 

1235 failed. 

1236 - The agent response is truncated. 

1237 - The agent response is overlong. 

1238 

1239 """ 

1240 del running_ssh_agent 1d

1241 

1242 passed_response_code = response_code 1d

1243 

1244 # TODO(the-13th-letter): Extract this mock function into a common 

1245 # top-level "request" mock function. 

1246 def request( 1d

1247 request_code: int | _types.SSH_AGENTC, 

1248 payload: bytes | bytearray, 

1249 /, 

1250 *, 

1251 response_code: Iterable[int | _types.SSH_AGENT] 

1252 | int 

1253 | _types.SSH_AGENT 

1254 | None = None, 

1255 ) -> tuple[int, bytes | bytearray] | bytes | bytearray: 

1256 del request_code 1d

1257 del payload 1d

1258 if isinstance( # pragma: no branch 1d

1259 response_code, (int, _types.SSH_AGENT) 

1260 ): 

1261 response_code = frozenset({response_code}) 1d

1262 if response_code is not None: # pragma: no branch 1d

1263 response_code = frozenset({ 1d

1264 c if isinstance(c, int) else c.value for c in response_code 

1265 }) 

1266 

1267 if not response_code: # pragma: no cover 1d

1268 return (passed_response_code.value, response) 

1269 if passed_response_code.value not in response_code: 1d

1270 raise ssh_agent.SSHAgentFailedError( 1d

1271 passed_response_code.value, response 

1272 ) 

1273 return response 1d

1274 

1275 with pytest.MonkeyPatch.context() as monkeypatch: 1d

1276 client = ssh_agent.SSHAgentClient() 1d

1277 monkeypatch.setattr(client, 'request', request) 1d

1278 with pytest.raises(exc_type, match=exc_pattern): 1d

1279 client.list_keys() 1d

1280 

1281 @Parametrize.SIGN_ERROR_RESPONSES 

1282 def test_330_sign_error_responses( 

1283 self, 

1284 running_ssh_agent: tests.RunningSSHAgentInfo, 

1285 key: bytes | bytearray, 

1286 check: bool, 

1287 response_code: _types.SSH_AGENT, 

1288 response: bytes | bytearray, 

1289 exc_type: type[Exception], 

1290 exc_pattern: str, 

1291 ) -> None: 

1292 """Fail on problems during signing. 

1293 

1294 Known problems: 

1295 

1296 - The key is not loaded into the agent. 

1297 - The agent refuses, or otherwise indicates the operation 

1298 failed. 

1299 

1300 """ 

1301 del running_ssh_agent 1c

1302 passed_response_code = response_code 1c

1303 

1304 # TODO(the-13th-letter): Extract this mock function into a common 

1305 # top-level "request" mock function. 

1306 def request( 1c

1307 request_code: int | _types.SSH_AGENTC, 

1308 payload: bytes | bytearray, 

1309 /, 

1310 *, 

1311 response_code: Iterable[int | _types.SSH_AGENT] 

1312 | int 

1313 | _types.SSH_AGENT 

1314 | None = None, 

1315 ) -> tuple[int, bytes | bytearray] | bytes | bytearray: 

1316 del request_code 1c

1317 del payload 1c

1318 if isinstance( # pragma: no branch 1c

1319 response_code, (int, _types.SSH_AGENT) 

1320 ): 

1321 response_code = frozenset({response_code}) 1c

1322 if response_code is not None: # pragma: no branch 1c

1323 response_code = frozenset({ 1c

1324 c if isinstance(c, int) else c.value for c in response_code 

1325 }) 

1326 

1327 if not response_code: # pragma: no cover 1c

1328 return (passed_response_code.value, response) 

1329 if ( 1c

1330 passed_response_code.value not in response_code 

1331 ): # pragma: no branch 

1332 raise ssh_agent.SSHAgentFailedError( 1c

1333 passed_response_code.value, response 

1334 ) 

1335 return response # pragma: no cover 

1336 

1337 with pytest.MonkeyPatch.context() as monkeypatch: 1c

1338 client = ssh_agent.SSHAgentClient() 1c

1339 monkeypatch.setattr(client, 'request', request) 1c

1340 Pair = _types.SSHKeyCommentPair # noqa: N806 1c

1341 com = b'no comment' 1c

1342 loaded_keys = [ 1c

1343 Pair(v.public_key_data, com).toreadonly() 

1344 for v in tests.SUPPORTED_KEYS.values() 

1345 ] 

1346 monkeypatch.setattr(client, 'list_keys', lambda: loaded_keys) 1c

1347 with pytest.raises(exc_type, match=exc_pattern): 1c

1348 client.sign(key, b'abc', check_if_key_loaded=check) 1c

1349 

1350 @Parametrize.REQUEST_ERROR_RESPONSES 

1351 def test_340_request_error_responses( 

1352 self, 

1353 running_ssh_agent: tests.RunningSSHAgentInfo, 

1354 request_code: _types.SSH_AGENTC, 

1355 response_code: _types.SSH_AGENT, 

1356 exc_type: type[Exception], 

1357 exc_pattern: str, 

1358 ) -> None: 

1359 """Fail on problems during signing. 

1360 

1361 Known problems: 

1362 

1363 - The key is not loaded into the agent. 

1364 - The agent refuses, or otherwise indicates the operation 

1365 failed. 

1366 

1367 """ 

1368 del running_ssh_agent 1y

1369 

1370 # TODO(the-13th-letter): Rewrite using parenthesized 

1371 # with-statements. 

1372 # https://the13thletter.info/derivepassphrase/latest/pycompatibility/#after-eol-py3.9 

1373 with contextlib.ExitStack() as stack: 1y

1374 stack.enter_context(pytest.raises(exc_type, match=exc_pattern)) 1y

1375 client = stack.enter_context(ssh_agent.SSHAgentClient()) 1y

1376 client.request(request_code, b'', response_code=response_code) 1y

1377 

1378 @Parametrize.QUERY_EXTENSIONS_MALFORMED_RESPONSES 

1379 def test_350_query_extensions_malformed_responses( 

1380 self, 

1381 monkeypatch: pytest.MonkeyPatch, 

1382 running_ssh_agent: tests.RunningSSHAgentInfo, 

1383 response_data: bytes, 

1384 ) -> None: 

1385 """Fail on malformed responses while querying extensions.""" 

1386 del running_ssh_agent 1f

1387 

1388 # TODO(the-13th-letter): Extract this mock function into a common 

1389 # top-level "request" mock function after removing the 

1390 # payload-specific parts. 

1391 def request( 1f

1392 code: int | _types.SSH_AGENTC, 

1393 payload: Buffer, 

1394 /, 

1395 *, 

1396 response_code: ( 

1397 Iterable[_types.SSH_AGENT | int] 

1398 | _types.SSH_AGENT 

1399 | int 

1400 | None 

1401 ) = None, 

1402 ) -> tuple[int, bytes] | bytes: 

1403 request_codes = { 1f

1404 _types.SSH_AGENTC.EXTENSION, 

1405 _types.SSH_AGENTC.EXTENSION.value, 

1406 } 

1407 assert code in request_codes 1f

1408 response_codes = { 1f

1409 _types.SSH_AGENT.EXTENSION_RESPONSE, 

1410 _types.SSH_AGENT.EXTENSION_RESPONSE.value, 

1411 _types.SSH_AGENT.SUCCESS, 

1412 _types.SSH_AGENT.SUCCESS.value, 

1413 } 

1414 assert payload == b'\x00\x00\x00\x05query' 1f

1415 if response_code is None: # pragma: no cover 1f

1416 return ( 

1417 _types.SSH_AGENT.EXTENSION_RESPONSE.value, 

1418 response_data, 

1419 ) 

1420 if isinstance( # pragma: no cover 1f

1421 response_code, (_types.SSH_AGENT, int) 

1422 ): 

1423 assert response_code in response_codes 

1424 return response_data 

1425 for single_code in response_code: # pragma: no cover 1f

1426 assert single_code in response_codes 1f

1427 return response_data # pragma: no cover 1f

1428 

1429 # TODO(the-13th-letter): Rewrite using parenthesized 

1430 # with-statements. 

1431 # https://the13thletter.info/derivepassphrase/latest/pycompatibility/#after-eol-py3.9 

1432 with contextlib.ExitStack() as stack: 1f

1433 monkeypatch2 = stack.enter_context(monkeypatch.context()) 1f

1434 client = stack.enter_context(ssh_agent.SSHAgentClient()) 1f

1435 monkeypatch2.setattr(client, 'request', request) 1f

1436 with pytest.raises( 1f

1437 RuntimeError, 

1438 match=r'Malformed response|does not match request', 

1439 ): 

1440 client.query_extensions() 1f