Coverage for src/ramses_tx/transport.py: 23%

793 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - RAMSES-II compatible packet transport. 

3 

4Operates at the pkt layer of: app - msg - pkt - h/w 

5 

6For ser2net, use the following YAML with: ``ser2net -c misc/ser2net.yaml`` 

7 

8.. code-block:: 

9 

10 connection: &con00 

11 accepter: telnet(rfc2217),tcp,5001 

12 timeout: 0 

13 connector: serialdev,/dev/ttyUSB0,115200n81,local 

14 options: 

15 max-connections: 3 

16 

17For ``socat``, see: 

18 

19.. code-block:: 

20 

21 socat -dd pty,raw,echo=0 pty,raw,echo=0 

22 python client.py monitor /dev/pts/0 

23 cat packet.log | cut -d ' ' -f 2- | unix2dos > /dev/pts/1 

24 

25For re-flashing evofw3 via Arduino IDE on *my* atmega328p (YMMV): 

26 

27 - Board: atmega328p (SW UART) 

28 - Bootloader: Old Bootloader 

29 - Processor: atmega328p (5V, 16 MHz) 

30 - Host: 57600 (or 115200, YMMV) 

31 - Pinout: Nano 

32 

33For re-flashing evofw3 via Arduino IDE on *my* atmega32u4 (YMMV): 

34 

35 - Board: atmega32u4 (HW UART) 

36 - Processor: atmega32u4 (5V, 16 MHz) 

37 - Pinout: Pro Micro 

38""" 

39 

40from __future__ import annotations 

41 

42import asyncio 

43import contextlib 

44import fileinput 

45import functools 

46import glob 

47import json 

48import logging 

49import os 

50import re 

51import sys 

52from collections import deque 

53from collections.abc import Awaitable, Callable, Iterable 

54from datetime import datetime as dt, timedelta as td 

55from functools import partial, wraps 

56from io import TextIOWrapper 

57from string import printable 

58from time import perf_counter 

59from typing import TYPE_CHECKING, Any, Final, TypeAlias 

60from urllib.parse import parse_qs, unquote, urlparse 

61 

62from paho.mqtt import MQTTException, client as mqtt 

63 

64try: 

65 from paho.mqtt.enums import CallbackAPIVersion 

66except ImportError: 

67 # Fallback for Paho MQTT < 2.0.0 (Home Assistant compatibility) 

68 CallbackAPIVersion = None # type: ignore[assignment, misc] 

69from serial import ( # type: ignore[import-untyped] 

70 Serial, 

71 SerialException, 

72 serial_for_url, 

73) 

74 

75from . import exceptions as exc 

76from .command import Command 

77from .const import ( 

78 DUTY_CYCLE_DURATION, 

79 MAX_DUTY_CYCLE_RATE, 

80 MAX_TRANSMIT_RATE_TOKENS, 

81 MIN_INTER_WRITE_GAP, 

82 SZ_ACTIVE_HGI, 

83 SZ_IS_EVOFW3, 

84 SZ_SIGNATURE, 

85) 

86from .helpers import dt_now 

87from .packet import Packet 

88from .schemas import ( 

89 SCH_SERIAL_PORT_CONFIG, 

90 SZ_EVOFW_FLAG, 

91 SZ_INBOUND, 

92 SZ_OUTBOUND, 

93 DeviceIdT, 

94 PortConfigT, 

95) 

96from .typing import ExceptionT, SerPortNameT 

97 

98from .const import ( # noqa: F401, isort: skip, pylint: disable=unused-import 

99 I_, 

100 RP, 

101 RQ, 

102 W_, 

103 Code, 

104) 

105 

106if TYPE_CHECKING: 

107 from .protocol import RamsesProtocolT 

108 

109 

110_DEFAULT_TIMEOUT_PORT: Final[float] = 3 

111_DEFAULT_TIMEOUT_MQTT: Final[float] = 60 # Updated from 9s to 60s for robustness 

112 

113_SIGNATURE_GAP_SECS = 0.05 

114_SIGNATURE_MAX_TRYS = 40 # was: 24 

115_SIGNATURE_MAX_SECS = 3 

116 

117SZ_RAMSES_GATEWAY: Final = "RAMSES/GATEWAY" 

118SZ_READER_TASK: Final = "reader_task" 

119 

120 

121# 

122# NOTE: All debug flags should be False for deployment to end-users 

123_DBG_DISABLE_DUTY_CYCLE_LIMIT: Final[bool] = False 

124_DBG_DISABLE_REGEX_WARNINGS: Final[bool] = False 

125_DBG_FORCE_FRAME_LOGGING: Final[bool] = False 

126 

127_LOGGER = logging.getLogger(__name__) 

128 

129 

130try: 

131 import serial_asyncio_fast as serial_asyncio # type: ignore[import-not-found, import-untyped, unused-ignore] 

132 

133 _LOGGER.debug("Using pyserial-asyncio-fast in place of pyserial-asyncio") 

134except ImportError: 

135 import serial_asyncio # type: ignore[import-not-found, import-untyped, unused-ignore, no-redef] 

136 

137 

138# For linux, use a modified version of comports() to include /dev/serial/by-id/* links 

139if os.name == "nt": # sys.platform == 'win32': 

140 from serial.tools.list_ports_windows import comports # type: ignore[import-untyped] 

141 

142elif os.name != "posix": # is unsupported 

143 raise ImportError( 

144 f"Sorry: no implementation for your platform ('{os.name}') available" 

145 ) 

146 

147elif sys.platform.lower()[:5] != "linux": # e.g. osx 

148 from serial.tools.list_ports_posix import comports # type: ignore[import-untyped] 

149 

150else: # is linux 

151 # - see: https://github.com/pyserial/pyserial/pull/700 

152 # - see: https://github.com/pyserial/pyserial/pull/709 

153 

154 from serial.tools.list_ports_linux import SysFS # type: ignore[import-untyped] 

155 

156 def list_links(devices: set[str]) -> list[str]: 

157 """Search for symlinks to ports already listed in devices. 

158 

159 :param devices: A set of real device paths. 

160 :type devices: set[str] 

161 :return: A list of symlinks pointing to the devices. 

162 :rtype: list[str] 

163 """ 

164 

165 links: list[str] = [] 

166 for device in glob.glob("/dev/*") + glob.glob("/dev/serial/by-id/*"): 

167 if os.path.islink(device) and os.path.realpath(device) in devices: 

168 links.append(device) 

169 return links 

170 

171 def comports( # type: ignore[no-any-unimported] 

172 include_links: bool = False, _hide_subsystems: list[str] | None = None 

173 ) -> list[SysFS]: 

174 """Return a list of Serial objects for all known serial ports. 

175 

176 :param include_links: Whether to include symlinks in the results, defaults to False. 

177 :type include_links: bool, optional 

178 :param _hide_subsystems: List of subsystems to hide, defaults to None. 

179 :type _hide_subsystems: list[str] | None, optional 

180 :return: A list of SysFS objects representing the ports. 

181 :rtype: list[SysFS] 

182 """ 

183 

184 if _hide_subsystems is None: 

185 _hide_subsystems = ["platform"] 

186 

187 devices = set() 

188 with open("/proc/tty/drivers") as file: 

189 drivers = file.readlines() 

190 for driver in drivers: 

191 items = driver.strip().split() 

192 if items[4] == "serial": 

193 devices.update(glob.glob(items[1] + "*")) 

194 

195 if include_links: 

196 devices.update(list_links(devices)) 

197 

198 result: list[SysFS] = [ # type: ignore[no-any-unimported] 

199 d for d in map(SysFS, devices) if d.subsystem not in _hide_subsystems 

200 ] 

201 return result 

202 

203 

204async def is_hgi80(serial_port: SerPortNameT) -> bool | None: 

205 """Return True if the device attached to the port has the attributes of a Honeywell HGI80. 

206 

207 Return False if it appears to be an evofw3-compatible device (ATMega etc). 

208 Return None if the type cannot be determined. 

209 

210 :param serial_port: The serial port path or URL. 

211 :type serial_port: SerPortNameT 

212 :return: True if HGI80, False if not (likely evofw3), None if undetermined. 

213 :rtype: bool | None 

214 :raises exc.TransportSerialError: If the serial port cannot be found. 

215 """ 

216 

217 if serial_port[:7] == "mqtt://": 

218 return False # ramses_esp 

219 

220 # TODO: add tests for different serial ports, incl./excl/ by-id 

221 

222 # See: https://github.com/pyserial/pyserial-asyncio/issues/46 

223 if "://" in serial_port: # e.g. "rfc2217://localhost:5001" 

224 try: 

225 serial_for_url(serial_port, do_not_open=True) 

226 except (SerialException, ValueError) as err: 

227 raise exc.TransportSerialError( 

228 f"Unable to find {serial_port}: {err}" 

229 ) from err 

230 return None 

231 

232 if not os.path.exists(serial_port): 

233 raise exc.TransportSerialError(f"Unable to find {serial_port}") 

234 

235 # first, try the easy win... 

236 if "by-id" not in serial_port: 

237 pass 

238 elif "TUSB3410" in serial_port: 

239 return True 

240 elif "evofw3" in serial_port or "FT232R" in serial_port or "NANO" in serial_port: 

241 return False 

242 

243 # otherwise, we can look at device attrs via comports()... 

244 try: 

245 loop = asyncio.get_running_loop() 

246 komports = await loop.run_in_executor( 

247 None, partial(comports, include_links=True) 

248 ) 

249 except ImportError as err: 

250 raise exc.TransportSerialError(f"Unable to find {serial_port}: {err}") from err 

251 

252 # TODO: remove get(): not monkeypatching comports() correctly for /dev/pts/... 

253 vid = {x.device: x.vid for x in komports}.get(serial_port) 

254 

255 # this works, but we may not have all valid VIDs 

256 if not vid: 

257 pass 

258 elif vid == 0x10AC: # Honeywell 

259 return True 

260 elif vid in (0x0403, 0x1B4F): # FTDI, SparkFun 

261 return False 

262 

263 # TODO: remove get(): not monkeypatching comports() correctly for /dev/pts/... 

264 product = {x.device: getattr(x, "product", None) for x in komports}.get(serial_port) 

265 

266 if not product: # is None - VM, or not member of plugdev group? 

267 pass 

268 elif "TUSB3410" in product: # ?needed 

269 return True 

270 elif "evofw3" in product or "FT232R" in product or "NANO" in product: 

271 return False 

272 

273 # could try sending an "!V", expect "# evofw3 0.7.1", but that needs I/O 

274 

275 _LOGGER.warning( 

276 f"{serial_port}: the gateway type is not determinable, will assume evofw3" 

277 + ( 

278 ", TIP: specify the serial port by-id (i.e. /dev/serial/by-id/usb-...)" 

279 if "by-id" not in serial_port 

280 else "" 

281 ) 

282 ) 

283 return None 

284 

285 

286def _normalise(pkt_line: str) -> str: 

287 """Perform any (transparent) frame-level hacks, as required at (near-)RF layer. 

288 

289 Goals: 

290 - ensure an evofw3 provides the same output as a HGI80 (none, presently) 

291 - handle 'strange' packets (e.g. ``I|08:|0008``) 

292 

293 :param pkt_line: The raw packet string from the hardware. 

294 :type pkt_line: str 

295 :return: The normalized packet string. 

296 :rtype: str 

297 """ 

298 

299 # TODO: deprecate as only for ramses_esp <0.4.0 

300 # ramses_esp-specific bugs, see: https://github.com/IndaloTech/ramses_esp/issues/1 

301 pkt_line = re.sub("\r\r", "\r", pkt_line) 

302 if pkt_line[:4] == " 000": 

303 pkt_line = pkt_line[1:] 

304 elif pkt_line[:2] in (I_, RQ, RP, W_): 

305 pkt_line = "" 

306 

307 # pseudo-RAMSES-II packets (encrypted payload?)... 

308 if pkt_line[10:14] in (" 08:", " 31:") and pkt_line[-16:] == "* Checksum error": 

309 pkt_line = pkt_line[:-17] + " # Checksum error (ignored)" 

310 

311 # remove any "/r/n" (leading whitespeace is a problem for commands, but not packets) 

312 return pkt_line.strip() 

313 

314 

315def _str(value: bytes) -> str: 

316 """Decode bytes to a string, ignoring non-printable characters. 

317 

318 :param value: The bytes to decode. 

319 :type value: bytes 

320 :return: The decoded string. 

321 :rtype: str 

322 """ 

323 

324 try: 

325 result = "".join( 

326 c for c in value.decode("ascii", errors="strict") if c in printable 

327 ) 

328 except UnicodeDecodeError: 

329 _LOGGER.warning("%s < Can't decode bytestream (ignoring)", value) 

330 return "" 

331 return result 

332 

333 

334def limit_duty_cycle( 

335 max_duty_cycle: float, time_window: int = DUTY_CYCLE_DURATION 

336) -> Callable[..., Any]: 

337 """Limit the Tx rate to the RF duty cycle regulations (e.g. 1% per hour). 

338 

339 :param max_duty_cycle: Bandwidth available per observation window (percentage as 0.0-1.0). 

340 :type max_duty_cycle: float 

341 :param time_window: Duration of the sliding observation window in seconds, defaults to 60. 

342 :type time_window: int 

343 :return: A decorator that enforces the duty cycle limit. 

344 :rtype: Callable[..., Any] 

345 """ 

346 

347 TX_RATE_AVAIL: int = 38400 # bits per second (deemed) 

348 FILL_RATE: float = TX_RATE_AVAIL * max_duty_cycle # bits per second 

349 BUCKET_CAPACITY: float = FILL_RATE * time_window 

350 

351 def decorator( 

352 fnc: Callable[..., Awaitable[None]], 

353 ) -> Callable[..., Awaitable[None]]: 

354 # start with a full bit bucket 

355 bits_in_bucket: float = BUCKET_CAPACITY 

356 last_time_bit_added = perf_counter() 

357 

358 @wraps(fnc) 

359 async def wrapper( 

360 self: PortTransport, frame: str, *args: Any, **kwargs: Any 

361 ) -> None: 

362 nonlocal bits_in_bucket 

363 nonlocal last_time_bit_added 

364 

365 rf_frame_size = 330 + len(frame[46:]) * 10 

366 

367 # top-up the bit bucket 

368 elapsed_time = perf_counter() - last_time_bit_added 

369 bits_in_bucket = min( 

370 bits_in_bucket + elapsed_time * FILL_RATE, BUCKET_CAPACITY 

371 ) 

372 last_time_bit_added = perf_counter() 

373 

374 if _DBG_DISABLE_DUTY_CYCLE_LIMIT: 

375 bits_in_bucket = BUCKET_CAPACITY 

376 

377 # if required, wait for the bit bucket to refill (not for SETs/PUTs) 

378 if bits_in_bucket < rf_frame_size: 

379 await asyncio.sleep((rf_frame_size - bits_in_bucket) / FILL_RATE) 

380 

381 # consume the bits from the bit bucket 

382 try: 

383 await fnc(self, frame, *args, **kwargs) 

384 finally: 

385 bits_in_bucket -= rf_frame_size 

386 

387 @wraps(fnc) 

388 async def null_wrapper( 

389 self: PortTransport, frame: str, *args: Any, **kwargs: Any 

390 ) -> None: 

391 await fnc(self, frame, *args, **kwargs) 

392 

393 if 0 < max_duty_cycle <= 1: 

394 return wrapper 

395 

396 return null_wrapper 

397 

398 return decorator 

399 

400 

401# used by @track_transmit_rate, current_transmit_rate() 

402_MAX_TRACKED_TRANSMITS = 99 

403_MAX_TRACKED_DURATION = 300 

404 

405 

406# used by @track_system_syncs, @avoid_system_syncs 

407_MAX_TRACKED_SYNCS = 3 

408_global_sync_cycles: deque[Packet] = deque(maxlen=_MAX_TRACKED_SYNCS) 

409 

410 

411# TODO: doesn't look right at all... 

412def avoid_system_syncs(fnc: Callable[..., Awaitable[None]]) -> Callable[..., Any]: 

413 """Take measures to avoid Tx when any controller is doing a sync cycle. 

414 

415 :param fnc: The async function to decorate. 

416 :type fnc: Callable[..., Awaitable[None]] 

417 :return: The decorated function. 

418 :rtype: Callable[..., Any] 

419 """ 

420 

421 DURATION_PKT_GAP = 0.020 # 0.0200 for evohome, or 0.0127 for DTS92 

422 DURATION_LONG_PKT = 0.022 # time to tx I|2309|048 (or 30C9, or 000A) 

423 DURATION_SYNC_PKT = 0.010 # time to tx I|1F09|003 

424 

425 SYNC_WAIT_LONG = (DURATION_PKT_GAP + DURATION_LONG_PKT) * 2 

426 SYNC_WAIT_SHORT = DURATION_SYNC_PKT 

427 SYNC_WINDOW_LOWER = td(seconds=SYNC_WAIT_SHORT * 0.8) # could be * 0 

428 SYNC_WINDOW_UPPER = SYNC_WINDOW_LOWER + td(seconds=SYNC_WAIT_LONG * 1.2) # 

429 

430 @wraps(fnc) 

431 async def wrapper(*args: Any, **kwargs: Any) -> None: 

432 global _global_sync_cycles 

433 

434 def is_imminent(p: Packet) -> bool: 

435 """Return True if a sync cycle is imminent.""" 

436 return bool( 

437 SYNC_WINDOW_LOWER 

438 < (p.dtm + td(seconds=int(p.payload[2:6], 16) / 10) - dt_now()) 

439 < SYNC_WINDOW_UPPER 

440 ) 

441 

442 start = perf_counter() # TODO: remove 

443 

444 # wait for the start of the sync cycle (I|1F09|003, Tx time ~0.009) 

445 while any(is_imminent(p) for p in _global_sync_cycles): 

446 await asyncio.sleep(SYNC_WAIT_SHORT) 

447 

448 # wait for the remainder of sync cycle (I|2309/30C9) to complete 

449 if perf_counter() - start > SYNC_WAIT_SHORT: 

450 await asyncio.sleep(SYNC_WAIT_LONG) 

451 

452 await fnc(*args, **kwargs) 

453 return None 

454 

455 return wrapper 

456 

457 

458def track_system_syncs(fnc: Callable[..., None]) -> Callable[..., Any]: 

459 """Track/remember any new/outstanding TCS sync cycle. 

460 

461 :param fnc: The function to decorate (usually a packet reader). 

462 :type fnc: Callable[..., None] 

463 :return: The decorated function. 

464 :rtype: Callable[..., Any] 

465 """ 

466 

467 @wraps(fnc) 

468 def wrapper(self: PortTransport, pkt: Packet) -> None: 

469 global _global_sync_cycles 

470 

471 def is_pending(p: Packet) -> bool: 

472 """Return True if a sync cycle is still pending (ignores drift).""" 

473 return bool(p.dtm + td(seconds=int(p.payload[2:6], 16) / 10) > dt_now()) 

474 

475 if pkt.code != Code._1F09 or pkt.verb != I_ or pkt._len != 3: 

476 fnc(self, pkt) 

477 return None 

478 

479 _global_sync_cycles = deque( 

480 p for p in _global_sync_cycles if p.src != pkt.src and is_pending(p) 

481 ) 

482 _global_sync_cycles.append(pkt) # TODO: sort 

483 

484 if ( 

485 len(_global_sync_cycles) > _MAX_TRACKED_SYNCS 

486 ): # safety net for corrupted payloads 

487 _global_sync_cycles.popleft() 

488 

489 fnc(self, pkt) 

490 

491 return wrapper 

492 

493 

494# ### Abstractors ##################################################################### 

495# ### Do the bare minimum to abstract each transport from its underlying class 

496 

497 

498class _CallbackTransportAbstractor: 

499 """Do the bare minimum to abstract a transport from its underlying class.""" 

500 

501 def __init__( 

502 self, loop: asyncio.AbstractEventLoop | None = None, **kwargs: Any 

503 ) -> None: 

504 """Initialize the callback transport abstractor. 

505 

506 :param loop: The asyncio event loop, defaults to None. 

507 :type loop: asyncio.AbstractEventLoop | None, optional 

508 """ 

509 self._loop = loop or asyncio.get_event_loop() 

510 # Consume 'kwargs' here. Do NOT pass them to object.__init__(). 

511 super().__init__() 

512 

513 

514class _BaseTransport: 

515 """Base class for all transports.""" 

516 

517 def __init__(self, *args: Any, **kwargs: Any) -> None: 

518 super().__init__(*args, **kwargs) 

519 

520 

521class _FileTransportAbstractor: 

522 """Do the bare minimum to abstract a transport from its underlying class.""" 

523 

524 def __init__( 

525 self, 

526 pkt_source: dict[str, str] | str | TextIOWrapper, 

527 protocol: RamsesProtocolT, 

528 loop: asyncio.AbstractEventLoop | None = None, 

529 ) -> None: 

530 """Initialize the file transport abstractor. 

531 

532 :param pkt_source: The source of packets (file path, file object, or dict). 

533 :type pkt_source: dict[str, str] | str | TextIOWrapper 

534 :param protocol: The protocol instance. 

535 :type protocol: RamsesProtocolT 

536 :param loop: The asyncio event loop, defaults to None. 

537 :type loop: asyncio.AbstractEventLoop | None, optional 

538 """ 

539 # per().__init__(extra=extra) # done in _BaseTransport 

540 

541 self._pkt_source = pkt_source 

542 

543 self._protocol = protocol 

544 self._loop = loop or asyncio.get_event_loop() 

545 

546 

547class _PortTransportAbstractor(serial_asyncio.SerialTransport): 

548 """Do the bare minimum to abstract a transport from its underlying class.""" 

549 

550 serial: Serial # type: ignore[no-any-unimported] 

551 

552 def __init__( # type: ignore[no-any-unimported] 

553 self, 

554 serial_instance: Serial, 

555 protocol: RamsesProtocolT, 

556 loop: asyncio.AbstractEventLoop | None = None, 

557 ) -> None: 

558 """Initialize the port transport abstractor. 

559 

560 :param serial_instance: The serial object instance. 

561 :type serial_instance: Serial 

562 :param protocol: The protocol instance. 

563 :type protocol: RamsesProtocolT 

564 :param loop: The asyncio event loop, defaults to None. 

565 :type loop: asyncio.AbstractEventLoop | None, optional 

566 """ 

567 

568 super().__init__(loop or asyncio.get_event_loop(), protocol, serial_instance) 

569 

570 # lf._serial = serial_instance # ._serial, not .serial 

571 

572 # lf._protocol = protocol 

573 # lf._loop = loop or asyncio.get_event_loop() 

574 

575 

576class _MqttTransportAbstractor: 

577 """Do the bare minimum to abstract a transport from its underlying class.""" 

578 

579 def __init__( 

580 self, 

581 broker_url: str, 

582 protocol: RamsesProtocolT, 

583 loop: asyncio.AbstractEventLoop | None = None, 

584 ) -> None: 

585 """Initialize the MQTT transport abstractor. 

586 

587 :param broker_url: The URL of the MQTT broker. 

588 :type broker_url: str 

589 :param protocol: The protocol instance. 

590 :type protocol: RamsesProtocolT 

591 :param loop: The asyncio event loop, defaults to None. 

592 :type loop: asyncio.AbstractEventLoop | None, optional 

593 """ 

594 # per().__init__(extra=extra) # done in _BaseTransport 

595 

596 self._broker_url = urlparse(broker_url) 

597 

598 self._protocol = protocol 

599 self._loop = loop or asyncio.get_event_loop() 

600 

601 

602# ### Base classes (common to all Transports) ######################################### 

603# ### Code shared by all R/O, R/W transport types (File/dict, Serial, MQTT) 

604 

605 

606class _ReadTransport(_BaseTransport): 

607 """Interface for read-only transports.""" 

608 

609 _protocol: RamsesProtocolT = None # type: ignore[assignment] 

610 _loop: asyncio.AbstractEventLoop 

611 

612 _is_hgi80: bool | None = None # NOTE: None (unknown) is as False (is_evofw3) 

613 

614 # __slots__ = ('_extra',) 

615 

616 def __init__( 

617 self, *args: Any, extra: dict[str, Any] | None = None, **kwargs: Any 

618 ) -> None: 

619 """Initialize the read-only transport. 

620 

621 :param extra: Extra info dict, defaults to None. 

622 :type extra: dict[str, Any] | None, optional 

623 """ 

624 super().__init__(*args, loop=kwargs.pop("loop", None)) 

625 

626 self._extra: dict[str, Any] = {} if extra is None else extra 

627 

628 self._evofw_flag = kwargs.pop(SZ_EVOFW_FLAG, None) # gwy.config.evofw_flag 

629 # kwargs.pop("comms_params", None) # FiXME: remove this 

630 

631 self._closing: bool = False 

632 self._reading: bool = False 

633 

634 self._this_pkt: Packet | None = None 

635 self._prev_pkt: Packet | None = None 

636 

637 for key in (SZ_ACTIVE_HGI, SZ_SIGNATURE): 

638 self._extra.setdefault(key, None) 

639 

640 def __repr__(self) -> str: 

641 return f"{self.__class__.__name__}({self._protocol})" 

642 

643 def _dt_now(self) -> dt: 

644 """Return a precise datetime, using last packet's dtm field. 

645 

646 :return: The timestamp of the current packet or a default. 

647 :rtype: dt 

648 """ 

649 

650 try: 

651 return self._this_pkt.dtm # type: ignore[union-attr] 

652 except AttributeError: 

653 return dt(1970, 1, 1, 1, 0) 

654 

655 @property 

656 def loop(self) -> asyncio.AbstractEventLoop: 

657 """The asyncio event loop as declared by SerialTransport. 

658 

659 :return: The event loop. 

660 :rtype: asyncio.AbstractEventLoop 

661 """ 

662 return self._loop 

663 

664 def get_extra_info(self, name: str, default: Any = None) -> Any: 

665 """Get extra information about the transport. 

666 

667 :param name: The name of the information to retrieve. 

668 :type name: str 

669 :param default: Default value if name is not found, defaults to None. 

670 :type default: Any, optional 

671 :return: The value associated with name. 

672 :rtype: Any 

673 """ 

674 if name == SZ_IS_EVOFW3: 

675 return not self._is_hgi80 

676 return self._extra.get(name, default) 

677 

678 def is_closing(self) -> bool: 

679 """Return True if the transport is closing or has closed. 

680 

681 :return: Closing state. 

682 :rtype: bool 

683 """ 

684 return self._closing 

685 

686 def _close(self, exc: exc.RamsesException | None = None) -> None: 

687 """Inform the protocol that this transport has closed. 

688 

689 :param exc: The exception that caused the closure, if any. 

690 :type exc: exc.RamsesException | None, optional 

691 """ 

692 

693 if self._closing: 

694 return 

695 self._closing = True 

696 

697 self.loop.call_soon_threadsafe( 

698 functools.partial(self._protocol.connection_lost, exc) # type: ignore[arg-type] 

699 ) 

700 

701 def close(self) -> None: 

702 """Close the transport gracefully.""" 

703 self._close() 

704 

705 def is_reading(self) -> bool: 

706 """Return True if the transport is receiving. 

707 

708 :return: Reading state. 

709 :rtype: bool 

710 """ 

711 return self._reading 

712 

713 def pause_reading(self) -> None: 

714 """Pause the receiving end (no data to protocol.pkt_received()).""" 

715 self._reading = False 

716 

717 def resume_reading(self) -> None: 

718 """Resume the receiving end.""" 

719 self._reading = True 

720 

721 def _make_connection(self, gwy_id: DeviceIdT | None) -> None: 

722 """Register the connection with the protocol. 

723 

724 :param gwy_id: The ID of the gateway device, if known. 

725 :type gwy_id: DeviceIdT | None 

726 """ 

727 self._extra[SZ_ACTIVE_HGI] = gwy_id # or HGI_DEV_ADDR.id 

728 

729 self.loop.call_soon_threadsafe( # shouldn't call this until we have HGI-ID 

730 functools.partial(self._protocol.connection_made, self, ramses=True) # type: ignore[arg-type] 

731 ) 

732 

733 # NOTE: all transport should call this method when they receive data 

734 def _frame_read(self, dtm_str: str, frame: str) -> None: 

735 """Make a Packet from the Frame and process it (called by each specific Tx). 

736 

737 :param dtm_str: The timestamp string of the frame. 

738 :type dtm_str: str 

739 :param frame: The raw frame string. 

740 :type frame: str 

741 """ 

742 

743 if not frame.strip(): 

744 return 

745 

746 try: 

747 pkt = Packet.from_file(dtm_str, frame) # is OK for when src is dict 

748 

749 except ValueError as err: # VE from dt.fromisoformat() or falsey packet 

750 _LOGGER.debug("%s < PacketInvalid(%s)", frame, err) 

751 return 

752 

753 except exc.PacketInvalid as err: # VE from dt.fromisoformat() 

754 _LOGGER.warning("%s < PacketInvalid(%s)", frame, err) 

755 return 

756 

757 self._pkt_read(pkt) 

758 

759 # NOTE: all protocol callbacks should be invoked from here 

760 def _pkt_read(self, pkt: Packet) -> None: 

761 """Pass any valid Packets to the protocol's callback (_prev_pkt, _this_pkt). 

762 

763 :param pkt: The parsed packet. 

764 :type pkt: Packet 

765 :raises exc.TransportError: If called while closing. 

766 """ 

767 

768 self._this_pkt, self._prev_pkt = pkt, self._this_pkt 

769 

770 # if self._reading is False: # raise, or warn & return? 

771 # raise exc.TransportError("Reading has been paused") 

772 if self._closing is True: # raise, or warn & return? 

773 raise exc.TransportError("Transport is closing or has closed") 

774 

775 # TODO: can we switch to call_soon now QoS has been refactored? 

776 # NOTE: No need to use call_soon() here, and they may break Qos/Callbacks 

777 # NOTE: Thus, excepts need checking 

778 try: # below could be a call_soon? 

779 self.loop.call_soon_threadsafe(self._protocol.pkt_received, pkt) 

780 except AssertionError as err: # protect from upper layers 

781 _LOGGER.exception("%s < exception from msg layer: %s", pkt, err) 

782 except exc.ProtocolError as err: # protect from upper layers 

783 _LOGGER.error("%s < exception from msg layer: %s", pkt, err) 

784 

785 async def write_frame(self, frame: str, disable_tx_limits: bool = False) -> None: 

786 """ "Transmit a frame via the underlying handler (e.g. serial port, MQTT). 

787 

788 :param frame: The frame to write. 

789 :type frame: str 

790 :param disable_tx_limits: Whether to bypass duty cycle limits, defaults to False. 

791 :type disable_tx_limits: bool, optional 

792 :raises exc.TransportSerialError: Because this transport is read-only. 

793 """ 

794 raise exc.TransportSerialError("This transport is read only") 

795 

796 

797class _FullTransport(_ReadTransport): # asyncio.Transport 

798 """Interface representing a bidirectional transport.""" 

799 

800 def __init__( 

801 self, *args: Any, disable_sending: bool = False, **kwargs: Any 

802 ) -> None: 

803 """Initialize the full transport. 

804 

805 :param disable_sending: Whether to disable sending capabilities, defaults to False. 

806 :type disable_sending: bool, optional 

807 """ 

808 super().__init__(*args, **kwargs) 

809 

810 self._disable_sending = disable_sending 

811 self._transmit_times: deque[dt] = deque(maxlen=_MAX_TRACKED_TRANSMITS) 

812 

813 def _dt_now(self) -> dt: 

814 """Return a precise datetime, using the current dtm. 

815 

816 :return: Current datetime. 

817 :rtype: dt 

818 """ 

819 # _LOGGER.error("Full._dt_now()") 

820 

821 return dt_now() 

822 

823 def get_extra_info(self, name: str, default: Any = None) -> Any: 

824 """Get extra info, including transmit rate calculations. 

825 

826 :param name: Name of info. 

827 :type name: str 

828 :param default: Default value. 

829 :type default: Any, optional 

830 :return: The requested info. 

831 :rtype: Any 

832 """ 

833 if name == "tx_rate": 

834 return self._report_transmit_rate() 

835 return super().get_extra_info(name, default=default) 

836 

837 def _report_transmit_rate(self) -> float: 

838 """Return the transmit rate in transmits per minute. 

839 

840 :return: Transmits per minute. 

841 :rtype: float 

842 """ 

843 

844 dt_now = dt.now() 

845 dtm = dt_now - td(seconds=_MAX_TRACKED_DURATION) 

846 transmit_times = tuple(t for t in self._transmit_times if t > dtm) 

847 

848 if len(transmit_times) <= 1: 

849 return len(transmit_times) 

850 

851 duration: float = (transmit_times[-1] - transmit_times[0]) / td(seconds=1) 

852 return int(len(transmit_times) / duration * 6000) / 100 

853 

854 def _track_transmit_rate(self) -> None: 

855 """Track the Tx rate as period of seconds per x transmits.""" 

856 

857 # period: float = (transmit_times[-1] - transmit_times[0]) / td(seconds=1) 

858 # num_tx: int = len(transmit_times) 

859 

860 self._transmit_times.append(dt.now()) 

861 

862 _LOGGER.debug(f"Current Tx rate: {self._report_transmit_rate():.2f} pkts/min") 

863 

864 # NOTE: Protocols call write_frame(), not write() 

865 def write(self, data: bytes) -> None: 

866 """Write the data to the underlying handler. 

867 

868 :param data: The data to write. 

869 :type data: bytes 

870 :raises exc.TransportError: Always raises, use write_frame instead. 

871 """ 

872 # _LOGGER.error("Full.write(%s)", data) 

873 

874 raise exc.TransportError("write() not implemented, use write_frame() instead") 

875 

876 async def write_frame(self, frame: str, disable_tx_limits: bool = False) -> None: 

877 """Transmit a frame via the underlying handler (e.g. serial port, MQTT). 

878 

879 Protocols call Transport.write_frame(), not Transport.write(). 

880 

881 :param frame: The frame to transmit. 

882 :type frame: str 

883 :param disable_tx_limits: Whether to disable duty cycle limits, defaults to False. 

884 :type disable_tx_limits: bool, optional 

885 :raises exc.TransportError: If sending is disabled or transport is closed. 

886 """ 

887 

888 if self._disable_sending is True: 

889 raise exc.TransportError("Sending has been disabled") 

890 if self._closing is True: 

891 raise exc.TransportError("Transport is closing or has closed") 

892 

893 self._track_transmit_rate() 

894 

895 await self._write_frame(frame) 

896 

897 async def _write_frame(self, frame: str) -> None: 

898 """Write some data bytes to the underlying transport. 

899 

900 :param frame: The frame to write. 

901 :type frame: str 

902 :raises NotImplementedError: Abstract method. 

903 """ 

904 # _LOGGER.error("Full._write_frame(%s)", frame) 

905 

906 raise NotImplementedError("_write_frame() not implemented here") 

907 

908 

909_RegexRuleT: TypeAlias = dict[str, str] 

910 

911 

912class _RegHackMixin: 

913 """Mixin to apply regex rules to inbound and outbound frames.""" 

914 

915 def __init__( 

916 self, *args: Any, use_regex: dict[str, _RegexRuleT] | None = None, **kwargs: Any 

917 ) -> None: 

918 """Initialize the regex mixin. 

919 

920 :param use_regex: Dictionary containing inbound/outbound regex rules. 

921 :type use_regex: dict[str, _RegexRuleT] | None, optional 

922 """ 

923 super().__init__(*args, **kwargs) 

924 

925 use_regex = use_regex or {} 

926 

927 self._inbound_rule: _RegexRuleT = use_regex.get(SZ_INBOUND, {}) 

928 self._outbound_rule: _RegexRuleT = use_regex.get(SZ_OUTBOUND, {}) 

929 

930 @staticmethod 

931 def _regex_hack(pkt_line: str, regex_rules: _RegexRuleT) -> str: 

932 """Apply regex rules to a packet line. 

933 

934 :param pkt_line: The packet line to process. 

935 :type pkt_line: str 

936 :param regex_rules: The rules to apply. 

937 :type regex_rules: _RegexRuleT 

938 :return: The modified packet line. 

939 :rtype: str 

940 """ 

941 if not regex_rules: 

942 return pkt_line 

943 

944 result = pkt_line 

945 for k, v in regex_rules.items(): 

946 try: 

947 result = re.sub(k, v, result) 

948 except re.error as err: 

949 _LOGGER.warning(f"{pkt_line} < issue with regex ({k}, {v}): {err}") 

950 

951 if result != pkt_line and not _DBG_DISABLE_REGEX_WARNINGS: 

952 _LOGGER.warning(f"{pkt_line} < Changed by use_regex to: {result}") 

953 return result 

954 

955 def _frame_read(self, dtm_str: str, frame: str) -> None: 

956 super()._frame_read(dtm_str, self._regex_hack(frame, self._inbound_rule)) # type: ignore[misc] 

957 

958 async def write_frame(self, frame: str, disable_tx_limits: bool = False) -> None: 

959 await super().write_frame(self._regex_hack(frame, self._outbound_rule)) # type: ignore[misc] 

960 

961 

962# ### Transports ###################################################################### 

963# ### Implement the transports for File/dict (R/O), Serial, MQTT 

964 

965 

966class FileTransport(_ReadTransport, _FileTransportAbstractor): 

967 """Receive packets from a read-only source such as packet log or a dict.""" 

968 

969 def __init__(self, *args: Any, disable_sending: bool = True, **kwargs: Any) -> None: 

970 """Initialize the file transport. 

971 

972 :param disable_sending: Must be True for FileTransport. 

973 :type disable_sending: bool 

974 :raises exc.TransportSourceInvalid: If disable_sending is False. 

975 """ 

976 super().__init__(*args, **kwargs) 

977 

978 if bool(disable_sending) is False: 

979 raise exc.TransportSourceInvalid("This Transport cannot send packets") 

980 

981 self._extra[SZ_READER_TASK] = self._reader_task = self._loop.create_task( 

982 self._start_reader(), name="FileTransport._start_reader()" 

983 ) 

984 

985 self._make_connection(None) 

986 

987 async def _start_reader(self) -> None: # TODO 

988 """Start the reader task.""" 

989 self._reading = True 

990 try: 

991 await self._reader() 

992 except Exception as err: 

993 self.loop.call_soon_threadsafe( 

994 functools.partial(self._protocol.connection_lost, err) # type: ignore[arg-type] 

995 ) 

996 else: 

997 self.loop.call_soon_threadsafe( 

998 functools.partial(self._protocol.connection_lost, None) 

999 ) 

1000 

1001 # NOTE: self._frame_read() invoked from here 

1002 async def _reader(self) -> None: # TODO 

1003 """Loop through the packet source for Frames and process them.""" 

1004 

1005 if isinstance(self._pkt_source, dict): 

1006 for dtm_str, pkt_line in self._pkt_source.items(): # assume dtm_str is OK 

1007 while not self._reading: 

1008 await asyncio.sleep(0.001) 

1009 self._frame_read(dtm_str, pkt_line) 

1010 await asyncio.sleep(0) 

1011 # NOTE: instable without, big performance penalty if delay >0 

1012 

1013 elif isinstance(self._pkt_source, str): # file_name, used in client parse 

1014 # open file file_name before reading 

1015 try: 

1016 with fileinput.input(files=self._pkt_source, encoding="utf-8") as file: 

1017 for dtm_pkt_line in file: # self._pkt_source: 

1018 # TODO check dtm_str is OK 

1019 while not self._reading: 

1020 await asyncio.sleep(0.001) 

1021 # there may be blank lines in annotated log files 

1022 if (dtm_pkt_line := dtm_pkt_line.strip()) and dtm_pkt_line[ 

1023 :1 

1024 ] != "#": 

1025 self._frame_read(dtm_pkt_line[:26], dtm_pkt_line[27:]) 

1026 # this is where the parsing magic happens! 

1027 await asyncio.sleep(0) 

1028 # NOTE: instable without, big performance penalty if delay >0 

1029 except FileNotFoundError as err: 

1030 _LOGGER.warning(f"Correct the packet file name; {err}") 

1031 elif isinstance(self._pkt_source, TextIOWrapper): # used by client monitor 

1032 for dtm_pkt_line in self._pkt_source: # should check dtm_str is OK 

1033 while not self._reading: 

1034 await asyncio.sleep(0.001) 

1035 # can be blank lines in annotated log files 

1036 if (dtm_pkt_line := dtm_pkt_line.strip()) and dtm_pkt_line[:1] != "#": 

1037 self._frame_read(dtm_pkt_line[:26], dtm_pkt_line[27:]) 

1038 await asyncio.sleep(0) 

1039 # NOTE: instable without, big performance penalty if delay >0 

1040 else: 

1041 raise exc.TransportSourceInvalid( 

1042 f"Packet source is not dict, TextIOWrapper or str: {self._pkt_source:!r}" 

1043 ) 

1044 

1045 def _close(self, exc: exc.RamsesException | None = None) -> None: 

1046 """Close the transport (cancel any outstanding tasks). 

1047 

1048 :param exc: The exception causing closure. 

1049 :type exc: exc.RamsesException | None, optional 

1050 """ 

1051 

1052 super()._close(exc) 

1053 

1054 if self._reader_task: 

1055 self._reader_task.cancel() 

1056 

1057 

1058class PortTransport(_RegHackMixin, _FullTransport, _PortTransportAbstractor): # type: ignore[misc] 

1059 """Send/receive packets async to/from evofw3/HGI80 via a serial port. 

1060 

1061 See: https://github.com/ghoti57/evofw3 

1062 """ 

1063 

1064 _init_fut: asyncio.Future[Packet | None] 

1065 _init_task: asyncio.Task[None] 

1066 

1067 _recv_buffer: bytes = b"" 

1068 

1069 def __init__(self, *args: Any, **kwargs: Any) -> None: 

1070 """Initialize the port transport.""" 

1071 super().__init__(*args, **kwargs) 

1072 

1073 self._leaker_sem = asyncio.BoundedSemaphore() 

1074 self._leaker_task = self._loop.create_task( 

1075 self._leak_sem(), name="PortTransport._leak_sem()" 

1076 ) 

1077 

1078 self._loop.create_task( 

1079 self._create_connection(), name="PortTransport._create_connection()" 

1080 ) 

1081 

1082 async def _create_connection(self) -> None: 

1083 """Invoke the Protocols's connection_made() callback after HGI80 discovery.""" 

1084 

1085 # HGI80s (and also VMs) take longer to send signature packets as they have long 

1086 # initialisation times, so we must wait until they send OK 

1087 

1088 # signature also serves to discover the HGI's device_id (& for pkt log, if any) 

1089 

1090 self._is_hgi80 = await is_hgi80(self.serial.name) 

1091 

1092 async def connect_sans_signature() -> None: 

1093 """Call connection_made() without sending/waiting for a signature.""" 

1094 

1095 self._init_fut.set_result(None) 

1096 self._make_connection(gwy_id=None) 

1097 

1098 async def connect_with_signature() -> None: 

1099 """Poll port with signatures, call connection_made() after first echo.""" 

1100 

1101 # TODO: send a 2nd signature, but with addr0 set to learned GWY address 

1102 # TODO: a HGI80 will silently drop this cmd, so an echo would tell us 

1103 # TODO: that the GWY is evofw3-compatible 

1104 

1105 sig = Command._puzzle() 

1106 self._extra[SZ_SIGNATURE] = sig.payload 

1107 

1108 num_sends = 0 

1109 while num_sends < _SIGNATURE_MAX_TRYS: 

1110 num_sends += 1 

1111 

1112 await self._write_frame(str(sig)) 

1113 await asyncio.sleep(_SIGNATURE_GAP_SECS) 

1114 

1115 if self._init_fut.done(): 

1116 pkt = self._init_fut.result() 

1117 self._make_connection(gwy_id=pkt.src.id if pkt else None) 

1118 return 

1119 

1120 if not self._init_fut.done(): 

1121 self._init_fut.set_result(None) 

1122 

1123 self._make_connection(gwy_id=None) 

1124 return 

1125 

1126 self._init_fut = asyncio.Future() 

1127 if self._disable_sending: 

1128 self._init_task = self._loop.create_task( 

1129 connect_sans_signature(), name="PortTransport.connect_sans_signature()" 

1130 ) 

1131 else: 

1132 self._init_task = self._loop.create_task( 

1133 connect_with_signature(), name="PortTransport.connect_with_signature()" 

1134 ) 

1135 

1136 try: # wait to get (1st) signature echo from evofw3/HGI80, if any 

1137 await asyncio.wait_for(self._init_fut, timeout=_SIGNATURE_MAX_SECS) 

1138 except TimeoutError as err: 

1139 raise exc.TransportSerialError( 

1140 f"Failed to initialise Transport within {_SIGNATURE_MAX_SECS} secs" 

1141 ) from err 

1142 

1143 async def _leak_sem(self) -> None: 

1144 """Used to enforce a minimum time between calls to self.write().""" 

1145 while True: 

1146 await asyncio.sleep(MIN_INTER_WRITE_GAP) 

1147 with contextlib.suppress(ValueError): 

1148 self._leaker_sem.release() 

1149 

1150 # NOTE: self._frame_read() invoked from here 

1151 def _read_ready(self) -> None: 

1152 """Make Frames from the read data and process them.""" 

1153 

1154 def bytes_read(data: bytes) -> Iterable[tuple[dt, bytes]]: 

1155 self._recv_buffer += data 

1156 if b"\r\n" in self._recv_buffer: 

1157 lines = self._recv_buffer.split(b"\r\n") 

1158 self._recv_buffer = lines[-1] 

1159 for line in lines[:-1]: 

1160 yield self._dt_now(), line + b"\r\n" 

1161 

1162 try: 

1163 data: bytes = self.serial.read(self._max_read_size) 

1164 except SerialException as err: 

1165 if not self._closing: 

1166 self._close(exc=err) # have to use _close() to pass in exception 

1167 return 

1168 

1169 if not data: 

1170 return 

1171 

1172 for dtm, raw_line in bytes_read(data): 

1173 if _DBG_FORCE_FRAME_LOGGING: 

1174 _LOGGER.warning("Rx: %s", raw_line) 

1175 elif _LOGGER.getEffectiveLevel() == logging.INFO: # log for INFO not DEBUG 

1176 _LOGGER.info("Rx: %s", raw_line) 

1177 

1178 self._frame_read( 

1179 dtm.isoformat(timespec="milliseconds"), _normalise(_str(raw_line)) 

1180 ) 

1181 

1182 @track_system_syncs 

1183 def _pkt_read(self, pkt: Packet) -> None: 

1184 # NOTE: a signature can override an existing active gateway 

1185 if ( 

1186 not self._init_fut.done() 

1187 and pkt.code == Code._PUZZ 

1188 and pkt.payload == self._extra[SZ_SIGNATURE] 

1189 ): 

1190 self._extra[SZ_ACTIVE_HGI] = pkt.src.id # , by_signature=True) 

1191 self._init_fut.set_result(pkt) 

1192 

1193 super()._pkt_read(pkt) 

1194 

1195 @limit_duty_cycle(MAX_DUTY_CYCLE_RATE) 

1196 @avoid_system_syncs 

1197 async def write_frame(self, frame: str, disable_tx_limits: bool = False) -> None: 

1198 """Transmit a frame via the underlying handler (e.g. serial port, MQTT). 

1199 

1200 Protocols call Transport.write_frame(), not Transport.write(). 

1201 

1202 :param frame: The frame to transmit. 

1203 :type frame: str 

1204 :param disable_tx_limits: Whether to disable duty cycle limits, defaults to False. 

1205 :type disable_tx_limits: bool, optional 

1206 """ 

1207 

1208 await self._leaker_sem.acquire() # MIN_INTER_WRITE_GAP 

1209 await super().write_frame(frame) 

1210 

1211 # NOTE: The order should be: minimum gap between writes, duty cycle limits, and 

1212 # then the code that avoids the controller sync cycles 

1213 

1214 async def _write_frame(self, frame: str) -> None: 

1215 """Write some data bytes to the underlying transport. 

1216 

1217 :param frame: The frame to write. 

1218 :type frame: str 

1219 """ 

1220 

1221 data = bytes(frame, "ascii") + b"\r\n" 

1222 

1223 log_msg = f"Serial transport transmitting frame: {frame}" 

1224 if _DBG_FORCE_FRAME_LOGGING: 

1225 _LOGGER.warning(log_msg) 

1226 elif _LOGGER.getEffectiveLevel() > logging.DEBUG: 

1227 _LOGGER.info(log_msg) 

1228 else: 

1229 _LOGGER.debug(log_msg) 

1230 

1231 try: 

1232 self._write(data) 

1233 except SerialException as err: 

1234 self._abort(err) 

1235 return 

1236 

1237 def _write(self, data: bytes) -> None: 

1238 """Perform the actual write to the serial port. 

1239 

1240 :param data: The bytes to write. 

1241 :type data: bytes 

1242 """ 

1243 self.serial.write(data) 

1244 

1245 def _abort(self, exc: ExceptionT) -> None: # type: ignore[override] # used by serial_asyncio.SerialTransport 

1246 """Abort the transport. 

1247 

1248 :param exc: The exception causing the abort. 

1249 :type exc: ExceptionT 

1250 """ 

1251 super()._abort(exc) # type: ignore[arg-type] 

1252 

1253 if self._init_task: 

1254 self._init_task.cancel() 

1255 if self._leaker_task: 

1256 self._leaker_task.cancel() 

1257 

1258 def _close(self, exc: exc.RamsesException | None = None) -> None: # type: ignore[override] 

1259 """Close the transport (cancel any outstanding tasks). 

1260 

1261 :param exc: The exception causing closure. 

1262 :type exc: exc.RamsesException | None, optional 

1263 """ 

1264 

1265 super()._close(exc) 

1266 

1267 if self._init_task: 

1268 self._init_task.cancel() 

1269 

1270 if self._leaker_task: 

1271 self._leaker_task.cancel() 

1272 

1273 

1274class MqttTransport(_FullTransport, _MqttTransportAbstractor): 

1275 """Send/receive packets to/from ramses_esp via MQTT. 

1276 For full RX logging, turn on debug logging. 

1277 

1278 See: https://github.com/IndaloTech/ramses_esp 

1279 """ 

1280 

1281 # used in .write_frame() to rate-limit the number of writes 

1282 _MAX_TOKENS: Final[int] = MAX_TRANSMIT_RATE_TOKENS 

1283 _TIME_WINDOW: Final[int] = DUTY_CYCLE_DURATION 

1284 _TOKEN_RATE: Final[float] = _MAX_TOKENS / _TIME_WINDOW 

1285 

1286 def __init__(self, *args: Any, **kwargs: Any) -> None: 

1287 # _LOGGER.error("__init__(%s, %s)", args, kwargs) 

1288 

1289 super().__init__(*args, **kwargs) 

1290 

1291 self._username = unquote(self._broker_url.username or "") 

1292 self._password = unquote(self._broker_url.password or "") 

1293 

1294 self._topic_base = validate_topic_path(self._broker_url.path) 

1295 self._topic_pub = "" 

1296 self._topic_sub = "" 

1297 # Track if we've subscribed to a wildcard data topic (e.g. ".../+/rx") 

1298 self._data_wildcard_topic = "" 

1299 

1300 self._mqtt_qos = int(parse_qs(self._broker_url.query).get("qos", ["0"])[0]) 

1301 

1302 self._connected = False 

1303 self._connecting = False 

1304 self._connection_established = False # Track if initial connection was made 

1305 self._extra[SZ_IS_EVOFW3] = True 

1306 

1307 # Reconnection settings 

1308 self._reconnect_interval = 5.0 # seconds 

1309 self._max_reconnect_interval = 300.0 # 5 minutes max 

1310 self._reconnect_backoff = 1.5 

1311 self._current_reconnect_interval = self._reconnect_interval 

1312 self._reconnect_task: asyncio.Task[None] | None = None 

1313 

1314 # used in .write_frame() to rate-limit the number of writes 

1315 self._timestamp = perf_counter() 

1316 self._max_tokens: float = self._MAX_TOKENS * 2 # allow for the initial burst 

1317 self._num_tokens: float = self._MAX_TOKENS * 2 

1318 

1319 # set log MQTT flag 

1320 self._log_all = kwargs.pop("log_all", False) 

1321 

1322 # instantiate a paho mqtt client 

1323 self.client = mqtt.Client( 

1324 protocol=mqtt.MQTTv5, callback_api_version=CallbackAPIVersion.VERSION2 

1325 ) 

1326 self.client.on_connect = self._on_connect 

1327 self.client.on_connect_fail = self._on_connect_fail 

1328 self.client.on_disconnect = self._on_disconnect 

1329 self.client.on_message = self._on_message 

1330 self.client.username_pw_set(self._username, self._password) 

1331 # connect to the mqtt server 

1332 self._attempt_connection() 

1333 

1334 def _attempt_connection(self) -> None: 

1335 """Attempt to connect to the MQTT broker.""" 

1336 if self._connecting or self._connected: 

1337 return 

1338 

1339 self._connecting = True 

1340 try: 

1341 self.client.connect_async( 

1342 str(self._broker_url.hostname or "localhost"), 

1343 self._broker_url.port or 1883, 

1344 60, 

1345 ) 

1346 self.client.loop_start() 

1347 except Exception as err: 

1348 _LOGGER.error(f"Failed to initiate MQTT connection: {err}") 

1349 self._connecting = False 

1350 self._schedule_reconnect() 

1351 

1352 def _schedule_reconnect(self) -> None: 

1353 """Schedule a reconnection attempt with exponential backoff.""" 

1354 if self._closing or self._reconnect_task: 

1355 return 

1356 

1357 _LOGGER.info( 

1358 f"Scheduling MQTT reconnect in {self._current_reconnect_interval} seconds" 

1359 ) 

1360 self._reconnect_task = self._loop.create_task( 

1361 self._reconnect_after_delay(), name="MqttTransport._reconnect_after_delay()" 

1362 ) 

1363 

1364 async def _reconnect_after_delay(self) -> None: 

1365 """Wait and then attempt to reconnect.""" 

1366 try: 

1367 await asyncio.sleep(self._current_reconnect_interval) 

1368 

1369 # Increase backoff for next time 

1370 self._current_reconnect_interval = min( 

1371 self._current_reconnect_interval * self._reconnect_backoff, 

1372 self._max_reconnect_interval, 

1373 ) 

1374 

1375 _LOGGER.info("Attempting MQTT reconnection...") 

1376 self._attempt_connection() 

1377 except asyncio.CancelledError: 

1378 pass 

1379 finally: 

1380 self._reconnect_task = None 

1381 

1382 def _on_connect( 

1383 self, 

1384 client: mqtt.Client, 

1385 userdata: Any, 

1386 flags: dict[str, Any], 

1387 reason_code: Any, 

1388 properties: Any | None, 

1389 ) -> None: 

1390 """Handle MQTT connection success. 

1391 

1392 :param client: The MQTT client. 

1393 :type client: mqtt.Client 

1394 :param userdata: User data. 

1395 :type userdata: Any 

1396 :param flags: Connection flags. 

1397 :type flags: dict[str, Any] 

1398 :param reason_code: Connection reason code. 

1399 :type reason_code: Any 

1400 :param properties: Connection properties. 

1401 :type properties: Any | None 

1402 """ 

1403 # _LOGGER.error("Mqtt._on_connect(%s, %s, %s, %s)", client, userdata, flags, reason_code.getName()) 

1404 

1405 self._connecting = False 

1406 

1407 if reason_code.is_failure: 

1408 _LOGGER.error(f"MQTT connection failed: {reason_code.getName()}") 

1409 self._schedule_reconnect() 

1410 return 

1411 

1412 _LOGGER.info(f"MQTT connected: {reason_code.getName()}") 

1413 

1414 # Reset reconnect interval on successful connection 

1415 self._current_reconnect_interval = self._reconnect_interval 

1416 

1417 # Cancel any pending reconnect task 

1418 if self._reconnect_task: 

1419 self._reconnect_task.cancel() 

1420 self._reconnect_task = None 

1421 

1422 # Subscribe to base topic to see 'online' messages 

1423 self.client.subscribe(self._topic_base) # hope to see 'online' message 

1424 

1425 # Also subscribe to data topics with wildcard for reliability, but only 

1426 # until a specific device topic is known. Once _topic_sub is set, avoid 

1427 # overlapping subscriptions that would duplicate messages. 

1428 if self._topic_base.endswith("/+") and not ( 

1429 hasattr(self, "_topic_sub") and self._topic_sub 

1430 ): 

1431 data_wildcard = self._topic_base.replace("/+", "/+/rx") 

1432 self.client.subscribe(data_wildcard, qos=self._mqtt_qos) 

1433 self._data_wildcard_topic = data_wildcard 

1434 _LOGGER.debug(f"Subscribed to data wildcard: {data_wildcard}") 

1435 

1436 # If we already have specific topics, re-subscribe to them 

1437 if hasattr(self, "_topic_sub") and self._topic_sub: 

1438 self.client.subscribe(self._topic_sub, qos=self._mqtt_qos) 

1439 _LOGGER.debug(f"Re-subscribed to specific topic: {self._topic_sub}") 

1440 # If we had a wildcard subscription, drop it to prevent duplicates 

1441 if getattr(self, "_data_wildcard_topic", ""): 

1442 try: 

1443 self.client.unsubscribe(self._data_wildcard_topic) 

1444 _LOGGER.debug( 

1445 f"Unsubscribed data wildcard after specific subscribe: {self._data_wildcard_topic}" 

1446 ) 

1447 finally: 

1448 self._data_wildcard_topic = "" 

1449 

1450 def _on_connect_fail( 

1451 self, 

1452 client: mqtt.Client, 

1453 userdata: Any, 

1454 ) -> None: 

1455 """Handle MQTT connection failure. 

1456 

1457 :param client: The MQTT client. 

1458 :type client: mqtt.Client 

1459 :param userdata: User data. 

1460 :type userdata: Any 

1461 """ 

1462 _LOGGER.error("MQTT connection failed") 

1463 

1464 self._connecting = False 

1465 self._connected = False 

1466 

1467 if not self._closing: 

1468 self._schedule_reconnect() 

1469 

1470 def _on_disconnect( 

1471 self, 

1472 client: mqtt.Client, 

1473 userdata: Any, 

1474 *args: Any, 

1475 **kwargs: Any, 

1476 ) -> None: 

1477 """Handle MQTT disconnection. 

1478 

1479 :param client: The MQTT client. 

1480 :type client: mqtt.Client 

1481 :param userdata: User data. 

1482 :type userdata: Any 

1483 """ 

1484 # Handle different paho-mqtt callback signatures 

1485 reason_code = args[0] if len(args) >= 1 else None 

1486 

1487 reason_name = ( 

1488 reason_code.getName() 

1489 if reason_code is not None and hasattr(reason_code, "getName") 

1490 else str(reason_code) 

1491 ) 

1492 _LOGGER.warning(f"MQTT disconnected: {reason_name}") 

1493 

1494 was_connected = self._connected 

1495 self._connected = False 

1496 

1497 # If we were previously connected and had established communication, 

1498 # notify that the device is now offline 

1499 if was_connected and hasattr(self, "_topic_sub") and self._topic_sub: 

1500 device_topic = self._topic_sub[:-3] # Remove "/rx" suffix 

1501 _LOGGER.warning(f"{self}: the MQTT device is offline: {device_topic}") 

1502 

1503 # Pause writing since device is offline 

1504 if hasattr(self, "_protocol"): 

1505 self._protocol.pause_writing() 

1506 

1507 # Only attempt reconnection if we didn't deliberately disconnect 

1508 

1509 if not self._closing: 

1510 # Schedule reconnection for any disconnect (unexpected or failure) 

1511 self._schedule_reconnect() 

1512 

1513 def _create_connection(self, msg: mqtt.MQTTMessage) -> None: 

1514 """Invoke the Protocols's connection_made() callback MQTT is established. 

1515 

1516 :param msg: The online message triggering the connection. 

1517 :type msg: mqtt.MQTTMessage 

1518 """ 

1519 # _LOGGER.error("Mqtt._create_connection(%s)", msg) 

1520 

1521 assert msg.payload == b"online", "Coding error" 

1522 

1523 if self._connected: 

1524 _LOGGER.info("MQTT device came back online - resuming writing") 

1525 self._loop.call_soon_threadsafe(self._protocol.resume_writing) 

1526 return 

1527 

1528 _LOGGER.info("MQTT device is online - establishing connection") 

1529 self._connected = True 

1530 

1531 self._extra[SZ_ACTIVE_HGI] = msg.topic[-9:] 

1532 

1533 self._topic_pub = msg.topic + "/tx" 

1534 self._topic_sub = msg.topic + "/rx" 

1535 

1536 self.client.subscribe(self._topic_sub, qos=self._mqtt_qos) 

1537 

1538 # If we previously subscribed to a wildcard data topic, unsubscribe now 

1539 # to avoid duplicate delivery (wildcard and specific both matching) 

1540 if getattr(self, "_data_wildcard_topic", ""): 

1541 try: 

1542 self.client.unsubscribe(self._data_wildcard_topic) 

1543 _LOGGER.debug( 

1544 f"Unsubscribed data wildcard after device online: {self._data_wildcard_topic}" 

1545 ) 

1546 finally: 

1547 self._data_wildcard_topic = "" 

1548 

1549 # Only call connection_made on first connection, not reconnections 

1550 if not self._connection_established: 

1551 self._connection_established = True 

1552 self._make_connection(gwy_id=msg.topic[-9:]) # type: ignore[arg-type] 

1553 else: 

1554 _LOGGER.info("MQTT reconnected - protocol connection already established") 

1555 

1556 # NOTE: self._frame_read() invoked from here 

1557 def _on_message( 

1558 self, client: mqtt.Client, userdata: Any, msg: mqtt.MQTTMessage 

1559 ) -> None: 

1560 """Make a Frame from the MQTT message and process it. 

1561 

1562 :param client: The MQTT client. 

1563 :type client: mqtt.Client 

1564 :param userdata: User data. 

1565 :type userdata: Any 

1566 :param msg: The received message. 

1567 :type msg: mqtt.MQTTMessage 

1568 """ 

1569 # _LOGGER.error( 

1570 # "Mqtt._on_message(%s, %s, %s)", 

1571 # client, 

1572 # userdata, 

1573 # (msg.timestamp, msg.topic, msg.payload), 

1574 # ) 

1575 

1576 if _DBG_FORCE_FRAME_LOGGING: 

1577 _LOGGER.warning("Rx: %s", msg.payload) 

1578 elif self._log_all and _LOGGER.getEffectiveLevel() == logging.INFO: 

1579 # log for INFO not DEBUG 

1580 _LOGGER.info("mq Rx: %s", msg.payload) # TODO remove mq marker? 

1581 

1582 if msg.topic[-3:] != "/rx": # then, e.g. 'RAMSES/GATEWAY/18:017804' 

1583 if msg.payload == b"offline": 

1584 # Check if this offline message is for our current device 

1585 if ( 

1586 hasattr(self, "_topic_sub") 

1587 and self._topic_sub 

1588 and msg.topic == self._topic_sub[:-3] 

1589 ) or not hasattr(self, "_topic_sub"): 

1590 _LOGGER.warning( 

1591 f"{self}: the ESP device is offline (via LWT): {msg.topic}" 

1592 ) 

1593 # Don't set _connected = False here - that's for MQTT connection, not ESP device 

1594 if hasattr(self, "_protocol"): 

1595 self._protocol.pause_writing() 

1596 

1597 # BUG: using create task (self._loop.ct() & asyncio.ct()) causes the 

1598 # BUG: event look to close early 

1599 elif msg.payload == b"online": 

1600 _LOGGER.info( 

1601 f"{self}: the ESP device is online (via status): {msg.topic}" 

1602 ) 

1603 self._create_connection(msg) 

1604 

1605 return 

1606 

1607 # Handle data messages - if we don't have connection established yet but get data, 

1608 # we can infer the gateway from the topic 

1609 if not self._connection_established and msg.topic.endswith("/rx"): 

1610 # Extract gateway ID from topic like "RAMSES/GATEWAY/18:123456/rx" 

1611 topic_parts = msg.topic.split("/") 

1612 if len(topic_parts) >= 3 and topic_parts[-2] not in ("+", "*"): 

1613 gateway_id = topic_parts[-2] # Should be something like "18:123456" 

1614 _LOGGER.info( 

1615 f"Inferring gateway connection from data topic: {gateway_id}" 

1616 ) 

1617 

1618 # Set up topics and connection 

1619 self._topic_pub = f"{'/'.join(topic_parts[:-1])}/tx" 

1620 self._topic_sub = msg.topic 

1621 self._extra[SZ_ACTIVE_HGI] = gateway_id 

1622 

1623 # Mark as connected and establish protocol connection 

1624 self._connected = True 

1625 self._connection_established = True 

1626 self._make_connection(gwy_id=gateway_id) # type: ignore[arg-type] 

1627 

1628 # Ensure we subscribe specifically to the device topic and drop the 

1629 # wildcard subscription to prevent duplicates 

1630 try: 

1631 self.client.subscribe(self._topic_sub, qos=self._mqtt_qos) 

1632 except Exception as err: # pragma: no cover - defensive 

1633 _LOGGER.debug(f"Error subscribing specific topic: {err}") 

1634 if getattr(self, "_data_wildcard_topic", ""): 

1635 try: 

1636 self.client.unsubscribe(self._data_wildcard_topic) 

1637 _LOGGER.debug( 

1638 f"Unsubscribed data wildcard after inferring device: {self._data_wildcard_topic}" 

1639 ) 

1640 finally: 

1641 self._data_wildcard_topic = "" 

1642 

1643 try: 

1644 payload = json.loads(msg.payload) 

1645 except json.JSONDecodeError: 

1646 _LOGGER.warning("%s < Can't decode JSON (ignoring)", msg.payload) 

1647 return 

1648 

1649 # HACK: hotfix for converting RAMSES_ESP dtm into local/naive dtm 

1650 dtm = dt.fromisoformat(payload["ts"]) 

1651 if dtm.tzinfo is not None: 

1652 dtm = dtm.astimezone().replace(tzinfo=None) 

1653 if dtm < dt.now() - td(days=90): 

1654 _LOGGER.warning( 

1655 f"{self}: Have you configured the SNTP settings on the ESP?" 

1656 ) 

1657 # FIXME: convert all dt early, and convert to aware, i.e. dt.now().astimezone() 

1658 

1659 try: 

1660 self._frame_read(dtm.isoformat(), _normalise(payload["msg"])) 

1661 except exc.TransportError: 

1662 # If the transport is closing, we expect this error and can safely ignore it 

1663 # prevents "Uncaught thread exception" in paho.mqtt client 

1664 if not self._closing: 

1665 raise 

1666 

1667 async def write_frame(self, frame: str, disable_tx_limits: bool = False) -> None: 

1668 """Transmit a frame via the underlying handler (e.g. serial port, MQTT). 

1669 

1670 Writes are rate-limited to _MAX_TOKENS Packets over the last _TIME_WINDOW 

1671 seconds, except when disable_tx_limits is True (for e.g. user commands). 

1672 

1673 Protocols call Transport.write_frame(), not Transport.write(). 

1674 

1675 :param frame: The frame to transmit. 

1676 :type frame: str 

1677 :param disable_tx_limits: Whether to disable rate limiting, defaults to False. 

1678 :type disable_tx_limits: bool, optional 

1679 """ 

1680 

1681 # Check if we're connected before attempting to write 

1682 if not self._connected: 

1683 _LOGGER.debug(f"{self}: Dropping write - MQTT not connected") 

1684 return 

1685 

1686 # top-up the token bucket 

1687 timestamp = perf_counter() 

1688 elapsed, self._timestamp = timestamp - self._timestamp, timestamp 

1689 self._num_tokens = min( 

1690 self._num_tokens + elapsed * self._TOKEN_RATE, self._max_tokens 

1691 ) 

1692 

1693 # if would have to sleep >= 1 second, dump the write instead 

1694 if self._num_tokens < 1.0 - self._TOKEN_RATE and not disable_tx_limits: 

1695 _LOGGER.warning(f"{self}: Discarding write (tokens={self._num_tokens:.2f})") 

1696 return 

1697 

1698 self._num_tokens -= 1.0 

1699 if self._max_tokens > self._MAX_TOKENS: # what is the new max number of tokens 

1700 self._max_tokens = min(self._max_tokens, self._num_tokens) 

1701 self._max_tokens = max(self._max_tokens, self._MAX_TOKENS) 

1702 

1703 # if in token debt, sleep until the debt is paid 

1704 if self._num_tokens < 0.0 and not disable_tx_limits: 

1705 delay = (0 - self._num_tokens) / self._TOKEN_RATE 

1706 _LOGGER.debug(f"{self}: Sleeping (seconds={delay})") 

1707 await asyncio.sleep(delay) 

1708 

1709 await super().write_frame(frame) 

1710 

1711 async def _write_frame(self, frame: str) -> None: 

1712 """Write some data bytes to the underlying transport. 

1713 

1714 :param frame: The frame to write. 

1715 :type frame: str 

1716 """ 

1717 # _LOGGER.error("Mqtt._write_frame(%s)", frame) 

1718 

1719 data = json.dumps({"msg": frame}) 

1720 

1721 if _DBG_FORCE_FRAME_LOGGING: 

1722 _LOGGER.warning("Tx: %s", data) 

1723 elif _LOGGER.getEffectiveLevel() == logging.INFO: # log for INFO not DEBUG 

1724 _LOGGER.info("Tx: %s", data) 

1725 

1726 try: 

1727 self._publish(data) 

1728 except MQTTException as err: 

1729 _LOGGER.error(f"MQTT publish failed: {err}") 

1730 # Don't close the transport, just log the error and continue 

1731 # The broker might come back online 

1732 return 

1733 

1734 def _publish(self, payload: str) -> None: 

1735 """Publish the payload to the MQTT broker. 

1736 

1737 :param payload: The data payload to publish. 

1738 :type payload: str 

1739 """ 

1740 # _LOGGER.error("Mqtt._publish(%s)", message) 

1741 

1742 if not self._connected: 

1743 _LOGGER.debug("Cannot publish - MQTT not connected") 

1744 return 

1745 

1746 info: mqtt.MQTTMessageInfo = self.client.publish( 

1747 self._topic_pub, payload=payload, qos=self._mqtt_qos 

1748 ) 

1749 

1750 if not info: 

1751 _LOGGER.warning("MQTT publish returned no info") 

1752 elif info.rc != mqtt.MQTT_ERR_SUCCESS: 

1753 _LOGGER.warning(f"MQTT publish failed with code: {info.rc}") 

1754 # Check if this indicates a connection issue 

1755 if info.rc in (mqtt.MQTT_ERR_NO_CONN, mqtt.MQTT_ERR_CONN_LOST): 

1756 self._connected = False 

1757 if not self._closing: 

1758 self._schedule_reconnect() 

1759 

1760 def _close(self, exc: exc.RamsesException | None = None) -> None: 

1761 """Close the transport (disconnect from the broker and stop its poller). 

1762 

1763 :param exc: The exception causing closure. 

1764 :type exc: exc.RamsesException | None, optional 

1765 """ 

1766 # _LOGGER.error("Mqtt._close(%s)", exc) 

1767 

1768 super()._close(exc) 

1769 

1770 # Cancel any pending reconnection attempts 

1771 if self._reconnect_task: 

1772 self._reconnect_task.cancel() 

1773 self._reconnect_task = None 

1774 

1775 if not self._connected: 

1776 return 

1777 self._connected = False 

1778 

1779 try: 

1780 self.client.unsubscribe(self._topic_sub) 

1781 self.client.disconnect() 

1782 self.client.loop_stop() 

1783 except Exception as err: 

1784 _LOGGER.debug(f"Error during MQTT cleanup: {err}") 

1785 

1786 

1787class CallbackTransport(_FullTransport, _CallbackTransportAbstractor): 

1788 """A virtual transport that delegates I/O to external callbacks (Inversion of Control). 

1789 

1790 This transport allows ramses_rf to be used with external connection managers 

1791 (like Home Assistant's MQTT integration) without direct dependencies. 

1792 """ 

1793 

1794 def __init__( 

1795 self, 

1796 protocol: RamsesProtocolT, 

1797 io_writer: Callable[[str], Awaitable[None]], 

1798 disable_sending: bool = False, 

1799 **kwargs: Any, 

1800 ) -> None: 

1801 """Initialize the callback transport. 

1802 

1803 :param protocol: The protocol instance. 

1804 :type protocol: RamsesProtocolT 

1805 :param io_writer: Async callable to handle outbound frames. 

1806 :type io_writer: Callable[[str], Awaitable[None]] 

1807 :param disable_sending: Whether to disable sending, defaults to False. 

1808 :type disable_sending: bool, optional 

1809 """ 

1810 # Pass kwargs up the chain. _ReadTransport will extract 'loop' if present. 

1811 # _BaseTransport will pass 'loop' to _CallbackTransportAbstractor, which consumes it. 

1812 super().__init__(disable_sending=disable_sending, **kwargs) 

1813 

1814 self._protocol = protocol 

1815 self._io_writer = io_writer 

1816 

1817 # Section 3.1: "Initial State: Default to a PAUSED state" 

1818 self._reading = False 

1819 

1820 # Section 6.1: Object Lifecycle Logging 

1821 _LOGGER.info(f"CallbackTransport created with io_writer={io_writer}") 

1822 

1823 # NOTE: connection_made is NOT called here. It must be triggered 

1824 # externally (e.g. by the Bridge) via the protocol methods once 

1825 # the external connection is ready. 

1826 

1827 async def write_frame(self, frame: str, disable_tx_limits: bool = False) -> None: 

1828 """Process a frame for transmission by passing it to the external writer. 

1829 

1830 :param frame: The frame to write. 

1831 :type frame: str 

1832 :param disable_tx_limits: Unused for this transport, kept for API compatibility. 

1833 :type disable_tx_limits: bool, optional 

1834 :raises exc.TransportError: If sending is disabled or the writer fails. 

1835 """ 

1836 if self._disable_sending: 

1837 raise exc.TransportError("Sending has been disabled") 

1838 

1839 # Section 6.1: Boundary Logging (Outgoing) 

1840 _LOGGER.debug(f"Sending frame via external writer: {frame}") 

1841 

1842 try: 

1843 await self._io_writer(frame) 

1844 except Exception as err: 

1845 _LOGGER.error(f"External writer failed to send frame: {err}") 

1846 raise exc.TransportError(f"External writer failed: {err}") from err 

1847 

1848 async def _write_frame(self, frame: str) -> None: 

1849 """Wait for the frame to be written by the external writer. 

1850 

1851 :param frame: The frame to write. 

1852 :type frame: str 

1853 """ 

1854 # Wrapper to satisfy abstract base class, though logic is in write_frame 

1855 await self.write_frame(frame) 

1856 

1857 def receive_frame(self, frame: str, dtm: str | None = None) -> None: 

1858 """Ingest a frame from the external source (Read Path). 

1859 

1860 This is the public method called by the Bridge to inject data. 

1861 

1862 :param frame: The raw frame string to receive. 

1863 :type frame: str 

1864 :param dtm: The timestamp of the frame, defaults to current time. 

1865 :type dtm: str | None, optional 

1866 """ 

1867 _LOGGER.debug( 

1868 f"Received frame from external source: frame='{frame}', timestamp={dtm}" 

1869 ) 

1870 

1871 # Section 4.2: Circuit Breaker implementation (Packet gating) 

1872 if not self._reading: 

1873 _LOGGER.debug(f"Dropping received frame (transport paused): {repr(frame)}") 

1874 return 

1875 

1876 dtm = dtm or dt_now().isoformat() 

1877 

1878 # Section 6.1: Boundary Logging (Incoming) 

1879 _LOGGER.debug( 

1880 f"Ingesting frame into transport: frame='{frame}', timestamp={dtm}" 

1881 ) 

1882 

1883 # Pass to the standard processing pipeline 

1884 self._frame_read(dtm, frame.rstrip()) 

1885 

1886 

1887def validate_topic_path(path: str) -> str: 

1888 """Test the topic path and normalize it. 

1889 

1890 :param path: The candidate topic path. 

1891 :type path: str 

1892 :return: The valid, normalized path. 

1893 :rtype: str 

1894 :raises ValueError: If the path format is invalid. 

1895 """ 

1896 

1897 # The user can supply the following paths: 

1898 # - "" 

1899 # - "/RAMSES/GATEWAY" 

1900 # - "/RAMSES/GATEWAY/+" (the previous two are equivalent to this one) 

1901 # - "/RAMSES/GATEWAY/18:123456" 

1902 

1903 # "RAMSES/GATEWAY/+" -> online, online, ... 

1904 # "RAMSES/GATEWAY/18:017804" -> online 

1905 # "RAMSES/GATEWAY/18:017804/info/+" -> ramses_esp/0.4.0 

1906 # "RAMSES/GATEWAY/+/rx" -> pkts from all gateways 

1907 

1908 new_path = path or SZ_RAMSES_GATEWAY 

1909 if new_path.startswith("/"): 

1910 new_path = new_path[1:] 

1911 if not new_path.startswith(SZ_RAMSES_GATEWAY): 

1912 raise ValueError(f"Invalid topic path: {path}") 

1913 if new_path == SZ_RAMSES_GATEWAY: 

1914 new_path += "/+" 

1915 if len(new_path.split("/")) != 3: 

1916 raise ValueError(f"Invalid topic path: {path}") 

1917 return new_path 

1918 

1919 

1920RamsesTransportT: TypeAlias = ( 

1921 FileTransport | MqttTransport | PortTransport | CallbackTransport 

1922) 

1923 

1924 

1925async def transport_factory( 

1926 protocol: RamsesProtocolT, 

1927 /, 

1928 *, 

1929 port_name: SerPortNameT | None = None, 

1930 port_config: PortConfigT | None = None, 

1931 packet_log: str | None = None, 

1932 packet_dict: dict[str, str] | None = None, 

1933 transport_constructor: Callable[..., Awaitable[RamsesTransportT]] | None = None, 

1934 disable_sending: bool = False, 

1935 extra: dict[str, Any] | None = None, 

1936 loop: asyncio.AbstractEventLoop | None = None, 

1937 log_all: bool = False, 

1938 **kwargs: Any, # HACK: odd/misc params 

1939) -> RamsesTransportT: 

1940 """Create and return a Ramses-specific async packet Transport. 

1941 

1942 :param protocol: The protocol instance that will use this transport. 

1943 :type protocol: RamsesProtocolT 

1944 :param port_name: Serial port name or MQTT URL, defaults to None. 

1945 :type port_name: SerPortNameT | None, optional 

1946 :param port_config: Configuration dictionary for serial port, defaults to None. 

1947 :type port_config: PortConfigT | None, optional 

1948 :param packet_log: Path to a file containing packet logs for playback, defaults to None. 

1949 :type packet_log: str | None, optional 

1950 :param packet_dict: Dictionary of packets for playback, defaults to None. 

1951 :type packet_dict: dict[str, str] | None, optional 

1952 :param transport_constructor: Custom async callable to create a transport, defaults to None. 

1953 :type transport_constructor: Callable[..., Awaitable[RamsesTransportT]] | None, optional 

1954 :param disable_sending: If True, the transport will not transmit packets, defaults to False. 

1955 :type disable_sending: bool | None, optional 

1956 :param extra: Extra configuration options, defaults to None. 

1957 :type extra: dict[str, Any] | None, optional 

1958 :param loop: Asyncio event loop, defaults to None. 

1959 :type loop: asyncio.AbstractEventLoop | None, optional 

1960 :param log_all: If True, log all MQTT messages including non-protocol ones, defaults to False. 

1961 :type log_all: bool, optional 

1962 :param kwargs: Additional keyword arguments for specific transports. 

1963 :type kwargs: Any 

1964 :return: An instantiated RamsesTransportT object. 

1965 :rtype: RamsesTransportT 

1966 :raises exc.TransportSourceInvalid: If the packet source is invalid or multiple sources are specified. 

1967 """ 

1968 

1969 # If a constructor is provided, delegate entirely to it. 

1970 if transport_constructor: 

1971 _LOGGER.debug("transport_factory: Delegating to external transport_constructor") 

1972 return await transport_constructor( 

1973 protocol, disable_sending=disable_sending, extra=extra, **kwargs 

1974 ) 

1975 

1976 # kwargs are specific to a transport. The above transports have: 

1977 # evofw3_flag, use_regex 

1978 

1979 def get_serial_instance( # type: ignore[no-any-unimported] 

1980 ser_name: SerPortNameT, ser_config: PortConfigT | None 

1981 ) -> Serial: 

1982 """Return a Serial instance for the given port name and config. 

1983 

1984 May: raise TransportSourceInvalid("Unable to open serial port...") 

1985 

1986 :param ser_name: Name of the serial port. 

1987 :type ser_name: SerPortNameT 

1988 :param ser_config: Configuration for the serial port. 

1989 :type ser_config: PortConfigT | None 

1990 :return: Configured Serial object. 

1991 :rtype: Serial 

1992 :raises exc.TransportSourceInvalid: If the serial port cannot be opened. 

1993 """ 

1994 # For example: 

1995 # - python client.py monitor 'rfc2217://localhost:5001' 

1996 # - python client.py monitor 'alt:///dev/ttyUSB0?class=PosixPollSerial' 

1997 

1998 ser_config = SCH_SERIAL_PORT_CONFIG(ser_config or {}) 

1999 

2000 try: 

2001 ser_obj = serial_for_url(ser_name, **ser_config) 

2002 except SerialException as err: 

2003 _LOGGER.error( 

2004 "Failed to open %s (config: %s): %s", ser_name, ser_config, err 

2005 ) 

2006 raise exc.TransportSourceInvalid( 

2007 f"Unable to open the serial port: {ser_name}" 

2008 ) from err 

2009 

2010 # FTDI on Posix/Linux would be a common environment for this library... 

2011 with contextlib.suppress(AttributeError, NotImplementedError, ValueError): 

2012 ser_obj.set_low_latency_mode(True) 

2013 

2014 return ser_obj 

2015 

2016 def issue_warning() -> None: 

2017 """Warn of the perils of semi-supported configurations.""" 

2018 _LOGGER.warning( 

2019 f"{'Windows' if os.name == 'nt' else 'This type of serial interface'} " 

2020 "is not fully supported by this library: " 

2021 "please don't report any Transport/Protocol errors/warnings, " 

2022 "unless they are reproducible with a standard configuration " 

2023 "(e.g. linux with a local serial port)" 

2024 ) 

2025 

2026 if len([x for x in (packet_dict, packet_log, port_name) if x is not None]) != 1: 

2027 raise exc.TransportSourceInvalid( 

2028 "Packet source must be exactly one of: packet_dict, packet_log, port_name" 

2029 ) 

2030 

2031 # File 

2032 if (pkt_source := packet_log or packet_dict) is not None: 

2033 return FileTransport(pkt_source, protocol, extra=extra, loop=loop, **kwargs) 

2034 

2035 assert port_name is not None # mypy check 

2036 assert port_config is not None # mypy check 

2037 

2038 # MQTT 

2039 if port_name[:4] == "mqtt": 

2040 # Check for custom timeout in kwargs, fallback to constant 

2041 mqtt_timeout = kwargs.get("timeout", _DEFAULT_TIMEOUT_MQTT) 

2042 

2043 transport = MqttTransport( 

2044 port_name, 

2045 protocol, 

2046 disable_sending=bool( 

2047 disable_sending 

2048 ), # Feature Added: handled disable_sending 

2049 extra=extra, 

2050 loop=loop, 

2051 log_all=log_all, 

2052 **kwargs, 

2053 ) 

2054 

2055 try: 

2056 # Robustness Fix: Wait with timeout, handle failure gracefully 

2057 await protocol.wait_for_connection_made(timeout=mqtt_timeout) 

2058 except Exception: 

2059 # CRITICAL FIX: Close the transport if setup fails to prevent "Zombie" callbacks 

2060 # This prevents the "AttributeError: 'NoneType'..." crash later on 

2061 transport.close() 

2062 raise 

2063 

2064 return transport 

2065 

2066 # Serial 

2067 ser_instance = get_serial_instance(port_name, port_config) 

2068 

2069 if os.name == "nt" or ser_instance.portstr[:7] in ("rfc2217", "socket:"): 

2070 issue_warning() # TODO: add tests for these... 

2071 

2072 transport = PortTransport( # type: ignore[assignment] 

2073 ser_instance, 

2074 protocol, 

2075 disable_sending=bool(disable_sending), 

2076 extra=extra, 

2077 loop=loop, 

2078 **kwargs, 

2079 ) 

2080 

2081 # TODO: remove this? better to invoke timeout after factory returns? 

2082 await protocol.wait_for_connection_made(timeout=_DEFAULT_TIMEOUT_PORT) 

2083 # pytest-cov times out in virtual_rf.py when set below 30.0 on GitHub Actions 

2084 return transport