Coverage for tests\conftest.py: 93.333%

58 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 12:17 +0200

1# SPDX-FileCopyrightText: 2025 Marco Ricci <software@the13thletter.info> 

2# 

3# SPDX-License-Identifier: Zlib 

4 

5from __future__ import annotations 

6 

7import base64 

8import contextlib 

9import datetime 

10import importlib 

11import operator 

12import os 

13import shutil 

14import socket 

15import subprocess 

16import sys 

17from typing import TYPE_CHECKING, Protocol, TypeVar 

18 

19import hypothesis 

20import packaging.version 

21import pytest 

22 

23import tests 

24from derivepassphrase import _types, ssh_agent 

25 

26if TYPE_CHECKING: 

27 from collections.abc import Iterator, Sequence 

28 

29startup_ssh_auth_sock = os.environ.get('SSH_AUTH_SOCK', None) 

30 

31 

32def _hypothesis_settings_setup() -> None: 

33 """ 

34 Ensure sensible hypothesis settings if running under coverage. 

35 

36 In our tests, the sys.monitoring tracer slows down execution speed 

37 by a factor of roughly 3, the C tracer by roughly 2.5, and the 

38 Python tracer by roughly 40. Ensure that hypothesis default 

39 timeouts apply relative to this *new* execution speed, not the old 

40 one. 

41 

42 In any case, we *also* reduce the state machine step count to 32 

43 steps per run, because the current state machines defined in the 

44 tests rather benefit from broad testing rather than deep testing. 

45 

46 """ 

47 settings = hypothesis.settings() 

48 slowdown: float | None = None 

49 if ( 

50 importlib.util.find_spec('coverage') is not None 

51 and settings.deadline is not None 

52 and settings.deadline.total_seconds() < 1.0 

53 ): # pragma: no cover 

54 ctracer_class = ( 

55 importlib.import_module('coverage.tracer').CTracer 

56 if importlib.util.find_spec('coverage.tracer') is not None 

57 else type(None) 

58 ) 

59 pytracer_class = importlib.import_module('coverage.pytracer').PyTracer 

60 if ( 

61 getattr(sys, 'monitoring', None) is not None 

62 and sys.monitoring.get_tool(sys.monitoring.COVERAGE_ID) 

63 == 'coverage.py' 

64 ): 

65 slowdown = 3.0 

66 elif ( 

67 trace_func := getattr(sys, 'gettrace', lambda: None)() 

68 ) is not None and isinstance(trace_func, ctracer_class): 

69 slowdown = 2.5 

70 elif ( 

71 trace_func is not None 

72 and hasattr(trace_func, '__self__') 

73 and isinstance(trace_func.__self__, pytracer_class) 

74 ): 

75 slowdown = 8.0 

76 settings = hypothesis.settings( 

77 deadline=slowdown * settings.deadline 

78 if slowdown 

79 else settings.deadline, 

80 stateful_step_count=32, 

81 suppress_health_check=(hypothesis.HealthCheck.too_slow,), 

82 ) 

83 hypothesis.settings.register_profile('default', settings) 

84 hypothesis.settings.register_profile( 

85 'dev', derandomize=True, max_examples=10 

86 ) 

87 hypothesis.settings.register_profile( 

88 'debug', 

89 parent=hypothesis.settings.get_profile('dev'), 

90 verbosity=hypothesis.Verbosity.verbose, 

91 ) 

92 hypothesis.settings.register_profile( 

93 'flaky', 

94 deadline=( 

95 settings.deadline - settings.deadline // 4 

96 if settings.deadline is not None 

97 else datetime.timedelta(milliseconds=150) 

98 ), 

99 ) 

100 ci_profile = hypothesis.settings.get_profile('ci') 

101 hypothesis.settings.register_profile( 

102 'intense', 

103 parent=ci_profile, 

104 derandomize=False, 

105 max_examples=10 * ci_profile.max_examples, 

106 ) 

107 

108 

109_hypothesis_settings_setup() 

110 

111 

112# https://docs.pytest.org/en/stable/explanation/fixtures.html#a-note-about-fixture-cleanup 

113# https://github.com/pytest-dev/pytest/issues/5243#issuecomment-491522595 

114@pytest.fixture(scope='session', autouse=False) 

115def term_handler() -> Iterator[None]: # pragma: no cover 

116 try: 

117 import signal # noqa: PLC0415 

118 

119 sigint_handler = signal.getsignal(signal.SIGINT) 

120 except (ImportError, OSError): 

121 return 

122 else: 

123 orig_term = signal.signal(signal.SIGTERM, sigint_handler) 

124 yield 

125 signal.signal(signal.SIGTERM, orig_term) 

126 

127 

128@pytest.fixture(scope='session') 

129def skip_if_no_af_unix_support() -> None: # pragma: no cover 

130 """Skip the test if Python does not support AF_UNIX. 

131 

132 Implemented as a fixture instead of a mark because it has 

133 consequences for other fixtures, and because another "autouse" 

134 session fixture may want to force/simulate non-support of 

135 [`socket.AF_UNIX`][]. 

136 

137 """ 

138 if not hasattr(socket, 'AF_UNIX'): 

139 pytest.skip('socket module does not support AF_UNIX') 

140 

141 

142class SpawnFunc(Protocol): 

143 """Spawns an SSH agent, if possible.""" 

144 

145 def __call__( 

146 self, 

147 executable: str | None, 

148 env: dict[str, str], 

149 ) -> subprocess.Popen[str] | None: 

150 """Spawn the SSH agent. 

151 

152 Args: 

153 executable: 

154 The respective SSH agent executable. 

155 env: 

156 The new environment for the respective agent. Should 

157 typically not include an SSH_AUTH_SOCK variable. 

158 

159 Returns: 

160 The spawned SSH agent subprocess. If the executable is 

161 `None`, then return `None` directly. 

162 

163 It is the caller's responsibility to clean up the spawned 

164 subprocess. 

165 

166 Raises: 

167 OSError: 

168 The [`subprocess.Popen`][] call failed. See there. 

169 

170 """ 

171 

172 

173def spawn_pageant_on_posix( # pragma: no cover 

174 executable: str | None, env: dict[str, str] 

175) -> subprocess.Popen[str] | None: 

176 """Spawn an isolated (UNIX) Pageant, if possible. 

177 

178 We attempt to detect whether Pageant is usable, i.e. whether Pageant 

179 has output buffering problems when announcing its authentication 

180 socket. This is the case for Pageant 0.81 and earlier. 

181 

182 Args: 

183 executable: 

184 The path to the Pageant executable. 

185 env: 

186 The new environment for Pageant. Should typically not 

187 include an SSH_AUTH_SOCK variable. 

188 

189 Returns: 

190 The spawned Pageant subprocess. If the executable is `None`, or 

191 if we detect that Pageant cannot be sensibly controlled as 

192 a subprocess, then return `None` directly. 

193 

194 It is the caller's responsibility to clean up the spawned 

195 subprocess. 

196 

197 """ 

198 if executable is None or os.name == 'nt': # pragma: no cover 

199 return None 

200 

201 # Apparently, Pageant 0.81 and lower running in debug mode does 

202 # not actively flush its output. As a result, the first two 

203 # lines, which set the SSH_AUTH_SOCK and the SSH_AGENT_PID, only 

204 # print once the output buffer is flushed, whenever that is. 

205 # 

206 # This has been reported to the PuTTY developers. It is fixed in 

207 # version 0.82, though the PuTTY developers consider this to be an 

208 # abuse of debug mode. A new foreground mode (`--foreground`), also 

209 # introduced in 0.82, provides the desired behavior: no forking, and 

210 # immediately parsable instructions for SSH_AUTH_SOCK and 

211 # SSH_AGENT_PID. 

212 

213 help_output = subprocess.run( 

214 ['pageant', '--help'], 

215 executable=executable, 

216 env=env, 

217 capture_output=True, 

218 text=True, 

219 check=False, 

220 ).stdout 

221 help_lines = help_output.splitlines(True) 

222 pageant_version_string = ( 

223 help_lines[1].strip().removeprefix('Release ') 

224 if len(help_lines) >= 2 

225 else '' 

226 ) 

227 v0_82 = packaging.version.Version('0.82') 

228 pageant_version = packaging.version.Version(pageant_version_string) 

229 

230 if pageant_version < v0_82: # pragma: no cover 

231 return None 

232 

233 return subprocess.Popen( 

234 ['pageant', '--foreground', '-s'], 

235 executable=executable, 

236 stdin=subprocess.DEVNULL, 

237 stdout=subprocess.PIPE, 

238 shell=False, 

239 env=env, 

240 text=True, 

241 bufsize=1, 

242 ) 

243 

244 

245def spawn_openssh_agent_on_posix( # pragma: no cover 

246 executable: str | None, env: dict[str, str] 

247) -> subprocess.Popen[str] | None: 

248 """Spawn an isolated OpenSSH agent (on UNIX), if possible. 

249 

250 Args: 

251 executable: 

252 The path to the OpenSSH agent executable. 

253 env: 

254 The new environment for the OpenSSH agent. Should typically 

255 not include an SSH_AUTH_SOCK variable. 

256 

257 Returns: 

258 The spawned OpenSSH agent subprocess. If the executable is 

259 `None`, then return `None` directly. 

260 

261 It is the caller's responsibility to clean up the spawned 

262 subprocess. 

263 

264 """ 

265 if executable is None or os.name == 'nt': # pragma: no cover 

266 return None 

267 return subprocess.Popen( 

268 ['ssh-agent', '-D', '-s'], 

269 executable=executable, 

270 stdin=subprocess.DEVNULL, 

271 stdout=subprocess.PIPE, 

272 shell=False, 

273 env=env, 

274 text=True, 

275 bufsize=1, 

276 ) 

277 

278 

279def spawn_noop( # pragma: no cover 

280 executable: str | None, env: dict[str, str] 

281) -> None: 

282 """Placeholder function. Does nothing.""" 

283 

284 

285spawn_handlers: Sequence[tuple[str, SpawnFunc, tests.KnownSSHAgent]] = [ 

286 ('pageant', spawn_pageant_on_posix, tests.KnownSSHAgent.Pageant), 

287 ( 

288 'ssh-agent', 

289 spawn_openssh_agent_on_posix, 

290 tests.KnownSSHAgent.OpenSSHAgent, 

291 ), 

292 ('fake', spawn_noop, tests.KnownSSHAgent.FakeSSHAgentSocket), 

293 ('(system)', spawn_noop, tests.KnownSSHAgent.UNKNOWN), 

294] 

295""" 

296The standard registry of agent spawning functions. 

297""" 

298 

299Popen = TypeVar('Popen', bound=subprocess.Popen) 

300 

301 

302@contextlib.contextmanager 

303def terminate_on_exit(proc: Popen) -> Iterator[Popen]: 

304 """Terminate and wait for the subprocess upon exiting the context. 

305 

306 Args: 

307 proc: 

308 The subprocess to manage. 

309 

310 Returns: 

311 A context manager. Upon entering the manager, return the 

312 managed subprocess. Upon exiting the manager, terminate the 

313 process and wait for it. 

314 

315 """ 

316 try: 

317 yield proc 

318 finally: 

319 proc.terminate() 

320 proc.wait() 

321 

322 

323class CannotSpawnError(RuntimeError): 

324 """Cannot spawn the SSH agent.""" 

325 

326 

327def spawn_named_agent( 

328 exec_name: str, 

329 spawn_func: SpawnFunc, 

330 agent_type: tests.KnownSSHAgent, 

331) -> Iterator[tests.SpawnedSSHAgentInfo]: # pragma: no cover 

332 """Spawn the named SSH agent and check that it is operational. 

333 

334 Using the correct agent-specific spawn function from the 

335 [`spawn_handlers`][] registry, spawn the named SSH agent (according 

336 to its declared type), then set up the communication channel and 

337 yield an SSH agent client connected to this agent. After resuming, 

338 tear down the communication channel and terminate the SSH agent. 

339 

340 The SSH agent's instructions for setting up the communication 

341 channel are parsed with [`tests.parse_sh_export_line`][]. See the 

342 caveats there. 

343 

344 Args: 

345 exec_name: 

346 The executable to spawn. 

347 spawn_func: 

348 The agent-specific spawn function. 

349 agent_type: 

350 The agent type. 

351 

352 Yields: 

353 A 3-tuple containing the agent type, an SSH agent client 

354 connected to this agent, and a boolean indicating whether this 

355 agent was actually spawned in an isolated manner. 

356 

357 Only one tuple will ever be yielded. After resuming, the 

358 connected client will be torn down, as will the agent if it was 

359 isolated. 

360 

361 Raises: 

362 CannotSpawnError: 

363 We failed to spawn the agent or otherwise set up the 

364 environment/communication channel/etc. 

365 

366 """ 

367 # pytest's fixture system does not seem to guarantee that 

368 # environment variables are set up correctly if nested and 

369 # parametrized fixtures are used: it is possible that "outer" 

370 # parametrized fixtures are torn down only after other "outer" 

371 # fixtures of the same parameter set have run. So our fixtures set 

372 # SSH_AUTH_SOCK explicitly to the value saved at interpreter 

373 # startup. 

374 # 

375 # Here, we verify at most major steps that SSH_AUTH_SOCK didn't 

376 # change under our nose. 

377 assert os.environ.get('SSH_AUTH_SOCK') == startup_ssh_auth_sock, ( 

378 f'SSH_AUTH_SOCK mismatch when checking for spawnable {exec_name}' 

379 ) 

380 exit_stack = contextlib.ExitStack() 

381 agent_env = os.environ.copy() 

382 ssh_auth_sock = agent_env.pop('SSH_AUTH_SOCK', None) 

383 proc = spawn_func(executable=shutil.which(exec_name), env=agent_env) 

384 with exit_stack: 

385 if ( 

386 spawn_func is spawn_noop 

387 and agent_type == tests.KnownSSHAgent.FakeSSHAgentSocket 

388 ): 

389 ssh_auth_sock = None 

390 elif not hasattr(socket, 'AF_UNIX'): # pragma: no cover 

391 err_msg = 'socket module does not support AF_UNIX' 

392 raise CannotSpawnError(err_msg) 

393 elif spawn_func is spawn_noop: 

394 ssh_auth_sock = os.environ['SSH_AUTH_SOCK'] 

395 elif proc is None: # pragma: no cover 

396 err_msg = f'Cannot spawn usable {exec_name}' 

397 raise CannotSpawnError(err_msg) 

398 else: 

399 exit_stack.enter_context(terminate_on_exit(proc)) 

400 assert os.environ.get('SSH_AUTH_SOCK') == startup_ssh_auth_sock, ( 

401 f'SSH_AUTH_SOCK mismatch after spawning {exec_name}' 

402 ) 

403 assert proc.stdout is not None 

404 ssh_auth_sock_line = proc.stdout.readline() 

405 try: 

406 ssh_auth_sock = tests.parse_sh_export_line( 

407 ssh_auth_sock_line, env_name='SSH_AUTH_SOCK' 

408 ) 

409 except ValueError: # pragma: no cover 

410 err_msg = f'Cannot parse agent output: {ssh_auth_sock_line!r}' 

411 raise CannotSpawnError(err_msg) from None 

412 pid_line = proc.stdout.readline() 

413 if ( 

414 'pid' not in pid_line.lower() 

415 and '_pid' not in pid_line.lower() 

416 ): # pragma: no cover 

417 err_msg = f'Cannot parse agent output: {pid_line!r}' 

418 raise CannotSpawnError(err_msg) 

419 monkeypatch = exit_stack.enter_context(pytest.MonkeyPatch.context()) 

420 if ssh_auth_sock is not None: 

421 monkeypatch.setenv('SSH_AUTH_SOCK', ssh_auth_sock) 

422 client = exit_stack.enter_context( 

423 ssh_agent.SSHAgentClient.ensure_agent_subcontext() 

424 ) 

425 else: 

426 monkeypatch.setenv( 

427 'SSH_AUTH_SOCK', tests.FakeSSHAgentSocketWithAddress.ADDRESS 

428 ) 

429 monkeypatch.setattr( 

430 ssh_agent.SSHAgentClient, 

431 'SOCKET_PROVIDERS', 

432 ['fake_with_address'], 

433 ) 

434 client = exit_stack.enter_context( 

435 ssh_agent.SSHAgentClient.ensure_agent_subcontext( 

436 tests.FakeSSHAgentSocketWithAddress() 

437 ) 

438 ) 

439 client.list_keys() # sanity test 

440 yield tests.SpawnedSSHAgentInfo( 

441 agent_type, client, spawn_func is not spawn_noop 

442 ) 

443 assert os.environ.get('SSH_AUTH_SOCK', None) == startup_ssh_auth_sock, ( 

444 f'SSH_AUTH_SOCK mismatch after tearing down {exec_name}' 

445 ) 

446 

447 

448def is_agent_permitted( 

449 agent_type: tests.KnownSSHAgent, 

450) -> bool: # pragma: no cover 

451 """May the given SSH agent be spawned by the test harness? 

452 

453 If the environment variable `PERMITTED_SSH_AGENTS` is given, it 

454 names a comma-separated list of known SSH agent names that the test 

455 harness may spawn. Invalid names are silently ignored. If not 

456 given, then any agent may be spawned. 

457 

458 The fake agents cannot be restricted in this manner, as the test 

459 suite depends on their availability. 

460 

461 """ 

462 if not os.environ.get('PERMITTED_SSH_AGENTS'): 

463 return True 

464 permitted_agents = {tests.KnownSSHAgent.FakeSSHAgentSocket} 

465 permitted_agents.update({ 

466 tests.KnownSSHAgent(x) 

467 for x in os.environ['PERMITTED_SSH_AGENTS'].split(',') 

468 if x in tests.KnownSSHAgent.__members__ 

469 }) 

470 return agent_type in permitted_agents 

471 

472 

473@pytest.fixture 

474def running_ssh_agent( # pragma: no cover 

475) -> Iterator[tests.RunningSSHAgentInfo]: 

476 """Ensure a running SSH agent, if possible, as a pytest fixture. 

477 

478 Check for a running SSH agent, or spawn a new one if possible. We 

479 know how to spawn OpenSSH's agent and PuTTY's Pageant. If spawned 

480 this way, the agent does not persist beyond the test. 

481 

482 This fixture can neither guarantee a particular running agent, nor 

483 can it guarantee a particular set of loaded keys. 

484 

485 Yields: 

486 A 2-tuple `(ssh_auth_sock, agent_type)`, where `ssh_auth_sock` 

487 is the value of the `SSH_AUTH_SOCK` environment variable, to be 

488 used to connect to the running agent, and `agent_type` is the 

489 agent type. 

490 

491 Raises: 

492 pytest.skip.Exception: 

493 If no agent is running or can be spawned, skip this test. 

494 

495 """ 

496 

497 def prepare_environment( 

498 agent_type: tests.KnownSSHAgent, 

499 ) -> Iterator[tests.RunningSSHAgentInfo]: 

500 with pytest.MonkeyPatch.context() as monkeypatch: 

501 if agent_type == tests.KnownSSHAgent.FakeSSHAgentSocket: 

502 monkeypatch.setattr( 

503 ssh_agent.SSHAgentClient, 

504 'SOCKET_PROVIDERS', 

505 ['fake_with_address'], 

506 ) 

507 monkeypatch.setenv( 

508 'SSH_AUTH_SOCK', 

509 tests.FakeSSHAgentSocketWithAddress.ADDRESS, 

510 ) 

511 yield tests.RunningSSHAgentInfo( 

512 tests.FakeSSHAgentSocketWithAddress, 

513 tests.KnownSSHAgent.FakeSSHAgentSocket, 

514 ) 

515 else: 

516 yield tests.RunningSSHAgentInfo( 

517 os.environ['SSH_AUTH_SOCK'], 

518 agent_type, 

519 ) 

520 

521 with pytest.MonkeyPatch.context() as monkeypatch: 

522 # pytest's fixture system does not seem to guarantee that 

523 # environment variables are set up correctly if nested and 

524 # parametrized fixtures are used: it is possible that "outer" 

525 # parametrized fixtures are torn down only after other "outer" 

526 # fixtures of the same parameter set have run. So set 

527 # SSH_AUTH_SOCK explicitly to the value saved at interpreter 

528 # startup. 

529 if startup_ssh_auth_sock: 

530 monkeypatch.setenv('SSH_AUTH_SOCK', startup_ssh_auth_sock) 

531 else: 

532 monkeypatch.delenv('SSH_AUTH_SOCK', raising=False) 

533 for exec_name, spawn_func, agent_type in spawn_handlers: 

534 if not is_agent_permitted(agent_type): 

535 continue 

536 try: 

537 for _agent_info in spawn_named_agent( 

538 exec_name, spawn_func, agent_type 

539 ): 

540 yield from prepare_environment(agent_type) 

541 except (KeyError, OSError, CannotSpawnError): 

542 continue 

543 return 

544 pytest.skip('No SSH agent running or spawnable') 

545 

546 

547@pytest.fixture(params=spawn_handlers, ids=operator.itemgetter(0)) 

548def spawn_ssh_agent( 

549 request: pytest.FixtureRequest, 

550) -> Iterator[tests.SpawnedSSHAgentInfo]: # pragma: no cover 

551 """Spawn an isolated SSH agent, if possible, as a pytest fixture. 

552 

553 Spawn a new SSH agent isolated from other SSH use by other 

554 processes, if possible. We know how to spawn OpenSSH's agent and 

555 PuTTY's Pageant, and the "(system)" fallback agent. 

556 

557 Yields: 

558 A [named tuple][collections.namedtuple] containing information 

559 about the spawned agent, e.g. the software product, a client 

560 connected to the agent, and whether the agent is isolated from 

561 other clients. 

562 

563 Raises: 

564 pytest.skip.Exception: 

565 If the agent cannot be spawned, skip this test. 

566 

567 """ 

568 

569 with pytest.MonkeyPatch.context() as monkeypatch: 

570 # pytest's fixture system does not seem to guarantee that 

571 # environment variables are set up correctly if nested and 

572 # parametrized fixtures are used: it is possible that "outer" 

573 # parametrized fixtures are torn down only after other "outer" 

574 # fixtures of the same parameter set have run. So set 

575 # SSH_AUTH_SOCK explicitly to the value saved at interpreter 

576 # startup. 

577 if startup_ssh_auth_sock: # pragma: no cover 

578 monkeypatch.setenv('SSH_AUTH_SOCK', startup_ssh_auth_sock) 

579 else: # pragma: no cover 

580 monkeypatch.delenv('SSH_AUTH_SOCK', raising=False) 

581 try: 

582 yield from spawn_named_agent(*request.param) 

583 except (KeyError, OSError, CannotSpawnError) as exc: 

584 pytest.skip(exc.args[0]) 

585 return 

586 

587 

588@pytest.fixture 

589def ssh_agent_client_with_test_keys_loaded( # noqa: C901 

590 spawn_ssh_agent: tests.SpawnedSSHAgentInfo, 

591) -> Iterator[ssh_agent.SSHAgentClient]: 

592 """Provide an SSH agent with loaded test keys, as a pytest fixture. 

593 

594 Use the `spawn_ssh_agent` fixture to acquire a usable SSH agent, 

595 upload the known test keys into the agent, and return a connected 

596 client. 

597 

598 The agent may reject several of the test keys due to unsupported or 

599 obsolete key types. Rejected keys will be silently ignored, unless 

600 all keys are rejected; then the test will be skipped. You must not 

601 automatically assume any particular key is present in the agent. 

602 

603 Yields: 

604 A [named tuple][collections.namedtuple] containing 

605 information about the spawned agent, e.g. the software 

606 product, a client connected to the agent, and whether the 

607 agent is isolated from other clients. 

608 

609 Raises: 

610 OSError: 

611 There was a communication or a socket setup error with the 

612 agent. 

613 pytest.skip.Exception: 

614 If the agent is unusable or if it rejected all test keys, 

615 skip this test. 

616 

617 Warning: 

618 It is the fixture's responsibility to clean up the SSH agent 

619 client after the test. Closing the client's socket connection 

620 beforehand (e.g. by using the client as a context manager) may 

621 lead to exceptions being thrown upon fixture teardown. 

622 

623 """ 

624 agent_type, client, isolated = spawn_ssh_agent 

625 successfully_loaded_keys: set[str] = set() 

626 

627 try: 

628 current_loaded_keys = frozenset({ 

629 pair.key for pair in client.list_keys() 

630 }) 

631 except ( 

632 EOFError, 

633 OSError, 

634 ssh_agent.SSHAgentFailedError, 

635 ) as exc: # pragma: no cover 

636 # Defensive programming, with an external source of 

637 # nondeterminism, so no coverage. 

638 pytest.skip(f'SSH agent failed the "list keys" sanity test: {exc!r}') 

639 

640 def prepare_payload( 

641 payload: bytes | bytearray, 

642 *, 

643 isolated: bool = True, 

644 time_to_live: int = 30, 

645 ) -> tuple[_types.SSH_AGENTC, bytes]: 

646 return_code = ( 

647 _types.SSH_AGENTC.ADD_IDENTITY 

648 if isolated 

649 else _types.SSH_AGENTC.ADD_ID_CONSTRAINED 

650 ) 

651 lifetime_constraint = ( 

652 b'' 

653 if isolated 

654 else b'\x01' + ssh_agent.SSHAgentClient.uint32(time_to_live) 

655 ) 

656 return (return_code, bytes(payload) + lifetime_constraint) 

657 

658 try: 

659 for key_type, key_struct in tests.ALL_KEYS.items(): 

660 private_key_data = key_struct.private_key_blob 

661 if private_key_data is None: # pragma: no cover 

662 continue 

663 request_code, payload = prepare_payload( 

664 private_key_data, isolated=isolated, time_to_live=30 

665 ) 

666 try: 

667 try: 

668 client.request( 

669 request_code, 

670 payload, 

671 response_code=_types.SSH_AGENT.SUCCESS, 

672 ) 

673 except ssh_agent.SSHAgentFailedError: # pragma: no cover 

674 # Pageant can fail to accept a key for two separate 

675 # reasons: 

676 # 

677 # - Pageant refuses to accept a key it already holds 

678 # in memory. Verify this by listing keys. 

679 # - Pageant does not support key constraints (see 

680 # references below). 

681 # 

682 # https://www.chiark.greenend.org.uk/~sgtatham/putty/wishlist/pageant-timeout.html 

683 # https://www.chiark.greenend.org.uk/~sgtatham/putty/wishlist/pageant-key-confirm.html 

684 current_loaded_keys = frozenset({ 

685 pair.key for pair in client.list_keys() 

686 }) 

687 if agent_type == tests.KnownSSHAgent.Pageant and ( 

688 key_struct.public_key_data in current_loaded_keys 

689 ): 

690 pass 

691 elif agent_type == tests.KnownSSHAgent.Pageant and ( 

692 not isolated 

693 ): 

694 request_code, payload = prepare_payload( 

695 private_key_data, isolated=True 

696 ) 

697 client.request( 

698 request_code, 

699 payload, 

700 response_code=_types.SSH_AGENT.SUCCESS, 

701 ) 

702 else: 

703 raise 

704 except ( 

705 EOFError, 

706 OSError, 

707 ssh_agent.SSHAgentFailedError, 

708 ): # pragma: no cover 

709 pass 

710 else: # pragma: no cover 

711 successfully_loaded_keys.add(key_type) 

712 yield client 

713 finally: 

714 for key_type, key_struct in tests.ALL_KEYS.items(): 

715 if not isolated and ( 

716 key_type in successfully_loaded_keys 

717 ): # pragma: no cover 

718 # The public key blob is the base64-encoded part in 

719 # the "public key line". 

720 public_key = base64.standard_b64decode( 

721 key_struct.public_key.split(None, 2)[1] 

722 ) 

723 request_code = _types.SSH_AGENTC.REMOVE_IDENTITY 

724 client.request( 

725 request_code, 

726 public_key, 

727 response_code=frozenset({ 

728 _types.SSH_AGENT.SUCCESS, 

729 _types.SSH_AGENT.FAILURE, 

730 }), 

731 )