Coverage for src\derivepassphrase\ssh_agent\__init__.py: 97.797%

185 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 12:17 +0200

1# SPDX-FileCopyrightText: 2025 Marco Ricci <software@the13thletter.info> 

2# 

3# SPDX-License-Identifier: Zlib 

4 

5"""A bare-bones SSH agent client supporting signing and key listing.""" 

6 

7from __future__ import annotations 

8 

9import collections 

10import contextlib 

11import sys 

12from collections.abc import Sequence 

13from typing import TYPE_CHECKING, ClassVar, overload 

14 

15from typing_extensions import Never, Self, assert_type 

16 

17from derivepassphrase import _types 

18 

19if sys.version_info < (3, 11): 

20 from exceptiongroup import ExceptionGroup 

21 

22if TYPE_CHECKING: 

23 from collections.abc import Iterable, Iterator 

24 from collections.abc import Set as AbstractSet 

25 from types import TracebackType 

26 

27 from typing_extensions import Buffer 

28 

29__all__ = ('SSHAgentClient',) 

30 

31# In SSH bytestrings, the "length" of the byte string is stored as 

32# a 4-byte/32-bit unsigned integer at the beginning. 

33HEAD_LEN = 4 

34 

35 

36class TrailingDataError(RuntimeError): 

37 """The result contained trailing data.""" 

38 

39 def __init__(self) -> None: 

40 super().__init__('Overlong response from SSH agent') 1pcj

41 

42 

43class SSHAgentFailedError(RuntimeError): 

44 """The SSH agent failed to complete the requested operation.""" 

45 

46 def __str__(self) -> str: 

47 # TODO(the-13th-letter): Rewrite using structural pattern matching. 

48 # https://the13thletter.info/derivepassphrase/latest/pycompatibility/#after-eol-py3.9 

49 if self.args == ( # pragma: no branch 1ojmg

50 _types.SSH_AGENT.FAILURE.value, 

51 b'', 

52 ): 

53 return 'The SSH agent failed to complete the request' 1jm

54 elif self.args[1]: # noqa: RET505 # pragma: no cover 1og

55 # Defensive programming, and an external source of 

56 # nondeterminism, so no coverage. 

57 code = self.args[0] 1g

58 msg = self.args[1].decode('utf-8', 'surrogateescape') 1g

59 return f'[Code {code:d}] {msg:s}' 1g

60 else: # pragma: no cover 

61 # Defensive programming, and an external source of 

62 # nondeterminism, so no coverage. 

63 return repr(self) 1o

64 

65 def __repr__(self) -> str: # pragma: no cover 

66 """""" # noqa: D419 

67 # Only used interactively during debugging, so no coverage. 

68 return f'{self.__class__.__name__}{self.args!r}' 1o

69 

70 

71class SSHAgentClient: 

72 """A bare-bones SSH agent client supporting signing and key listing. 

73 

74 The main use case is requesting the agent sign some data, after 

75 checking that the necessary key is already loaded. 

76 

77 The main fleshed out methods are [`list_keys`][] and [`sign`][], 

78 which implement the [`REQUEST_IDENTITIES`] 

79 [_types.SSH_AGENTC.REQUEST_IDENTITIES] and [`SIGN_REQUEST`] 

80 [_types.SSH_AGENTC.SIGN_REQUEST] requests. If you *really* wanted 

81 to, there is enough infrastructure in place to issue other requests 

82 as defined in the protocol---it's merely the wrapper functions and 

83 the protocol numbers table that are missing. 

84 

85 """ 

86 

87 _connection: _types.SSHAgentSocket 

88 SOCKET_PROVIDERS: ClassVar = ['native'] 

89 

90 def __init__( # noqa: C901, PLR0912 

91 self, 

92 /, 

93 *, 

94 socket: _types.SSHAgentSocket | Sequence[str] | None = None, 

95 ) -> None: 

96 """Initialize the client. 

97 

98 Args: 

99 socket: 

100 An optional socket-like object, already connected to the 

101 SSH agent, or a list of names of socket providers to try. 

102 If not given, we query platform-specific default 

103 addresses, if possible. 

104 

105 Raises: 

106 derivepassphrase.ssh_agent.socketprovider.NoSuchProviderError: 

107 The list of socket provider names contained an invalid 

108 entry. 

109 KeyError: 

110 An expected configuration entry or an expected environment 

111 variable is missing. 

112 NotImplementedError: 

113 The named socket provider is not functional or not 

114 applicable to this `derivepassphrase` installation, e.g. 

115 because it is generally not implemented yet, or it requires 

116 a specific operating system, or specific system 

117 functionality that is not provided by all Python versions, 

118 or external packages that are unavailable on this 

119 installation. 

120 

121 This error may be raised multiple times, as an exception 

122 group. 

123 OSError: 

124 There was an error setting up a socket connection to the 

125 agent. 

126 

127 """ # noqa: DOC501 

128 import socket as _socket # noqa: PLC0415 1aqnbltrposdcwKxEkjmgf

129 

130 from derivepassphrase.ssh_agent import socketprovider # noqa: PLC0415 1aqnbltrposdcwKxEkjmgf

131 

132 if isinstance(socket, _socket.socket): 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true1aqnbltrposdcwKxEkjmgf

133 self._connection = socket 

134 # Test whether the socket is connected. 

135 self._connection.getpeername() 

136 elif isinstance(socket, str): 1aqnbltrposdcwKxEkjmgf

137 self._connection = socketprovider.SocketProvider.resolve(socket)() 1K

138 elif socket is None or isinstance(socket, Sequence): 1aqnbltrposdcwxEkjmgf

139 if not socket: 1qnbltrposdcwxkjmgf

140 socket = self.SOCKET_PROVIDERS 1qnbltrposdcwkjmgf

141 assert isinstance(socket, Sequence) # for the type checker 1qnbltrposdcwxkjmgf

142 excs: list[NotImplementedError] = [] 1qnbltrposdcwxkjmgf

143 providers: list[_types.SSHAgentSocketProvider] = [] 1qnbltrposdcwxkjmgf

144 for candidate in socket: 1qnbltrposdcwxkjmgf

145 try: 1qnbltrposdcwxkjmgf

146 provider = socketprovider.SocketProvider.resolve(candidate) 1qnbltrposdcwxkjmgf

147 except NotImplementedError as exc: 1x

148 excs.append(exc) 1x

149 continue 1x

150 else: 

151 providers.append(provider) 1qnbltrposdcwkjmgf

152 for provider in providers: 1qnbltrposdcwxkjmgf

153 try: 1qnbltrposdcwkjmgf

154 self._connection = provider() 1qnbltrposdcwkjmgf

155 except NotImplementedError as exc: 1tscw

156 excs.append(exc) 1sc

157 continue 1sc

158 else: 

159 break 1qnblrpodckjmgf

160 else: 

161 msg = 'No supported SSH agent socket provider found.' 1scx

162 raise ( 1scx

163 ExceptionGroup(msg, excs) 

164 if excs 

165 else NotImplementedError(msg) 

166 ) 

167 elif isinstance(socket, _types.SSHAgentSocket): 1adE

168 self._connection = socket 1adE

169 else: # pragma: no cover 

170 # Defensive programming, so no coverage. 

171 assert_type(socket, Never) 

172 msg = f'invalid socket object: {socket!r}' 

173 raise TypeError(msg) 

174 

175 def __enter__(self) -> Self: 

176 """Close socket connection upon context manager completion. 

177 

178 Returns: 

179 Self. 

180 

181 """ 

182 self._connection.__enter__() 1aqnblrpodcgf

183 return self 1aqnblrpodcgf

184 

185 def __exit__( 

186 self, 

187 exc_type: type[BaseException] | None, 

188 exc_val: BaseException | None, 

189 exc_tb: TracebackType | None, 

190 ) -> bool: 

191 """Close socket connection upon context manager completion. 

192 

193 Args: 

194 exc_type: An optional exception type. 

195 exc_val: An optional exception value. 

196 exc_tb: An optional exception traceback. 

197 

198 Returns: 

199 True if the exception was handled, false if it should 

200 propagate. 

201 

202 """ 

203 return bool( 1aqnblrpodcgf

204 self._connection.__exit__(exc_type, exc_val, exc_tb) # type: ignore[func-returns-value] 

205 ) 

206 

207 @staticmethod 

208 def uint32(num: int, /) -> bytes: 

209 r"""Format the number as a `uint32`, as per the agent protocol. 

210 

211 Args: 

212 num: A number. 

213 

214 Returns: 

215 The number in SSH agent wire protocol format, i.e. as 

216 a 32-bit big endian number. 

217 

218 Raises: 

219 OverflowError: 

220 As per [`int.to_bytes`][]. 

221 

222 Examples: 

223 >>> SSHAgentClient.uint32(16777216) 

224 b'\x01\x00\x00\x00' 

225 

226 """ 

227 return int.to_bytes(num, 4, 'big', signed=False) 1abdceihkmgfMFNGyzADOPHuv

228 

229 @classmethod 

230 def string(cls, payload: Buffer, /) -> bytes: 

231 r"""Format the payload as an SSH string, as per the agent protocol. 

232 

233 Args: 

234 payload: A bytes-like object. 

235 

236 Returns: 

237 The payload, framed in the SSH agent wire protocol format, 

238 as a bytes object. 

239 

240 Examples: 

241 >>> SSHAgentClient.string(b'ssh-rsa') 

242 b'\x00\x00\x00\x07ssh-rsa' 

243 

244 """ # noqa: DOC501 

245 try: 1abdceihkmgfFLGyzAHuv

246 payload = memoryview(payload) 1abdceihkmgfFLGyzAHuv

247 except TypeError as e: 1L

248 msg = 'invalid payload type' 1L

249 raise TypeError(msg) from e 1L

250 ret = bytearray() 1abdceihkmgfFGyzAHuv

251 ret.extend(cls.uint32(len(payload))) 1abdceihkmgfFGyzAHuv

252 ret.extend(payload) 1abdceihkmgfFGyzAHuv

253 return bytes(ret) 1abdceihkmgfFGyzAHuv

254 

255 @classmethod 

256 def unstring(cls, bytestring: Buffer, /) -> bytes: 

257 r"""Unpack an SSH string. 

258 

259 Args: 

260 bytestring: A framed bytes-like object. 

261 

262 Returns: 

263 The payload, as a bytes object. 

264 

265 Raises: 

266 ValueError: 

267 The byte string is not an SSH string. 

268 

269 Examples: 

270 >>> SSHAgentClient.unstring(b'\x00\x00\x00\x07ssh-rsa') 

271 b'ssh-rsa' 

272 >>> SSHAgentClient.unstring(SSHAgentClient.string(b'ssh-ed25519')) 

273 b'ssh-ed25519' 

274 

275 """ 

276 bytestring = memoryview(bytestring) 1ableCBuv

277 n = len(bytestring) 1ableCBuv

278 msg = 'malformed SSH byte string' 1ableCBuv

279 if n < HEAD_LEN or n != HEAD_LEN + int.from_bytes( 1ableCBuv

280 bytestring[:HEAD_LEN], 'big', signed=False 

281 ): 

282 raise ValueError(msg) 1B

283 return bytes(bytestring[HEAD_LEN:]) 1ableCuv

284 

285 @classmethod 

286 def unstring_prefix(cls, bytestring: Buffer, /) -> tuple[bytes, bytes]: 

287 r"""Unpack an SSH string at the beginning of the byte string. 

288 

289 Args: 

290 bytestring: 

291 A bytes-like object, beginning with a framed/SSH byte 

292 string. 

293 

294 Returns: 

295 A 2-tuple `(a, b)`, where `a` is the unframed byte 

296 string/payload at the beginning of input byte string, and 

297 `b` is the remainder of the input byte string. 

298 

299 Raises: 

300 ValueError: 

301 The byte string does not begin with an SSH string. 

302 

303 Examples: 

304 >>> SSHAgentClient.unstring_prefix( 

305 ... b'\x00\x00\x00\x07ssh-rsa____trailing data' 

306 ... ) 

307 (b'ssh-rsa', b'____trailing data') 

308 >>> SSHAgentClient.unstring_prefix( 

309 ... SSHAgentClient.string(b'ssh-ed25519') 

310 ... ) 

311 (b'ssh-ed25519', b'') 

312 

313 """ 

314 bytestring = memoryview(bytestring).toreadonly() 1anblIJdehfCByzADuv

315 n = len(bytestring) 1anblIJdehfCByzADuv

316 msg = 'malformed SSH byte string' 1anblIJdehfCByzADuv

317 if n < HEAD_LEN: 1anblIJdehfCByzADuv

318 raise ValueError(msg) 1fB

319 m = int.from_bytes(bytestring[:HEAD_LEN], 'big', signed=False) 1anblIJdehfCByzADuv

320 if m + HEAD_LEN > n: 1anblIJdehfCByzADuv

321 raise ValueError(msg) 1fB

322 return ( 1anblIJdehfCByzADuv

323 bytes(bytestring[HEAD_LEN : m + HEAD_LEN]), 

324 bytes(bytestring[m + HEAD_LEN :]), 

325 ) 

326 

327 @classmethod 

328 @contextlib.contextmanager 

329 def ensure_agent_subcontext( 

330 cls, 

331 conn: SSHAgentClient 

332 | _types.SSHAgentSocket 

333 | Sequence[str] 

334 | None = None, 

335 ) -> Iterator[SSHAgentClient]: 

336 """Return an SSH agent client subcontext. 

337 

338 If necessary, construct an SSH agent client first using the 

339 connection hint. 

340 

341 Args: 

342 conn: 

343 If an existing SSH agent client, then enter a context 

344 within this client's scope. After exiting the context, 

345 the client persists, including its socket. 

346 

347 If a socket, then construct a client using this socket, 

348 then enter a context within this client's scope. After 

349 exiting the context, the client is destroyed and the 

350 socket is closed. 

351 

352 If `None`, or a list of names of socket providers, then 

353 construct a client according to those connection hints 

354 ("auto-discovery"), and enter a context within this 

355 client's scope. After exiting the context, both the 

356 client and its socket are destroyed. 

357 

358 Yields: 

359 When entering this context, return the SSH agent client. 

360 

361 Raises: 

362 derivepassphrase.ssh_agent.socketprovider.NoSuchProviderError: 

363 As per [`__init__`][]. 

364 KeyError: 

365 As per [`__init__`][], including the multiple raise as 

366 an exception group. 

367 NotImplementedError: 

368 As per [`__init__`][]. 

369 OSError: 

370 As per [`__init__`][]. 

371 

372 """ # noqa: DOC501 

373 # TODO(the-13th-letter): Rewrite using structural pattern matching. 

374 # https://the13thletter.info/derivepassphrase/latest/pycompatibility/#after-eol-py3.9 

375 if isinstance(conn, SSHAgentClient): 1aqnbltrposdceh

376 with contextlib.nullcontext(): 1bldceh

377 yield conn 1bldceh

378 elif ( 1aqnbltrposdc

379 isinstance(conn, (_types.SSHAgentSocket, str, Sequence)) 

380 or conn is None 

381 ): 

382 with SSHAgentClient(socket=conn) as client: 1aqnbltrposdc

383 yield client 1aqnblrpodc

384 else: # pragma: no cover 

385 # Defensive programming, so no coverage. 

386 assert_type(conn, Never) 

387 msg = f'invalid connection hint: {conn!r}' 

388 raise TypeError(msg) 

389 

390 def _agent_is_pageant(self) -> bool: 

391 """Return True if we are connected to Pageant. 

392 

393 Warning: 

394 This is a heuristic, not a verified query or computation. 

395 

396 """ 

397 return ( 1bdh

398 b'list-extended@putty.projects.tartarus.org' 

399 in self.query_extensions() 

400 ) 

401 

402 def has_deterministic_dsa_signatures(self) -> bool: 

403 """Check whether the agent returns deterministic DSA signatures. 

404 

405 This includes ECDSA signatures. 

406 

407 Generally, this means that the SSH agent implements [RFC 6979][] 

408 or a similar system. 

409 

410 [RFC 6979]: https://www.rfc-editor.org/rfc/rfc6979 

411 

412 Returns: 

413 True if a known agent was detected where signatures are 

414 deterministic for all DSA key types, false otherwise. 

415 

416 Note: Known agents with deterministic signatures 

417 | agent | detected via | 

418 |:----------------|:--------------------------------------------------------------| 

419 | Pageant (PuTTY) | `list-extended@putty.projects.tartarus.org` extension request | 

420 

421 """ # noqa: E501 

422 known_good_agents = { 1bdh

423 'Pageant': self._agent_is_pageant, 

424 } 

425 return any( # pragma: no branch 1bdh

426 v() for v in known_good_agents.values() 

427 ) 

428 

429 @overload 

430 def request( 

431 self, 

432 code: int | _types.SSH_AGENTC, 

433 payload: Buffer, 

434 /, 

435 *, 

436 response_code: None = None, 

437 ) -> tuple[int, bytes]: ... 

438 

439 @overload 

440 def request( 

441 self, 

442 code: int | _types.SSH_AGENTC, 

443 payload: Buffer, 

444 /, 

445 *, 

446 response_code: Iterable[_types.SSH_AGENT | int] = frozenset({ 

447 _types.SSH_AGENT.SUCCESS 

448 }), 

449 ) -> bytes: ... 

450 

451 @overload 

452 def request( 

453 self, 

454 code: int | _types.SSH_AGENTC, 

455 payload: Buffer, 

456 /, 

457 *, 

458 response_code: _types.SSH_AGENT | int = _types.SSH_AGENT.SUCCESS, 

459 ) -> bytes: ... 

460 

461 def request( 

462 self, 

463 code: int | _types.SSH_AGENTC, 

464 payload: Buffer, 

465 /, 

466 *, 

467 response_code: ( 

468 Iterable[_types.SSH_AGENT | int] | _types.SSH_AGENT | int | None 

469 ) = None, 

470 ) -> tuple[int, bytes] | bytes: 

471 """Issue a generic request to the SSH agent. 

472 

473 Args: 

474 code: 

475 The request code. See the SSH agent protocol for 

476 protocol numbers to use here (and which protocol numbers 

477 to expect in a response). 

478 payload: 

479 A bytes-like object containing the payload, or 

480 "contents", of the request. Request-specific. 

481 

482 It is our responsibility to add any necessary wire 

483 framing around the request code and the payload, 

484 not the caller's. 

485 response_code: 

486 An optional response code, or a set of response codes, 

487 that we expect. If given, and the actual response code 

488 does not match, raise an error. 

489 

490 Returns: 

491 A 2-tuple consisting of the response code and the payload, 

492 with all wire framing removed. 

493 

494 If a response code was passed, then only return the payload. 

495 

496 Raises: 

497 EOFError: 

498 The response from the SSH agent is truncated or missing. 

499 OSError: 

500 There was a communication error with the SSH agent. 

501 SSHAgentFailedError: 

502 We expected specific response codes, but did not receive 

503 any of them. 

504 

505 """ 

506 if isinstance( # pragma: no branch 1abdceihkg

507 response_code, (int, _types.SSH_AGENT) 

508 ): 

509 response_code = frozenset({response_code}) 1aceig

510 if response_code is not None: # pragma: no branch 1abdceihkg

511 response_code = frozenset({ 1abdceihg

512 c if isinstance(c, int) else c.value for c in response_code 

513 }) 

514 payload = memoryview(payload) 1abdceihkg

515 request_message = bytearray([ 1abdceihkg

516 code if isinstance(code, int) else code.value 

517 ]) 

518 request_message.extend(payload) 1abdceihkg

519 self._connection.sendall(self.string(request_message)) 1abdceihkg

520 chunk = self._connection.recv(HEAD_LEN) 1abdceihkg

521 if len(chunk) < HEAD_LEN: 1abdceihkg

522 msg = 'cannot read response length' 1k

523 raise EOFError(msg) 1k

524 response_length = int.from_bytes(chunk, 'big', signed=False) 1abdceihkg

525 response = self._connection.recv(response_length) 1abdceihkg

526 if len(response) < response_length: 1abdceihkg

527 msg = 'truncated response from SSH agent' 1k

528 raise EOFError(msg) 1k

529 if not response_code: # pragma: no cover 1abdceihg

530 # Defensive programming, will not actually be triggered by 

531 # our code, so no coverage. 

532 return response[0], response[1:] 

533 if response[0] not in response_code: 1abdceihg

534 raise SSHAgentFailedError(response[0], response[1:]) 1ag

535 return response[1:] 1abdceih

536 

537 def list_keys(self) -> Sequence[_types.SSHKeyCommentPair]: 

538 """Request a list of keys known to the SSH agent. 

539 

540 Returns: 

541 A read-only sequence of key/comment pairs. 

542 

543 Raises: 

544 EOFError: 

545 The response from the SSH agent is truncated or missing. 

546 OSError: 

547 There was a communication error with the SSH agent. 

548 TrailingDataError: 

549 The response from the SSH agent is too long. 

550 SSHAgentFailedError: 

551 The agent failed to complete the request. 

552 

553 """ 

554 response = self.request( 1aceij

555 _types.SSH_AGENTC.REQUEST_IDENTITIES.value, 

556 b'', 

557 response_code=_types.SSH_AGENT.IDENTITIES_ANSWER, 

558 ) 

559 response_stream = collections.deque(response) 1aceij

560 

561 def shift(num: int) -> bytes: 1aceij

562 buf = collections.deque(b'') 1aceij

563 for _ in range(num): 1aceij

564 try: 1aceij

565 val = response_stream.popleft() 1aceij

566 except IndexError: 1j

567 response_stream.extendleft(reversed(buf)) 1j

568 msg = 'truncated response from SSH agent' 1j

569 raise EOFError(msg) from None 1j

570 buf.append(val) 1aceij

571 return bytes(buf) 1aceij

572 

573 key_count = int.from_bytes(shift(4), 'big') 1aceij

574 keys: collections.deque[_types.SSHKeyCommentPair] 

575 keys = collections.deque() 1aceij

576 for _ in range(key_count): 1aceij

577 key_size = int.from_bytes(shift(4), 'big') 1aceij

578 key = shift(key_size) 1acei

579 comment_size = int.from_bytes(shift(4), 'big') 1acei

580 comment = shift(comment_size) 1acei

581 # Both `key` and `comment` are not wrapped as SSH strings. 

582 keys.append(_types.SSHKeyCommentPair(key, comment)) 1acei

583 if response_stream: 1aceij

584 raise TrailingDataError 1j

585 return keys 1acei

586 

587 def sign( 

588 self, 

589 /, 

590 key: Buffer, 

591 payload: Buffer, 

592 *, 

593 flags: int = 0, 

594 check_if_key_loaded: bool = False, 

595 ) -> bytes: 

596 """Request the SSH agent sign the payload with the key. 

597 

598 Args: 

599 key: 

600 The public SSH key to sign the payload with, in the same 

601 format as returned by, e.g., the [`list_keys`][] method. 

602 The corresponding private key must have previously been 

603 loaded into the agent to successfully issue a signature. 

604 payload: 

605 A byte string of data to sign. 

606 flags: 

607 Optional flags for the signing request. Currently 

608 passed on as-is to the agent. In real-world usage, this 

609 could be used, e.g., to request more modern hash 

610 algorithms when signing with RSA keys. (No such 

611 real-world usage is currently implemented.) 

612 check_if_key_loaded: 

613 If true, check beforehand (via [`list_keys`][]) if the 

614 corresponding key has been loaded into the agent. 

615 

616 Returns: 

617 The binary signature of the payload under the given key. 

618 

619 Raises: 

620 EOFError: 

621 The response from the SSH agent is truncated or missing. 

622 OSError: 

623 There was a communication error with the SSH agent. 

624 TrailingDataError: 

625 The response from the SSH agent is too long. 

626 SSHAgentFailedError: 

627 The agent failed to complete the request. 

628 KeyError: 

629 `check_if_key_loaded` is true, and the `key` was not 

630 loaded into the agent. 

631 

632 """ 

633 key = memoryview(key) 1em

634 payload = memoryview(payload) 1em

635 if check_if_key_loaded: 1em

636 loaded_keys = frozenset({pair.key for pair in self.list_keys()}) 1m

637 if bytes(key) not in loaded_keys: 1m

638 msg = 'target SSH key not loaded into agent' 1m

639 raise KeyError(msg) 1m

640 request_data = bytearray(self.string(key)) 1em

641 request_data.extend(self.string(payload)) 1em

642 request_data.extend(self.uint32(flags)) 1em

643 return bytes( 1em

644 self.unstring( 

645 self.request( 

646 _types.SSH_AGENTC.SIGN_REQUEST.value, 

647 request_data, 

648 response_code=_types.SSH_AGENT.SIGN_RESPONSE, 

649 ) 

650 ) 

651 ) 

652 

653 def query_extensions(self) -> AbstractSet[bytes]: 

654 """Request a listing of extensions supported by the SSH agent. 

655 

656 Returns: 

657 A read-only set of extension names the SSH agent says it 

658 supports. 

659 

660 Raises: 

661 EOFError: 

662 The response from the SSH agent is truncated or missing. 

663 OSError: 

664 There was a communication error with the SSH agent. 

665 RuntimeError: 

666 The response from the SSH agent is malformed. 

667 

668 Note: 

669 The set of supported extensions is queried via the `query` 

670 extension request. If the agent does not support the query 

671 extension request, or extension requests in general, then an 

672 empty set is returned. This does not however imply that the 

673 agent doesn't support *any* extension request... merely that 

674 it doesn't support extension autodiscovery. 

675 

676 """ 

677 try: 1bdhf

678 response_data = self.request( 1bdhf

679 _types.SSH_AGENTC.EXTENSION, 

680 self.string(b'query'), 

681 response_code={ 

682 _types.SSH_AGENT.EXTENSION_RESPONSE, 

683 _types.SSH_AGENT.SUCCESS, 

684 }, 

685 ) 

686 except SSHAgentFailedError: 

687 # Cannot query extension support. Assume no extensions. 

688 # This isn't necessarily true, e.g. for OpenSSH's ssh-agent. 

689 return frozenset() 

690 extensions: set[bytes] = set() 1bdhf

691 msg = 'Malformed response from SSH agent' 1bdhf

692 msg2 = 'Extension response message does not match request' 1bdhf

693 try: 1bdhf

694 query, response_data = self.unstring_prefix(response_data) 1bdhf

695 except ValueError as e: 1f

696 raise RuntimeError(msg) from e 1f

697 if bytes(query) != b'query': 1bdhf

698 raise RuntimeError(msg2) 1f

699 while response_data: 1bdhf

700 try: 1bdhf

701 extension, response_data = self.unstring_prefix(response_data) 1bdhf

702 except ValueError as e: 1f

703 raise RuntimeError(msg) from e 1f

704 else: 

705 extensions.add(bytes(extension)) 1bdhf

706 return frozenset(extensions) 1bdh