Coverage for src/ramses_tx/packet.py: 33%
115 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - a RAMSES-II protocol decoder & analyser.
4Decode/process a packet (packet that was received).
5"""
7from __future__ import annotations
9from datetime import datetime as dt, timedelta as td
10from typing import Any
12from . import exceptions as exc
13from .command import Command
14from .frame import Frame
15from .logger import getLogger # overridden logger.getLogger
16from .opentherm import PARAMS_DATA_IDS, SCHEMA_DATA_IDS, STATUS_DATA_IDS
17from .ramses import CODES_SCHEMA, SZ_LIFESPAN
19from .const import ( # noqa: F401, isort: skip, pylint: disable=unused-import
20 I_,
21 RP,
22 RQ,
23 W_,
24 Code,
25)
28# these trade memory for speed
29_TD_SECS_000 = td(seconds=0)
30_TD_SECS_003 = td(seconds=3)
31_TD_SECS_360 = td(seconds=360)
32_TD_MINS_005 = td(minutes=5)
33_TD_MINS_060 = td(minutes=60)
34_TD_MINS_360 = td(minutes=360)
35_TD_DAYS_001 = td(minutes=60 * 24)
38PKT_LOGGER = getLogger(f"{__name__}_log", pkt_log=True)
41class Packet(Frame):
42 """The Packet class (pkts that were received); will trap/log invalid pkts.
44 They have a datetime (when received) an RSSI, and other meta-fields.
45 """
47 _dtm: dt
48 _rssi: str
50 def __init__(self, dtm: dt, frame: str, **kwargs: Any) -> None:
51 """Create a packet from a raw frame string.
53 :param dtm: The timestamp when the packet was received
54 :type dtm: dt
55 :param frame: The raw frame string, typically including RSSI
56 :type frame: str
57 :param kwargs: Metadata including 'comment', 'err_msg', or 'raw_frame'
58 :raises PacketInvalid: If the frame content is malformed.
59 """
61 super().__init__(frame[4:]) # remove RSSI
63 self._dtm: dt = dtm
65 self._rssi: str = frame[0:3]
67 self.comment: str = kwargs.get("comment", "")
68 self.error_text: str = kwargs.get("err_msg", "")
69 self.raw_frame: str = kwargs.get("raw_frame", "")
71 self._lifespan: bool | td = pkt_lifespan(self) or False
73 self._validate(strict_checking=False)
75 def _validate(self, *, strict_checking: bool = False) -> None:
76 """Validate the packet, and parse the addresses if so (will log all packets).
78 Raise an exception InvalidPacketError (InvalidAddrSetError) if it is not valid.
79 """
81 try:
82 if self.error_text:
83 raise exc.PacketInvalid(self.error_text)
85 if not self._frame and self.comment: # log null pkts only if has a comment
86 raise exc.PacketInvalid("Null packet")
88 super()._validate(strict_checking=strict_checking) # no RSSI
90 # FIXME: this is messy
91 PKT_LOGGER.info("", extra=self.__dict__) # the packet.log line
93 except exc.PacketInvalid as err: # incl. InvalidAddrSetError
94 if self._frame or self.error_text:
95 PKT_LOGGER.warning("%s", err, extra=self.__dict__)
96 raise err
98 def __repr__(self) -> str:
99 """Return an unambiguous string representation of this object."""
100 # e.g.: RQ --- 18:000730 01:145038 --:------ 000A 002 0800 # 000A|RQ|01:145038|08
101 try:
102 hdr = f" # {self._hdr}{f' ({self._ctx})' if self._ctx else ''}"
103 except (exc.PacketInvalid, NotImplementedError):
104 hdr = ""
105 try:
106 dtm = self.dtm.isoformat(timespec="microseconds")
107 except AttributeError:
108 dtm = dt.min.isoformat(timespec="microseconds")
109 return f"{dtm} ... {self}{hdr}"
111 def __str__(self) -> str:
112 """Return a brief readable string representation of this object aka 'header'."""
113 # e.g.: 000A|RQ|01:145038|08
114 return super().__repr__() # TODO: self._hdr
116 @property
117 def dtm(self) -> dt:
118 return self._dtm
120 @staticmethod
121 def _partition(pkt_line: str) -> tuple[str, str, str]: # map[str]
122 """Partition a packet line into its three parts.
124 Format: packet[ < parser-hint: ...][ * evofw3-err_msg][ # evofw3-comment]
125 """
127 fragment, _, comment = pkt_line.partition("#")
128 fragment, _, err_msg = fragment.partition("*")
129 pkt_str, _, _ = fragment.partition("<") # discard any parser hints
130 return map(str.strip, (pkt_str, err_msg, comment)) # type: ignore[return-value]
132 @classmethod
133 def _from_cmd(cls, cmd: Command, dtm: dt | None = None) -> Packet:
134 """Create a Packet from a Command."""
135 if dtm is None:
136 dtm = dt.now()
137 return cls.from_port(dtm, f"... {cmd._frame}")
139 @classmethod
140 def from_dict(cls, dtm: str, pkt_line: str) -> Packet:
141 """Create a packet from a saved state (a curated dict)."""
142 frame, _, comment = cls._partition(pkt_line)
143 return cls(dt.fromisoformat(dtm), frame, comment=comment)
145 @classmethod
146 def from_file(cls, dtm: str, pkt_line: str) -> Packet:
147 """Create a packet from a log file line."""
148 frame, err_msg, comment = cls._partition(pkt_line)
149 if not frame:
150 raise ValueError(f"null frame: >>>{frame}<<<")
151 return cls(dt.fromisoformat(dtm), frame, err_msg=err_msg, comment=comment)
153 @classmethod
154 def from_port(cls, dtm: dt, pkt_line: str, raw_line: bytes | None = None) -> Packet:
155 """Create a packet from a USB port (HGI80, evofw3)."""
156 frame, err_msg, comment = cls._partition(pkt_line)
157 if not frame:
158 raise ValueError(f"null frame: >>>{frame}<<<")
159 return cls(dtm, frame, err_msg=err_msg, comment=comment, raw_frame=raw_line)
162# TODO: remove None as a possible return value
163def pkt_lifespan(pkt: Packet) -> td: # import OtbGateway??
164 """Return the lifespan of a packet before it expires.
166 :param pkt: The packet instance to evaluate
167 :type pkt: Packet
168 :return: The duration the packet's data remains valid
169 :rtype: td
170 """
172 if pkt.verb in (RQ, W_):
173 return _TD_SECS_000
175 if pkt.code in (Code._0005, Code._000C):
176 return _TD_DAYS_001
178 if pkt.code == Code._0006:
179 return _TD_MINS_060
181 if pkt.code == Code._0404: # 0404 tombstoned by incremented 0006
182 return _TD_DAYS_001
184 if pkt.code == Code._000A and pkt._has_array:
185 return _TD_MINS_060 # sends I /1h
187 if pkt.code == Code._10E0: # but: what if valid pkt with a corrupt src_id
188 return _TD_DAYS_001
190 if pkt.code == Code._1F09: # sends I /sync_cycle
191 # can't do better than 300s with reading the payload
192 return _TD_SECS_360 if pkt.verb == I_ else _TD_SECS_000
194 if pkt.code == Code._1FC9 and pkt.verb == RP:
195 return _TD_DAYS_001 # TODO: check other verbs, they seem variable
197 if pkt.code in (Code._2309, Code._30C9) and pkt._has_array: # sends I /sync_cycle
198 return _TD_SECS_360
200 if pkt.code == Code._3220: # FIXME: 2.1 means we can miss two packets
201 # if pkt.payload[4:6] in WRITE_MSG_IDS: # and Write-Data: # TODO
202 # return _TD_SECS_003 * 2.1
203 if int(pkt.payload[4:6], 16) in SCHEMA_DATA_IDS:
204 return _TD_MINS_360 * 2.1
205 if int(pkt.payload[4:6], 16) in PARAMS_DATA_IDS:
206 return _TD_MINS_060 * 2.1
207 if int(pkt.payload[4:6], 16) in STATUS_DATA_IDS:
208 return _TD_MINS_005 * 2.1
209 return _TD_MINS_005 * 2.1
211 # if pkt.code in (Code._3B00, Code._3EF0, ): # TODO: 0008, 3EF0, 3EF1
212 # return td(minutes=6.7) # TODO: WIP
214 if (code := CODES_SCHEMA.get(pkt.code)) and SZ_LIFESPAN in code:
215 result: bool | td | None = CODES_SCHEMA[pkt.code][SZ_LIFESPAN]
216 return result if isinstance(result, td) else _TD_MINS_060
218 return _TD_MINS_060 # applies to lots of HVAC packets