Coverage for tests\conftest.py: 93.333%
58 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 12:17 +0200
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 12:17 +0200
1# SPDX-FileCopyrightText: 2025 Marco Ricci <software@the13thletter.info>
2#
3# SPDX-License-Identifier: Zlib
5from __future__ import annotations
7import base64
8import contextlib
9import datetime
10import importlib
11import operator
12import os
13import shutil
14import socket
15import subprocess
16import sys
17from typing import TYPE_CHECKING, Protocol, TypeVar
19import hypothesis
20import packaging.version
21import pytest
23import tests
24from derivepassphrase import _types, ssh_agent
26if TYPE_CHECKING:
27 from collections.abc import Iterator, Sequence
29startup_ssh_auth_sock = os.environ.get('SSH_AUTH_SOCK', None)
32def _hypothesis_settings_setup() -> None:
33 """
34 Ensure sensible hypothesis settings if running under coverage.
36 In our tests, the sys.monitoring tracer slows down execution speed
37 by a factor of roughly 3, the C tracer by roughly 2.5, and the
38 Python tracer by roughly 40. Ensure that hypothesis default
39 timeouts apply relative to this *new* execution speed, not the old
40 one.
42 In any case, we *also* reduce the state machine step count to 32
43 steps per run, because the current state machines defined in the
44 tests rather benefit from broad testing rather than deep testing.
46 """
47 settings = hypothesis.settings()
48 slowdown: float | None = None
49 if (
50 importlib.util.find_spec('coverage') is not None
51 and settings.deadline is not None
52 and settings.deadline.total_seconds() < 1.0
53 ): # pragma: no cover
54 ctracer_class = (
55 importlib.import_module('coverage.tracer').CTracer
56 if importlib.util.find_spec('coverage.tracer') is not None
57 else type(None)
58 )
59 pytracer_class = importlib.import_module('coverage.pytracer').PyTracer
60 if (
61 getattr(sys, 'monitoring', None) is not None
62 and sys.monitoring.get_tool(sys.monitoring.COVERAGE_ID)
63 == 'coverage.py'
64 ):
65 slowdown = 3.0
66 elif (
67 trace_func := getattr(sys, 'gettrace', lambda: None)()
68 ) is not None and isinstance(trace_func, ctracer_class):
69 slowdown = 2.5
70 elif (
71 trace_func is not None
72 and hasattr(trace_func, '__self__')
73 and isinstance(trace_func.__self__, pytracer_class)
74 ):
75 slowdown = 8.0
76 settings = hypothesis.settings(
77 deadline=slowdown * settings.deadline
78 if slowdown
79 else settings.deadline,
80 stateful_step_count=32,
81 suppress_health_check=(hypothesis.HealthCheck.too_slow,),
82 )
83 hypothesis.settings.register_profile('default', settings)
84 hypothesis.settings.register_profile(
85 'dev', derandomize=True, max_examples=10
86 )
87 hypothesis.settings.register_profile(
88 'debug',
89 parent=hypothesis.settings.get_profile('dev'),
90 verbosity=hypothesis.Verbosity.verbose,
91 )
92 hypothesis.settings.register_profile(
93 'flaky',
94 deadline=(
95 settings.deadline - settings.deadline // 4
96 if settings.deadline is not None
97 else datetime.timedelta(milliseconds=150)
98 ),
99 )
100 ci_profile = hypothesis.settings.get_profile('ci')
101 hypothesis.settings.register_profile(
102 'intense',
103 parent=ci_profile,
104 derandomize=False,
105 max_examples=10 * ci_profile.max_examples,
106 )
109_hypothesis_settings_setup()
112# https://docs.pytest.org/en/stable/explanation/fixtures.html#a-note-about-fixture-cleanup
113# https://github.com/pytest-dev/pytest/issues/5243#issuecomment-491522595
114@pytest.fixture(scope='session', autouse=False)
115def term_handler() -> Iterator[None]: # pragma: no cover
116 try:
117 import signal # noqa: PLC0415
119 sigint_handler = signal.getsignal(signal.SIGINT)
120 except (ImportError, OSError):
121 return
122 else:
123 orig_term = signal.signal(signal.SIGTERM, sigint_handler)
124 yield
125 signal.signal(signal.SIGTERM, orig_term)
128@pytest.fixture(scope='session')
129def skip_if_no_af_unix_support() -> None: # pragma: no cover
130 """Skip the test if Python does not support AF_UNIX.
132 Implemented as a fixture instead of a mark because it has
133 consequences for other fixtures, and because another "autouse"
134 session fixture may want to force/simulate non-support of
135 [`socket.AF_UNIX`][].
137 """
138 if not hasattr(socket, 'AF_UNIX'):
139 pytest.skip('socket module does not support AF_UNIX')
142class SpawnFunc(Protocol):
143 """Spawns an SSH agent, if possible."""
145 def __call__(
146 self,
147 executable: str | None,
148 env: dict[str, str],
149 ) -> subprocess.Popen[str] | None:
150 """Spawn the SSH agent.
152 Args:
153 executable:
154 The respective SSH agent executable.
155 env:
156 The new environment for the respective agent. Should
157 typically not include an SSH_AUTH_SOCK variable.
159 Returns:
160 The spawned SSH agent subprocess. If the executable is
161 `None`, then return `None` directly.
163 It is the caller's responsibility to clean up the spawned
164 subprocess.
166 Raises:
167 OSError:
168 The [`subprocess.Popen`][] call failed. See there.
170 """
173def spawn_pageant_on_posix( # pragma: no cover
174 executable: str | None, env: dict[str, str]
175) -> subprocess.Popen[str] | None:
176 """Spawn an isolated (UNIX) Pageant, if possible.
178 We attempt to detect whether Pageant is usable, i.e. whether Pageant
179 has output buffering problems when announcing its authentication
180 socket. This is the case for Pageant 0.81 and earlier.
182 Args:
183 executable:
184 The path to the Pageant executable.
185 env:
186 The new environment for Pageant. Should typically not
187 include an SSH_AUTH_SOCK variable.
189 Returns:
190 The spawned Pageant subprocess. If the executable is `None`, or
191 if we detect that Pageant cannot be sensibly controlled as
192 a subprocess, then return `None` directly.
194 It is the caller's responsibility to clean up the spawned
195 subprocess.
197 """
198 if executable is None or os.name == 'nt': # pragma: no cover
199 return None
201 # Apparently, Pageant 0.81 and lower running in debug mode does
202 # not actively flush its output. As a result, the first two
203 # lines, which set the SSH_AUTH_SOCK and the SSH_AGENT_PID, only
204 # print once the output buffer is flushed, whenever that is.
205 #
206 # This has been reported to the PuTTY developers. It is fixed in
207 # version 0.82, though the PuTTY developers consider this to be an
208 # abuse of debug mode. A new foreground mode (`--foreground`), also
209 # introduced in 0.82, provides the desired behavior: no forking, and
210 # immediately parsable instructions for SSH_AUTH_SOCK and
211 # SSH_AGENT_PID.
213 help_output = subprocess.run(
214 ['pageant', '--help'],
215 executable=executable,
216 env=env,
217 capture_output=True,
218 text=True,
219 check=False,
220 ).stdout
221 help_lines = help_output.splitlines(True)
222 pageant_version_string = (
223 help_lines[1].strip().removeprefix('Release ')
224 if len(help_lines) >= 2
225 else ''
226 )
227 v0_82 = packaging.version.Version('0.82')
228 pageant_version = packaging.version.Version(pageant_version_string)
230 if pageant_version < v0_82: # pragma: no cover
231 return None
233 return subprocess.Popen(
234 ['pageant', '--foreground', '-s'],
235 executable=executable,
236 stdin=subprocess.DEVNULL,
237 stdout=subprocess.PIPE,
238 shell=False,
239 env=env,
240 text=True,
241 bufsize=1,
242 )
245def spawn_openssh_agent_on_posix( # pragma: no cover
246 executable: str | None, env: dict[str, str]
247) -> subprocess.Popen[str] | None:
248 """Spawn an isolated OpenSSH agent (on UNIX), if possible.
250 Args:
251 executable:
252 The path to the OpenSSH agent executable.
253 env:
254 The new environment for the OpenSSH agent. Should typically
255 not include an SSH_AUTH_SOCK variable.
257 Returns:
258 The spawned OpenSSH agent subprocess. If the executable is
259 `None`, then return `None` directly.
261 It is the caller's responsibility to clean up the spawned
262 subprocess.
264 """
265 if executable is None or os.name == 'nt': # pragma: no cover
266 return None
267 return subprocess.Popen(
268 ['ssh-agent', '-D', '-s'],
269 executable=executable,
270 stdin=subprocess.DEVNULL,
271 stdout=subprocess.PIPE,
272 shell=False,
273 env=env,
274 text=True,
275 bufsize=1,
276 )
279def spawn_noop( # pragma: no cover
280 executable: str | None, env: dict[str, str]
281) -> None:
282 """Placeholder function. Does nothing."""
285spawn_handlers: Sequence[tuple[str, SpawnFunc, tests.KnownSSHAgent]] = [
286 ('pageant', spawn_pageant_on_posix, tests.KnownSSHAgent.Pageant),
287 (
288 'ssh-agent',
289 spawn_openssh_agent_on_posix,
290 tests.KnownSSHAgent.OpenSSHAgent,
291 ),
292 ('fake', spawn_noop, tests.KnownSSHAgent.FakeSSHAgentSocket),
293 ('(system)', spawn_noop, tests.KnownSSHAgent.UNKNOWN),
294]
295"""
296The standard registry of agent spawning functions.
297"""
299Popen = TypeVar('Popen', bound=subprocess.Popen)
302@contextlib.contextmanager
303def terminate_on_exit(proc: Popen) -> Iterator[Popen]:
304 """Terminate and wait for the subprocess upon exiting the context.
306 Args:
307 proc:
308 The subprocess to manage.
310 Returns:
311 A context manager. Upon entering the manager, return the
312 managed subprocess. Upon exiting the manager, terminate the
313 process and wait for it.
315 """
316 try:
317 yield proc
318 finally:
319 proc.terminate()
320 proc.wait()
323class CannotSpawnError(RuntimeError):
324 """Cannot spawn the SSH agent."""
327def spawn_named_agent(
328 exec_name: str,
329 spawn_func: SpawnFunc,
330 agent_type: tests.KnownSSHAgent,
331) -> Iterator[tests.SpawnedSSHAgentInfo]: # pragma: no cover
332 """Spawn the named SSH agent and check that it is operational.
334 Using the correct agent-specific spawn function from the
335 [`spawn_handlers`][] registry, spawn the named SSH agent (according
336 to its declared type), then set up the communication channel and
337 yield an SSH agent client connected to this agent. After resuming,
338 tear down the communication channel and terminate the SSH agent.
340 The SSH agent's instructions for setting up the communication
341 channel are parsed with [`tests.parse_sh_export_line`][]. See the
342 caveats there.
344 Args:
345 exec_name:
346 The executable to spawn.
347 spawn_func:
348 The agent-specific spawn function.
349 agent_type:
350 The agent type.
352 Yields:
353 A 3-tuple containing the agent type, an SSH agent client
354 connected to this agent, and a boolean indicating whether this
355 agent was actually spawned in an isolated manner.
357 Only one tuple will ever be yielded. After resuming, the
358 connected client will be torn down, as will the agent if it was
359 isolated.
361 Raises:
362 CannotSpawnError:
363 We failed to spawn the agent or otherwise set up the
364 environment/communication channel/etc.
366 """
367 # pytest's fixture system does not seem to guarantee that
368 # environment variables are set up correctly if nested and
369 # parametrized fixtures are used: it is possible that "outer"
370 # parametrized fixtures are torn down only after other "outer"
371 # fixtures of the same parameter set have run. So our fixtures set
372 # SSH_AUTH_SOCK explicitly to the value saved at interpreter
373 # startup.
374 #
375 # Here, we verify at most major steps that SSH_AUTH_SOCK didn't
376 # change under our nose.
377 assert os.environ.get('SSH_AUTH_SOCK') == startup_ssh_auth_sock, (
378 f'SSH_AUTH_SOCK mismatch when checking for spawnable {exec_name}'
379 )
380 exit_stack = contextlib.ExitStack()
381 agent_env = os.environ.copy()
382 ssh_auth_sock = agent_env.pop('SSH_AUTH_SOCK', None)
383 proc = spawn_func(executable=shutil.which(exec_name), env=agent_env)
384 with exit_stack:
385 if (
386 spawn_func is spawn_noop
387 and agent_type == tests.KnownSSHAgent.FakeSSHAgentSocket
388 ):
389 ssh_auth_sock = None
390 elif not hasattr(socket, 'AF_UNIX'): # pragma: no cover
391 err_msg = 'socket module does not support AF_UNIX'
392 raise CannotSpawnError(err_msg)
393 elif spawn_func is spawn_noop:
394 ssh_auth_sock = os.environ['SSH_AUTH_SOCK']
395 elif proc is None: # pragma: no cover
396 err_msg = f'Cannot spawn usable {exec_name}'
397 raise CannotSpawnError(err_msg)
398 else:
399 exit_stack.enter_context(terminate_on_exit(proc))
400 assert os.environ.get('SSH_AUTH_SOCK') == startup_ssh_auth_sock, (
401 f'SSH_AUTH_SOCK mismatch after spawning {exec_name}'
402 )
403 assert proc.stdout is not None
404 ssh_auth_sock_line = proc.stdout.readline()
405 try:
406 ssh_auth_sock = tests.parse_sh_export_line(
407 ssh_auth_sock_line, env_name='SSH_AUTH_SOCK'
408 )
409 except ValueError: # pragma: no cover
410 err_msg = f'Cannot parse agent output: {ssh_auth_sock_line!r}'
411 raise CannotSpawnError(err_msg) from None
412 pid_line = proc.stdout.readline()
413 if (
414 'pid' not in pid_line.lower()
415 and '_pid' not in pid_line.lower()
416 ): # pragma: no cover
417 err_msg = f'Cannot parse agent output: {pid_line!r}'
418 raise CannotSpawnError(err_msg)
419 monkeypatch = exit_stack.enter_context(pytest.MonkeyPatch.context())
420 if ssh_auth_sock is not None:
421 monkeypatch.setenv('SSH_AUTH_SOCK', ssh_auth_sock)
422 client = exit_stack.enter_context(
423 ssh_agent.SSHAgentClient.ensure_agent_subcontext()
424 )
425 else:
426 monkeypatch.setenv(
427 'SSH_AUTH_SOCK', tests.FakeSSHAgentSocketWithAddress.ADDRESS
428 )
429 monkeypatch.setattr(
430 ssh_agent.SSHAgentClient,
431 'SOCKET_PROVIDERS',
432 ['fake_with_address'],
433 )
434 client = exit_stack.enter_context(
435 ssh_agent.SSHAgentClient.ensure_agent_subcontext(
436 tests.FakeSSHAgentSocketWithAddress()
437 )
438 )
439 client.list_keys() # sanity test
440 yield tests.SpawnedSSHAgentInfo(
441 agent_type, client, spawn_func is not spawn_noop
442 )
443 assert os.environ.get('SSH_AUTH_SOCK', None) == startup_ssh_auth_sock, (
444 f'SSH_AUTH_SOCK mismatch after tearing down {exec_name}'
445 )
448def is_agent_permitted(
449 agent_type: tests.KnownSSHAgent,
450) -> bool: # pragma: no cover
451 """May the given SSH agent be spawned by the test harness?
453 If the environment variable `PERMITTED_SSH_AGENTS` is given, it
454 names a comma-separated list of known SSH agent names that the test
455 harness may spawn. Invalid names are silently ignored. If not
456 given, then any agent may be spawned.
458 The fake agents cannot be restricted in this manner, as the test
459 suite depends on their availability.
461 """
462 if not os.environ.get('PERMITTED_SSH_AGENTS'):
463 return True
464 permitted_agents = {tests.KnownSSHAgent.FakeSSHAgentSocket}
465 permitted_agents.update({
466 tests.KnownSSHAgent(x)
467 for x in os.environ['PERMITTED_SSH_AGENTS'].split(',')
468 if x in tests.KnownSSHAgent.__members__
469 })
470 return agent_type in permitted_agents
473@pytest.fixture
474def running_ssh_agent( # pragma: no cover
475) -> Iterator[tests.RunningSSHAgentInfo]:
476 """Ensure a running SSH agent, if possible, as a pytest fixture.
478 Check for a running SSH agent, or spawn a new one if possible. We
479 know how to spawn OpenSSH's agent and PuTTY's Pageant. If spawned
480 this way, the agent does not persist beyond the test.
482 This fixture can neither guarantee a particular running agent, nor
483 can it guarantee a particular set of loaded keys.
485 Yields:
486 A 2-tuple `(ssh_auth_sock, agent_type)`, where `ssh_auth_sock`
487 is the value of the `SSH_AUTH_SOCK` environment variable, to be
488 used to connect to the running agent, and `agent_type` is the
489 agent type.
491 Raises:
492 pytest.skip.Exception:
493 If no agent is running or can be spawned, skip this test.
495 """
497 def prepare_environment(
498 agent_type: tests.KnownSSHAgent,
499 ) -> Iterator[tests.RunningSSHAgentInfo]:
500 with pytest.MonkeyPatch.context() as monkeypatch:
501 if agent_type == tests.KnownSSHAgent.FakeSSHAgentSocket:
502 monkeypatch.setattr(
503 ssh_agent.SSHAgentClient,
504 'SOCKET_PROVIDERS',
505 ['fake_with_address'],
506 )
507 monkeypatch.setenv(
508 'SSH_AUTH_SOCK',
509 tests.FakeSSHAgentSocketWithAddress.ADDRESS,
510 )
511 yield tests.RunningSSHAgentInfo(
512 tests.FakeSSHAgentSocketWithAddress,
513 tests.KnownSSHAgent.FakeSSHAgentSocket,
514 )
515 else:
516 yield tests.RunningSSHAgentInfo(
517 os.environ['SSH_AUTH_SOCK'],
518 agent_type,
519 )
521 with pytest.MonkeyPatch.context() as monkeypatch:
522 # pytest's fixture system does not seem to guarantee that
523 # environment variables are set up correctly if nested and
524 # parametrized fixtures are used: it is possible that "outer"
525 # parametrized fixtures are torn down only after other "outer"
526 # fixtures of the same parameter set have run. So set
527 # SSH_AUTH_SOCK explicitly to the value saved at interpreter
528 # startup.
529 if startup_ssh_auth_sock:
530 monkeypatch.setenv('SSH_AUTH_SOCK', startup_ssh_auth_sock)
531 else:
532 monkeypatch.delenv('SSH_AUTH_SOCK', raising=False)
533 for exec_name, spawn_func, agent_type in spawn_handlers:
534 if not is_agent_permitted(agent_type):
535 continue
536 try:
537 for _agent_info in spawn_named_agent(
538 exec_name, spawn_func, agent_type
539 ):
540 yield from prepare_environment(agent_type)
541 except (KeyError, OSError, CannotSpawnError):
542 continue
543 return
544 pytest.skip('No SSH agent running or spawnable')
547@pytest.fixture(params=spawn_handlers, ids=operator.itemgetter(0))
548def spawn_ssh_agent(
549 request: pytest.FixtureRequest,
550) -> Iterator[tests.SpawnedSSHAgentInfo]: # pragma: no cover
551 """Spawn an isolated SSH agent, if possible, as a pytest fixture.
553 Spawn a new SSH agent isolated from other SSH use by other
554 processes, if possible. We know how to spawn OpenSSH's agent and
555 PuTTY's Pageant, and the "(system)" fallback agent.
557 Yields:
558 A [named tuple][collections.namedtuple] containing information
559 about the spawned agent, e.g. the software product, a client
560 connected to the agent, and whether the agent is isolated from
561 other clients.
563 Raises:
564 pytest.skip.Exception:
565 If the agent cannot be spawned, skip this test.
567 """
569 with pytest.MonkeyPatch.context() as monkeypatch:
570 # pytest's fixture system does not seem to guarantee that
571 # environment variables are set up correctly if nested and
572 # parametrized fixtures are used: it is possible that "outer"
573 # parametrized fixtures are torn down only after other "outer"
574 # fixtures of the same parameter set have run. So set
575 # SSH_AUTH_SOCK explicitly to the value saved at interpreter
576 # startup.
577 if startup_ssh_auth_sock: # pragma: no cover
578 monkeypatch.setenv('SSH_AUTH_SOCK', startup_ssh_auth_sock)
579 else: # pragma: no cover
580 monkeypatch.delenv('SSH_AUTH_SOCK', raising=False)
581 try:
582 yield from spawn_named_agent(*request.param)
583 except (KeyError, OSError, CannotSpawnError) as exc:
584 pytest.skip(exc.args[0])
585 return
588@pytest.fixture
589def ssh_agent_client_with_test_keys_loaded( # noqa: C901
590 spawn_ssh_agent: tests.SpawnedSSHAgentInfo,
591) -> Iterator[ssh_agent.SSHAgentClient]:
592 """Provide an SSH agent with loaded test keys, as a pytest fixture.
594 Use the `spawn_ssh_agent` fixture to acquire a usable SSH agent,
595 upload the known test keys into the agent, and return a connected
596 client.
598 The agent may reject several of the test keys due to unsupported or
599 obsolete key types. Rejected keys will be silently ignored, unless
600 all keys are rejected; then the test will be skipped. You must not
601 automatically assume any particular key is present in the agent.
603 Yields:
604 A [named tuple][collections.namedtuple] containing
605 information about the spawned agent, e.g. the software
606 product, a client connected to the agent, and whether the
607 agent is isolated from other clients.
609 Raises:
610 OSError:
611 There was a communication or a socket setup error with the
612 agent.
613 pytest.skip.Exception:
614 If the agent is unusable or if it rejected all test keys,
615 skip this test.
617 Warning:
618 It is the fixture's responsibility to clean up the SSH agent
619 client after the test. Closing the client's socket connection
620 beforehand (e.g. by using the client as a context manager) may
621 lead to exceptions being thrown upon fixture teardown.
623 """
624 agent_type, client, isolated = spawn_ssh_agent
625 successfully_loaded_keys: set[str] = set()
627 try:
628 current_loaded_keys = frozenset({
629 pair.key for pair in client.list_keys()
630 })
631 except (
632 EOFError,
633 OSError,
634 ssh_agent.SSHAgentFailedError,
635 ) as exc: # pragma: no cover
636 # Defensive programming, with an external source of
637 # nondeterminism, so no coverage.
638 pytest.skip(f'SSH agent failed the "list keys" sanity test: {exc!r}')
640 def prepare_payload(
641 payload: bytes | bytearray,
642 *,
643 isolated: bool = True,
644 time_to_live: int = 30,
645 ) -> tuple[_types.SSH_AGENTC, bytes]:
646 return_code = (
647 _types.SSH_AGENTC.ADD_IDENTITY
648 if isolated
649 else _types.SSH_AGENTC.ADD_ID_CONSTRAINED
650 )
651 lifetime_constraint = (
652 b''
653 if isolated
654 else b'\x01' + ssh_agent.SSHAgentClient.uint32(time_to_live)
655 )
656 return (return_code, bytes(payload) + lifetime_constraint)
658 try:
659 for key_type, key_struct in tests.ALL_KEYS.items():
660 private_key_data = key_struct.private_key_blob
661 if private_key_data is None: # pragma: no cover
662 continue
663 request_code, payload = prepare_payload(
664 private_key_data, isolated=isolated, time_to_live=30
665 )
666 try:
667 try:
668 client.request(
669 request_code,
670 payload,
671 response_code=_types.SSH_AGENT.SUCCESS,
672 )
673 except ssh_agent.SSHAgentFailedError: # pragma: no cover
674 # Pageant can fail to accept a key for two separate
675 # reasons:
676 #
677 # - Pageant refuses to accept a key it already holds
678 # in memory. Verify this by listing keys.
679 # - Pageant does not support key constraints (see
680 # references below).
681 #
682 # https://www.chiark.greenend.org.uk/~sgtatham/putty/wishlist/pageant-timeout.html
683 # https://www.chiark.greenend.org.uk/~sgtatham/putty/wishlist/pageant-key-confirm.html
684 current_loaded_keys = frozenset({
685 pair.key for pair in client.list_keys()
686 })
687 if agent_type == tests.KnownSSHAgent.Pageant and (
688 key_struct.public_key_data in current_loaded_keys
689 ):
690 pass
691 elif agent_type == tests.KnownSSHAgent.Pageant and (
692 not isolated
693 ):
694 request_code, payload = prepare_payload(
695 private_key_data, isolated=True
696 )
697 client.request(
698 request_code,
699 payload,
700 response_code=_types.SSH_AGENT.SUCCESS,
701 )
702 else:
703 raise
704 except (
705 EOFError,
706 OSError,
707 ssh_agent.SSHAgentFailedError,
708 ): # pragma: no cover
709 pass
710 else: # pragma: no cover
711 successfully_loaded_keys.add(key_type)
712 yield client
713 finally:
714 for key_type, key_struct in tests.ALL_KEYS.items():
715 if not isolated and (
716 key_type in successfully_loaded_keys
717 ): # pragma: no cover
718 # The public key blob is the base64-encoded part in
719 # the "public key line".
720 public_key = base64.standard_b64decode(
721 key_struct.public_key.split(None, 2)[1]
722 )
723 request_code = _types.SSH_AGENTC.REMOVE_IDENTITY
724 client.request(
725 request_code,
726 public_key,
727 response_code=frozenset({
728 _types.SSH_AGENT.SUCCESS,
729 _types.SSH_AGENT.FAILURE,
730 }),
731 )