Coverage for src\derivepassphrase\ssh_agent\socketprovider.py: 88.496%
85 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 12:17 +0200
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 12:17 +0200
1# SPDX-FileCopyrightText: 2025 Marco Ricci <software@the13thletter.info>
2#
3# SPDX-License-Identifier: Zlib
5"""Machinery for providing sockets connected to SSH agents."""
7from __future__ import annotations
9import collections
10import ctypes
11import os
12import socket
13from typing import TYPE_CHECKING, ClassVar, cast
15if TYPE_CHECKING:
16 from collections.abc import Callable
18 from derivepassphrase import _types
20__all__ = ('SocketProvider',)
23class NoSuchProviderError(KeyError):
24 """No such SSH agent socket provider is known."""
27class AfUnixNotAvailableError(NotImplementedError):
28 """This Python installation does not support socket.AF_UNIX."""
31class TheAnnoyingOsNamedPipesNotAvailableError(NotImplementedError):
32 """This Python installation does not support Windows named pipes."""
35class SocketProvider:
36 """Static functionality for providing sockets."""
38 @staticmethod
39 def unix_domain_ssh_auth_sock(*, timeout: int = 125) -> socket.socket:
40 """Return a UNIX domain socket connected to `SSH_AUTH_SOCK`.
42 Args:
43 timeout:
44 A connection timeout for the SSH agent. Only used for
45 "true" sockets, and only if the socket is not yet connected.
46 The default value gives ample time for agent connections
47 forwarded via SSH on high-latency networks (e.g. Tor).
49 Returns:
50 A connected UNIX domain socket.
52 Raises:
53 KeyError:
54 The `SSH_AUTH_SOCK` environment variable was not found.
55 AfUnixNotAvailableError:
56 [This Python version does not support UNIX domain
57 sockets][AF_UNIX], necessary to automatically connect to
58 a running SSH agent via the `SSH_AUTH_SOCK` environment
59 variable.
61 [AF_UNIX]: https://docs.python.org/3/library/socket.html#socket.AF_UNIX
62 OSError:
63 There was an error setting up a socket connection to the
64 agent.
66 """
67 if not hasattr(socket, 'AF_UNIX'): 67 ↛ 71line 67 didn't jump to line 71 because the condition on line 67 was always true1feg
68 msg = 'This Python version does not support UNIX domain sockets.' 1feg
69 raise AfUnixNotAvailableError(msg) 1feg
70 else: # noqa: RET506 # pragma: unless posix no cover
71 sock = socket.socket(family=socket.AF_UNIX)
72 if 'SSH_AUTH_SOCK' not in os.environ:
73 msg = 'SSH_AUTH_SOCK environment variable'
74 raise KeyError(msg)
75 ssh_auth_sock = os.environ['SSH_AUTH_SOCK']
76 sock.settimeout(timeout)
77 sock.connect(ssh_auth_sock)
78 return sock
80 @staticmethod
81 def the_annoying_os_named_pipes() -> _types.SSHAgentSocket:
82 """Return a socket connected to Pageant/OpenSSH on The Annoying OS.
84 This may be a write-through socket if the underlying connection
85 to Pageant or OpenSSH does not use an actual network socket to
86 communicate.
88 Raises:
89 TheAnnoyingOsNamedPipesNotAvailableError:
90 This functionality is not implemented yet.
92 Warning: Not implemented yet
93 This functionality is not implemented yet. Specifically,
94 [we do not yet support any of the communication mechanisms
95 used by the leading SSH agent
96 implementations.][windows-ssh-agent-support]
98 [windows-ssh-agent-support]: https://the13thletter.info/derivepassphrase/0.x/wishlist/windows-ssh-agent-support/
100 """
101 if not hasattr(ctypes, 'WinDLL'): 101 ↛ 105line 101 didn't jump to line 105 because the condition on line 101 was always true1e
102 msg = 'This Python version does not support Windows named pipes.' 1e
103 raise TheAnnoyingOsNamedPipesNotAvailableError(msg) 1e
104 else: # noqa: RET506 # pragma: unless the-annoying-os no cover
105 msg = (
106 'Communicating with Pageant or OpenSSH on Windows '
107 'is not implemented yet.'
108 )
109 raise NotImplementedError(msg)
111 registry: ClassVar[
112 dict[str, _types.SSHAgentSocketProvider | str | None]
113 ] = {}
114 """A dictionary of callables that provide SSH agent sockets.
116 Each entry in the dictionary points either to a callable, a string,
117 or `None`: if a callable, then that callable returns a socket; if
118 a string, then this entry is an alias for that other entry; if
119 `None`, then the entry name is merely registered, but no
120 implementation is available.
122 Not every callable is necessarily callable on every platform, even
123 if an implementation actually is available.
125 """
127 @classmethod
128 def register(
129 cls,
130 name: str,
131 *aliases: str,
132 ) -> Callable[
133 [_types.SSHAgentSocketProvider], _types.SSHAgentSocketProvider
134 ]:
135 """Register a callable as an SSH agent socket provider (decorator).
137 Attempting to re-register an existing alias, or a name with an
138 implementation, with a different implementation is an error.
140 Args:
141 name:
142 The principal name under which to register the passed
143 callable.
144 aliases:
145 Alternate names to register as aliases for the principal
146 name.
148 Returns:
149 A decorator implementing the above.
151 """
153 def decorator( 1ad
154 f: _types.SSHAgentSocketProvider,
155 ) -> _types.SSHAgentSocketProvider:
156 """Register a callable as an SSH agent socket provider.
158 Attempting to re-register an existing alias, or a name with
159 an implementation, with a different implementation is an
160 error.
162 Args:
163 f: The callable to decorate/register.
165 Returns:
166 The callable.
168 Raises:
169 ValueError:
170 The name or alias is already in use.
172 """
173 for alias in [name, *aliases]: 1ad
174 try: 1ad
175 existing = cls.resolve(alias) 1ad
176 except NotImplementedError:
177 cls.registry[alias] = f if alias == name else name
178 else:
179 if existing != f: 1d
180 msg = ( 1d
181 f'The SSH agent socket provider {alias!r} '
182 f'is already registered.'
183 )
184 raise ValueError(msg) 1d
185 return f 1ad
187 return decorator 1ad
189 @classmethod
190 def resolve(
191 cls, provider: Callable[[], _types.SSHAgentSocket] | str | None, /
192 ) -> Callable[[], _types.SSHAgentSocket]:
193 """Resolve a socket provider to a proper callable.
195 Args:
196 provider: The provider to resolve.
198 Returns:
199 The callable indicated by this provider.
201 Raises:
202 NoSuchProviderError:
203 The provider is not registered.
204 NotImplementedError:
205 The provider is registered, but is not functional or not
206 applicable to this `derivepassphrase` installation.
208 """
209 ret = provider 1aklmnopqrfsetgjuvwxyhbdiz
210 while isinstance(ret, str): 1aklmnopqrfsetgjuvwxyhbdiz
211 try: 1aklmnopqrfsetgjuvwxyhbdiz
212 ret = cls.registry[ret] 1aklmnopqrfsetgjuvwxyhbdiz
213 except KeyError as exc: 1z
214 raise NoSuchProviderError(ret) from exc 1z
215 if ret is None: 1aklmnopqrfsetgjuvwxyhbdi
216 msg = ( 1ajhi
217 f'The {ret!r} socket provider is not functional on or '
218 'not applicable to this derivepassphrase installation.'
219 )
220 raise NotImplementedError(msg) 1ajhi
221 return ret 1aklmnopqrfsetguvwxyhbdi
223 ENTRY_POINT_GROUP_NAME = 'derivepassphrase.ssh_agent_socket_providers'
224 """
225 The group name under which [entry
226 points][importlib.metadata.entry_points] for the SSH agent socket
227 provider registry should be recorded. Each target of such an entry
228 point should be a [`_types.SSHAgentSocketProviderEntry`][] object.
229 """
231 @classmethod
232 def _find_all_ssh_agent_socket_providers(cls) -> None:
233 """Find and load all declared SSH agent socket providers.
235 Load all [entry points][importlib.metadata.entry_points] in the
236 `derivepassphrase.ssh_agent_socket_providers` group as providers,
237 then register them. The target of each entry point should be
238 a [`_types.SSHAgentSocketProviderEntry`][] object.
240 Raises:
241 AssertionError:
242 The declared SSH agent socket provider was not, in fact,
243 an SSH agent socket provider.
245 Alternatively, multiple distributions supplied the same
246 SSH agent socket provider, but with different
247 implementations.
249 """
250 import importlib.metadata # noqa: PLC0415 1bc
252 origins: dict[str, str | None] = {} 1bc
253 entries = collections.ChainMap({}, cls.registry) 1bc
254 for entry_point in importlib.metadata.entry_points( 1bc
255 group=cls.ENTRY_POINT_GROUP_NAME
256 ):
257 provider_entry = cast( 1bc
258 '_types.SSHAgentSocketProviderEntry', entry_point.load()
259 )
260 key = provider_entry.key 1bc
261 value = entry_point.value 1bc
262 dist = ( 1bc
263 entry_point.dist.name # type: ignore[union-attr]
264 if getattr(entry_point, 'dist', None) is not None
265 else None
266 )
267 origin = origins.get(key, 'derivepassphrase') 1bc
268 if not callable(provider_entry.provider): 1bc
269 msg = ( 1c
270 f'Not an SSHAgentSocketProvider: '
271 f'{dist = }, {cls.ENTRY_POINT_GROUP_NAME = }, '
272 f'{value = }, {provider_entry = }'
273 )
274 raise AssertionError(msg) # noqa: TRY004 1c
275 if key in entries: 1bc
276 if entries[key] != provider_entry.provider: 1bc
277 msg = ( 1c
278 f'Name clash in SSH agent socket providers '
279 f'for entry {key!r}, both by {dist!r} '
280 f'and by {origin!r}'
281 )
282 raise AssertionError(msg) 1c
283 else:
284 entries[key] = provider_entry.provider 1b
285 origins[key] = dist 1b
286 for alias in provider_entry.aliases: 1bc
287 alias_origin = origins.get(alias, 'derivepassphrase') 1bc
288 if alias in entries: 1bc
289 if entries[alias] != key: 1c
290 msg = ( 1c
291 f'Name clash in SSH agent socket providers '
292 f'for entry {alias!r}, both by {dist!r} '
293 f'and by {alias_origin!r}'
294 )
295 raise AssertionError(msg) 1c
296 else:
297 entries[alias] = key 1b
298 origins[key] = dist 1b
299 cls.registry.update(entries.maps[0]) 1b
302SocketProvider.registry.update({
303 'posix': SocketProvider.unix_domain_ssh_auth_sock,
304 'the_annoying_os': SocketProvider.the_annoying_os_named_pipes,
305 # known instances
306 'fake': None,
307 'fake_with_address': None,
308 # aliases
309 'native': 'the_annoying_os' if os.name == 'nt' else 'posix',
310 'unix_domain': 'posix',
311 'ssh_auth_sock': 'posix',
312 'the_annoying_os_named_pipe': 'the_annoying_os',
313 'pageant_on_the_annoying_os': 'the_annoying_os',
314 'openssh_on_the_annoying_os': 'the_annoying_os',
315 'windows': 'the_annoying_os',
316 'windows_named_pipe': 'the_annoying_os',
317 'pageant_on_windows': 'the_annoying_os',
318 'openssh_on_windows': 'the_annoying_os',
319})