Coverage for tests\test_derivepassphrase_ssh_agent.py: 97.137%
420 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 12:17 +0200
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 12:17 +0200
1# SPDX-FileCopyrightText: 2025 Marco Ricci <software@the13thletter.info>
2#
3# SPDX-License-Identifier: Zlib
5"""Test OpenSSH key loading and signing."""
7from __future__ import annotations
9import base64
10import contextlib
11import errno
12import importlib.metadata
13import io
14import os
15import pathlib
16import re
17import socket
18import sys
19import types
20from typing import TYPE_CHECKING
22import click
23import click.testing
24import hypothesis
25import pytest
26from hypothesis import strategies
28import tests
29from derivepassphrase import _types, ssh_agent, vault
30from derivepassphrase._internals import cli_helpers
31from derivepassphrase.ssh_agent import socketprovider
33if TYPE_CHECKING:
34 from collections.abc import Iterable
36 from typing_extensions import Any, Buffer, Literal
38if sys.version_info < (3, 11):
39 from exceptiongroup import ExceptionGroup
42class Parametrize(types.SimpleNamespace):
43 BAD_ENTRY_POINTS = pytest.mark.parametrize(
44 'additional_entry_points',
45 [
46 pytest.param(
47 [
48 importlib.metadata.EntryPoint(
49 name=tests.faulty_entry_callable.key,
50 group=socketprovider.SocketProvider.ENTRY_POINT_GROUP_NAME,
51 value='tests: faulty_entry_callable',
52 ),
53 ],
54 id='not-callable',
55 ),
56 pytest.param(
57 [
58 importlib.metadata.EntryPoint(
59 name=tests.faulty_entry_name_exists.key,
60 group=socketprovider.SocketProvider.ENTRY_POINT_GROUP_NAME,
61 value='tests: faulty_entry_name_exists',
62 ),
63 ],
64 id='name-already-exists',
65 ),
66 pytest.param(
67 [
68 importlib.metadata.EntryPoint(
69 name=tests.faulty_entry_alias_exists.key,
70 group=socketprovider.SocketProvider.ENTRY_POINT_GROUP_NAME,
71 value='tests: faulty_entry_alias_exists',
72 ),
73 ],
74 id='alias-already-exists',
75 ),
76 ],
77 )
78 GOOD_ENTRY_POINTS = pytest.mark.parametrize(
79 'additional_entry_points',
80 [
81 pytest.param(
82 [
83 importlib.metadata.EntryPoint(
84 name=tests.posix_entry.key,
85 group=socketprovider.SocketProvider.ENTRY_POINT_GROUP_NAME,
86 value='tests: posix_entry',
87 ),
88 importlib.metadata.EntryPoint(
89 name=tests.the_annoying_os_entry.key,
90 group=socketprovider.SocketProvider.ENTRY_POINT_GROUP_NAME,
91 value='tests: the_annoying_os_entry',
92 ),
93 ],
94 id='existing-entries',
95 ),
96 pytest.param(
97 [
98 importlib.metadata.EntryPoint(
99 name=tests.provider_entry1.key,
100 group=socketprovider.SocketProvider.ENTRY_POINT_GROUP_NAME,
101 value='tests: provider_entry1',
102 ),
103 importlib.metadata.EntryPoint(
104 name=tests.provider_entry2.key,
105 group=socketprovider.SocketProvider.ENTRY_POINT_GROUP_NAME,
106 value='tests: provider_entry2',
107 ),
108 ],
109 id='new-entries',
110 ),
111 ],
112 )
113 SSH_STRING_EXCEPTIONS = pytest.mark.parametrize(
114 ['input', 'exc_type', 'exc_pattern'],
115 [
116 pytest.param(
117 'some string', TypeError, 'invalid payload type', id='str'
118 ),
119 ],
120 )
121 UINT32_EXCEPTIONS = pytest.mark.parametrize(
122 ['input', 'exc_type', 'exc_pattern'],
123 [
124 pytest.param(
125 10000000000000000,
126 OverflowError,
127 'int too big to convert',
128 id='10000000000000000',
129 ),
130 pytest.param(
131 -1,
132 OverflowError,
133 "can't convert negative int to unsigned",
134 id='-1',
135 ),
136 ],
137 )
138 SSH_UNSTRING_EXCEPTIONS = pytest.mark.parametrize(
139 ['input', 'exc_type', 'exc_pattern', 'has_trailer', 'parts'],
140 [
141 pytest.param(
142 b'ssh',
143 ValueError,
144 'malformed SSH byte string',
145 False,
146 None,
147 id='unencoded',
148 ),
149 pytest.param(
150 b'\x00\x00\x00\x08ssh-rsa',
151 ValueError,
152 'malformed SSH byte string',
153 False,
154 None,
155 id='truncated',
156 ),
157 pytest.param(
158 b'\x00\x00\x00\x04XXX trailing text',
159 ValueError,
160 'malformed SSH byte string',
161 True,
162 (b'XXX ', b'trailing text'),
163 id='trailing-data',
164 ),
165 ],
166 )
167 SSH_STRING_INPUT = pytest.mark.parametrize(
168 ['input', 'expected'],
169 [
170 pytest.param(
171 b'ssh-rsa',
172 b'\x00\x00\x00\x07ssh-rsa',
173 id='ssh-rsa',
174 ),
175 pytest.param(
176 b'ssh-ed25519',
177 b'\x00\x00\x00\x0bssh-ed25519',
178 id='ssh-ed25519',
179 ),
180 pytest.param(
181 ssh_agent.SSHAgentClient.string(b'ssh-ed25519'),
182 b'\x00\x00\x00\x0f\x00\x00\x00\x0bssh-ed25519',
183 id='string(ssh-ed25519)',
184 ),
185 ],
186 )
187 SSH_UNSTRING_INPUT = pytest.mark.parametrize(
188 ['input', 'expected'],
189 [
190 pytest.param(
191 b'\x00\x00\x00\x07ssh-rsa',
192 b'ssh-rsa',
193 id='ssh-rsa',
194 ),
195 pytest.param(
196 ssh_agent.SSHAgentClient.string(b'ssh-ed25519'),
197 b'ssh-ed25519',
198 id='ssh-ed25519',
199 ),
200 ],
201 )
202 UINT32_INPUT = pytest.mark.parametrize(
203 ['input', 'expected'],
204 [
205 pytest.param(16777216, b'\x01\x00\x00\x00', id='16777216'),
206 ],
207 )
208 SIGN_ERROR_RESPONSES = pytest.mark.parametrize(
209 [
210 'key',
211 'check',
212 'response_code',
213 'response',
214 'exc_type',
215 'exc_pattern',
216 ],
217 [
218 pytest.param(
219 b'invalid-key',
220 True,
221 _types.SSH_AGENT.FAILURE,
222 b'',
223 KeyError,
224 'target SSH key not loaded into agent',
225 id='key-not-loaded',
226 ),
227 pytest.param(
228 tests.SUPPORTED_KEYS['ed25519'].public_key_data,
229 True,
230 _types.SSH_AGENT.FAILURE,
231 b'',
232 ssh_agent.SSHAgentFailedError,
233 'failed to complete the request',
234 id='failed-to-complete',
235 ),
236 ],
237 )
238 SSH_KEY_SELECTION = pytest.mark.parametrize(
239 ['key', 'single'],
240 [
241 (value.public_key_data, False)
242 for value in tests.SUPPORTED_KEYS.values()
243 ]
244 + [(tests.list_keys_singleton()[0].key, True)],
245 ids=[*tests.SUPPORTED_KEYS.keys(), 'singleton'],
246 )
247 SH_EXPORT_LINES = pytest.mark.parametrize(
248 ['line', 'env_name', 'value'],
249 [
250 pytest.param(
251 'SSH_AUTH_SOCK=/tmp/pageant.user/pageant.27170; export SSH_AUTH_SOCK;',
252 'SSH_AUTH_SOCK',
253 '/tmp/pageant.user/pageant.27170',
254 id='value-export-semicolon-pageant',
255 ),
256 pytest.param(
257 'SSH_AUTH_SOCK=/tmp/ssh-3CSTC1W5M22A/agent.27270; export SSH_AUTH_SOCK;',
258 'SSH_AUTH_SOCK',
259 '/tmp/ssh-3CSTC1W5M22A/agent.27270',
260 id='value-export-semicolon-openssh',
261 ),
262 pytest.param(
263 'SSH_AUTH_SOCK=/tmp/pageant.user/pageant.27170; export SSH_AUTH_SOCK',
264 'SSH_AUTH_SOCK',
265 '/tmp/pageant.user/pageant.27170',
266 id='value-export-pageant',
267 ),
268 pytest.param(
269 'export SSH_AUTH_SOCK=/tmp/ssh-3CSTC1W5M22A/agent.27270;',
270 'SSH_AUTH_SOCK',
271 '/tmp/ssh-3CSTC1W5M22A/agent.27270',
272 id='export-value-semicolon-openssh',
273 ),
274 pytest.param(
275 'export SSH_AUTH_SOCK=/tmp/pageant.user/pageant.27170',
276 'SSH_AUTH_SOCK',
277 '/tmp/pageant.user/pageant.27170',
278 id='export-value-pageant',
279 ),
280 pytest.param(
281 'SSH_AGENT_PID=27170; export SSH_AGENT_PID;',
282 'SSH_AGENT_PID',
283 '27170',
284 id='pid-export-semicolon',
285 ),
286 pytest.param(
287 'SSH_AGENT_PID=27170; export SSH_AGENT_PID',
288 'SSH_AGENT_PID',
289 '27170',
290 id='pid-export',
291 ),
292 pytest.param(
293 'export SSH_AGENT_PID=27170;',
294 'SSH_AGENT_PID',
295 '27170',
296 id='export-pid-semicolon',
297 ),
298 pytest.param(
299 'export SSH_AGENT_PID=27170',
300 'SSH_AGENT_PID',
301 '27170',
302 id='export-pid',
303 ),
304 pytest.param(
305 'export VARIABLE=value; export OTHER_VARIABLE=other_value;',
306 'VARIABLE',
307 None,
308 id='export-too-much',
309 ),
310 pytest.param(
311 'VARIABLE=value',
312 'VARIABLE',
313 None,
314 id='no-export',
315 ),
316 ],
317 )
318 PUBLIC_KEY_DATA = pytest.mark.parametrize(
319 'public_key_struct',
320 list(tests.SUPPORTED_KEYS.values()),
321 ids=list(tests.SUPPORTED_KEYS.keys()),
322 )
323 REQUEST_ERROR_RESPONSES = pytest.mark.parametrize(
324 ['request_code', 'response_code', 'exc_type', 'exc_pattern'],
325 [
326 pytest.param(
327 _types.SSH_AGENTC.REQUEST_IDENTITIES,
328 _types.SSH_AGENT.SUCCESS,
329 ssh_agent.SSHAgentFailedError,
330 re.escape(
331 f'[Code {_types.SSH_AGENT.IDENTITIES_ANSWER.value}]'
332 ),
333 id='REQUEST_IDENTITIES-expect-SUCCESS',
334 ),
335 ],
336 )
337 TRUNCATED_AGENT_RESPONSES = pytest.mark.parametrize(
338 'response',
339 [
340 b'\x00\x00',
341 b'\x00\x00\x00\x1f some bytes missing',
342 ],
343 ids=['in-header', 'in-body'],
344 )
345 LIST_KEYS_ERROR_RESPONSES = pytest.mark.parametrize(
346 ['response_code', 'response', 'exc_type', 'exc_pattern'],
347 [
348 pytest.param(
349 _types.SSH_AGENT.FAILURE,
350 b'',
351 ssh_agent.SSHAgentFailedError,
352 'failed to complete the request',
353 id='failed-to-complete',
354 ),
355 pytest.param(
356 _types.SSH_AGENT.IDENTITIES_ANSWER,
357 b'\x00\x00\x00\x01',
358 EOFError,
359 'truncated response',
360 id='truncated-response',
361 ),
362 pytest.param(
363 _types.SSH_AGENT.IDENTITIES_ANSWER,
364 b'\x00\x00\x00\x00abc',
365 ssh_agent.TrailingDataError,
366 'Overlong response',
367 id='overlong-response',
368 ),
369 ],
370 )
371 QUERY_EXTENSIONS_MALFORMED_RESPONSES = pytest.mark.parametrize(
372 'response_data',
373 [
374 pytest.param(b'\xde\xad\xbe\xef', id='truncated'),
375 pytest.param(
376 b'\x00\x00\x00\x0fwrong extension', id='wrong-extension'
377 ),
378 pytest.param(
379 b'\x00\x00\x00\x05query\xde\xad\xbe\xef', id='with-trailer'
380 ),
381 pytest.param(
382 b'\x00\x00\x00\x05query\x00\x00\x00\x04ext1\x00\x00',
383 id='with-extra-fields',
384 ),
385 ],
386 )
387 SUPPORTED_SSH_TEST_KEYS = pytest.mark.parametrize(
388 ['ssh_test_key_type', 'ssh_test_key'],
389 list(tests.SUPPORTED_KEYS.items()),
390 ids=tests.SUPPORTED_KEYS.keys(),
391 )
392 UNSUITABLE_SSH_TEST_KEYS = pytest.mark.parametrize(
393 ['ssh_test_key_type', 'ssh_test_key'],
394 list(tests.UNSUITABLE_KEYS.items()),
395 ids=tests.UNSUITABLE_KEYS.keys(),
396 )
397 INVALID_SSH_AGENT_MESSAGES = pytest.mark.parametrize(
398 'message',
399 [
400 pytest.param(b'\x00\x00\x00\x00', id='empty-message'),
401 pytest.param(b'\x00\x00\x00\x0f\x0d', id='truncated-message'),
402 pytest.param(
403 b'\x00\x00\x00\x06\x1b\x00\x00\x00\x01\xff',
404 id='invalid-extension-name',
405 ),
406 pytest.param(
407 b'\x00\x00\x00\x11\x0d\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
408 id='sign-with-trailing-data',
409 ),
410 ],
411 )
412 UNSUPPORTED_SSH_AGENT_MESSAGES = pytest.mark.parametrize(
413 'message',
414 [
415 pytest.param(
416 ssh_agent.SSHAgentClient.string(
417 b''.join([
418 b'\x0d',
419 ssh_agent.SSHAgentClient.string(
420 tests.ALL_KEYS['rsa'].public_key_data
421 ),
422 ssh_agent.SSHAgentClient.string(vault.Vault.UUID),
423 b'\x00\x00\x00\x02',
424 ])
425 ),
426 id='sign-with-flags',
427 ),
428 pytest.param(
429 ssh_agent.SSHAgentClient.string(
430 b''.join([
431 b'\x0d',
432 ssh_agent.SSHAgentClient.string(
433 tests.ALL_KEYS['ed25519'].public_key_data
434 ),
435 b'\x00\x00\x00\x08\x00\x01\x02\x03\x04\x05\x06\x07',
436 b'\x00\x00\x00\x00',
437 ])
438 ),
439 id='sign-with-nonstandard-passphrase',
440 ),
441 pytest.param(
442 ssh_agent.SSHAgentClient.string(
443 b''.join([
444 b'\x0d',
445 ssh_agent.SSHAgentClient.string(
446 tests.ALL_KEYS['dsa1024'].public_key_data
447 ),
448 ssh_agent.SSHAgentClient.string(vault.Vault.UUID),
449 b'\x00\x00\x00\x00',
450 ])
451 ),
452 id='sign-key-no-expected-signature',
453 ),
454 pytest.param(
455 ssh_agent.SSHAgentClient.string(
456 b''.join([
457 b'\x0d',
458 b'\x00\x00\x00\x00',
459 ssh_agent.SSHAgentClient.string(vault.Vault.UUID),
460 b'\x00\x00\x00\x00',
461 ])
462 ),
463 id='sign-key-unregistered-test-key',
464 ),
465 ],
466 )
467 FAKE_AGENT_ADDRESSES = pytest.mark.parametrize(
468 ['address', 'exception', 'match'],
469 [
470 pytest.param(None, KeyError, 'SSH_AUTH_SOCK', id='unset'),
471 pytest.param('fake-ssh-agent:', None, '', id='standard'),
472 pytest.param(
473 str(pathlib.Path('~').expanduser()),
474 FileNotFoundError,
475 os.strerror(errno.ENOENT),
476 id='invalid-url',
477 ),
478 pytest.param(
479 'fake-ssh-agent:EPROTONOSUPPORT',
480 OSError,
481 os.strerror(errno.EPROTONOSUPPORT),
482 id='protocol-not-supported',
483 ),
484 pytest.param(
485 'fake-ssh-agent:ABCDEFGHIJKLMNOPQRSTUVWXYZ',
486 OSError,
487 os.strerror(errno.EINVAL),
488 id='invalid-error-code',
489 ),
490 ],
491 )
494class TestTestingMachineryFakeSSHAgentSocket:
495 """Test the fake SSH agent socket for the `ssh_agent` module tests."""
497 def test_100_query_extensions(self) -> None:
498 """The agent implements a known list of extensions."""
499 with tests.FakeSSHAgentSocket() as agent: 1C
500 agent.sendall(b'\x00\x00\x00\x0a\x1b\x00\x00\x00\x05query') 1C
501 assert ( 1C
502 agent.recv(1000)
503 == b'\x00\x00\x00\x40\x1d\x00\x00\x00\x05query\x00\x00\x00\x05query\x00\x00\x00\x29list-extended@putty.projects.tartarus.org'
504 )
506 def test_101_request_identities(self) -> None:
507 """The agent implements a known list of identities."""
508 unstring_prefix = ssh_agent.SSHAgentClient.unstring_prefix 1g
509 with tests.FakeSSHAgentSocket() as agent: 1g
510 agent.sendall(b'\x00\x00\x00\x01\x0b') 1g
511 message_length = int.from_bytes(agent.recv(4), 'big') 1g
512 orig_message: bytes | bytearray = bytearray( 1g
513 agent.recv(message_length)
514 )
515 assert ( 1g
516 _types.SSH_AGENT(orig_message[0])
517 == _types.SSH_AGENT.IDENTITIES_ANSWER
518 )
519 identity_count = int.from_bytes(orig_message[1:5], 'big') 1g
520 message = bytes(orig_message[5:]) 1g
521 for _ in range(identity_count): 1g
522 key, message = unstring_prefix(message) 1g
523 _comment, message = unstring_prefix(message) 1g
524 assert key 1g
525 assert key in { 1g
526 k.public_key_data for k in tests.ALL_KEYS.values()
527 }
528 assert not message 1g
530 @Parametrize.SUPPORTED_SSH_TEST_KEYS
531 def test_102_sign(
532 self,
533 ssh_test_key_type: str,
534 ssh_test_key: tests.SSHTestKey,
535 ) -> None:
536 """The agent signs known key/message pairs."""
537 del ssh_test_key_type 1q
538 assert ssh_test_key.expected_signature is not None 1q
539 string = ssh_agent.SSHAgentClient.string 1q
540 with tests.FakeSSHAgentSocket() as agent: 1q
541 agent.sendall( 1q
542 string(
543 b'\x0d'
544 + string(ssh_test_key.public_key_data)
545 + string(vault.Vault.UUID)
546 + b'\x00\x00\x00\x00'
547 )
548 )
549 assert agent.recv(1000) == string( 1q
550 b'\x0e' + string(ssh_test_key.expected_signature)
551 )
553 def test_120_close_multiple(self) -> None:
554 """The agent can be closed repeatedly."""
555 with tests.FakeSSHAgentSocket() as agent: 1s
556 pass 1s
557 with tests.FakeSSHAgentSocket() as agent: 1s
558 pass 1s
559 del agent 1s
561 def test_121_closed_agents_cannot_be_interacted_with(self) -> None:
562 """The agent can be closed repeatedly."""
563 with tests.FakeSSHAgentSocket() as agent: 1t
564 pass 1t
565 with pytest.raises( 1t
566 ValueError,
567 match=re.escape(tests.FakeSSHAgentSocket._SOCKET_IS_CLOSED),
568 ):
569 agent.sendall(b'\x00\x00\x00\x0a\x23\x00\x00\x00\x05query') 1t
570 assert agent.recv(100) == b'' 1t
572 def test_122_no_recv_without_sendall(self) -> None:
573 """The agent requires a message before sending a response."""
574 with tests.FakeSSHAgentSocket() as agent: # noqa: SIM117 1D
575 with pytest.raises( 1D
576 AssertionError,
577 match=re.escape(tests.FakeSSHAgentSocket._PROTOCOL_VIOLATION),
578 ):
579 agent.recv(100) 1D
581 @Parametrize.INVALID_SSH_AGENT_MESSAGES
582 def test_123_invalid_ssh_agent_messages(
583 self,
584 message: Buffer,
585 ) -> None:
586 """The agent responds with errors on invalid messages."""
587 with tests.FakeSSHAgentSocket() as agent: 1E
588 agent.sendall(message) 1E
589 assert agent.recv(100) == b'\x00\x00\x00\x01\x05' 1E
591 @Parametrize.UNSUPPORTED_SSH_AGENT_MESSAGES
592 def test_124_unsupported_ssh_agent_messages(
593 self,
594 message: Buffer,
595 ) -> None:
596 """The agent responds with errors on unsupported messages."""
597 with tests.FakeSSHAgentSocket() as agent: 1F
598 agent.sendall(message) 1F
599 assert agent.recv(100) == b'\x00\x00\x00\x01\x05' 1F
601 @Parametrize.FAKE_AGENT_ADDRESSES
602 def test_125_addresses(
603 self,
604 address: str | None,
605 exception: type[Exception] | None,
606 match: str,
607 ) -> None:
608 """The agent accepts addresses."""
609 with contextlib.ExitStack() as stack: 1m
610 monkeypatch = stack.enter_context(pytest.MonkeyPatch.context()) 1m
611 if address: 1m
612 monkeypatch.setenv('SSH_AUTH_SOCK', address) 1m
613 else:
614 monkeypatch.delenv('SSH_AUTH_SOCK', raising=False) 1m
615 if exception: 1m
616 stack.enter_context( 1m
617 pytest.raises(exception, match=re.escape(match))
618 )
619 tests.FakeSSHAgentSocketWithAddress() 1m
622class TestStaticFunctionality:
623 """Test the static functionality of the `ssh_agent` module."""
625 @staticmethod
626 def as_ssh_string(bytestring: bytes) -> bytes:
627 """Return an encoded SSH string from a bytestring.
629 This is a helper function for hypothesis data generation.
631 """
632 return int.to_bytes(len(bytestring), 4, 'big') + bytestring 1k
634 @staticmethod
635 def canonicalize1(data: bytes) -> bytes:
636 """Return an encoded SSH string from a bytestring.
638 This is a helper function for hypothesis testing.
640 References:
642 * [David R. MacIver: Another invariant to test for
643 encoders][DECODE_ENCODE]
645 [DECODE_ENCODE]: https://hypothesis.works/articles/canonical-serialization/
647 """
648 return ssh_agent.SSHAgentClient.string( 1k
649 ssh_agent.SSHAgentClient.unstring(data)
650 )
652 @staticmethod
653 def canonicalize2(data: bytes) -> bytes:
654 """Return an encoded SSH string from a bytestring.
656 This is a helper function for hypothesis testing.
658 References:
660 * [David R. MacIver: Another invariant to test for
661 encoders][DECODE_ENCODE]
663 [DECODE_ENCODE]: https://hypothesis.works/articles/canonical-serialization/
665 """
666 unstringed, trailer = ssh_agent.SSHAgentClient.unstring_prefix(data) 1k
667 assert not trailer 1k
668 return ssh_agent.SSHAgentClient.string(unstringed) 1k
670 # TODO(the-13th-letter): Re-evaluate if this check is worth keeping.
671 # It cannot provide true tamper-resistence, but probably appears to.
672 @Parametrize.PUBLIC_KEY_DATA
673 def test_100_key_decoding(
674 self,
675 public_key_struct: tests.SSHTestKey,
676 ) -> None:
677 """The [`tests.ALL_KEYS`][] public key data looks sane."""
678 keydata = base64.b64decode( 1K
679 public_key_struct.public_key.split(None, 2)[1]
680 )
681 assert keydata == public_key_struct.public_key_data, ( 1K
682 "recorded public key data doesn't match"
683 )
685 @Parametrize.SH_EXPORT_LINES
686 def test_190_sh_export_line_parsing(
687 self, line: str, env_name: str, value: str | None
688 ) -> None:
689 """[`tests.parse_sh_export_line`][] works."""
690 if value is not None: 1z
691 assert tests.parse_sh_export_line(line, env_name=env_name) == value 1z
692 else:
693 with pytest.raises(ValueError, match='Cannot parse sh line:'): 1z
694 tests.parse_sh_export_line(line, env_name=env_name) 1z
696 def test_200_constructor_posix_no_ssh_auth_sock(
697 self,
698 skip_if_no_af_unix_support: None,
699 ) -> None:
700 """Abort if the running agent cannot be located."""
701 del skip_if_no_af_unix_support
702 posix_handler = socketprovider.SocketProvider.resolve('posix')
703 with pytest.MonkeyPatch.context() as monkeypatch:
704 monkeypatch.delenv('SSH_AUTH_SOCK', raising=False)
705 with pytest.raises(
706 KeyError, match='SSH_AUTH_SOCK environment variable'
707 ):
708 posix_handler()
710 @Parametrize.UINT32_INPUT
711 def test_210_uint32(self, input: int, expected: bytes | bytearray) -> None:
712 """`uint32` encoding works."""
713 uint32 = ssh_agent.SSHAgentClient.uint32 1L
714 assert uint32(input) == expected 1L
716 @hypothesis.given(strategies.integers(min_value=0, max_value=0xFFFFFFFF))
717 @hypothesis.example(0xDEADBEEF).via('manual, pre-hypothesis example') 1aG
718 def test_210a_uint32_from_number(self, num: int) -> None:
719 """`uint32` encoding works, starting from numbers."""
720 uint32 = ssh_agent.SSHAgentClient.uint32 1G
721 assert int.from_bytes(uint32(num), 'big', signed=False) == num 1G
723 @hypothesis.given(strategies.binary(min_size=4, max_size=4))
724 @hypothesis.example(b'\xde\xad\xbe\xef').via( 1aH
725 'manual, pre-hypothesis example'
726 )
727 def test_210b_uint32_from_bytestring(self, bytestring: bytes) -> None:
728 """`uint32` encoding works, starting from length four byte strings."""
729 uint32 = ssh_agent.SSHAgentClient.uint32 1H
730 assert ( 1H
731 uint32(int.from_bytes(bytestring, 'big', signed=False))
732 == bytestring
733 )
735 @Parametrize.SSH_STRING_INPUT
736 def test_211_string(
737 self, input: bytes | bytearray, expected: bytes | bytearray
738 ) -> None:
739 """SSH string encoding works."""
740 string = ssh_agent.SSHAgentClient.string 1M
741 assert bytes(string(input)) == expected 1M
743 @hypothesis.given(strategies.binary(max_size=0x0001FFFF))
744 @hypothesis.example(b'DEADBEEF' * 10000).via( 1au
745 'manual, pre-hypothesis example with highest order bit set'
746 )
747 def test_211a_string_from_bytestring(self, bytestring: bytes) -> None:
748 """SSH string encoding works, starting from a byte string."""
749 res = ssh_agent.SSHAgentClient.string(bytestring) 1u
750 assert res.startswith((b'\x00\x00', b'\x00\x01')) 1u
751 assert int.from_bytes(res[:4], 'big', signed=False) == len(bytestring) 1u
752 assert res[4:] == bytestring 1u
754 @Parametrize.SSH_UNSTRING_INPUT
755 def test_212_unstring(
756 self, input: bytes | bytearray, expected: bytes | bytearray
757 ) -> None:
758 """SSH string decoding works."""
759 unstring = ssh_agent.SSHAgentClient.unstring 1A
760 unstring_prefix = ssh_agent.SSHAgentClient.unstring_prefix 1A
761 assert bytes(unstring(input)) == expected 1A
762 assert tuple(bytes(x) for x in unstring_prefix(input)) == ( 1A
763 expected,
764 b'',
765 )
767 @hypothesis.given(strategies.binary(max_size=0x00FFFFFF))
768 @hypothesis.example(b'\x00\x00\x00\x07ssh-rsa').via( 1al
769 'manual, pre-hypothesis example to attempt to detect double-decoding'
770 )
771 @hypothesis.example(b'\x00\x00\x00\x01').via(
772 'detect no-op encoding via ill-formed SSH string'
773 )
774 def test_212a_unstring_of_string_of_data(self, bytestring: bytes) -> None:
775 """SSH string decoding of encoded SSH strings works.
777 References:
779 * [David R. MacIver: The Encode/Decode invariant][ENCODE_DECODE]
781 [ENCODE_DECODE]: https://hypothesis.works/articles/encode-decode-invariant/
783 """
784 string = ssh_agent.SSHAgentClient.string 1l
785 unstring = ssh_agent.SSHAgentClient.unstring 1l
786 unstring_prefix = ssh_agent.SSHAgentClient.unstring_prefix 1l
787 encoded = string(bytestring) 1l
788 assert unstring(encoded) == bytestring 1l
789 assert unstring_prefix(encoded) == (bytestring, b'') 1l
790 trailing_data = b' trailing data' 1l
791 encoded2 = string(bytestring) + trailing_data 1l
792 assert unstring_prefix(encoded2) == (bytestring, trailing_data) 1l
794 @hypothesis.given(
795 strategies.binary(max_size=0x00FFFFFF).map(
796 # Scoping issues, and the fact that staticmethod objects
797 # (before class finalization) are not callable, necessitate
798 # wrapping this staticmethod call in a lambda.
799 lambda x: TestStaticFunctionality.as_ssh_string(x) # noqa: PLW0108
800 ),
801 )
802 def test_212b_string_of_unstring_of_data(self, encoded: bytes) -> None:
803 """SSH string decoding of encoded SSH strings works.
805 References:
807 * [David R. MacIver: Another invariant to test for
808 encoders][DECODE_ENCODE]
810 [DECODE_ENCODE]: https://hypothesis.works/articles/canonical-serialization/
812 """
813 canonical_functions = [self.canonicalize1, self.canonicalize2] 1k
814 for canon1 in canonical_functions: 1k
815 for canon2 in canonical_functions: 1k
816 assert canon1(encoded) == canon2(encoded) 1k
817 assert canon1(canon2(encoded)) == canon1(encoded) 1k
819 def test_220_registry_resolve(
820 self,
821 ) -> None:
822 """Resolving entries in the socket provider registry works."""
823 registry = socketprovider.SocketProvider.registry 1o
824 resolve = socketprovider.SocketProvider.resolve 1o
825 with pytest.MonkeyPatch.context() as monkeypatch: 1o
826 monkeypatch.setitem(registry, 'fake', None) 1o
827 assert callable(resolve('native')) 1o
828 with pytest.raises(NotImplementedError): 1o
829 resolve('fake') 1o
831 @hypothesis.given(
832 terminal=strategies.sampled_from([
833 'unimplemented',
834 'alias',
835 'callable',
836 ]),
837 chain=strategies.lists(
838 strategies.sampled_from([
839 'c1',
840 'c2',
841 'c3',
842 'c4',
843 'c5',
844 'c6',
845 'c7',
846 'c8',
847 'c9',
848 'c10',
849 ]),
850 min_size=1,
851 unique=True,
852 ),
853 )
854 def test_221_registry_resolve_chains(
855 self,
856 terminal: Literal['unimplemented', 'alias', 'callable'],
857 chain: list[str],
858 ) -> None:
859 """Resolving a chain of providers works."""
860 registry = socketprovider.SocketProvider.registry 1h
861 resolve = socketprovider.SocketProvider.resolve 1h
862 try: 1h
863 implementation = resolve('native') 1h
864 except NotImplementedError: # pragma: no cover
865 hypothesis.note(f'{registry = }')
866 pytest.fail('Native SSH agent socket provider is unavailable?!')
867 # TODO(the-13th-letter): Rewrite using structural pattern matching.
868 # https://the13thletter.info/derivepassphrase/latest/pycompatibility/#after-eol-py3.9
869 target = ( 1h
870 None
871 if terminal == 'unimplemented'
872 else 'native'
873 if terminal == 'alias'
874 else implementation
875 )
876 with pytest.MonkeyPatch.context() as monkeypatch: 1h
877 for link in chain: 1h
878 monkeypatch.setitem(registry, link, target) 1h
879 target = link 1h
880 if terminal == 'unimplemented': 1h
881 with pytest.raises(NotImplementedError): 1h
882 resolve(chain[-1]) 1h
883 else:
884 assert resolve(chain[-1]) == implementation 1h
886 @Parametrize.GOOD_ENTRY_POINTS
887 def test_230_find_all_socket_providers(
888 self,
889 additional_entry_points: list[importlib.metadata.EntryPoint],
890 ) -> None:
891 """Finding all SSH agent socket providers works."""
892 resolve = socketprovider.SocketProvider.resolve 1p
893 old_registry = socketprovider.SocketProvider.registry 1p
894 with tests.faked_entry_point_list( 1p
895 additional_entry_points, remove_conflicting_entries=False
896 ) as names:
897 socketprovider.SocketProvider._find_all_ssh_agent_socket_providers() 1p
898 for name in names: 1p
899 assert name in socketprovider.SocketProvider.registry 1p
900 assert resolve(name) in { 1p
901 tests.provider_entry_provider,
902 *old_registry.values(),
903 }
905 @Parametrize.BAD_ENTRY_POINTS
906 def test_231_find_all_socket_providers_errors(
907 self,
908 additional_entry_points: list[importlib.metadata.EntryPoint],
909 ) -> None:
910 """Finding faulty SSH agent socket providers raises errors."""
911 with contextlib.ExitStack() as stack: 1B
912 stack.enter_context( 1B
913 tests.faked_entry_point_list(
914 additional_entry_points, remove_conflicting_entries=False
915 )
916 )
917 stack.enter_context(pytest.raises(AssertionError)) 1B
918 socketprovider.SocketProvider._find_all_ssh_agent_socket_providers() 1B
920 @Parametrize.UINT32_EXCEPTIONS
921 def test_310_uint32_exceptions(
922 self, input: int, exc_type: type[Exception], exc_pattern: str
923 ) -> None:
924 """`uint32` encoding fails for out-of-bound values."""
925 uint32 = ssh_agent.SSHAgentClient.uint32 1I
926 with pytest.raises(exc_type, match=exc_pattern): 1I
927 uint32(input) 1I
929 @Parametrize.SSH_STRING_EXCEPTIONS
930 def test_311_string_exceptions(
931 self, input: Any, exc_type: type[Exception], exc_pattern: str
932 ) -> None:
933 """SSH string encoding fails for non-strings."""
934 string = ssh_agent.SSHAgentClient.string 1J
935 with pytest.raises(exc_type, match=exc_pattern): 1J
936 string(input) 1J
938 @Parametrize.SSH_UNSTRING_EXCEPTIONS
939 def test_312_unstring_exceptions(
940 self,
941 input: bytes | bytearray,
942 exc_type: type[Exception],
943 exc_pattern: str,
944 has_trailer: bool,
945 parts: tuple[bytes | bytearray, bytes | bytearray] | None,
946 ) -> None:
947 """SSH string decoding fails for invalid values."""
948 unstring = ssh_agent.SSHAgentClient.unstring 1n
949 unstring_prefix = ssh_agent.SSHAgentClient.unstring_prefix 1n
950 with pytest.raises(exc_type, match=exc_pattern): 1n
951 unstring(input) 1n
952 if has_trailer: 1n
953 assert tuple(bytes(x) for x in unstring_prefix(input)) == parts 1n
954 else:
955 with pytest.raises(exc_type, match=exc_pattern): 1n
956 unstring_prefix(input) 1n
958 def test_320_registry_already_registered(
959 self,
960 ) -> None:
961 """The registry forbids overwriting entries."""
962 registry = socketprovider.SocketProvider.registry.copy() 1e
963 resolve = socketprovider.SocketProvider.resolve 1e
964 register = socketprovider.SocketProvider.register 1e
965 the_annoying_os = resolve('the_annoying_os') 1e
966 posix = resolve('posix') 1e
967 with pytest.MonkeyPatch.context() as monkeypatch: 1e
968 monkeypatch.setattr( 1e
969 socketprovider.SocketProvider, 'registry', registry
970 )
971 register('posix')(posix) 1e
972 register('the_annoying_os')(the_annoying_os) 1e
973 with pytest.raises(ValueError, match='already registered'): 1e
974 register('posix')(the_annoying_os) 1e
975 with pytest.raises(ValueError, match='already registered'): 1e
976 register('the_annoying_os')(posix) 1e
977 with pytest.raises(ValueError, match='already registered'): 1e
978 register('posix', 'the_annoying_os_named_pipe')(posix) 1e
979 with pytest.raises(ValueError, match='already registered'): 1e
980 register('the_annoying_os', 'unix_domain')(the_annoying_os) 1e
982 @hypothesis.given(
983 entry=strategies.text('abcdefghijklmnopqrstuvwxyz0123456789_').filter(
984 lambda s: s not in socketprovider.SocketProvider.registry
985 ),
986 )
987 def test_321_registry_resolve_non_existant_entries(
988 self,
989 entry: str,
990 ) -> None:
991 """Resolving a non-existant entry fails."""
992 with pytest.raises(socketprovider.NoSuchProviderError): 1N
993 socketprovider.SocketProvider.resolve(entry) 1N
996class TestAgentInteraction:
997 """Test actually talking to the SSH agent."""
999 @Parametrize.SUPPORTED_SSH_TEST_KEYS
1000 def test_200_sign_data_via_agent(
1001 self,
1002 ssh_agent_client_with_test_keys_loaded: ssh_agent.SSHAgentClient,
1003 ssh_test_key_type: str,
1004 ssh_test_key: tests.SSHTestKey,
1005 ) -> None:
1006 """Signing data with specific SSH keys works.
1008 Single tests may abort early (skip) if the indicated key is not
1009 loaded in the agent. Presumably this means the key type is
1010 unsupported.
1012 """
1013 client = ssh_agent_client_with_test_keys_loaded 1i
1014 key_comment_pairs = {bytes(k): bytes(c) for k, c in client.list_keys()} 1i
1015 public_key_data = ssh_test_key.public_key_data 1i
1016 expected_signature = ssh_test_key.expected_signature 1i
1017 derived_passphrase = ssh_test_key.derived_passphrase 1i
1018 assert expected_signature is not None 1i
1019 assert derived_passphrase is not None 1i
1020 if public_key_data not in key_comment_pairs: # pragma: no cover 1i
1021 pytest.skip(f'prerequisite {ssh_test_key_type} SSH key not loaded')
1022 signature = bytes( 1i
1023 client.sign(payload=vault.Vault.UUID, key=public_key_data)
1024 )
1025 assert signature == expected_signature, ( 1i
1026 f'SSH signature mismatch ({ssh_test_key_type})'
1027 )
1028 signature2 = bytes( 1i
1029 client.sign(payload=vault.Vault.UUID, key=public_key_data)
1030 )
1031 assert signature2 == expected_signature, ( 1i
1032 f'SSH signature mismatch ({ssh_test_key_type})'
1033 )
1034 assert ( 1i
1035 vault.Vault.phrase_from_key(public_key_data, conn=client)
1036 == derived_passphrase
1037 ), f'SSH signature mismatch ({ssh_test_key_type})'
1039 @Parametrize.UNSUITABLE_SSH_TEST_KEYS
1040 def test_201_sign_data_via_agent_unsupported(
1041 self,
1042 ssh_agent_client_with_test_keys_loaded: ssh_agent.SSHAgentClient,
1043 ssh_test_key_type: str,
1044 ssh_test_key: tests.SSHTestKey,
1045 ) -> None:
1046 """Using an unsuitable key with [`vault.Vault`][] fails.
1048 Single tests may abort early (skip) if the indicated key is not
1049 loaded in the agent. Presumably this means the key type is
1050 unsupported. Single tests may also abort early if the agent
1051 ensures that the generally unsuitable key is actually suitable
1052 under this agent.
1054 """
1055 client = ssh_agent_client_with_test_keys_loaded 1v
1056 key_comment_pairs = {bytes(k): bytes(c) for k, c in client.list_keys()} 1v
1057 public_key_data = ssh_test_key.public_key_data 1v
1058 if public_key_data not in key_comment_pairs: # pragma: no cover 1v
1059 pytest.skip(f'prerequisite {ssh_test_key_type} SSH key not loaded') 1v
1060 assert not vault.Vault.is_suitable_ssh_key(
1061 public_key_data, client=None
1062 ), f'Expected {ssh_test_key_type} key to be unsuitable in general'
1063 if vault.Vault.is_suitable_ssh_key(public_key_data, client=client):
1064 pytest.skip(
1065 f'agent automatically ensures {ssh_test_key_type} key is suitable'
1066 )
1067 with pytest.raises(ValueError, match='unsuitable SSH key'):
1068 vault.Vault.phrase_from_key(public_key_data, conn=client)
1070 @Parametrize.SSH_KEY_SELECTION
1071 def test_210_ssh_key_selector(
1072 self,
1073 monkeypatch: pytest.MonkeyPatch,
1074 ssh_agent_client_with_test_keys_loaded: ssh_agent.SSHAgentClient,
1075 key: bytes,
1076 single: bool,
1077 ) -> None:
1078 """The key selector presents exactly the suitable keys.
1080 "Suitable" here means suitability for this SSH agent
1081 specifically.
1083 """
1084 client = ssh_agent_client_with_test_keys_loaded 1b
1086 def key_is_suitable(key: bytes) -> bool: 1b
1087 """Stub out [`vault.Vault.key_is_suitable`][]."""
1088 always = {v.public_key_data for v in tests.SUPPORTED_KEYS.values()} 1b
1089 dsa = { 1b
1090 v.public_key_data
1091 for k, v in tests.UNSUITABLE_KEYS.items()
1092 if k.startswith(('dsa', 'ecdsa'))
1093 }
1094 return key in always or ( 1b
1095 client.has_deterministic_dsa_signatures() and key in dsa
1096 )
1098 # TODO(the-13th-letter): Handle the unlikely(?) case that only
1099 # one test key is loaded, but `single` is False. Rename the
1100 # `index` variable to `input`, store the `input` in there, and
1101 # make the definition of `text` in the else block dependent on
1102 # `n` being singular or non-singular.
1103 if single: 1b
1104 monkeypatch.setattr( 1b
1105 ssh_agent.SSHAgentClient,
1106 'list_keys',
1107 tests.list_keys_singleton,
1108 )
1109 keys = [ 1b
1110 pair.key
1111 for pair in tests.list_keys_singleton()
1112 if key_is_suitable(pair.key)
1113 ]
1114 index = '1' 1b
1115 text = 'Use this key? yes\n' 1b
1116 else:
1117 monkeypatch.setattr( 1b
1118 ssh_agent.SSHAgentClient, 'list_keys', tests.list_keys
1119 )
1120 keys = [ 1b
1121 pair.key
1122 for pair in tests.list_keys()
1123 if key_is_suitable(pair.key)
1124 ]
1125 index = str(1 + keys.index(key)) 1b
1126 n = len(keys) 1b
1127 text = f'Your selection? (1-{n}, leave empty to abort): {index}\n' 1b
1128 b64_key = base64.standard_b64encode(key).decode('ASCII') 1b
1130 @click.command() 1b
1131 def driver() -> None: 1b
1132 """Call [`cli_helpers.select_ssh_key`][] directly, as a command."""
1133 key = cli_helpers.select_ssh_key(client) 1b
1134 click.echo(base64.standard_b64encode(key).decode('ASCII')) 1b
1136 # TODO(the-13th-letter): (Continued from above.) Update input
1137 # data to use `index`/`input` directly and unconditionally.
1138 runner = tests.CliRunner(mix_stderr=True) 1b
1139 result = runner.invoke( 1b
1140 driver,
1141 [],
1142 input=('yes\n' if single else f'{index}\n'),
1143 catch_exceptions=True,
1144 )
1145 for snippet in ('Suitable SSH keys:\n', text, f'\n{b64_key}\n'): 1b
1146 assert result.clean_exit(output=snippet), 'expected clean exit' 1b
1148 def test_300_constructor_bad_running_agent(
1149 self,
1150 running_ssh_agent: tests.RunningSSHAgentInfo,
1151 ) -> None:
1152 """Fail if the agent address is invalid."""
1153 with pytest.MonkeyPatch.context() as monkeypatch: 1w
1154 new_socket_name = ( 1w
1155 running_ssh_agent.socket + '~'
1156 if isinstance(running_ssh_agent.socket, str)
1157 else '<invalid//address>'
1158 )
1159 monkeypatch.setenv('SSH_AUTH_SOCK', new_socket_name) 1w
1160 with pytest.raises(OSError): # noqa: PT011 1w
1161 ssh_agent.SSHAgentClient() 1w
1163 def test_301_constructor_no_af_unix_support(self) -> None:
1164 """Fail without [`socket.AF_UNIX`][] support."""
1165 assert 'posix' in socketprovider.SocketProvider.registry 1r
1166 with pytest.MonkeyPatch.context() as monkeypatch: 1r
1167 monkeypatch.setenv('SSH_AUTH_SOCK', "the value doesn't matter") 1r
1168 monkeypatch.delattr(socket, 'AF_UNIX', raising=False) 1r
1169 with pytest.raises( 1r
1170 NotImplementedError,
1171 match='UNIX domain sockets',
1172 ):
1173 ssh_agent.SSHAgentClient(socket='posix') 1r
1175 def test_302_no_ssh_agent_socket_provider_available(
1176 self,
1177 ) -> None:
1178 """Fail if no SSH agent socket provider is available."""
1179 with pytest.MonkeyPatch.context() as monkeypatch: 1x
1180 monkeypatch.setitem( 1x
1181 socketprovider.SocketProvider.registry, 'fake', None
1182 )
1183 with pytest.raises(ExceptionGroup) as excinfo: 1x
1184 ssh_agent.SSHAgentClient(socket=['fake', 'fake', 'fake']) 1x
1185 assert all([ 1x
1186 isinstance(e, NotImplementedError)
1187 for e in excinfo.value.exceptions
1188 ])
1190 def test_303_explicit_socket(
1191 self,
1192 spawn_ssh_agent: tests.SpawnedSSHAgentInfo,
1193 ) -> None:
1194 conn = spawn_ssh_agent.client._connection 1O
1195 ssh_agent.SSHAgentClient(socket=conn) 1O
1197 @Parametrize.TRUNCATED_AGENT_RESPONSES
1198 def test_310_truncated_server_response(
1199 self,
1200 running_ssh_agent: tests.RunningSSHAgentInfo,
1201 response: bytes,
1202 ) -> None:
1203 """Fail on truncated responses from the SSH agent."""
1204 del running_ssh_agent 1j
1205 client = ssh_agent.SSHAgentClient() 1j
1206 response_stream = io.BytesIO(response) 1j
1208 class PseudoSocket: 1j
1209 def sendall(self, *args: Any, **kwargs: Any) -> Any: # noqa: ARG002 1j
1210 return None 1j
1212 def recv(self, *args: Any, **kwargs: Any) -> Any: 1j
1213 return response_stream.read(*args, **kwargs) 1j
1215 pseudo_socket = PseudoSocket() 1j
1216 with pytest.MonkeyPatch.context() as monkeypatch: 1j
1217 monkeypatch.setattr(client, '_connection', pseudo_socket) 1j
1218 with pytest.raises(EOFError): 1j
1219 client.request(255, b'') 1j
1221 @Parametrize.LIST_KEYS_ERROR_RESPONSES
1222 def test_320_list_keys_error_responses(
1223 self,
1224 running_ssh_agent: tests.RunningSSHAgentInfo,
1225 response_code: _types.SSH_AGENT,
1226 response: bytes | bytearray,
1227 exc_type: type[Exception],
1228 exc_pattern: str,
1229 ) -> None:
1230 """Fail on problems during key listing.
1232 Known problems:
1234 - The agent refuses, or otherwise indicates the operation
1235 failed.
1236 - The agent response is truncated.
1237 - The agent response is overlong.
1239 """
1240 del running_ssh_agent 1d
1242 passed_response_code = response_code 1d
1244 # TODO(the-13th-letter): Extract this mock function into a common
1245 # top-level "request" mock function.
1246 def request( 1d
1247 request_code: int | _types.SSH_AGENTC,
1248 payload: bytes | bytearray,
1249 /,
1250 *,
1251 response_code: Iterable[int | _types.SSH_AGENT]
1252 | int
1253 | _types.SSH_AGENT
1254 | None = None,
1255 ) -> tuple[int, bytes | bytearray] | bytes | bytearray:
1256 del request_code 1d
1257 del payload 1d
1258 if isinstance( # pragma: no branch 1d
1259 response_code, (int, _types.SSH_AGENT)
1260 ):
1261 response_code = frozenset({response_code}) 1d
1262 if response_code is not None: # pragma: no branch 1d
1263 response_code = frozenset({ 1d
1264 c if isinstance(c, int) else c.value for c in response_code
1265 })
1267 if not response_code: # pragma: no cover 1d
1268 return (passed_response_code.value, response)
1269 if passed_response_code.value not in response_code: 1d
1270 raise ssh_agent.SSHAgentFailedError( 1d
1271 passed_response_code.value, response
1272 )
1273 return response 1d
1275 with pytest.MonkeyPatch.context() as monkeypatch: 1d
1276 client = ssh_agent.SSHAgentClient() 1d
1277 monkeypatch.setattr(client, 'request', request) 1d
1278 with pytest.raises(exc_type, match=exc_pattern): 1d
1279 client.list_keys() 1d
1281 @Parametrize.SIGN_ERROR_RESPONSES
1282 def test_330_sign_error_responses(
1283 self,
1284 running_ssh_agent: tests.RunningSSHAgentInfo,
1285 key: bytes | bytearray,
1286 check: bool,
1287 response_code: _types.SSH_AGENT,
1288 response: bytes | bytearray,
1289 exc_type: type[Exception],
1290 exc_pattern: str,
1291 ) -> None:
1292 """Fail on problems during signing.
1294 Known problems:
1296 - The key is not loaded into the agent.
1297 - The agent refuses, or otherwise indicates the operation
1298 failed.
1300 """
1301 del running_ssh_agent 1c
1302 passed_response_code = response_code 1c
1304 # TODO(the-13th-letter): Extract this mock function into a common
1305 # top-level "request" mock function.
1306 def request( 1c
1307 request_code: int | _types.SSH_AGENTC,
1308 payload: bytes | bytearray,
1309 /,
1310 *,
1311 response_code: Iterable[int | _types.SSH_AGENT]
1312 | int
1313 | _types.SSH_AGENT
1314 | None = None,
1315 ) -> tuple[int, bytes | bytearray] | bytes | bytearray:
1316 del request_code 1c
1317 del payload 1c
1318 if isinstance( # pragma: no branch 1c
1319 response_code, (int, _types.SSH_AGENT)
1320 ):
1321 response_code = frozenset({response_code}) 1c
1322 if response_code is not None: # pragma: no branch 1c
1323 response_code = frozenset({ 1c
1324 c if isinstance(c, int) else c.value for c in response_code
1325 })
1327 if not response_code: # pragma: no cover 1c
1328 return (passed_response_code.value, response)
1329 if ( 1c
1330 passed_response_code.value not in response_code
1331 ): # pragma: no branch
1332 raise ssh_agent.SSHAgentFailedError( 1c
1333 passed_response_code.value, response
1334 )
1335 return response # pragma: no cover
1337 with pytest.MonkeyPatch.context() as monkeypatch: 1c
1338 client = ssh_agent.SSHAgentClient() 1c
1339 monkeypatch.setattr(client, 'request', request) 1c
1340 Pair = _types.SSHKeyCommentPair # noqa: N806 1c
1341 com = b'no comment' 1c
1342 loaded_keys = [ 1c
1343 Pair(v.public_key_data, com).toreadonly()
1344 for v in tests.SUPPORTED_KEYS.values()
1345 ]
1346 monkeypatch.setattr(client, 'list_keys', lambda: loaded_keys) 1c
1347 with pytest.raises(exc_type, match=exc_pattern): 1c
1348 client.sign(key, b'abc', check_if_key_loaded=check) 1c
1350 @Parametrize.REQUEST_ERROR_RESPONSES
1351 def test_340_request_error_responses(
1352 self,
1353 running_ssh_agent: tests.RunningSSHAgentInfo,
1354 request_code: _types.SSH_AGENTC,
1355 response_code: _types.SSH_AGENT,
1356 exc_type: type[Exception],
1357 exc_pattern: str,
1358 ) -> None:
1359 """Fail on problems during signing.
1361 Known problems:
1363 - The key is not loaded into the agent.
1364 - The agent refuses, or otherwise indicates the operation
1365 failed.
1367 """
1368 del running_ssh_agent 1y
1370 # TODO(the-13th-letter): Rewrite using parenthesized
1371 # with-statements.
1372 # https://the13thletter.info/derivepassphrase/latest/pycompatibility/#after-eol-py3.9
1373 with contextlib.ExitStack() as stack: 1y
1374 stack.enter_context(pytest.raises(exc_type, match=exc_pattern)) 1y
1375 client = stack.enter_context(ssh_agent.SSHAgentClient()) 1y
1376 client.request(request_code, b'', response_code=response_code) 1y
1378 @Parametrize.QUERY_EXTENSIONS_MALFORMED_RESPONSES
1379 def test_350_query_extensions_malformed_responses(
1380 self,
1381 monkeypatch: pytest.MonkeyPatch,
1382 running_ssh_agent: tests.RunningSSHAgentInfo,
1383 response_data: bytes,
1384 ) -> None:
1385 """Fail on malformed responses while querying extensions."""
1386 del running_ssh_agent 1f
1388 # TODO(the-13th-letter): Extract this mock function into a common
1389 # top-level "request" mock function after removing the
1390 # payload-specific parts.
1391 def request( 1f
1392 code: int | _types.SSH_AGENTC,
1393 payload: Buffer,
1394 /,
1395 *,
1396 response_code: (
1397 Iterable[_types.SSH_AGENT | int]
1398 | _types.SSH_AGENT
1399 | int
1400 | None
1401 ) = None,
1402 ) -> tuple[int, bytes] | bytes:
1403 request_codes = { 1f
1404 _types.SSH_AGENTC.EXTENSION,
1405 _types.SSH_AGENTC.EXTENSION.value,
1406 }
1407 assert code in request_codes 1f
1408 response_codes = { 1f
1409 _types.SSH_AGENT.EXTENSION_RESPONSE,
1410 _types.SSH_AGENT.EXTENSION_RESPONSE.value,
1411 _types.SSH_AGENT.SUCCESS,
1412 _types.SSH_AGENT.SUCCESS.value,
1413 }
1414 assert payload == b'\x00\x00\x00\x05query' 1f
1415 if response_code is None: # pragma: no cover 1f
1416 return (
1417 _types.SSH_AGENT.EXTENSION_RESPONSE.value,
1418 response_data,
1419 )
1420 if isinstance( # pragma: no cover 1f
1421 response_code, (_types.SSH_AGENT, int)
1422 ):
1423 assert response_code in response_codes
1424 return response_data
1425 for single_code in response_code: # pragma: no cover 1f
1426 assert single_code in response_codes 1f
1427 return response_data # pragma: no cover 1f
1429 # TODO(the-13th-letter): Rewrite using parenthesized
1430 # with-statements.
1431 # https://the13thletter.info/derivepassphrase/latest/pycompatibility/#after-eol-py3.9
1432 with contextlib.ExitStack() as stack: 1f
1433 monkeypatch2 = stack.enter_context(monkeypatch.context()) 1f
1434 client = stack.enter_context(ssh_agent.SSHAgentClient()) 1f
1435 monkeypatch2.setattr(client, 'request', request) 1f
1436 with pytest.raises( 1f
1437 RuntimeError,
1438 match=r'Malformed response|does not match request',
1439 ):
1440 client.query_extensions() 1f