Coverage for tests/tests_rf/virtual_rf/virtual_rf.py: 0%
215 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""A virtual RF network useful for testing."""
4# NOTE: does not rely on ramses_rf library
6import asyncio
7import contextlib
8import logging
9import os
10import pty
11import re
12import signal
13import tty
14from collections import deque
15from io import FileIO
16from selectors import EVENT_READ, DefaultSelector
17from typing import Any, Final, TypeAlias, TypedDict
19from serial import Serial, serial_for_url # type: ignore[import-untyped]
21from .const import HgiFwTypes
23_FD: TypeAlias = int # file descriptor
24_PN: TypeAlias = str # port name
26# _FILEOBJ: TypeAlias = int | Any # int | HasFileno
28_GwyAttrsT = TypedDict(
29 "_GwyAttrsT",
30 {
31 "manufacturer": str,
32 "product": str,
33 "vid": int,
34 "pid": int,
35 "description": str,
36 "interface": str | None,
37 "serial_number": str | None,
38 "subsystem": str,
39 "_dev_path": str,
40 "_dev_by-id": str,
41 },
42)
45DEVICE_ID: Final = "device_id"
46FW_TYPE: Final = "fw_type"
47DEVICE_ID_BYTES: Final = "device_id_bytes"
50class _GatewaysT(TypedDict):
51 device_id: str
52 fw_type: HgiFwTypes
53 device_id_bytes: bytes
56_LOGGER = logging.getLogger(__name__)
57_LOGGER.setLevel(logging.DEBUG)
59DEFAULT_GWY_ID = bytes("18:000730", "ascii")
61MAX_NUM_PORTS = 6
64_GWY_ATTRS: dict[str, _GwyAttrsT] = {
65 HgiFwTypes.HGI_80: {
66 "manufacturer": "Texas Instruments",
67 "product": "TUSB3410 Boot Device",
68 "vid": 0x10AC, # Honeywell, Inc.
69 "pid": 0x0102, # HGI80
70 "description": "TUSB3410 Boot Device",
71 "interface": None,
72 "serial_number": "TUSB3410",
73 "subsystem": "usb",
74 #
75 "_dev_path": "/dev/ttyUSB0",
76 "_dev_by-id": "/dev/serial/by-id/usb-Texas_Instruments_TUSB3410_Boot_Device_TUSB3410-if00-port0",
77 },
78 HgiFwTypes.EVOFW3: {
79 "manufacturer": "SparkFun",
80 "product": "evofw3 atmega32u4",
81 "vid": 0x1B4F, # SparkFun Electronics
82 "pid": 0x9206, #
83 "description": "evofw3 atmega32u4",
84 "interface": None,
85 "serial_number": None,
86 "subsystem": "usb-serial",
87 #
88 "_dev_path": "/dev/ttyACM0",
89 "_dev_by-id": "/dev/serial/by-id/usb-SparkFun_evofw3_atmega32u4-if00",
90 },
91 f"{HgiFwTypes.EVOFW3}_alt": {
92 "manufacturer": "FTDI",
93 "product": "FT232R USB UART",
94 "vid": 0x0403, # FTDI
95 "pid": 0x6001, # SSM-D2
96 "description": "FT232R USB UART - FT232R USB UART",
97 "interface": "FT232R USB UART",
98 "serial_number": "A50285BI",
99 "subsystem": "usb-serial",
100 #
101 "_dev_path": "/dev/ttyUSB0",
102 "_dev_by-id": "/dev/serial/by-id/usb-FTDI_FT232R_USB_UART_A50285BI-if00-port0",
103 },
104 # . /dev/serial/by-id/usb-SHK_NANO_CUL_868-if00-port0
105 # . /dev/serial/by-id/usb-1a86_USB2.0-Serial-if00-port0
106}
109class VirtualComPortInfo:
110 """A container for emulating pyserial's PortInfo (SysFS) objects."""
112 def __init__(self, port_name: _PN, dev_type: HgiFwTypes | None) -> None:
113 """Supplies a useful subset of PortInfo attrs according to gateway type."""
115 self.device: _PN = port_name # # e.g. /dev/pts/2 (a la /dev/ttyUSB0)
116 self.name: str = port_name[5:] # e.g. pts/2 (a la ttyUSB0)
118 self._set_attrs(_GWY_ATTRS[dev_type or HgiFwTypes.EVOFW3])
120 def _set_attrs(self, gwy_attrs: _GwyAttrsT) -> None:
121 """Set the remaining USB attributes according to the gateway type."""
123 self.manufacturer: str = gwy_attrs["manufacturer"]
124 self.product: str = gwy_attrs["product"]
126 self.vid: int = gwy_attrs["vid"]
127 self.pid: int = gwy_attrs["pid"]
129 self.description: str = gwy_attrs["description"]
130 self.interface: str | None = gwy_attrs["interface"]
131 self.serial_number: str | None = gwy_attrs["serial_number"]
132 self.subsystem: str = gwy_attrs["subsystem"]
135class VirtualRfBase:
136 """A virtual many-to-many network of serial port (a la RF network).
138 Creates a collection of serial ports. When data frames are received from any one
139 port, they are sent to all the other ports.
141 The data frames are in the RAMSES_II format, terminated by `\\r\\n`.
142 """
144 def __init__(self, num_ports: int, log_size: int = 100) -> None:
145 """Create `num_ports` virtual serial ports."""
147 if os.name != "posix":
148 raise RuntimeError(f"Unsupported OS: {os.name} (requires termios)")
150 if 1 > num_ports > MAX_NUM_PORTS:
151 raise ValueError(f"Port limit exceeded: {num_ports}")
153 self._port_info_list: dict[_PN, VirtualComPortInfo] = {}
155 self._loop = asyncio.get_running_loop()
156 self._selector = DefaultSelector()
158 self._master_to_port: dict[_FD, _PN] = {} # # for polling port
159 self._port_to_master: dict[_PN, _FD] = {} # # for logging
160 self._port_to_object: dict[_PN, FileIO] = {} # for I/O (read/write)
161 self._port_to_slave_: dict[_PN, _FD] = {} # # for cleanup only
163 # self._setup_event_handlers() # TODO: needs fixing/testing
164 for idx in range(num_ports):
165 self._create_port(idx)
167 self._log: deque[tuple[_PN, str, bytes]] = deque([], log_size)
168 self._task: asyncio.Task[None] | None = None
170 self._replies: dict[str, bytes] = {}
172 def _create_port(self, port_idx: int, dev_type: HgiFwTypes | None = None) -> None:
173 """Create a port without a HGI80 attached."""
174 master_fd, slave_fd = pty.openpty() # pty, tty
176 tty.setraw(master_fd) # requires termios module, so: works only on *nix
177 os.set_blocking(master_fd, False) # make non-blocking
179 port_name = os.ttyname(slave_fd)
180 self._selector.register(master_fd, EVENT_READ)
182 self._master_to_port[master_fd] = port_name
183 self._port_to_master[port_name] = master_fd
184 self._port_to_object[port_name] = open(master_fd, "rb+", buffering=0) # noqa: SIM115
185 self._port_to_slave_[port_name] = slave_fd
187 self._set_comport_info(port_name, dev_type=dev_type)
189 def comports(
190 self, include_links: bool = False
191 ) -> list[VirtualComPortInfo]: # unsorted
192 """Use this method to monkey patch serial.tools.list_ports.comports()."""
193 return list(self._port_info_list.values())
195 def _set_comport_info(
196 self, port_name: _PN, dev_type: HgiFwTypes | None = None
197 ) -> VirtualComPortInfo:
198 """Add comport info to the list (won't fail if the entry already exists)."""
199 self._port_info_list.pop(port_name, None)
200 self._port_info_list[port_name] = VirtualComPortInfo(port_name, dev_type)
201 return self._port_info_list[port_name]
203 @property
204 def ports(self) -> list[_PN]:
205 """Return a list of the names of the serial ports."""
206 return list(self._port_to_master) # [p.name for p in self.comports]
208 async def stop(self) -> None:
209 """Stop polling ports and distributing data."""
211 if not self._task or self._task.done():
212 return
213 self._task.cancel()
214 with contextlib.suppress(asyncio.CancelledError):
215 await self._task
217 self._cleanup()
219 def _cleanup(self) -> None:
220 """Destroy file objects and file descriptors."""
222 for fp in self._port_to_object.values():
223 fp.close() # also closes corresponding master fd
224 for fd in self._port_to_slave_.values():
225 os.close(fd) # else this slave fd will persist
227 def start(self) -> asyncio.Task[None]:
228 """Start polling ports and distributing data, calls `pull_data_from_port()`."""
230 self._task = self._loop.create_task(self._poll_ports_for_data())
231 return self._task
233 async def _poll_ports_for_data(self) -> None:
234 """Send data received from any one port (as .write(data)) to all other ports."""
236 with contextlib.ExitStack() as stack:
237 for fp in self._port_to_object.values():
238 stack.enter_context(fp)
240 while True:
241 for key, _ in self._selector.select(timeout=0):
242 # if not event_mask & EVENT_READ:
243 # continue
244 self._pull_data_from_src_port(self._master_to_port[key.fileobj]) # type: ignore[index]
245 await asyncio.sleep(0)
246 else:
247 await asyncio.sleep(0.0001)
249 def _pull_data_from_src_port(self, src_port: _PN) -> None:
250 """Pull the data from the sending port and process any frames."""
252 data = self._port_to_object[src_port].read() # read the Tx'd data
253 self._log.append((src_port, "SENT", data))
255 # this assumes all .write(data) are 1+ whole frames terminated with \r\n
256 for frame in (d + b"\r\n" for d in data.split(b"\r\n") if d): # ignore b""
257 if fr := self._proc_before_tx(src_port, frame):
258 self._cast_frame_to_all_ports(src_port, fr) # is not echo only
260 def _cast_frame_to_all_ports(self, src_port: _PN, frame: bytes) -> None:
261 """Pull the frame from the source port and cast it to the RF."""
263 _LOGGER.info(f"{src_port:<11} cast: {frame!r}")
264 for dst_port in self._port_to_master:
265 self._push_frame_to_dst_port(dst_port, frame)
267 # see if there is a faked response (RP/I) for a given command (RQ/W)
268 if not (reply := self._find_reply_for_cmd(frame)):
269 return
271 _LOGGER.info(f"{src_port:<11} rply: {reply!r}")
272 for dst_port in self._port_to_master:
273 self._push_frame_to_dst_port(dst_port, reply) # is not echo only
275 def add_reply_for_cmd(self, cmd: str, reply: str) -> None:
276 """Add a reply packet for a given command frame (for a mocked device).
278 For example (note no RSSI, \\r\\n in reply pkt):
279 cmd regex: r"RQ.* 18:.* 01:.* 0006 001 00"
280 reply pkt: "RP --- 01:145038 18:013393 --:------ 0006 004 00050135",
281 """
283 self._replies[cmd] = reply.encode() + b"\r\n"
285 def _find_reply_for_cmd(self, cmd: bytes) -> bytes | None:
286 """Return a reply packet for a given command frame (for a mocked device)."""
287 for pattern, reply in self._replies.items():
288 if re.match(pattern, cmd.decode()):
289 return reply
290 return None
292 def _push_frame_to_dst_port(self, dst_port: _PN, frame: bytes) -> None:
293 """Push the frame to a single destination port."""
295 if data := self._proc_after_rx(dst_port, frame):
296 self._log.append((dst_port, "RCVD", data))
297 self._port_to_object[dst_port].write(data)
299 def _proc_after_rx(self, rcv_port: _PN, frame: bytes) -> bytes | None:
300 """Allow the device to modify the frame after receiving (e.g. adding RSSI)."""
301 return frame
303 def _proc_before_tx(self, src_port: _PN, frame: bytes) -> bytes | None:
304 """Allow the device to modify the frame before sending (e.g. changing addr0)."""
305 return frame
307 def _setup_event_handlers(self) -> None:
308 def handle_exception(
309 loop: asyncio.BaseEventLoop, context: dict[str, Any]
310 ) -> None:
311 """Handle exceptions on any platform."""
312 _LOGGER.error("Caught an exception: %s, cleaning up...", context["message"])
313 self._cleanup()
314 err = context.get("exception")
315 if err:
316 raise err
318 async def handle_sig_posix(sig: signal.Signals) -> None:
319 """Handle signals on posix platform."""
320 _LOGGER.error("Received a signal: %s, cleaning up...", sig.name)
321 self._cleanup()
322 signal.raise_signal(sig)
324 _LOGGER.debug("Creating exception handler...")
325 self._loop.set_exception_handler(handle_exception)
327 _LOGGER.debug("Creating signal handlers...")
328 if os.name == "posix": # signal.SIGKILL people?
329 for sig in (signal.SIGABRT, signal.SIGINT, signal.SIGTERM):
330 self._loop.add_signal_handler(
331 sig,
332 lambda sig=sig: self._loop.create_task(handle_sig_posix(sig)), # type: ignore[misc]
333 )
334 else: # unsupported OS
335 raise RuntimeError(f"Unsupported OS for this module: {os.name} (termios)")
338class VirtualRf(VirtualRfBase):
339 """A virtual network of serial ports, each with an optional HGI80s or compatible.
341 Frames are modified/dropped according to the expected behaviours of the gateway that
342 is transmitting (addr0) / receiving (RSSI) it.
343 """
345 def __init__(self, num_ports: int, log_size: int = 100, start: bool = True) -> None:
346 """Create a number of virtual serial ports.
348 Each port has the option of a HGI80 or evofw3-based gateway device.
349 """
351 self._gateways: dict[_PN, _GatewaysT] = {}
353 super().__init__(num_ports, log_size)
355 if start:
356 self.start()
358 @property
359 def gateways(self) -> dict[str, _PN]:
360 return {v[DEVICE_ID]: k for k, v in self._gateways.items()}
362 def set_gateway(
363 self,
364 port_name: _PN,
365 device_id: str,
366 fw_type: HgiFwTypes = HgiFwTypes.EVOFW3,
367 ) -> None:
368 """Attach a gateway with a given device_id and FW type to a port.
370 Raise an exception if the device_id is already attached to another port.
371 """
373 if port_name not in self.ports:
374 raise LookupError(f"Port does not exist: {port_name}")
376 if [v for k, v in self.gateways.items() if k != port_name and v == device_id]:
377 raise LookupError(f"Gateway exists on another port: {device_id}")
379 if fw_type not in HgiFwTypes:
380 raise LookupError(f"Unknown FW specified for gateway: {fw_type}")
382 self._gateways[port_name] = {
383 DEVICE_ID: device_id,
384 FW_TYPE: fw_type,
385 DEVICE_ID_BYTES: bytes(device_id, "ascii"),
386 }
388 self._set_comport_info(port_name, dev_type=fw_type)
390 async def dump_frames_to_rf(
391 self, pkts: list[bytes], /, timeout: float | None = None
392 ) -> None: # TODO: WIP
393 """Dump frames as if from a sending port (for mocking)."""
395 async def no_data_left_to_send() -> None:
396 """Wait until there all pending data is read."""
397 while self._selector.select(timeout=0):
398 await asyncio.sleep(0.001)
400 for data in pkts:
401 self._log.append(("/dev/mock", "SENT", data))
402 self._cast_frame_to_all_ports("/dev/mock", data) # is not echo only
404 if timeout:
405 await asyncio.wait_for(no_data_left_to_send(), timeout)
407 def _proc_after_rx(self, rcv_port: _PN, frame: bytes) -> bytes | None:
408 """Return the frame as it would have been modified by a gateway after Rx.
410 Return None if the bytes are not to be Rx by this device.
412 Both FW types will prepend an RSSI to the frame.
413 """
415 if frame[:1] != b"!":
416 return b"000 " + frame
418 # The type of Gateway will inform next steps (NOTE: is not a ramses_rf.Gateway)
419 gwy: _GatewaysT | None = self._gateways.get(rcv_port)
421 if gwy is None or gwy.get(FW_TYPE) != HgiFwTypes.EVOFW3:
422 return None
424 if frame == b"!V":
425 return b"# evofw3 0.7.1\r\n" # self._fxle_objs[port_name].write(data)
426 return None # TODO: return the ! response
428 def _proc_before_tx(self, src_port: _PN, frame: bytes) -> bytes | None:
429 """Return the frame as it would have been modified by a gateway before Tx.
431 Return None if the bytes are not to be Tx to the RF ether (e.g. to echo only).
433 Both FW types will convert addr0 (only) from 18:000730 to its actual device_id.
434 HGI80-based gateways will silently drop frames with addr0 other than 18:000730.
435 """
437 # The type of Gateway will inform next steps (NOTE: is not a ramses_rf.Gateway)
438 gwy: _GatewaysT | None = self._gateways.get(src_port)
440 # Handle trace flags (evofw3 only)
441 if frame[:1] == b"!": # never to be cast, but may be echo'd, or other response
442 if gwy is None or gwy.get(FW_TYPE) != HgiFwTypes.EVOFW3:
443 return None # do not Tx the frame
444 self._push_frame_to_dst_port(src_port, frame)
446 if gwy is None: # TODO: ?should raise: but is probably from test suite
447 return frame
449 # Real HGI80s will silently drop cmds if addr0 is not the 18:000730 sentinel
450 if gwy[FW_TYPE] == HgiFwTypes.HGI_80 and frame[7:16] != DEFAULT_GWY_ID:
451 return None
453 # Both (HGI80 & evofw3) will swap out addr0 (and only addr0)
454 if frame[7:16] == DEFAULT_GWY_ID:
455 frame = frame[:7] + gwy[DEVICE_ID_BYTES] + frame[16:]
457 return frame
460async def main() -> None:
461 """ "Demonstrate the class functionality."""
463 num_ports = 3
465 rf = VirtualRf(num_ports)
466 print(f"Ports are: {rf.ports}")
468 sers: list[Serial] = [serial_for_url(rf.ports[i]) for i in range(num_ports)] # type: ignore[no-any-unimported]
470 for i in range(num_ports):
471 sers[i].write(bytes(f"Hello World {i}! ", "utf-8"))
472 await asyncio.sleep(0.005) # give the write a chance to effect
474 print(f"{sers[i].name}: {sers[i].read(sers[i].in_waiting)}")
475 sers[i].close()
477 await rf.stop()
480if __name__ == "__main__":
481 asyncio.run(main())