Coverage for tests/tests_rf/virtual_rf/virtual_rf.py: 0%

215 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""A virtual RF network useful for testing.""" 

3 

4# NOTE: does not rely on ramses_rf library 

5 

6import asyncio 

7import contextlib 

8import logging 

9import os 

10import pty 

11import re 

12import signal 

13import tty 

14from collections import deque 

15from io import FileIO 

16from selectors import EVENT_READ, DefaultSelector 

17from typing import Any, Final, TypeAlias, TypedDict 

18 

19from serial import Serial, serial_for_url # type: ignore[import-untyped] 

20 

21from .const import HgiFwTypes 

22 

23_FD: TypeAlias = int # file descriptor 

24_PN: TypeAlias = str # port name 

25 

26# _FILEOBJ: TypeAlias = int | Any # int | HasFileno 

27 

28_GwyAttrsT = TypedDict( 

29 "_GwyAttrsT", 

30 { 

31 "manufacturer": str, 

32 "product": str, 

33 "vid": int, 

34 "pid": int, 

35 "description": str, 

36 "interface": str | None, 

37 "serial_number": str | None, 

38 "subsystem": str, 

39 "_dev_path": str, 

40 "_dev_by-id": str, 

41 }, 

42) 

43 

44 

45DEVICE_ID: Final = "device_id" 

46FW_TYPE: Final = "fw_type" 

47DEVICE_ID_BYTES: Final = "device_id_bytes" 

48 

49 

50class _GatewaysT(TypedDict): 

51 device_id: str 

52 fw_type: HgiFwTypes 

53 device_id_bytes: bytes 

54 

55 

56_LOGGER = logging.getLogger(__name__) 

57_LOGGER.setLevel(logging.DEBUG) 

58 

59DEFAULT_GWY_ID = bytes("18:000730", "ascii") 

60 

61MAX_NUM_PORTS = 6 

62 

63 

64_GWY_ATTRS: dict[str, _GwyAttrsT] = { 

65 HgiFwTypes.HGI_80: { 

66 "manufacturer": "Texas Instruments", 

67 "product": "TUSB3410 Boot Device", 

68 "vid": 0x10AC, # Honeywell, Inc. 

69 "pid": 0x0102, # HGI80 

70 "description": "TUSB3410 Boot Device", 

71 "interface": None, 

72 "serial_number": "TUSB3410", 

73 "subsystem": "usb", 

74 # 

75 "_dev_path": "/dev/ttyUSB0", 

76 "_dev_by-id": "/dev/serial/by-id/usb-Texas_Instruments_TUSB3410_Boot_Device_TUSB3410-if00-port0", 

77 }, 

78 HgiFwTypes.EVOFW3: { 

79 "manufacturer": "SparkFun", 

80 "product": "evofw3 atmega32u4", 

81 "vid": 0x1B4F, # SparkFun Electronics 

82 "pid": 0x9206, # 

83 "description": "evofw3 atmega32u4", 

84 "interface": None, 

85 "serial_number": None, 

86 "subsystem": "usb-serial", 

87 # 

88 "_dev_path": "/dev/ttyACM0", 

89 "_dev_by-id": "/dev/serial/by-id/usb-SparkFun_evofw3_atmega32u4-if00", 

90 }, 

91 f"{HgiFwTypes.EVOFW3}_alt": { 

92 "manufacturer": "FTDI", 

93 "product": "FT232R USB UART", 

94 "vid": 0x0403, # FTDI 

95 "pid": 0x6001, # SSM-D2 

96 "description": "FT232R USB UART - FT232R USB UART", 

97 "interface": "FT232R USB UART", 

98 "serial_number": "A50285BI", 

99 "subsystem": "usb-serial", 

100 # 

101 "_dev_path": "/dev/ttyUSB0", 

102 "_dev_by-id": "/dev/serial/by-id/usb-FTDI_FT232R_USB_UART_A50285BI-if00-port0", 

103 }, 

104 # . /dev/serial/by-id/usb-SHK_NANO_CUL_868-if00-port0 

105 # . /dev/serial/by-id/usb-1a86_USB2.0-Serial-if00-port0 

106} 

107 

108 

109class VirtualComPortInfo: 

110 """A container for emulating pyserial's PortInfo (SysFS) objects.""" 

111 

112 def __init__(self, port_name: _PN, dev_type: HgiFwTypes | None) -> None: 

113 """Supplies a useful subset of PortInfo attrs according to gateway type.""" 

114 

115 self.device: _PN = port_name # # e.g. /dev/pts/2 (a la /dev/ttyUSB0) 

116 self.name: str = port_name[5:] # e.g. pts/2 (a la ttyUSB0) 

117 

118 self._set_attrs(_GWY_ATTRS[dev_type or HgiFwTypes.EVOFW3]) 

119 

120 def _set_attrs(self, gwy_attrs: _GwyAttrsT) -> None: 

121 """Set the remaining USB attributes according to the gateway type.""" 

122 

123 self.manufacturer: str = gwy_attrs["manufacturer"] 

124 self.product: str = gwy_attrs["product"] 

125 

126 self.vid: int = gwy_attrs["vid"] 

127 self.pid: int = gwy_attrs["pid"] 

128 

129 self.description: str = gwy_attrs["description"] 

130 self.interface: str | None = gwy_attrs["interface"] 

131 self.serial_number: str | None = gwy_attrs["serial_number"] 

132 self.subsystem: str = gwy_attrs["subsystem"] 

133 

134 

135class VirtualRfBase: 

136 """A virtual many-to-many network of serial port (a la RF network). 

137 

138 Creates a collection of serial ports. When data frames are received from any one 

139 port, they are sent to all the other ports. 

140 

141 The data frames are in the RAMSES_II format, terminated by `\\r\\n`. 

142 """ 

143 

144 def __init__(self, num_ports: int, log_size: int = 100) -> None: 

145 """Create `num_ports` virtual serial ports.""" 

146 

147 if os.name != "posix": 

148 raise RuntimeError(f"Unsupported OS: {os.name} (requires termios)") 

149 

150 if 1 > num_ports > MAX_NUM_PORTS: 

151 raise ValueError(f"Port limit exceeded: {num_ports}") 

152 

153 self._port_info_list: dict[_PN, VirtualComPortInfo] = {} 

154 

155 self._loop = asyncio.get_running_loop() 

156 self._selector = DefaultSelector() 

157 

158 self._master_to_port: dict[_FD, _PN] = {} # # for polling port 

159 self._port_to_master: dict[_PN, _FD] = {} # # for logging 

160 self._port_to_object: dict[_PN, FileIO] = {} # for I/O (read/write) 

161 self._port_to_slave_: dict[_PN, _FD] = {} # # for cleanup only 

162 

163 # self._setup_event_handlers() # TODO: needs fixing/testing 

164 for idx in range(num_ports): 

165 self._create_port(idx) 

166 

167 self._log: deque[tuple[_PN, str, bytes]] = deque([], log_size) 

168 self._task: asyncio.Task[None] | None = None 

169 

170 self._replies: dict[str, bytes] = {} 

171 

172 def _create_port(self, port_idx: int, dev_type: HgiFwTypes | None = None) -> None: 

173 """Create a port without a HGI80 attached.""" 

174 master_fd, slave_fd = pty.openpty() # pty, tty 

175 

176 tty.setraw(master_fd) # requires termios module, so: works only on *nix 

177 os.set_blocking(master_fd, False) # make non-blocking 

178 

179 port_name = os.ttyname(slave_fd) 

180 self._selector.register(master_fd, EVENT_READ) 

181 

182 self._master_to_port[master_fd] = port_name 

183 self._port_to_master[port_name] = master_fd 

184 self._port_to_object[port_name] = open(master_fd, "rb+", buffering=0) # noqa: SIM115 

185 self._port_to_slave_[port_name] = slave_fd 

186 

187 self._set_comport_info(port_name, dev_type=dev_type) 

188 

189 def comports( 

190 self, include_links: bool = False 

191 ) -> list[VirtualComPortInfo]: # unsorted 

192 """Use this method to monkey patch serial.tools.list_ports.comports().""" 

193 return list(self._port_info_list.values()) 

194 

195 def _set_comport_info( 

196 self, port_name: _PN, dev_type: HgiFwTypes | None = None 

197 ) -> VirtualComPortInfo: 

198 """Add comport info to the list (won't fail if the entry already exists).""" 

199 self._port_info_list.pop(port_name, None) 

200 self._port_info_list[port_name] = VirtualComPortInfo(port_name, dev_type) 

201 return self._port_info_list[port_name] 

202 

203 @property 

204 def ports(self) -> list[_PN]: 

205 """Return a list of the names of the serial ports.""" 

206 return list(self._port_to_master) # [p.name for p in self.comports] 

207 

208 async def stop(self) -> None: 

209 """Stop polling ports and distributing data.""" 

210 

211 if not self._task or self._task.done(): 

212 return 

213 self._task.cancel() 

214 with contextlib.suppress(asyncio.CancelledError): 

215 await self._task 

216 

217 self._cleanup() 

218 

219 def _cleanup(self) -> None: 

220 """Destroy file objects and file descriptors.""" 

221 

222 for fp in self._port_to_object.values(): 

223 fp.close() # also closes corresponding master fd 

224 for fd in self._port_to_slave_.values(): 

225 os.close(fd) # else this slave fd will persist 

226 

227 def start(self) -> asyncio.Task[None]: 

228 """Start polling ports and distributing data, calls `pull_data_from_port()`.""" 

229 

230 self._task = self._loop.create_task(self._poll_ports_for_data()) 

231 return self._task 

232 

233 async def _poll_ports_for_data(self) -> None: 

234 """Send data received from any one port (as .write(data)) to all other ports.""" 

235 

236 with contextlib.ExitStack() as stack: 

237 for fp in self._port_to_object.values(): 

238 stack.enter_context(fp) 

239 

240 while True: 

241 for key, _ in self._selector.select(timeout=0): 

242 # if not event_mask & EVENT_READ: 

243 # continue 

244 self._pull_data_from_src_port(self._master_to_port[key.fileobj]) # type: ignore[index] 

245 await asyncio.sleep(0) 

246 else: 

247 await asyncio.sleep(0.0001) 

248 

249 def _pull_data_from_src_port(self, src_port: _PN) -> None: 

250 """Pull the data from the sending port and process any frames.""" 

251 

252 data = self._port_to_object[src_port].read() # read the Tx'd data 

253 self._log.append((src_port, "SENT", data)) 

254 

255 # this assumes all .write(data) are 1+ whole frames terminated with \r\n 

256 for frame in (d + b"\r\n" for d in data.split(b"\r\n") if d): # ignore b"" 

257 if fr := self._proc_before_tx(src_port, frame): 

258 self._cast_frame_to_all_ports(src_port, fr) # is not echo only 

259 

260 def _cast_frame_to_all_ports(self, src_port: _PN, frame: bytes) -> None: 

261 """Pull the frame from the source port and cast it to the RF.""" 

262 

263 _LOGGER.info(f"{src_port:<11} cast: {frame!r}") 

264 for dst_port in self._port_to_master: 

265 self._push_frame_to_dst_port(dst_port, frame) 

266 

267 # see if there is a faked response (RP/I) for a given command (RQ/W) 

268 if not (reply := self._find_reply_for_cmd(frame)): 

269 return 

270 

271 _LOGGER.info(f"{src_port:<11} rply: {reply!r}") 

272 for dst_port in self._port_to_master: 

273 self._push_frame_to_dst_port(dst_port, reply) # is not echo only 

274 

275 def add_reply_for_cmd(self, cmd: str, reply: str) -> None: 

276 """Add a reply packet for a given command frame (for a mocked device). 

277 

278 For example (note no RSSI, \\r\\n in reply pkt): 

279 cmd regex: r"RQ.* 18:.* 01:.* 0006 001 00" 

280 reply pkt: "RP --- 01:145038 18:013393 --:------ 0006 004 00050135", 

281 """ 

282 

283 self._replies[cmd] = reply.encode() + b"\r\n" 

284 

285 def _find_reply_for_cmd(self, cmd: bytes) -> bytes | None: 

286 """Return a reply packet for a given command frame (for a mocked device).""" 

287 for pattern, reply in self._replies.items(): 

288 if re.match(pattern, cmd.decode()): 

289 return reply 

290 return None 

291 

292 def _push_frame_to_dst_port(self, dst_port: _PN, frame: bytes) -> None: 

293 """Push the frame to a single destination port.""" 

294 

295 if data := self._proc_after_rx(dst_port, frame): 

296 self._log.append((dst_port, "RCVD", data)) 

297 self._port_to_object[dst_port].write(data) 

298 

299 def _proc_after_rx(self, rcv_port: _PN, frame: bytes) -> bytes | None: 

300 """Allow the device to modify the frame after receiving (e.g. adding RSSI).""" 

301 return frame 

302 

303 def _proc_before_tx(self, src_port: _PN, frame: bytes) -> bytes | None: 

304 """Allow the device to modify the frame before sending (e.g. changing addr0).""" 

305 return frame 

306 

307 def _setup_event_handlers(self) -> None: 

308 def handle_exception( 

309 loop: asyncio.BaseEventLoop, context: dict[str, Any] 

310 ) -> None: 

311 """Handle exceptions on any platform.""" 

312 _LOGGER.error("Caught an exception: %s, cleaning up...", context["message"]) 

313 self._cleanup() 

314 err = context.get("exception") 

315 if err: 

316 raise err 

317 

318 async def handle_sig_posix(sig: signal.Signals) -> None: 

319 """Handle signals on posix platform.""" 

320 _LOGGER.error("Received a signal: %s, cleaning up...", sig.name) 

321 self._cleanup() 

322 signal.raise_signal(sig) 

323 

324 _LOGGER.debug("Creating exception handler...") 

325 self._loop.set_exception_handler(handle_exception) 

326 

327 _LOGGER.debug("Creating signal handlers...") 

328 if os.name == "posix": # signal.SIGKILL people? 

329 for sig in (signal.SIGABRT, signal.SIGINT, signal.SIGTERM): 

330 self._loop.add_signal_handler( 

331 sig, 

332 lambda sig=sig: self._loop.create_task(handle_sig_posix(sig)), # type: ignore[misc] 

333 ) 

334 else: # unsupported OS 

335 raise RuntimeError(f"Unsupported OS for this module: {os.name} (termios)") 

336 

337 

338class VirtualRf(VirtualRfBase): 

339 """A virtual network of serial ports, each with an optional HGI80s or compatible. 

340 

341 Frames are modified/dropped according to the expected behaviours of the gateway that 

342 is transmitting (addr0) / receiving (RSSI) it. 

343 """ 

344 

345 def __init__(self, num_ports: int, log_size: int = 100, start: bool = True) -> None: 

346 """Create a number of virtual serial ports. 

347 

348 Each port has the option of a HGI80 or evofw3-based gateway device. 

349 """ 

350 

351 self._gateways: dict[_PN, _GatewaysT] = {} 

352 

353 super().__init__(num_ports, log_size) 

354 

355 if start: 

356 self.start() 

357 

358 @property 

359 def gateways(self) -> dict[str, _PN]: 

360 return {v[DEVICE_ID]: k for k, v in self._gateways.items()} 

361 

362 def set_gateway( 

363 self, 

364 port_name: _PN, 

365 device_id: str, 

366 fw_type: HgiFwTypes = HgiFwTypes.EVOFW3, 

367 ) -> None: 

368 """Attach a gateway with a given device_id and FW type to a port. 

369 

370 Raise an exception if the device_id is already attached to another port. 

371 """ 

372 

373 if port_name not in self.ports: 

374 raise LookupError(f"Port does not exist: {port_name}") 

375 

376 if [v for k, v in self.gateways.items() if k != port_name and v == device_id]: 

377 raise LookupError(f"Gateway exists on another port: {device_id}") 

378 

379 if fw_type not in HgiFwTypes: 

380 raise LookupError(f"Unknown FW specified for gateway: {fw_type}") 

381 

382 self._gateways[port_name] = { 

383 DEVICE_ID: device_id, 

384 FW_TYPE: fw_type, 

385 DEVICE_ID_BYTES: bytes(device_id, "ascii"), 

386 } 

387 

388 self._set_comport_info(port_name, dev_type=fw_type) 

389 

390 async def dump_frames_to_rf( 

391 self, pkts: list[bytes], /, timeout: float | None = None 

392 ) -> None: # TODO: WIP 

393 """Dump frames as if from a sending port (for mocking).""" 

394 

395 async def no_data_left_to_send() -> None: 

396 """Wait until there all pending data is read.""" 

397 while self._selector.select(timeout=0): 

398 await asyncio.sleep(0.001) 

399 

400 for data in pkts: 

401 self._log.append(("/dev/mock", "SENT", data)) 

402 self._cast_frame_to_all_ports("/dev/mock", data) # is not echo only 

403 

404 if timeout: 

405 await asyncio.wait_for(no_data_left_to_send(), timeout) 

406 

407 def _proc_after_rx(self, rcv_port: _PN, frame: bytes) -> bytes | None: 

408 """Return the frame as it would have been modified by a gateway after Rx. 

409 

410 Return None if the bytes are not to be Rx by this device. 

411 

412 Both FW types will prepend an RSSI to the frame. 

413 """ 

414 

415 if frame[:1] != b"!": 

416 return b"000 " + frame 

417 

418 # The type of Gateway will inform next steps (NOTE: is not a ramses_rf.Gateway) 

419 gwy: _GatewaysT | None = self._gateways.get(rcv_port) 

420 

421 if gwy is None or gwy.get(FW_TYPE) != HgiFwTypes.EVOFW3: 

422 return None 

423 

424 if frame == b"!V": 

425 return b"# evofw3 0.7.1\r\n" # self._fxle_objs[port_name].write(data) 

426 return None # TODO: return the ! response 

427 

428 def _proc_before_tx(self, src_port: _PN, frame: bytes) -> bytes | None: 

429 """Return the frame as it would have been modified by a gateway before Tx. 

430 

431 Return None if the bytes are not to be Tx to the RF ether (e.g. to echo only). 

432 

433 Both FW types will convert addr0 (only) from 18:000730 to its actual device_id. 

434 HGI80-based gateways will silently drop frames with addr0 other than 18:000730. 

435 """ 

436 

437 # The type of Gateway will inform next steps (NOTE: is not a ramses_rf.Gateway) 

438 gwy: _GatewaysT | None = self._gateways.get(src_port) 

439 

440 # Handle trace flags (evofw3 only) 

441 if frame[:1] == b"!": # never to be cast, but may be echo'd, or other response 

442 if gwy is None or gwy.get(FW_TYPE) != HgiFwTypes.EVOFW3: 

443 return None # do not Tx the frame 

444 self._push_frame_to_dst_port(src_port, frame) 

445 

446 if gwy is None: # TODO: ?should raise: but is probably from test suite 

447 return frame 

448 

449 # Real HGI80s will silently drop cmds if addr0 is not the 18:000730 sentinel 

450 if gwy[FW_TYPE] == HgiFwTypes.HGI_80 and frame[7:16] != DEFAULT_GWY_ID: 

451 return None 

452 

453 # Both (HGI80 & evofw3) will swap out addr0 (and only addr0) 

454 if frame[7:16] == DEFAULT_GWY_ID: 

455 frame = frame[:7] + gwy[DEVICE_ID_BYTES] + frame[16:] 

456 

457 return frame 

458 

459 

460async def main() -> None: 

461 """ "Demonstrate the class functionality.""" 

462 

463 num_ports = 3 

464 

465 rf = VirtualRf(num_ports) 

466 print(f"Ports are: {rf.ports}") 

467 

468 sers: list[Serial] = [serial_for_url(rf.ports[i]) for i in range(num_ports)] # type: ignore[no-any-unimported] 

469 

470 for i in range(num_ports): 

471 sers[i].write(bytes(f"Hello World {i}! ", "utf-8")) 

472 await asyncio.sleep(0.005) # give the write a chance to effect 

473 

474 print(f"{sers[i].name}: {sers[i].read(sers[i].in_waiting)}") 

475 sers[i].close() 

476 

477 await rf.stop() 

478 

479 

480if __name__ == "__main__": 

481 asyncio.run(main())