Coverage for src/ramses_tx/transport.py: 23%
793 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - RAMSES-II compatible packet transport.
4Operates at the pkt layer of: app - msg - pkt - h/w
6For ser2net, use the following YAML with: ``ser2net -c misc/ser2net.yaml``
8.. code-block::
10 connection: &con00
11 accepter: telnet(rfc2217),tcp,5001
12 timeout: 0
13 connector: serialdev,/dev/ttyUSB0,115200n81,local
14 options:
15 max-connections: 3
17For ``socat``, see:
19.. code-block::
21 socat -dd pty,raw,echo=0 pty,raw,echo=0
22 python client.py monitor /dev/pts/0
23 cat packet.log | cut -d ' ' -f 2- | unix2dos > /dev/pts/1
25For re-flashing evofw3 via Arduino IDE on *my* atmega328p (YMMV):
27 - Board: atmega328p (SW UART)
28 - Bootloader: Old Bootloader
29 - Processor: atmega328p (5V, 16 MHz)
30 - Host: 57600 (or 115200, YMMV)
31 - Pinout: Nano
33For re-flashing evofw3 via Arduino IDE on *my* atmega32u4 (YMMV):
35 - Board: atmega32u4 (HW UART)
36 - Processor: atmega32u4 (5V, 16 MHz)
37 - Pinout: Pro Micro
38"""
40from __future__ import annotations
42import asyncio
43import contextlib
44import fileinput
45import functools
46import glob
47import json
48import logging
49import os
50import re
51import sys
52from collections import deque
53from collections.abc import Awaitable, Callable, Iterable
54from datetime import datetime as dt, timedelta as td
55from functools import partial, wraps
56from io import TextIOWrapper
57from string import printable
58from time import perf_counter
59from typing import TYPE_CHECKING, Any, Final, TypeAlias
60from urllib.parse import parse_qs, unquote, urlparse
62from paho.mqtt import MQTTException, client as mqtt
64try:
65 from paho.mqtt.enums import CallbackAPIVersion
66except ImportError:
67 # Fallback for Paho MQTT < 2.0.0 (Home Assistant compatibility)
68 CallbackAPIVersion = None # type: ignore[assignment, misc]
69from serial import ( # type: ignore[import-untyped]
70 Serial,
71 SerialException,
72 serial_for_url,
73)
75from . import exceptions as exc
76from .command import Command
77from .const import (
78 DUTY_CYCLE_DURATION,
79 MAX_DUTY_CYCLE_RATE,
80 MAX_TRANSMIT_RATE_TOKENS,
81 MIN_INTER_WRITE_GAP,
82 SZ_ACTIVE_HGI,
83 SZ_IS_EVOFW3,
84 SZ_SIGNATURE,
85)
86from .helpers import dt_now
87from .packet import Packet
88from .schemas import (
89 SCH_SERIAL_PORT_CONFIG,
90 SZ_EVOFW_FLAG,
91 SZ_INBOUND,
92 SZ_OUTBOUND,
93 DeviceIdT,
94 PortConfigT,
95)
96from .typing import ExceptionT, SerPortNameT
98from .const import ( # noqa: F401, isort: skip, pylint: disable=unused-import
99 I_,
100 RP,
101 RQ,
102 W_,
103 Code,
104)
106if TYPE_CHECKING:
107 from .protocol import RamsesProtocolT
110_DEFAULT_TIMEOUT_PORT: Final[float] = 3
111_DEFAULT_TIMEOUT_MQTT: Final[float] = 60 # Updated from 9s to 60s for robustness
113_SIGNATURE_GAP_SECS = 0.05
114_SIGNATURE_MAX_TRYS = 40 # was: 24
115_SIGNATURE_MAX_SECS = 3
117SZ_RAMSES_GATEWAY: Final = "RAMSES/GATEWAY"
118SZ_READER_TASK: Final = "reader_task"
121#
122# NOTE: All debug flags should be False for deployment to end-users
123_DBG_DISABLE_DUTY_CYCLE_LIMIT: Final[bool] = False
124_DBG_DISABLE_REGEX_WARNINGS: Final[bool] = False
125_DBG_FORCE_FRAME_LOGGING: Final[bool] = False
127_LOGGER = logging.getLogger(__name__)
130try:
131 import serial_asyncio_fast as serial_asyncio # type: ignore[import-not-found, import-untyped, unused-ignore]
133 _LOGGER.debug("Using pyserial-asyncio-fast in place of pyserial-asyncio")
134except ImportError:
135 import serial_asyncio # type: ignore[import-not-found, import-untyped, unused-ignore, no-redef]
138# For linux, use a modified version of comports() to include /dev/serial/by-id/* links
139if os.name == "nt": # sys.platform == 'win32':
140 from serial.tools.list_ports_windows import comports # type: ignore[import-untyped]
142elif os.name != "posix": # is unsupported
143 raise ImportError(
144 f"Sorry: no implementation for your platform ('{os.name}') available"
145 )
147elif sys.platform.lower()[:5] != "linux": # e.g. osx
148 from serial.tools.list_ports_posix import comports # type: ignore[import-untyped]
150else: # is linux
151 # - see: https://github.com/pyserial/pyserial/pull/700
152 # - see: https://github.com/pyserial/pyserial/pull/709
154 from serial.tools.list_ports_linux import SysFS # type: ignore[import-untyped]
156 def list_links(devices: set[str]) -> list[str]:
157 """Search for symlinks to ports already listed in devices.
159 :param devices: A set of real device paths.
160 :type devices: set[str]
161 :return: A list of symlinks pointing to the devices.
162 :rtype: list[str]
163 """
165 links: list[str] = []
166 for device in glob.glob("/dev/*") + glob.glob("/dev/serial/by-id/*"):
167 if os.path.islink(device) and os.path.realpath(device) in devices:
168 links.append(device)
169 return links
171 def comports( # type: ignore[no-any-unimported]
172 include_links: bool = False, _hide_subsystems: list[str] | None = None
173 ) -> list[SysFS]:
174 """Return a list of Serial objects for all known serial ports.
176 :param include_links: Whether to include symlinks in the results, defaults to False.
177 :type include_links: bool, optional
178 :param _hide_subsystems: List of subsystems to hide, defaults to None.
179 :type _hide_subsystems: list[str] | None, optional
180 :return: A list of SysFS objects representing the ports.
181 :rtype: list[SysFS]
182 """
184 if _hide_subsystems is None:
185 _hide_subsystems = ["platform"]
187 devices = set()
188 with open("/proc/tty/drivers") as file:
189 drivers = file.readlines()
190 for driver in drivers:
191 items = driver.strip().split()
192 if items[4] == "serial":
193 devices.update(glob.glob(items[1] + "*"))
195 if include_links:
196 devices.update(list_links(devices))
198 result: list[SysFS] = [ # type: ignore[no-any-unimported]
199 d for d in map(SysFS, devices) if d.subsystem not in _hide_subsystems
200 ]
201 return result
204async def is_hgi80(serial_port: SerPortNameT) -> bool | None:
205 """Return True if the device attached to the port has the attributes of a Honeywell HGI80.
207 Return False if it appears to be an evofw3-compatible device (ATMega etc).
208 Return None if the type cannot be determined.
210 :param serial_port: The serial port path or URL.
211 :type serial_port: SerPortNameT
212 :return: True if HGI80, False if not (likely evofw3), None if undetermined.
213 :rtype: bool | None
214 :raises exc.TransportSerialError: If the serial port cannot be found.
215 """
217 if serial_port[:7] == "mqtt://":
218 return False # ramses_esp
220 # TODO: add tests for different serial ports, incl./excl/ by-id
222 # See: https://github.com/pyserial/pyserial-asyncio/issues/46
223 if "://" in serial_port: # e.g. "rfc2217://localhost:5001"
224 try:
225 serial_for_url(serial_port, do_not_open=True)
226 except (SerialException, ValueError) as err:
227 raise exc.TransportSerialError(
228 f"Unable to find {serial_port}: {err}"
229 ) from err
230 return None
232 if not os.path.exists(serial_port):
233 raise exc.TransportSerialError(f"Unable to find {serial_port}")
235 # first, try the easy win...
236 if "by-id" not in serial_port:
237 pass
238 elif "TUSB3410" in serial_port:
239 return True
240 elif "evofw3" in serial_port or "FT232R" in serial_port or "NANO" in serial_port:
241 return False
243 # otherwise, we can look at device attrs via comports()...
244 try:
245 loop = asyncio.get_running_loop()
246 komports = await loop.run_in_executor(
247 None, partial(comports, include_links=True)
248 )
249 except ImportError as err:
250 raise exc.TransportSerialError(f"Unable to find {serial_port}: {err}") from err
252 # TODO: remove get(): not monkeypatching comports() correctly for /dev/pts/...
253 vid = {x.device: x.vid for x in komports}.get(serial_port)
255 # this works, but we may not have all valid VIDs
256 if not vid:
257 pass
258 elif vid == 0x10AC: # Honeywell
259 return True
260 elif vid in (0x0403, 0x1B4F): # FTDI, SparkFun
261 return False
263 # TODO: remove get(): not monkeypatching comports() correctly for /dev/pts/...
264 product = {x.device: getattr(x, "product", None) for x in komports}.get(serial_port)
266 if not product: # is None - VM, or not member of plugdev group?
267 pass
268 elif "TUSB3410" in product: # ?needed
269 return True
270 elif "evofw3" in product or "FT232R" in product or "NANO" in product:
271 return False
273 # could try sending an "!V", expect "# evofw3 0.7.1", but that needs I/O
275 _LOGGER.warning(
276 f"{serial_port}: the gateway type is not determinable, will assume evofw3"
277 + (
278 ", TIP: specify the serial port by-id (i.e. /dev/serial/by-id/usb-...)"
279 if "by-id" not in serial_port
280 else ""
281 )
282 )
283 return None
286def _normalise(pkt_line: str) -> str:
287 """Perform any (transparent) frame-level hacks, as required at (near-)RF layer.
289 Goals:
290 - ensure an evofw3 provides the same output as a HGI80 (none, presently)
291 - handle 'strange' packets (e.g. ``I|08:|0008``)
293 :param pkt_line: The raw packet string from the hardware.
294 :type pkt_line: str
295 :return: The normalized packet string.
296 :rtype: str
297 """
299 # TODO: deprecate as only for ramses_esp <0.4.0
300 # ramses_esp-specific bugs, see: https://github.com/IndaloTech/ramses_esp/issues/1
301 pkt_line = re.sub("\r\r", "\r", pkt_line)
302 if pkt_line[:4] == " 000":
303 pkt_line = pkt_line[1:]
304 elif pkt_line[:2] in (I_, RQ, RP, W_):
305 pkt_line = ""
307 # pseudo-RAMSES-II packets (encrypted payload?)...
308 if pkt_line[10:14] in (" 08:", " 31:") and pkt_line[-16:] == "* Checksum error":
309 pkt_line = pkt_line[:-17] + " # Checksum error (ignored)"
311 # remove any "/r/n" (leading whitespeace is a problem for commands, but not packets)
312 return pkt_line.strip()
315def _str(value: bytes) -> str:
316 """Decode bytes to a string, ignoring non-printable characters.
318 :param value: The bytes to decode.
319 :type value: bytes
320 :return: The decoded string.
321 :rtype: str
322 """
324 try:
325 result = "".join(
326 c for c in value.decode("ascii", errors="strict") if c in printable
327 )
328 except UnicodeDecodeError:
329 _LOGGER.warning("%s < Can't decode bytestream (ignoring)", value)
330 return ""
331 return result
334def limit_duty_cycle(
335 max_duty_cycle: float, time_window: int = DUTY_CYCLE_DURATION
336) -> Callable[..., Any]:
337 """Limit the Tx rate to the RF duty cycle regulations (e.g. 1% per hour).
339 :param max_duty_cycle: Bandwidth available per observation window (percentage as 0.0-1.0).
340 :type max_duty_cycle: float
341 :param time_window: Duration of the sliding observation window in seconds, defaults to 60.
342 :type time_window: int
343 :return: A decorator that enforces the duty cycle limit.
344 :rtype: Callable[..., Any]
345 """
347 TX_RATE_AVAIL: int = 38400 # bits per second (deemed)
348 FILL_RATE: float = TX_RATE_AVAIL * max_duty_cycle # bits per second
349 BUCKET_CAPACITY: float = FILL_RATE * time_window
351 def decorator(
352 fnc: Callable[..., Awaitable[None]],
353 ) -> Callable[..., Awaitable[None]]:
354 # start with a full bit bucket
355 bits_in_bucket: float = BUCKET_CAPACITY
356 last_time_bit_added = perf_counter()
358 @wraps(fnc)
359 async def wrapper(
360 self: PortTransport, frame: str, *args: Any, **kwargs: Any
361 ) -> None:
362 nonlocal bits_in_bucket
363 nonlocal last_time_bit_added
365 rf_frame_size = 330 + len(frame[46:]) * 10
367 # top-up the bit bucket
368 elapsed_time = perf_counter() - last_time_bit_added
369 bits_in_bucket = min(
370 bits_in_bucket + elapsed_time * FILL_RATE, BUCKET_CAPACITY
371 )
372 last_time_bit_added = perf_counter()
374 if _DBG_DISABLE_DUTY_CYCLE_LIMIT:
375 bits_in_bucket = BUCKET_CAPACITY
377 # if required, wait for the bit bucket to refill (not for SETs/PUTs)
378 if bits_in_bucket < rf_frame_size:
379 await asyncio.sleep((rf_frame_size - bits_in_bucket) / FILL_RATE)
381 # consume the bits from the bit bucket
382 try:
383 await fnc(self, frame, *args, **kwargs)
384 finally:
385 bits_in_bucket -= rf_frame_size
387 @wraps(fnc)
388 async def null_wrapper(
389 self: PortTransport, frame: str, *args: Any, **kwargs: Any
390 ) -> None:
391 await fnc(self, frame, *args, **kwargs)
393 if 0 < max_duty_cycle <= 1:
394 return wrapper
396 return null_wrapper
398 return decorator
401# used by @track_transmit_rate, current_transmit_rate()
402_MAX_TRACKED_TRANSMITS = 99
403_MAX_TRACKED_DURATION = 300
406# used by @track_system_syncs, @avoid_system_syncs
407_MAX_TRACKED_SYNCS = 3
408_global_sync_cycles: deque[Packet] = deque(maxlen=_MAX_TRACKED_SYNCS)
411# TODO: doesn't look right at all...
412def avoid_system_syncs(fnc: Callable[..., Awaitable[None]]) -> Callable[..., Any]:
413 """Take measures to avoid Tx when any controller is doing a sync cycle.
415 :param fnc: The async function to decorate.
416 :type fnc: Callable[..., Awaitable[None]]
417 :return: The decorated function.
418 :rtype: Callable[..., Any]
419 """
421 DURATION_PKT_GAP = 0.020 # 0.0200 for evohome, or 0.0127 for DTS92
422 DURATION_LONG_PKT = 0.022 # time to tx I|2309|048 (or 30C9, or 000A)
423 DURATION_SYNC_PKT = 0.010 # time to tx I|1F09|003
425 SYNC_WAIT_LONG = (DURATION_PKT_GAP + DURATION_LONG_PKT) * 2
426 SYNC_WAIT_SHORT = DURATION_SYNC_PKT
427 SYNC_WINDOW_LOWER = td(seconds=SYNC_WAIT_SHORT * 0.8) # could be * 0
428 SYNC_WINDOW_UPPER = SYNC_WINDOW_LOWER + td(seconds=SYNC_WAIT_LONG * 1.2) #
430 @wraps(fnc)
431 async def wrapper(*args: Any, **kwargs: Any) -> None:
432 global _global_sync_cycles
434 def is_imminent(p: Packet) -> bool:
435 """Return True if a sync cycle is imminent."""
436 return bool(
437 SYNC_WINDOW_LOWER
438 < (p.dtm + td(seconds=int(p.payload[2:6], 16) / 10) - dt_now())
439 < SYNC_WINDOW_UPPER
440 )
442 start = perf_counter() # TODO: remove
444 # wait for the start of the sync cycle (I|1F09|003, Tx time ~0.009)
445 while any(is_imminent(p) for p in _global_sync_cycles):
446 await asyncio.sleep(SYNC_WAIT_SHORT)
448 # wait for the remainder of sync cycle (I|2309/30C9) to complete
449 if perf_counter() - start > SYNC_WAIT_SHORT:
450 await asyncio.sleep(SYNC_WAIT_LONG)
452 await fnc(*args, **kwargs)
453 return None
455 return wrapper
458def track_system_syncs(fnc: Callable[..., None]) -> Callable[..., Any]:
459 """Track/remember any new/outstanding TCS sync cycle.
461 :param fnc: The function to decorate (usually a packet reader).
462 :type fnc: Callable[..., None]
463 :return: The decorated function.
464 :rtype: Callable[..., Any]
465 """
467 @wraps(fnc)
468 def wrapper(self: PortTransport, pkt: Packet) -> None:
469 global _global_sync_cycles
471 def is_pending(p: Packet) -> bool:
472 """Return True if a sync cycle is still pending (ignores drift)."""
473 return bool(p.dtm + td(seconds=int(p.payload[2:6], 16) / 10) > dt_now())
475 if pkt.code != Code._1F09 or pkt.verb != I_ or pkt._len != 3:
476 fnc(self, pkt)
477 return None
479 _global_sync_cycles = deque(
480 p for p in _global_sync_cycles if p.src != pkt.src and is_pending(p)
481 )
482 _global_sync_cycles.append(pkt) # TODO: sort
484 if (
485 len(_global_sync_cycles) > _MAX_TRACKED_SYNCS
486 ): # safety net for corrupted payloads
487 _global_sync_cycles.popleft()
489 fnc(self, pkt)
491 return wrapper
494# ### Abstractors #####################################################################
495# ### Do the bare minimum to abstract each transport from its underlying class
498class _CallbackTransportAbstractor:
499 """Do the bare minimum to abstract a transport from its underlying class."""
501 def __init__(
502 self, loop: asyncio.AbstractEventLoop | None = None, **kwargs: Any
503 ) -> None:
504 """Initialize the callback transport abstractor.
506 :param loop: The asyncio event loop, defaults to None.
507 :type loop: asyncio.AbstractEventLoop | None, optional
508 """
509 self._loop = loop or asyncio.get_event_loop()
510 # Consume 'kwargs' here. Do NOT pass them to object.__init__().
511 super().__init__()
514class _BaseTransport:
515 """Base class for all transports."""
517 def __init__(self, *args: Any, **kwargs: Any) -> None:
518 super().__init__(*args, **kwargs)
521class _FileTransportAbstractor:
522 """Do the bare minimum to abstract a transport from its underlying class."""
524 def __init__(
525 self,
526 pkt_source: dict[str, str] | str | TextIOWrapper,
527 protocol: RamsesProtocolT,
528 loop: asyncio.AbstractEventLoop | None = None,
529 ) -> None:
530 """Initialize the file transport abstractor.
532 :param pkt_source: The source of packets (file path, file object, or dict).
533 :type pkt_source: dict[str, str] | str | TextIOWrapper
534 :param protocol: The protocol instance.
535 :type protocol: RamsesProtocolT
536 :param loop: The asyncio event loop, defaults to None.
537 :type loop: asyncio.AbstractEventLoop | None, optional
538 """
539 # per().__init__(extra=extra) # done in _BaseTransport
541 self._pkt_source = pkt_source
543 self._protocol = protocol
544 self._loop = loop or asyncio.get_event_loop()
547class _PortTransportAbstractor(serial_asyncio.SerialTransport):
548 """Do the bare minimum to abstract a transport from its underlying class."""
550 serial: Serial # type: ignore[no-any-unimported]
552 def __init__( # type: ignore[no-any-unimported]
553 self,
554 serial_instance: Serial,
555 protocol: RamsesProtocolT,
556 loop: asyncio.AbstractEventLoop | None = None,
557 ) -> None:
558 """Initialize the port transport abstractor.
560 :param serial_instance: The serial object instance.
561 :type serial_instance: Serial
562 :param protocol: The protocol instance.
563 :type protocol: RamsesProtocolT
564 :param loop: The asyncio event loop, defaults to None.
565 :type loop: asyncio.AbstractEventLoop | None, optional
566 """
568 super().__init__(loop or asyncio.get_event_loop(), protocol, serial_instance)
570 # lf._serial = serial_instance # ._serial, not .serial
572 # lf._protocol = protocol
573 # lf._loop = loop or asyncio.get_event_loop()
576class _MqttTransportAbstractor:
577 """Do the bare minimum to abstract a transport from its underlying class."""
579 def __init__(
580 self,
581 broker_url: str,
582 protocol: RamsesProtocolT,
583 loop: asyncio.AbstractEventLoop | None = None,
584 ) -> None:
585 """Initialize the MQTT transport abstractor.
587 :param broker_url: The URL of the MQTT broker.
588 :type broker_url: str
589 :param protocol: The protocol instance.
590 :type protocol: RamsesProtocolT
591 :param loop: The asyncio event loop, defaults to None.
592 :type loop: asyncio.AbstractEventLoop | None, optional
593 """
594 # per().__init__(extra=extra) # done in _BaseTransport
596 self._broker_url = urlparse(broker_url)
598 self._protocol = protocol
599 self._loop = loop or asyncio.get_event_loop()
602# ### Base classes (common to all Transports) #########################################
603# ### Code shared by all R/O, R/W transport types (File/dict, Serial, MQTT)
606class _ReadTransport(_BaseTransport):
607 """Interface for read-only transports."""
609 _protocol: RamsesProtocolT = None # type: ignore[assignment]
610 _loop: asyncio.AbstractEventLoop
612 _is_hgi80: bool | None = None # NOTE: None (unknown) is as False (is_evofw3)
614 # __slots__ = ('_extra',)
616 def __init__(
617 self, *args: Any, extra: dict[str, Any] | None = None, **kwargs: Any
618 ) -> None:
619 """Initialize the read-only transport.
621 :param extra: Extra info dict, defaults to None.
622 :type extra: dict[str, Any] | None, optional
623 """
624 super().__init__(*args, loop=kwargs.pop("loop", None))
626 self._extra: dict[str, Any] = {} if extra is None else extra
628 self._evofw_flag = kwargs.pop(SZ_EVOFW_FLAG, None) # gwy.config.evofw_flag
629 # kwargs.pop("comms_params", None) # FiXME: remove this
631 self._closing: bool = False
632 self._reading: bool = False
634 self._this_pkt: Packet | None = None
635 self._prev_pkt: Packet | None = None
637 for key in (SZ_ACTIVE_HGI, SZ_SIGNATURE):
638 self._extra.setdefault(key, None)
640 def __repr__(self) -> str:
641 return f"{self.__class__.__name__}({self._protocol})"
643 def _dt_now(self) -> dt:
644 """Return a precise datetime, using last packet's dtm field.
646 :return: The timestamp of the current packet or a default.
647 :rtype: dt
648 """
650 try:
651 return self._this_pkt.dtm # type: ignore[union-attr]
652 except AttributeError:
653 return dt(1970, 1, 1, 1, 0)
655 @property
656 def loop(self) -> asyncio.AbstractEventLoop:
657 """The asyncio event loop as declared by SerialTransport.
659 :return: The event loop.
660 :rtype: asyncio.AbstractEventLoop
661 """
662 return self._loop
664 def get_extra_info(self, name: str, default: Any = None) -> Any:
665 """Get extra information about the transport.
667 :param name: The name of the information to retrieve.
668 :type name: str
669 :param default: Default value if name is not found, defaults to None.
670 :type default: Any, optional
671 :return: The value associated with name.
672 :rtype: Any
673 """
674 if name == SZ_IS_EVOFW3:
675 return not self._is_hgi80
676 return self._extra.get(name, default)
678 def is_closing(self) -> bool:
679 """Return True if the transport is closing or has closed.
681 :return: Closing state.
682 :rtype: bool
683 """
684 return self._closing
686 def _close(self, exc: exc.RamsesException | None = None) -> None:
687 """Inform the protocol that this transport has closed.
689 :param exc: The exception that caused the closure, if any.
690 :type exc: exc.RamsesException | None, optional
691 """
693 if self._closing:
694 return
695 self._closing = True
697 self.loop.call_soon_threadsafe(
698 functools.partial(self._protocol.connection_lost, exc) # type: ignore[arg-type]
699 )
701 def close(self) -> None:
702 """Close the transport gracefully."""
703 self._close()
705 def is_reading(self) -> bool:
706 """Return True if the transport is receiving.
708 :return: Reading state.
709 :rtype: bool
710 """
711 return self._reading
713 def pause_reading(self) -> None:
714 """Pause the receiving end (no data to protocol.pkt_received())."""
715 self._reading = False
717 def resume_reading(self) -> None:
718 """Resume the receiving end."""
719 self._reading = True
721 def _make_connection(self, gwy_id: DeviceIdT | None) -> None:
722 """Register the connection with the protocol.
724 :param gwy_id: The ID of the gateway device, if known.
725 :type gwy_id: DeviceIdT | None
726 """
727 self._extra[SZ_ACTIVE_HGI] = gwy_id # or HGI_DEV_ADDR.id
729 self.loop.call_soon_threadsafe( # shouldn't call this until we have HGI-ID
730 functools.partial(self._protocol.connection_made, self, ramses=True) # type: ignore[arg-type]
731 )
733 # NOTE: all transport should call this method when they receive data
734 def _frame_read(self, dtm_str: str, frame: str) -> None:
735 """Make a Packet from the Frame and process it (called by each specific Tx).
737 :param dtm_str: The timestamp string of the frame.
738 :type dtm_str: str
739 :param frame: The raw frame string.
740 :type frame: str
741 """
743 if not frame.strip():
744 return
746 try:
747 pkt = Packet.from_file(dtm_str, frame) # is OK for when src is dict
749 except ValueError as err: # VE from dt.fromisoformat() or falsey packet
750 _LOGGER.debug("%s < PacketInvalid(%s)", frame, err)
751 return
753 except exc.PacketInvalid as err: # VE from dt.fromisoformat()
754 _LOGGER.warning("%s < PacketInvalid(%s)", frame, err)
755 return
757 self._pkt_read(pkt)
759 # NOTE: all protocol callbacks should be invoked from here
760 def _pkt_read(self, pkt: Packet) -> None:
761 """Pass any valid Packets to the protocol's callback (_prev_pkt, _this_pkt).
763 :param pkt: The parsed packet.
764 :type pkt: Packet
765 :raises exc.TransportError: If called while closing.
766 """
768 self._this_pkt, self._prev_pkt = pkt, self._this_pkt
770 # if self._reading is False: # raise, or warn & return?
771 # raise exc.TransportError("Reading has been paused")
772 if self._closing is True: # raise, or warn & return?
773 raise exc.TransportError("Transport is closing or has closed")
775 # TODO: can we switch to call_soon now QoS has been refactored?
776 # NOTE: No need to use call_soon() here, and they may break Qos/Callbacks
777 # NOTE: Thus, excepts need checking
778 try: # below could be a call_soon?
779 self.loop.call_soon_threadsafe(self._protocol.pkt_received, pkt)
780 except AssertionError as err: # protect from upper layers
781 _LOGGER.exception("%s < exception from msg layer: %s", pkt, err)
782 except exc.ProtocolError as err: # protect from upper layers
783 _LOGGER.error("%s < exception from msg layer: %s", pkt, err)
785 async def write_frame(self, frame: str, disable_tx_limits: bool = False) -> None:
786 """ "Transmit a frame via the underlying handler (e.g. serial port, MQTT).
788 :param frame: The frame to write.
789 :type frame: str
790 :param disable_tx_limits: Whether to bypass duty cycle limits, defaults to False.
791 :type disable_tx_limits: bool, optional
792 :raises exc.TransportSerialError: Because this transport is read-only.
793 """
794 raise exc.TransportSerialError("This transport is read only")
797class _FullTransport(_ReadTransport): # asyncio.Transport
798 """Interface representing a bidirectional transport."""
800 def __init__(
801 self, *args: Any, disable_sending: bool = False, **kwargs: Any
802 ) -> None:
803 """Initialize the full transport.
805 :param disable_sending: Whether to disable sending capabilities, defaults to False.
806 :type disable_sending: bool, optional
807 """
808 super().__init__(*args, **kwargs)
810 self._disable_sending = disable_sending
811 self._transmit_times: deque[dt] = deque(maxlen=_MAX_TRACKED_TRANSMITS)
813 def _dt_now(self) -> dt:
814 """Return a precise datetime, using the current dtm.
816 :return: Current datetime.
817 :rtype: dt
818 """
819 # _LOGGER.error("Full._dt_now()")
821 return dt_now()
823 def get_extra_info(self, name: str, default: Any = None) -> Any:
824 """Get extra info, including transmit rate calculations.
826 :param name: Name of info.
827 :type name: str
828 :param default: Default value.
829 :type default: Any, optional
830 :return: The requested info.
831 :rtype: Any
832 """
833 if name == "tx_rate":
834 return self._report_transmit_rate()
835 return super().get_extra_info(name, default=default)
837 def _report_transmit_rate(self) -> float:
838 """Return the transmit rate in transmits per minute.
840 :return: Transmits per minute.
841 :rtype: float
842 """
844 dt_now = dt.now()
845 dtm = dt_now - td(seconds=_MAX_TRACKED_DURATION)
846 transmit_times = tuple(t for t in self._transmit_times if t > dtm)
848 if len(transmit_times) <= 1:
849 return len(transmit_times)
851 duration: float = (transmit_times[-1] - transmit_times[0]) / td(seconds=1)
852 return int(len(transmit_times) / duration * 6000) / 100
854 def _track_transmit_rate(self) -> None:
855 """Track the Tx rate as period of seconds per x transmits."""
857 # period: float = (transmit_times[-1] - transmit_times[0]) / td(seconds=1)
858 # num_tx: int = len(transmit_times)
860 self._transmit_times.append(dt.now())
862 _LOGGER.debug(f"Current Tx rate: {self._report_transmit_rate():.2f} pkts/min")
864 # NOTE: Protocols call write_frame(), not write()
865 def write(self, data: bytes) -> None:
866 """Write the data to the underlying handler.
868 :param data: The data to write.
869 :type data: bytes
870 :raises exc.TransportError: Always raises, use write_frame instead.
871 """
872 # _LOGGER.error("Full.write(%s)", data)
874 raise exc.TransportError("write() not implemented, use write_frame() instead")
876 async def write_frame(self, frame: str, disable_tx_limits: bool = False) -> None:
877 """Transmit a frame via the underlying handler (e.g. serial port, MQTT).
879 Protocols call Transport.write_frame(), not Transport.write().
881 :param frame: The frame to transmit.
882 :type frame: str
883 :param disable_tx_limits: Whether to disable duty cycle limits, defaults to False.
884 :type disable_tx_limits: bool, optional
885 :raises exc.TransportError: If sending is disabled or transport is closed.
886 """
888 if self._disable_sending is True:
889 raise exc.TransportError("Sending has been disabled")
890 if self._closing is True:
891 raise exc.TransportError("Transport is closing or has closed")
893 self._track_transmit_rate()
895 await self._write_frame(frame)
897 async def _write_frame(self, frame: str) -> None:
898 """Write some data bytes to the underlying transport.
900 :param frame: The frame to write.
901 :type frame: str
902 :raises NotImplementedError: Abstract method.
903 """
904 # _LOGGER.error("Full._write_frame(%s)", frame)
906 raise NotImplementedError("_write_frame() not implemented here")
909_RegexRuleT: TypeAlias = dict[str, str]
912class _RegHackMixin:
913 """Mixin to apply regex rules to inbound and outbound frames."""
915 def __init__(
916 self, *args: Any, use_regex: dict[str, _RegexRuleT] | None = None, **kwargs: Any
917 ) -> None:
918 """Initialize the regex mixin.
920 :param use_regex: Dictionary containing inbound/outbound regex rules.
921 :type use_regex: dict[str, _RegexRuleT] | None, optional
922 """
923 super().__init__(*args, **kwargs)
925 use_regex = use_regex or {}
927 self._inbound_rule: _RegexRuleT = use_regex.get(SZ_INBOUND, {})
928 self._outbound_rule: _RegexRuleT = use_regex.get(SZ_OUTBOUND, {})
930 @staticmethod
931 def _regex_hack(pkt_line: str, regex_rules: _RegexRuleT) -> str:
932 """Apply regex rules to a packet line.
934 :param pkt_line: The packet line to process.
935 :type pkt_line: str
936 :param regex_rules: The rules to apply.
937 :type regex_rules: _RegexRuleT
938 :return: The modified packet line.
939 :rtype: str
940 """
941 if not regex_rules:
942 return pkt_line
944 result = pkt_line
945 for k, v in regex_rules.items():
946 try:
947 result = re.sub(k, v, result)
948 except re.error as err:
949 _LOGGER.warning(f"{pkt_line} < issue with regex ({k}, {v}): {err}")
951 if result != pkt_line and not _DBG_DISABLE_REGEX_WARNINGS:
952 _LOGGER.warning(f"{pkt_line} < Changed by use_regex to: {result}")
953 return result
955 def _frame_read(self, dtm_str: str, frame: str) -> None:
956 super()._frame_read(dtm_str, self._regex_hack(frame, self._inbound_rule)) # type: ignore[misc]
958 async def write_frame(self, frame: str, disable_tx_limits: bool = False) -> None:
959 await super().write_frame(self._regex_hack(frame, self._outbound_rule)) # type: ignore[misc]
962# ### Transports ######################################################################
963# ### Implement the transports for File/dict (R/O), Serial, MQTT
966class FileTransport(_ReadTransport, _FileTransportAbstractor):
967 """Receive packets from a read-only source such as packet log or a dict."""
969 def __init__(self, *args: Any, disable_sending: bool = True, **kwargs: Any) -> None:
970 """Initialize the file transport.
972 :param disable_sending: Must be True for FileTransport.
973 :type disable_sending: bool
974 :raises exc.TransportSourceInvalid: If disable_sending is False.
975 """
976 super().__init__(*args, **kwargs)
978 if bool(disable_sending) is False:
979 raise exc.TransportSourceInvalid("This Transport cannot send packets")
981 self._extra[SZ_READER_TASK] = self._reader_task = self._loop.create_task(
982 self._start_reader(), name="FileTransport._start_reader()"
983 )
985 self._make_connection(None)
987 async def _start_reader(self) -> None: # TODO
988 """Start the reader task."""
989 self._reading = True
990 try:
991 await self._reader()
992 except Exception as err:
993 self.loop.call_soon_threadsafe(
994 functools.partial(self._protocol.connection_lost, err) # type: ignore[arg-type]
995 )
996 else:
997 self.loop.call_soon_threadsafe(
998 functools.partial(self._protocol.connection_lost, None)
999 )
1001 # NOTE: self._frame_read() invoked from here
1002 async def _reader(self) -> None: # TODO
1003 """Loop through the packet source for Frames and process them."""
1005 if isinstance(self._pkt_source, dict):
1006 for dtm_str, pkt_line in self._pkt_source.items(): # assume dtm_str is OK
1007 while not self._reading:
1008 await asyncio.sleep(0.001)
1009 self._frame_read(dtm_str, pkt_line)
1010 await asyncio.sleep(0)
1011 # NOTE: instable without, big performance penalty if delay >0
1013 elif isinstance(self._pkt_source, str): # file_name, used in client parse
1014 # open file file_name before reading
1015 try:
1016 with fileinput.input(files=self._pkt_source, encoding="utf-8") as file:
1017 for dtm_pkt_line in file: # self._pkt_source:
1018 # TODO check dtm_str is OK
1019 while not self._reading:
1020 await asyncio.sleep(0.001)
1021 # there may be blank lines in annotated log files
1022 if (dtm_pkt_line := dtm_pkt_line.strip()) and dtm_pkt_line[
1023 :1
1024 ] != "#":
1025 self._frame_read(dtm_pkt_line[:26], dtm_pkt_line[27:])
1026 # this is where the parsing magic happens!
1027 await asyncio.sleep(0)
1028 # NOTE: instable without, big performance penalty if delay >0
1029 except FileNotFoundError as err:
1030 _LOGGER.warning(f"Correct the packet file name; {err}")
1031 elif isinstance(self._pkt_source, TextIOWrapper): # used by client monitor
1032 for dtm_pkt_line in self._pkt_source: # should check dtm_str is OK
1033 while not self._reading:
1034 await asyncio.sleep(0.001)
1035 # can be blank lines in annotated log files
1036 if (dtm_pkt_line := dtm_pkt_line.strip()) and dtm_pkt_line[:1] != "#":
1037 self._frame_read(dtm_pkt_line[:26], dtm_pkt_line[27:])
1038 await asyncio.sleep(0)
1039 # NOTE: instable without, big performance penalty if delay >0
1040 else:
1041 raise exc.TransportSourceInvalid(
1042 f"Packet source is not dict, TextIOWrapper or str: {self._pkt_source:!r}"
1043 )
1045 def _close(self, exc: exc.RamsesException | None = None) -> None:
1046 """Close the transport (cancel any outstanding tasks).
1048 :param exc: The exception causing closure.
1049 :type exc: exc.RamsesException | None, optional
1050 """
1052 super()._close(exc)
1054 if self._reader_task:
1055 self._reader_task.cancel()
1058class PortTransport(_RegHackMixin, _FullTransport, _PortTransportAbstractor): # type: ignore[misc]
1059 """Send/receive packets async to/from evofw3/HGI80 via a serial port.
1061 See: https://github.com/ghoti57/evofw3
1062 """
1064 _init_fut: asyncio.Future[Packet | None]
1065 _init_task: asyncio.Task[None]
1067 _recv_buffer: bytes = b""
1069 def __init__(self, *args: Any, **kwargs: Any) -> None:
1070 """Initialize the port transport."""
1071 super().__init__(*args, **kwargs)
1073 self._leaker_sem = asyncio.BoundedSemaphore()
1074 self._leaker_task = self._loop.create_task(
1075 self._leak_sem(), name="PortTransport._leak_sem()"
1076 )
1078 self._loop.create_task(
1079 self._create_connection(), name="PortTransport._create_connection()"
1080 )
1082 async def _create_connection(self) -> None:
1083 """Invoke the Protocols's connection_made() callback after HGI80 discovery."""
1085 # HGI80s (and also VMs) take longer to send signature packets as they have long
1086 # initialisation times, so we must wait until they send OK
1088 # signature also serves to discover the HGI's device_id (& for pkt log, if any)
1090 self._is_hgi80 = await is_hgi80(self.serial.name)
1092 async def connect_sans_signature() -> None:
1093 """Call connection_made() without sending/waiting for a signature."""
1095 self._init_fut.set_result(None)
1096 self._make_connection(gwy_id=None)
1098 async def connect_with_signature() -> None:
1099 """Poll port with signatures, call connection_made() after first echo."""
1101 # TODO: send a 2nd signature, but with addr0 set to learned GWY address
1102 # TODO: a HGI80 will silently drop this cmd, so an echo would tell us
1103 # TODO: that the GWY is evofw3-compatible
1105 sig = Command._puzzle()
1106 self._extra[SZ_SIGNATURE] = sig.payload
1108 num_sends = 0
1109 while num_sends < _SIGNATURE_MAX_TRYS:
1110 num_sends += 1
1112 await self._write_frame(str(sig))
1113 await asyncio.sleep(_SIGNATURE_GAP_SECS)
1115 if self._init_fut.done():
1116 pkt = self._init_fut.result()
1117 self._make_connection(gwy_id=pkt.src.id if pkt else None)
1118 return
1120 if not self._init_fut.done():
1121 self._init_fut.set_result(None)
1123 self._make_connection(gwy_id=None)
1124 return
1126 self._init_fut = asyncio.Future()
1127 if self._disable_sending:
1128 self._init_task = self._loop.create_task(
1129 connect_sans_signature(), name="PortTransport.connect_sans_signature()"
1130 )
1131 else:
1132 self._init_task = self._loop.create_task(
1133 connect_with_signature(), name="PortTransport.connect_with_signature()"
1134 )
1136 try: # wait to get (1st) signature echo from evofw3/HGI80, if any
1137 await asyncio.wait_for(self._init_fut, timeout=_SIGNATURE_MAX_SECS)
1138 except TimeoutError as err:
1139 raise exc.TransportSerialError(
1140 f"Failed to initialise Transport within {_SIGNATURE_MAX_SECS} secs"
1141 ) from err
1143 async def _leak_sem(self) -> None:
1144 """Used to enforce a minimum time between calls to self.write()."""
1145 while True:
1146 await asyncio.sleep(MIN_INTER_WRITE_GAP)
1147 with contextlib.suppress(ValueError):
1148 self._leaker_sem.release()
1150 # NOTE: self._frame_read() invoked from here
1151 def _read_ready(self) -> None:
1152 """Make Frames from the read data and process them."""
1154 def bytes_read(data: bytes) -> Iterable[tuple[dt, bytes]]:
1155 self._recv_buffer += data
1156 if b"\r\n" in self._recv_buffer:
1157 lines = self._recv_buffer.split(b"\r\n")
1158 self._recv_buffer = lines[-1]
1159 for line in lines[:-1]:
1160 yield self._dt_now(), line + b"\r\n"
1162 try:
1163 data: bytes = self.serial.read(self._max_read_size)
1164 except SerialException as err:
1165 if not self._closing:
1166 self._close(exc=err) # have to use _close() to pass in exception
1167 return
1169 if not data:
1170 return
1172 for dtm, raw_line in bytes_read(data):
1173 if _DBG_FORCE_FRAME_LOGGING:
1174 _LOGGER.warning("Rx: %s", raw_line)
1175 elif _LOGGER.getEffectiveLevel() == logging.INFO: # log for INFO not DEBUG
1176 _LOGGER.info("Rx: %s", raw_line)
1178 self._frame_read(
1179 dtm.isoformat(timespec="milliseconds"), _normalise(_str(raw_line))
1180 )
1182 @track_system_syncs
1183 def _pkt_read(self, pkt: Packet) -> None:
1184 # NOTE: a signature can override an existing active gateway
1185 if (
1186 not self._init_fut.done()
1187 and pkt.code == Code._PUZZ
1188 and pkt.payload == self._extra[SZ_SIGNATURE]
1189 ):
1190 self._extra[SZ_ACTIVE_HGI] = pkt.src.id # , by_signature=True)
1191 self._init_fut.set_result(pkt)
1193 super()._pkt_read(pkt)
1195 @limit_duty_cycle(MAX_DUTY_CYCLE_RATE)
1196 @avoid_system_syncs
1197 async def write_frame(self, frame: str, disable_tx_limits: bool = False) -> None:
1198 """Transmit a frame via the underlying handler (e.g. serial port, MQTT).
1200 Protocols call Transport.write_frame(), not Transport.write().
1202 :param frame: The frame to transmit.
1203 :type frame: str
1204 :param disable_tx_limits: Whether to disable duty cycle limits, defaults to False.
1205 :type disable_tx_limits: bool, optional
1206 """
1208 await self._leaker_sem.acquire() # MIN_INTER_WRITE_GAP
1209 await super().write_frame(frame)
1211 # NOTE: The order should be: minimum gap between writes, duty cycle limits, and
1212 # then the code that avoids the controller sync cycles
1214 async def _write_frame(self, frame: str) -> None:
1215 """Write some data bytes to the underlying transport.
1217 :param frame: The frame to write.
1218 :type frame: str
1219 """
1221 data = bytes(frame, "ascii") + b"\r\n"
1223 log_msg = f"Serial transport transmitting frame: {frame}"
1224 if _DBG_FORCE_FRAME_LOGGING:
1225 _LOGGER.warning(log_msg)
1226 elif _LOGGER.getEffectiveLevel() > logging.DEBUG:
1227 _LOGGER.info(log_msg)
1228 else:
1229 _LOGGER.debug(log_msg)
1231 try:
1232 self._write(data)
1233 except SerialException as err:
1234 self._abort(err)
1235 return
1237 def _write(self, data: bytes) -> None:
1238 """Perform the actual write to the serial port.
1240 :param data: The bytes to write.
1241 :type data: bytes
1242 """
1243 self.serial.write(data)
1245 def _abort(self, exc: ExceptionT) -> None: # type: ignore[override] # used by serial_asyncio.SerialTransport
1246 """Abort the transport.
1248 :param exc: The exception causing the abort.
1249 :type exc: ExceptionT
1250 """
1251 super()._abort(exc) # type: ignore[arg-type]
1253 if self._init_task:
1254 self._init_task.cancel()
1255 if self._leaker_task:
1256 self._leaker_task.cancel()
1258 def _close(self, exc: exc.RamsesException | None = None) -> None: # type: ignore[override]
1259 """Close the transport (cancel any outstanding tasks).
1261 :param exc: The exception causing closure.
1262 :type exc: exc.RamsesException | None, optional
1263 """
1265 super()._close(exc)
1267 if self._init_task:
1268 self._init_task.cancel()
1270 if self._leaker_task:
1271 self._leaker_task.cancel()
1274class MqttTransport(_FullTransport, _MqttTransportAbstractor):
1275 """Send/receive packets to/from ramses_esp via MQTT.
1276 For full RX logging, turn on debug logging.
1278 See: https://github.com/IndaloTech/ramses_esp
1279 """
1281 # used in .write_frame() to rate-limit the number of writes
1282 _MAX_TOKENS: Final[int] = MAX_TRANSMIT_RATE_TOKENS
1283 _TIME_WINDOW: Final[int] = DUTY_CYCLE_DURATION
1284 _TOKEN_RATE: Final[float] = _MAX_TOKENS / _TIME_WINDOW
1286 def __init__(self, *args: Any, **kwargs: Any) -> None:
1287 # _LOGGER.error("__init__(%s, %s)", args, kwargs)
1289 super().__init__(*args, **kwargs)
1291 self._username = unquote(self._broker_url.username or "")
1292 self._password = unquote(self._broker_url.password or "")
1294 self._topic_base = validate_topic_path(self._broker_url.path)
1295 self._topic_pub = ""
1296 self._topic_sub = ""
1297 # Track if we've subscribed to a wildcard data topic (e.g. ".../+/rx")
1298 self._data_wildcard_topic = ""
1300 self._mqtt_qos = int(parse_qs(self._broker_url.query).get("qos", ["0"])[0])
1302 self._connected = False
1303 self._connecting = False
1304 self._connection_established = False # Track if initial connection was made
1305 self._extra[SZ_IS_EVOFW3] = True
1307 # Reconnection settings
1308 self._reconnect_interval = 5.0 # seconds
1309 self._max_reconnect_interval = 300.0 # 5 minutes max
1310 self._reconnect_backoff = 1.5
1311 self._current_reconnect_interval = self._reconnect_interval
1312 self._reconnect_task: asyncio.Task[None] | None = None
1314 # used in .write_frame() to rate-limit the number of writes
1315 self._timestamp = perf_counter()
1316 self._max_tokens: float = self._MAX_TOKENS * 2 # allow for the initial burst
1317 self._num_tokens: float = self._MAX_TOKENS * 2
1319 # set log MQTT flag
1320 self._log_all = kwargs.pop("log_all", False)
1322 # instantiate a paho mqtt client
1323 self.client = mqtt.Client(
1324 protocol=mqtt.MQTTv5, callback_api_version=CallbackAPIVersion.VERSION2
1325 )
1326 self.client.on_connect = self._on_connect
1327 self.client.on_connect_fail = self._on_connect_fail
1328 self.client.on_disconnect = self._on_disconnect
1329 self.client.on_message = self._on_message
1330 self.client.username_pw_set(self._username, self._password)
1331 # connect to the mqtt server
1332 self._attempt_connection()
1334 def _attempt_connection(self) -> None:
1335 """Attempt to connect to the MQTT broker."""
1336 if self._connecting or self._connected:
1337 return
1339 self._connecting = True
1340 try:
1341 self.client.connect_async(
1342 str(self._broker_url.hostname or "localhost"),
1343 self._broker_url.port or 1883,
1344 60,
1345 )
1346 self.client.loop_start()
1347 except Exception as err:
1348 _LOGGER.error(f"Failed to initiate MQTT connection: {err}")
1349 self._connecting = False
1350 self._schedule_reconnect()
1352 def _schedule_reconnect(self) -> None:
1353 """Schedule a reconnection attempt with exponential backoff."""
1354 if self._closing or self._reconnect_task:
1355 return
1357 _LOGGER.info(
1358 f"Scheduling MQTT reconnect in {self._current_reconnect_interval} seconds"
1359 )
1360 self._reconnect_task = self._loop.create_task(
1361 self._reconnect_after_delay(), name="MqttTransport._reconnect_after_delay()"
1362 )
1364 async def _reconnect_after_delay(self) -> None:
1365 """Wait and then attempt to reconnect."""
1366 try:
1367 await asyncio.sleep(self._current_reconnect_interval)
1369 # Increase backoff for next time
1370 self._current_reconnect_interval = min(
1371 self._current_reconnect_interval * self._reconnect_backoff,
1372 self._max_reconnect_interval,
1373 )
1375 _LOGGER.info("Attempting MQTT reconnection...")
1376 self._attempt_connection()
1377 except asyncio.CancelledError:
1378 pass
1379 finally:
1380 self._reconnect_task = None
1382 def _on_connect(
1383 self,
1384 client: mqtt.Client,
1385 userdata: Any,
1386 flags: dict[str, Any],
1387 reason_code: Any,
1388 properties: Any | None,
1389 ) -> None:
1390 """Handle MQTT connection success.
1392 :param client: The MQTT client.
1393 :type client: mqtt.Client
1394 :param userdata: User data.
1395 :type userdata: Any
1396 :param flags: Connection flags.
1397 :type flags: dict[str, Any]
1398 :param reason_code: Connection reason code.
1399 :type reason_code: Any
1400 :param properties: Connection properties.
1401 :type properties: Any | None
1402 """
1403 # _LOGGER.error("Mqtt._on_connect(%s, %s, %s, %s)", client, userdata, flags, reason_code.getName())
1405 self._connecting = False
1407 if reason_code.is_failure:
1408 _LOGGER.error(f"MQTT connection failed: {reason_code.getName()}")
1409 self._schedule_reconnect()
1410 return
1412 _LOGGER.info(f"MQTT connected: {reason_code.getName()}")
1414 # Reset reconnect interval on successful connection
1415 self._current_reconnect_interval = self._reconnect_interval
1417 # Cancel any pending reconnect task
1418 if self._reconnect_task:
1419 self._reconnect_task.cancel()
1420 self._reconnect_task = None
1422 # Subscribe to base topic to see 'online' messages
1423 self.client.subscribe(self._topic_base) # hope to see 'online' message
1425 # Also subscribe to data topics with wildcard for reliability, but only
1426 # until a specific device topic is known. Once _topic_sub is set, avoid
1427 # overlapping subscriptions that would duplicate messages.
1428 if self._topic_base.endswith("/+") and not (
1429 hasattr(self, "_topic_sub") and self._topic_sub
1430 ):
1431 data_wildcard = self._topic_base.replace("/+", "/+/rx")
1432 self.client.subscribe(data_wildcard, qos=self._mqtt_qos)
1433 self._data_wildcard_topic = data_wildcard
1434 _LOGGER.debug(f"Subscribed to data wildcard: {data_wildcard}")
1436 # If we already have specific topics, re-subscribe to them
1437 if hasattr(self, "_topic_sub") and self._topic_sub:
1438 self.client.subscribe(self._topic_sub, qos=self._mqtt_qos)
1439 _LOGGER.debug(f"Re-subscribed to specific topic: {self._topic_sub}")
1440 # If we had a wildcard subscription, drop it to prevent duplicates
1441 if getattr(self, "_data_wildcard_topic", ""):
1442 try:
1443 self.client.unsubscribe(self._data_wildcard_topic)
1444 _LOGGER.debug(
1445 f"Unsubscribed data wildcard after specific subscribe: {self._data_wildcard_topic}"
1446 )
1447 finally:
1448 self._data_wildcard_topic = ""
1450 def _on_connect_fail(
1451 self,
1452 client: mqtt.Client,
1453 userdata: Any,
1454 ) -> None:
1455 """Handle MQTT connection failure.
1457 :param client: The MQTT client.
1458 :type client: mqtt.Client
1459 :param userdata: User data.
1460 :type userdata: Any
1461 """
1462 _LOGGER.error("MQTT connection failed")
1464 self._connecting = False
1465 self._connected = False
1467 if not self._closing:
1468 self._schedule_reconnect()
1470 def _on_disconnect(
1471 self,
1472 client: mqtt.Client,
1473 userdata: Any,
1474 *args: Any,
1475 **kwargs: Any,
1476 ) -> None:
1477 """Handle MQTT disconnection.
1479 :param client: The MQTT client.
1480 :type client: mqtt.Client
1481 :param userdata: User data.
1482 :type userdata: Any
1483 """
1484 # Handle different paho-mqtt callback signatures
1485 reason_code = args[0] if len(args) >= 1 else None
1487 reason_name = (
1488 reason_code.getName()
1489 if reason_code is not None and hasattr(reason_code, "getName")
1490 else str(reason_code)
1491 )
1492 _LOGGER.warning(f"MQTT disconnected: {reason_name}")
1494 was_connected = self._connected
1495 self._connected = False
1497 # If we were previously connected and had established communication,
1498 # notify that the device is now offline
1499 if was_connected and hasattr(self, "_topic_sub") and self._topic_sub:
1500 device_topic = self._topic_sub[:-3] # Remove "/rx" suffix
1501 _LOGGER.warning(f"{self}: the MQTT device is offline: {device_topic}")
1503 # Pause writing since device is offline
1504 if hasattr(self, "_protocol"):
1505 self._protocol.pause_writing()
1507 # Only attempt reconnection if we didn't deliberately disconnect
1509 if not self._closing:
1510 # Schedule reconnection for any disconnect (unexpected or failure)
1511 self._schedule_reconnect()
1513 def _create_connection(self, msg: mqtt.MQTTMessage) -> None:
1514 """Invoke the Protocols's connection_made() callback MQTT is established.
1516 :param msg: The online message triggering the connection.
1517 :type msg: mqtt.MQTTMessage
1518 """
1519 # _LOGGER.error("Mqtt._create_connection(%s)", msg)
1521 assert msg.payload == b"online", "Coding error"
1523 if self._connected:
1524 _LOGGER.info("MQTT device came back online - resuming writing")
1525 self._loop.call_soon_threadsafe(self._protocol.resume_writing)
1526 return
1528 _LOGGER.info("MQTT device is online - establishing connection")
1529 self._connected = True
1531 self._extra[SZ_ACTIVE_HGI] = msg.topic[-9:]
1533 self._topic_pub = msg.topic + "/tx"
1534 self._topic_sub = msg.topic + "/rx"
1536 self.client.subscribe(self._topic_sub, qos=self._mqtt_qos)
1538 # If we previously subscribed to a wildcard data topic, unsubscribe now
1539 # to avoid duplicate delivery (wildcard and specific both matching)
1540 if getattr(self, "_data_wildcard_topic", ""):
1541 try:
1542 self.client.unsubscribe(self._data_wildcard_topic)
1543 _LOGGER.debug(
1544 f"Unsubscribed data wildcard after device online: {self._data_wildcard_topic}"
1545 )
1546 finally:
1547 self._data_wildcard_topic = ""
1549 # Only call connection_made on first connection, not reconnections
1550 if not self._connection_established:
1551 self._connection_established = True
1552 self._make_connection(gwy_id=msg.topic[-9:]) # type: ignore[arg-type]
1553 else:
1554 _LOGGER.info("MQTT reconnected - protocol connection already established")
1556 # NOTE: self._frame_read() invoked from here
1557 def _on_message(
1558 self, client: mqtt.Client, userdata: Any, msg: mqtt.MQTTMessage
1559 ) -> None:
1560 """Make a Frame from the MQTT message and process it.
1562 :param client: The MQTT client.
1563 :type client: mqtt.Client
1564 :param userdata: User data.
1565 :type userdata: Any
1566 :param msg: The received message.
1567 :type msg: mqtt.MQTTMessage
1568 """
1569 # _LOGGER.error(
1570 # "Mqtt._on_message(%s, %s, %s)",
1571 # client,
1572 # userdata,
1573 # (msg.timestamp, msg.topic, msg.payload),
1574 # )
1576 if _DBG_FORCE_FRAME_LOGGING:
1577 _LOGGER.warning("Rx: %s", msg.payload)
1578 elif self._log_all and _LOGGER.getEffectiveLevel() == logging.INFO:
1579 # log for INFO not DEBUG
1580 _LOGGER.info("mq Rx: %s", msg.payload) # TODO remove mq marker?
1582 if msg.topic[-3:] != "/rx": # then, e.g. 'RAMSES/GATEWAY/18:017804'
1583 if msg.payload == b"offline":
1584 # Check if this offline message is for our current device
1585 if (
1586 hasattr(self, "_topic_sub")
1587 and self._topic_sub
1588 and msg.topic == self._topic_sub[:-3]
1589 ) or not hasattr(self, "_topic_sub"):
1590 _LOGGER.warning(
1591 f"{self}: the ESP device is offline (via LWT): {msg.topic}"
1592 )
1593 # Don't set _connected = False here - that's for MQTT connection, not ESP device
1594 if hasattr(self, "_protocol"):
1595 self._protocol.pause_writing()
1597 # BUG: using create task (self._loop.ct() & asyncio.ct()) causes the
1598 # BUG: event look to close early
1599 elif msg.payload == b"online":
1600 _LOGGER.info(
1601 f"{self}: the ESP device is online (via status): {msg.topic}"
1602 )
1603 self._create_connection(msg)
1605 return
1607 # Handle data messages - if we don't have connection established yet but get data,
1608 # we can infer the gateway from the topic
1609 if not self._connection_established and msg.topic.endswith("/rx"):
1610 # Extract gateway ID from topic like "RAMSES/GATEWAY/18:123456/rx"
1611 topic_parts = msg.topic.split("/")
1612 if len(topic_parts) >= 3 and topic_parts[-2] not in ("+", "*"):
1613 gateway_id = topic_parts[-2] # Should be something like "18:123456"
1614 _LOGGER.info(
1615 f"Inferring gateway connection from data topic: {gateway_id}"
1616 )
1618 # Set up topics and connection
1619 self._topic_pub = f"{'/'.join(topic_parts[:-1])}/tx"
1620 self._topic_sub = msg.topic
1621 self._extra[SZ_ACTIVE_HGI] = gateway_id
1623 # Mark as connected and establish protocol connection
1624 self._connected = True
1625 self._connection_established = True
1626 self._make_connection(gwy_id=gateway_id) # type: ignore[arg-type]
1628 # Ensure we subscribe specifically to the device topic and drop the
1629 # wildcard subscription to prevent duplicates
1630 try:
1631 self.client.subscribe(self._topic_sub, qos=self._mqtt_qos)
1632 except Exception as err: # pragma: no cover - defensive
1633 _LOGGER.debug(f"Error subscribing specific topic: {err}")
1634 if getattr(self, "_data_wildcard_topic", ""):
1635 try:
1636 self.client.unsubscribe(self._data_wildcard_topic)
1637 _LOGGER.debug(
1638 f"Unsubscribed data wildcard after inferring device: {self._data_wildcard_topic}"
1639 )
1640 finally:
1641 self._data_wildcard_topic = ""
1643 try:
1644 payload = json.loads(msg.payload)
1645 except json.JSONDecodeError:
1646 _LOGGER.warning("%s < Can't decode JSON (ignoring)", msg.payload)
1647 return
1649 # HACK: hotfix for converting RAMSES_ESP dtm into local/naive dtm
1650 dtm = dt.fromisoformat(payload["ts"])
1651 if dtm.tzinfo is not None:
1652 dtm = dtm.astimezone().replace(tzinfo=None)
1653 if dtm < dt.now() - td(days=90):
1654 _LOGGER.warning(
1655 f"{self}: Have you configured the SNTP settings on the ESP?"
1656 )
1657 # FIXME: convert all dt early, and convert to aware, i.e. dt.now().astimezone()
1659 try:
1660 self._frame_read(dtm.isoformat(), _normalise(payload["msg"]))
1661 except exc.TransportError:
1662 # If the transport is closing, we expect this error and can safely ignore it
1663 # prevents "Uncaught thread exception" in paho.mqtt client
1664 if not self._closing:
1665 raise
1667 async def write_frame(self, frame: str, disable_tx_limits: bool = False) -> None:
1668 """Transmit a frame via the underlying handler (e.g. serial port, MQTT).
1670 Writes are rate-limited to _MAX_TOKENS Packets over the last _TIME_WINDOW
1671 seconds, except when disable_tx_limits is True (for e.g. user commands).
1673 Protocols call Transport.write_frame(), not Transport.write().
1675 :param frame: The frame to transmit.
1676 :type frame: str
1677 :param disable_tx_limits: Whether to disable rate limiting, defaults to False.
1678 :type disable_tx_limits: bool, optional
1679 """
1681 # Check if we're connected before attempting to write
1682 if not self._connected:
1683 _LOGGER.debug(f"{self}: Dropping write - MQTT not connected")
1684 return
1686 # top-up the token bucket
1687 timestamp = perf_counter()
1688 elapsed, self._timestamp = timestamp - self._timestamp, timestamp
1689 self._num_tokens = min(
1690 self._num_tokens + elapsed * self._TOKEN_RATE, self._max_tokens
1691 )
1693 # if would have to sleep >= 1 second, dump the write instead
1694 if self._num_tokens < 1.0 - self._TOKEN_RATE and not disable_tx_limits:
1695 _LOGGER.warning(f"{self}: Discarding write (tokens={self._num_tokens:.2f})")
1696 return
1698 self._num_tokens -= 1.0
1699 if self._max_tokens > self._MAX_TOKENS: # what is the new max number of tokens
1700 self._max_tokens = min(self._max_tokens, self._num_tokens)
1701 self._max_tokens = max(self._max_tokens, self._MAX_TOKENS)
1703 # if in token debt, sleep until the debt is paid
1704 if self._num_tokens < 0.0 and not disable_tx_limits:
1705 delay = (0 - self._num_tokens) / self._TOKEN_RATE
1706 _LOGGER.debug(f"{self}: Sleeping (seconds={delay})")
1707 await asyncio.sleep(delay)
1709 await super().write_frame(frame)
1711 async def _write_frame(self, frame: str) -> None:
1712 """Write some data bytes to the underlying transport.
1714 :param frame: The frame to write.
1715 :type frame: str
1716 """
1717 # _LOGGER.error("Mqtt._write_frame(%s)", frame)
1719 data = json.dumps({"msg": frame})
1721 if _DBG_FORCE_FRAME_LOGGING:
1722 _LOGGER.warning("Tx: %s", data)
1723 elif _LOGGER.getEffectiveLevel() == logging.INFO: # log for INFO not DEBUG
1724 _LOGGER.info("Tx: %s", data)
1726 try:
1727 self._publish(data)
1728 except MQTTException as err:
1729 _LOGGER.error(f"MQTT publish failed: {err}")
1730 # Don't close the transport, just log the error and continue
1731 # The broker might come back online
1732 return
1734 def _publish(self, payload: str) -> None:
1735 """Publish the payload to the MQTT broker.
1737 :param payload: The data payload to publish.
1738 :type payload: str
1739 """
1740 # _LOGGER.error("Mqtt._publish(%s)", message)
1742 if not self._connected:
1743 _LOGGER.debug("Cannot publish - MQTT not connected")
1744 return
1746 info: mqtt.MQTTMessageInfo = self.client.publish(
1747 self._topic_pub, payload=payload, qos=self._mqtt_qos
1748 )
1750 if not info:
1751 _LOGGER.warning("MQTT publish returned no info")
1752 elif info.rc != mqtt.MQTT_ERR_SUCCESS:
1753 _LOGGER.warning(f"MQTT publish failed with code: {info.rc}")
1754 # Check if this indicates a connection issue
1755 if info.rc in (mqtt.MQTT_ERR_NO_CONN, mqtt.MQTT_ERR_CONN_LOST):
1756 self._connected = False
1757 if not self._closing:
1758 self._schedule_reconnect()
1760 def _close(self, exc: exc.RamsesException | None = None) -> None:
1761 """Close the transport (disconnect from the broker and stop its poller).
1763 :param exc: The exception causing closure.
1764 :type exc: exc.RamsesException | None, optional
1765 """
1766 # _LOGGER.error("Mqtt._close(%s)", exc)
1768 super()._close(exc)
1770 # Cancel any pending reconnection attempts
1771 if self._reconnect_task:
1772 self._reconnect_task.cancel()
1773 self._reconnect_task = None
1775 if not self._connected:
1776 return
1777 self._connected = False
1779 try:
1780 self.client.unsubscribe(self._topic_sub)
1781 self.client.disconnect()
1782 self.client.loop_stop()
1783 except Exception as err:
1784 _LOGGER.debug(f"Error during MQTT cleanup: {err}")
1787class CallbackTransport(_FullTransport, _CallbackTransportAbstractor):
1788 """A virtual transport that delegates I/O to external callbacks (Inversion of Control).
1790 This transport allows ramses_rf to be used with external connection managers
1791 (like Home Assistant's MQTT integration) without direct dependencies.
1792 """
1794 def __init__(
1795 self,
1796 protocol: RamsesProtocolT,
1797 io_writer: Callable[[str], Awaitable[None]],
1798 disable_sending: bool = False,
1799 **kwargs: Any,
1800 ) -> None:
1801 """Initialize the callback transport.
1803 :param protocol: The protocol instance.
1804 :type protocol: RamsesProtocolT
1805 :param io_writer: Async callable to handle outbound frames.
1806 :type io_writer: Callable[[str], Awaitable[None]]
1807 :param disable_sending: Whether to disable sending, defaults to False.
1808 :type disable_sending: bool, optional
1809 """
1810 # Pass kwargs up the chain. _ReadTransport will extract 'loop' if present.
1811 # _BaseTransport will pass 'loop' to _CallbackTransportAbstractor, which consumes it.
1812 super().__init__(disable_sending=disable_sending, **kwargs)
1814 self._protocol = protocol
1815 self._io_writer = io_writer
1817 # Section 3.1: "Initial State: Default to a PAUSED state"
1818 self._reading = False
1820 # Section 6.1: Object Lifecycle Logging
1821 _LOGGER.info(f"CallbackTransport created with io_writer={io_writer}")
1823 # NOTE: connection_made is NOT called here. It must be triggered
1824 # externally (e.g. by the Bridge) via the protocol methods once
1825 # the external connection is ready.
1827 async def write_frame(self, frame: str, disable_tx_limits: bool = False) -> None:
1828 """Process a frame for transmission by passing it to the external writer.
1830 :param frame: The frame to write.
1831 :type frame: str
1832 :param disable_tx_limits: Unused for this transport, kept for API compatibility.
1833 :type disable_tx_limits: bool, optional
1834 :raises exc.TransportError: If sending is disabled or the writer fails.
1835 """
1836 if self._disable_sending:
1837 raise exc.TransportError("Sending has been disabled")
1839 # Section 6.1: Boundary Logging (Outgoing)
1840 _LOGGER.debug(f"Sending frame via external writer: {frame}")
1842 try:
1843 await self._io_writer(frame)
1844 except Exception as err:
1845 _LOGGER.error(f"External writer failed to send frame: {err}")
1846 raise exc.TransportError(f"External writer failed: {err}") from err
1848 async def _write_frame(self, frame: str) -> None:
1849 """Wait for the frame to be written by the external writer.
1851 :param frame: The frame to write.
1852 :type frame: str
1853 """
1854 # Wrapper to satisfy abstract base class, though logic is in write_frame
1855 await self.write_frame(frame)
1857 def receive_frame(self, frame: str, dtm: str | None = None) -> None:
1858 """Ingest a frame from the external source (Read Path).
1860 This is the public method called by the Bridge to inject data.
1862 :param frame: The raw frame string to receive.
1863 :type frame: str
1864 :param dtm: The timestamp of the frame, defaults to current time.
1865 :type dtm: str | None, optional
1866 """
1867 _LOGGER.debug(
1868 f"Received frame from external source: frame='{frame}', timestamp={dtm}"
1869 )
1871 # Section 4.2: Circuit Breaker implementation (Packet gating)
1872 if not self._reading:
1873 _LOGGER.debug(f"Dropping received frame (transport paused): {repr(frame)}")
1874 return
1876 dtm = dtm or dt_now().isoformat()
1878 # Section 6.1: Boundary Logging (Incoming)
1879 _LOGGER.debug(
1880 f"Ingesting frame into transport: frame='{frame}', timestamp={dtm}"
1881 )
1883 # Pass to the standard processing pipeline
1884 self._frame_read(dtm, frame.rstrip())
1887def validate_topic_path(path: str) -> str:
1888 """Test the topic path and normalize it.
1890 :param path: The candidate topic path.
1891 :type path: str
1892 :return: The valid, normalized path.
1893 :rtype: str
1894 :raises ValueError: If the path format is invalid.
1895 """
1897 # The user can supply the following paths:
1898 # - ""
1899 # - "/RAMSES/GATEWAY"
1900 # - "/RAMSES/GATEWAY/+" (the previous two are equivalent to this one)
1901 # - "/RAMSES/GATEWAY/18:123456"
1903 # "RAMSES/GATEWAY/+" -> online, online, ...
1904 # "RAMSES/GATEWAY/18:017804" -> online
1905 # "RAMSES/GATEWAY/18:017804/info/+" -> ramses_esp/0.4.0
1906 # "RAMSES/GATEWAY/+/rx" -> pkts from all gateways
1908 new_path = path or SZ_RAMSES_GATEWAY
1909 if new_path.startswith("/"):
1910 new_path = new_path[1:]
1911 if not new_path.startswith(SZ_RAMSES_GATEWAY):
1912 raise ValueError(f"Invalid topic path: {path}")
1913 if new_path == SZ_RAMSES_GATEWAY:
1914 new_path += "/+"
1915 if len(new_path.split("/")) != 3:
1916 raise ValueError(f"Invalid topic path: {path}")
1917 return new_path
1920RamsesTransportT: TypeAlias = (
1921 FileTransport | MqttTransport | PortTransport | CallbackTransport
1922)
1925async def transport_factory(
1926 protocol: RamsesProtocolT,
1927 /,
1928 *,
1929 port_name: SerPortNameT | None = None,
1930 port_config: PortConfigT | None = None,
1931 packet_log: str | None = None,
1932 packet_dict: dict[str, str] | None = None,
1933 transport_constructor: Callable[..., Awaitable[RamsesTransportT]] | None = None,
1934 disable_sending: bool = False,
1935 extra: dict[str, Any] | None = None,
1936 loop: asyncio.AbstractEventLoop | None = None,
1937 log_all: bool = False,
1938 **kwargs: Any, # HACK: odd/misc params
1939) -> RamsesTransportT:
1940 """Create and return a Ramses-specific async packet Transport.
1942 :param protocol: The protocol instance that will use this transport.
1943 :type protocol: RamsesProtocolT
1944 :param port_name: Serial port name or MQTT URL, defaults to None.
1945 :type port_name: SerPortNameT | None, optional
1946 :param port_config: Configuration dictionary for serial port, defaults to None.
1947 :type port_config: PortConfigT | None, optional
1948 :param packet_log: Path to a file containing packet logs for playback, defaults to None.
1949 :type packet_log: str | None, optional
1950 :param packet_dict: Dictionary of packets for playback, defaults to None.
1951 :type packet_dict: dict[str, str] | None, optional
1952 :param transport_constructor: Custom async callable to create a transport, defaults to None.
1953 :type transport_constructor: Callable[..., Awaitable[RamsesTransportT]] | None, optional
1954 :param disable_sending: If True, the transport will not transmit packets, defaults to False.
1955 :type disable_sending: bool | None, optional
1956 :param extra: Extra configuration options, defaults to None.
1957 :type extra: dict[str, Any] | None, optional
1958 :param loop: Asyncio event loop, defaults to None.
1959 :type loop: asyncio.AbstractEventLoop | None, optional
1960 :param log_all: If True, log all MQTT messages including non-protocol ones, defaults to False.
1961 :type log_all: bool, optional
1962 :param kwargs: Additional keyword arguments for specific transports.
1963 :type kwargs: Any
1964 :return: An instantiated RamsesTransportT object.
1965 :rtype: RamsesTransportT
1966 :raises exc.TransportSourceInvalid: If the packet source is invalid or multiple sources are specified.
1967 """
1969 # If a constructor is provided, delegate entirely to it.
1970 if transport_constructor:
1971 _LOGGER.debug("transport_factory: Delegating to external transport_constructor")
1972 return await transport_constructor(
1973 protocol, disable_sending=disable_sending, extra=extra, **kwargs
1974 )
1976 # kwargs are specific to a transport. The above transports have:
1977 # evofw3_flag, use_regex
1979 def get_serial_instance( # type: ignore[no-any-unimported]
1980 ser_name: SerPortNameT, ser_config: PortConfigT | None
1981 ) -> Serial:
1982 """Return a Serial instance for the given port name and config.
1984 May: raise TransportSourceInvalid("Unable to open serial port...")
1986 :param ser_name: Name of the serial port.
1987 :type ser_name: SerPortNameT
1988 :param ser_config: Configuration for the serial port.
1989 :type ser_config: PortConfigT | None
1990 :return: Configured Serial object.
1991 :rtype: Serial
1992 :raises exc.TransportSourceInvalid: If the serial port cannot be opened.
1993 """
1994 # For example:
1995 # - python client.py monitor 'rfc2217://localhost:5001'
1996 # - python client.py monitor 'alt:///dev/ttyUSB0?class=PosixPollSerial'
1998 ser_config = SCH_SERIAL_PORT_CONFIG(ser_config or {})
2000 try:
2001 ser_obj = serial_for_url(ser_name, **ser_config)
2002 except SerialException as err:
2003 _LOGGER.error(
2004 "Failed to open %s (config: %s): %s", ser_name, ser_config, err
2005 )
2006 raise exc.TransportSourceInvalid(
2007 f"Unable to open the serial port: {ser_name}"
2008 ) from err
2010 # FTDI on Posix/Linux would be a common environment for this library...
2011 with contextlib.suppress(AttributeError, NotImplementedError, ValueError):
2012 ser_obj.set_low_latency_mode(True)
2014 return ser_obj
2016 def issue_warning() -> None:
2017 """Warn of the perils of semi-supported configurations."""
2018 _LOGGER.warning(
2019 f"{'Windows' if os.name == 'nt' else 'This type of serial interface'} "
2020 "is not fully supported by this library: "
2021 "please don't report any Transport/Protocol errors/warnings, "
2022 "unless they are reproducible with a standard configuration "
2023 "(e.g. linux with a local serial port)"
2024 )
2026 if len([x for x in (packet_dict, packet_log, port_name) if x is not None]) != 1:
2027 raise exc.TransportSourceInvalid(
2028 "Packet source must be exactly one of: packet_dict, packet_log, port_name"
2029 )
2031 # File
2032 if (pkt_source := packet_log or packet_dict) is not None:
2033 return FileTransport(pkt_source, protocol, extra=extra, loop=loop, **kwargs)
2035 assert port_name is not None # mypy check
2036 assert port_config is not None # mypy check
2038 # MQTT
2039 if port_name[:4] == "mqtt":
2040 # Check for custom timeout in kwargs, fallback to constant
2041 mqtt_timeout = kwargs.get("timeout", _DEFAULT_TIMEOUT_MQTT)
2043 transport = MqttTransport(
2044 port_name,
2045 protocol,
2046 disable_sending=bool(
2047 disable_sending
2048 ), # Feature Added: handled disable_sending
2049 extra=extra,
2050 loop=loop,
2051 log_all=log_all,
2052 **kwargs,
2053 )
2055 try:
2056 # Robustness Fix: Wait with timeout, handle failure gracefully
2057 await protocol.wait_for_connection_made(timeout=mqtt_timeout)
2058 except Exception:
2059 # CRITICAL FIX: Close the transport if setup fails to prevent "Zombie" callbacks
2060 # This prevents the "AttributeError: 'NoneType'..." crash later on
2061 transport.close()
2062 raise
2064 return transport
2066 # Serial
2067 ser_instance = get_serial_instance(port_name, port_config)
2069 if os.name == "nt" or ser_instance.portstr[:7] in ("rfc2217", "socket:"):
2070 issue_warning() # TODO: add tests for these...
2072 transport = PortTransport( # type: ignore[assignment]
2073 ser_instance,
2074 protocol,
2075 disable_sending=bool(disable_sending),
2076 extra=extra,
2077 loop=loop,
2078 **kwargs,
2079 )
2081 # TODO: remove this? better to invoke timeout after factory returns?
2082 await protocol.wait_for_connection_made(timeout=_DEFAULT_TIMEOUT_PORT)
2083 # pytest-cov times out in virtual_rf.py when set below 30.0 on GitHub Actions
2084 return transport