Coverage for src/ramses_tx/message.py: 31%
159 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - Decode/process a message (payload into JSON)."""
4from __future__ import annotations
6import logging
7import re
8from datetime import datetime as dt, timedelta as td
9from functools import lru_cache
10from typing import TYPE_CHECKING
12from . import exceptions as exc
13from .address import Address
14from .command import Command
15from .const import DEV_TYPE_MAP, SZ_DHW_IDX, SZ_DOMAIN_ID, SZ_UFH_IDX, SZ_ZONE_IDX
16from .packet import Packet
17from .parsers import parse_payload
18from .ramses import CODE_IDX_ARE_COMPLEX, CODES_SCHEMA, RQ_IDX_COMPLEX
20# TODO:
21# long-format msg.__str__ - alias columns don't line up
24from .const import ( # noqa: F401, isort: skip, pylint: disable=unused-import
25 I_,
26 RP,
27 RQ,
28 W_,
29 Code,
30)
32if TYPE_CHECKING:
33 from ramses_rf import Gateway
35 from .const import IndexT, VerbT # noqa: F401, pylint: disable=unused-import
38__all__ = ["Message"]
41CODE_NAMES = {k: v["name"] for k, v in CODES_SCHEMA.items()}
43MSG_FORMAT_10 = "|| {:10s} | {:10s} | {:2s} | {:16s} | {:^4s} || {}"
45_TD_SECS_003 = td(seconds=3)
48_LOGGER = logging.getLogger(__name__)
51class MessageBase:
52 """The Message class; will trap/log invalid msgs."""
54 def __init__(self, pkt: Packet) -> None:
55 """Create a message from a valid packet.
57 :param pkt: The packet to process into a message
58 :type pkt: Packet
59 :raises PacketInvalid: If the packet payload cannot be parsed.
60 """
62 self._pkt = pkt
64 self.src: Address = pkt.src
65 self.dst: Address = pkt.dst
66 self._addrs: tuple[Address, Address, Address] = pkt._addrs
68 self.dtm: dt = pkt.dtm
70 self.verb: VerbT = pkt.verb
71 self.seqn: str = (
72 pkt.seqn
73 ) # the msg is part of a set for 1 Code, received in order
74 self.code: Code = pkt.code
75 self.len: int = pkt._len
77 self._payload = self._validate(
78 self._pkt.payload
79 ) # ? may raise InvalidPacketError
81 self._str: str = None # type: ignore[assignment]
83 def __repr__(self) -> str:
84 """Return an unambiguous string representation of this object."""
85 return str(self._pkt) # repr or str?
87 def __str__(self) -> str:
88 """Return a brief readable string representation of this object."""
90 def ctx(pkt: Packet) -> str:
91 ctx = {True: "[..]", False: "", None: "??"}.get(pkt._ctx, pkt._ctx) # type: ignore[arg-type]
92 if not ctx and pkt.payload[:2] not in ("00", "FF"):
93 return f"({pkt.payload[:2]})"
94 return ctx
96 if self._str is not None:
97 return self._str
99 if self.src.id == self._addrs[0].id: # type: ignore[unreachable]
100 name_0 = self._name(self.src)
101 name_1 = (
102 "" if self.dst is self.src else self._name(self.dst)
103 ) # use 'is', issue_cc 318
104 else:
105 name_0 = ""
106 name_1 = self._name(self.src)
108 code_name = CODE_NAMES.get(self.code, f"unknown_{self.code}")
109 self._str = MSG_FORMAT_10.format(
110 name_0, name_1, self.verb, code_name, ctx(self._pkt), self.payload
111 )
112 return self._str
114 def __eq__(self, other: object) -> bool:
115 if not isinstance(other, Message):
116 return NotImplemented
117 return (self.src, self.dst, self.verb, self.code, self._pkt.payload) == (
118 other.src,
119 other.dst,
120 other.verb,
121 other.code,
122 other._pkt.payload,
123 )
125 def __lt__(self, other: object) -> bool:
126 if not isinstance(other, Message):
127 return NotImplemented
128 return self.dtm < other.dtm
130 def _name(self, addr: Address) -> str:
131 """Return a friendly name for an Address, or a Device."""
132 return f" {addr.id}" # can't do 'CTL:123456' instead of ' 01:123456'
134 @property
135 def payload(self): # type: ignore[no-untyped-def] # FIXME -> dict | list:
136 """Return the payload."""
137 return self._payload
139 @property
140 def _has_payload(self) -> bool:
141 """Return False if there is no payload (may falsely Return True).
143 The message (i.e. the raw payload) may still have an idx.
144 """
146 return self._pkt._has_payload
148 @property
149 def _has_array(self) -> bool:
150 """
151 :return: True if the message's raw payload is an array.
152 """
154 return bool(self._pkt._has_array)
156 @property
157 def _idx(self) -> dict[str, str]:
158 """Get the domain_id/zone_idx/other_idx of a message payload, if any.
159 Used to identify the zone/domain that a message applies to.
161 :return: an empty dict if there is none such, or None if undetermined.
162 """
164 # .I --- 01:145038 --:------ 01:145038 3B00 002 FCC8
166 IDX_NAMES = {
167 Code._0002: "other_idx", # non-evohome: hometronics
168 Code._10A0: SZ_DHW_IDX, # can be 2 DHW zones per system, albeit unusual
169 Code._1260: SZ_DHW_IDX, # can be 2 DHW zones per system, albeit unusual
170 Code._1F41: SZ_DHW_IDX, # can be 2 DHW zones per system, albeit unusual
171 Code._22C9: SZ_UFH_IDX, # UFH circuit
172 Code._2389: "other_idx", # anachronistic
173 Code._2D49: "other_idx", # non-evohome: hometronics
174 Code._31D9: "hvac_id",
175 Code._31DA: "hvac_id",
176 Code._3220: "msg_id",
177 } # ALSO: SZ_DOMAIN_ID, SZ_ZONE_IDX
179 if self.code in (Code._31D9, Code._31DA): # shouldn't be needed?
180 assert isinstance(self._pkt._idx, str) # mypy hint
181 return {"hvac_id": self._pkt._idx}
183 if self._pkt._idx in (True, False) or self.code in CODE_IDX_ARE_COMPLEX:
184 return {} # above was: CODE_IDX_COMPLEX + (Code._3150):
186 if self.code in (Code._3220,): # FIXME: should be _SIMPLE
187 return {}
189 # .I 068 03:201498 --:------ 03:201498 30C9 003 0106D6 # rare
191 # .I --- 00:034798 --:------ 12:126457 2309 003 0201F4
192 if not {self.src.type, self.dst.type} & {
193 DEV_TYPE_MAP.CTL,
194 DEV_TYPE_MAP.UFC,
195 DEV_TYPE_MAP.HCW, # ?remove (see above, rare)
196 DEV_TYPE_MAP.DTS,
197 DEV_TYPE_MAP.HGI,
198 DEV_TYPE_MAP.DT2,
199 DEV_TYPE_MAP.PRG,
200 }: # FIXME: DEX should be deprecated to use device type rather than class
201 assert self._pkt._idx == "00", "What!! (AA)"
202 return {}
204 # .I 035 --:------ --:------ 12:126457 30C9 003 017FFF
205 if self.src.type == self.dst.type and self.src.type not in (
206 DEV_TYPE_MAP.CTL,
207 DEV_TYPE_MAP.UFC,
208 DEV_TYPE_MAP.HCW, # ?remove (see above, rare)
209 DEV_TYPE_MAP.HGI,
210 DEV_TYPE_MAP.PRG,
211 ): # DEX
212 assert self._pkt._idx == "00", "What!! (AB)"
213 return {}
215 # .I --- 04:029362 --:------ 12:126457 3150 002 0162
216 # if not getattr(self.src, "_is_controller", True) and not getattr(
217 # self.dst, "_is_controller", True
218 # ):
219 # assert self._pkt._idx == "00", "What!! (BA)"
220 # return {}
222 # .I --- 04:029362 --:------ 12:126457 3150 002 0162
223 # if not (
224 # getattr(self.src, "_is_controller", True)
225 # or getattr(self.dst, "_is_controller", True)
226 # ):
227 # assert self._pkt._idx == "00", "What!! (BB)"
228 # return {}
230 if self.src.type == self.dst.type and not getattr(
231 self.src, "_is_controller", True
232 ): # DEX
233 assert self._pkt._idx == "00", "What!! (BC)"
234 return {}
236 # TODO: also 000C (but is a complex idx)
237 # TODO: also 3150 (when not domain, and will be array if so)
238 if self.code in (Code._000A, Code._2309) and self.src.type == DEV_TYPE_MAP.UFC:
239 assert isinstance(self._pkt._idx, str) # mypy hint
240 return {IDX_NAMES[Code._22C9]: self._pkt._idx}
242 assert isinstance(self._pkt._idx, str) # mypy hint
243 idx_name = SZ_DOMAIN_ID if self._pkt._idx[:1] == "F" else SZ_ZONE_IDX
244 index_name = IDX_NAMES.get(self.code, idx_name)
246 return {index_name: self._pkt._idx}
248 # TODO: needs work...
249 def _validate(self, raw_payload: str) -> dict | list[dict]: # type: ignore[type-arg]
250 """Validate a message packet payload, and parse it if valid.
252 :return: a dict containing key: value pairs, or a list of those created from the payload
253 :raises an InvalidPacketError exception if it is not valid.
254 """
256 try: # parse the payload
257 # TODO: only accept invalid packets to/from HGI when flag raised
258 _check_msg_payload(self, self._pkt.payload) # ? InvalidPayloadError
260 if not self._has_payload and (
261 self.verb == RQ and self.code not in RQ_IDX_COMPLEX
262 ):
263 return {}
265 result = parse_payload(self) # invoke the code parsers
267 if isinstance(result, list):
268 return result
269 if isinstance(result, dict):
270 return {**self._idx, **result}
272 raise TypeError(f"Invalid payload type: {type(result)}")
274 except exc.PacketInvalid as err:
275 _LOGGER.warning("%s < %s", self._pkt, err)
276 raise err
278 except AssertionError as err:
279 # beware: HGI80 can send 'odd' but parseable packets +/- get invalid reply
280 _LOGGER.exception("%s < %s", self._pkt, f"{err.__class__.__name__}({err})")
281 raise exc.PacketInvalid("Bad packet") from err
283 except (AttributeError, LookupError, TypeError, ValueError) as err: # TODO: dev
284 _LOGGER.exception(
285 "%s < Coding error: %s", self._pkt, f"{err.__class__.__name__}({err})"
286 )
287 raise exc.PacketInvalid from err
289 except NotImplementedError as err: # parser_unknown (unknown packet code)
290 _LOGGER.warning("%s < Unknown packet code (cannot parse)", self._pkt)
291 raise exc.PacketInvalid from err
294class Message(MessageBase):
295 """Extend the Message class, so is useful to a stateful Gateway.
297 Adds _expired attr to the Message class.
298 """
300 CANT_EXPIRE = -1 # sentinel value for fraction_expired
302 HAS_EXPIRED = 2.0 # fraction_expired >= HAS_EXPIRED
303 # .HAS_DIED = 1.0 # fraction_expired >= 1.0 (is expected lifespan)
304 IS_EXPIRING = 0.8 # fraction_expired >= 0.8 (and < HAS_EXPIRED)
306 _gwy: Gateway
307 _fraction_expired: float | None = None
309 @classmethod
310 def _from_cmd(cls, cmd: Command, dtm: dt | None = None) -> Message:
311 """Create a Message from a Command."""
312 return cls(Packet._from_cmd(cmd, dtm=dtm))
314 @classmethod
315 def _from_pkt(cls, pkt: Packet) -> Message:
316 """Create a Message from a Packet."""
317 return cls(pkt)
319 @property
320 def _expired(self) -> bool:
321 """Return True if the message is dated (or False otherwise)."""
322 # fraction_expired = (dt_now - self.dtm - _TD_SECONDS_003) / self._pkt._lifespan
323 # TODO: keep none >7d, even 10E0, etc.
325 def fraction_expired(lifespan: td) -> float:
326 """Return the packet's age as fraction of its 'normal' life span."""
327 return (self._gwy._dt_now() - self.dtm - _TD_SECS_003) / lifespan
329 # 1. Look for easy win...
330 if self._fraction_expired is not None:
331 if self._fraction_expired == self.CANT_EXPIRE:
332 return False
333 if self._fraction_expired >= self.HAS_EXPIRED:
334 return True
336 # 2. Need to update the fraction_expired...
337 if self.code == Code._1F09 and self.verb != RQ: # sync_cycle is a special case
338 # RQs won't have remaining_seconds, RP/Ws have only partial cycle times
339 self._fraction_expired = fraction_expired(
340 td(seconds=self.payload["remaining_seconds"]),
341 )
343 elif self._pkt._lifespan is False: # Can't expire
344 self._fraction_expired = self.CANT_EXPIRE
346 elif self._pkt._lifespan is True: # Can't expire
347 raise NotImplementedError
349 else:
350 self._fraction_expired = fraction_expired(self._pkt._lifespan)
352 return self._fraction_expired >= self.HAS_EXPIRED
355@lru_cache(maxsize=256)
356def re_compile_re_match(regex: str, string: str) -> bool: # Optional[Match[Any]]
357 # TODO: confirm this does speed things up
358 # Python has its own caching of re.compile, _MAXCACHE = 512
359 # https://github.com/python/cpython/blob/3.10/Lib/re.py
360 return re.compile(regex).match(string) # type: ignore[return-value]
363def _check_msg_payload(msg: MessageBase, payload: str) -> None:
364 """Validate a packet's payload against its verb/code pair.
366 :param msg: The message object being validated
367 :type msg: MessageBase
368 :param payload: The raw hex payload string
369 :type payload: str
370 :raises PacketInvalid: If the code is unknown or verb/code pair is invalid.
371 :raises PacketPayloadInvalid: If the payload does not match the expected regex.
372 """
374 _ = repr(msg._pkt) # HACK: ? raise InvalidPayloadError
376 if msg.code not in CODES_SCHEMA:
377 raise exc.PacketInvalid(f"Unknown code: {msg.code}")
379 try:
380 regex = CODES_SCHEMA[msg.code][msg.verb]
381 except KeyError:
382 raise exc.PacketInvalid(
383 f"Unknown verb/code pair: {msg.verb}/{msg.code}"
384 ) from None
386 if not re_compile_re_match(regex, payload):
387 raise exc.PacketPayloadInvalid(f"Payload doesn't match '{regex}': {payload}")