Coverage for src/ramses_tx/frame.py: 19%
216 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - a RAMSES-II protocol decoder & analyser.
4Provide the base class for commands (constructed/sent packets) and packets.
5"""
7from __future__ import annotations
9import logging
10from typing import TYPE_CHECKING
12from . import exceptions as exc
13from .address import ALL_DEV_ADDR, NON_DEV_ADDR, Address, pkt_addrs
14from .const import COMMAND_REGEX, DEV_ROLE_MAP, DEV_TYPE_MAP
15from .ramses import (
16 CODE_IDX_ARE_COMPLEX,
17 CODE_IDX_ARE_NONE,
18 CODE_IDX_ARE_SIMPLE,
19 CODE_IDX_DOMAIN,
20 CODES_ONLY_FROM_CTL,
21 CODES_SCHEMA,
22 CODES_WITH_ARRAYS,
23 RQ_NO_PAYLOAD,
24)
26# TODO: add _has_idx (as func return only one type, or raise)
28from .const import ( # noqa: F401, isort: skip, pylint: disable=unused-import
29 I_,
30 RP,
31 RQ,
32 W_,
33 Code,
34)
35from .const import ( # noqa: F401, isort: skip, pylint: disable=unused-import
36 F8,
37 F9,
38 FA,
39 FC,
40 FF,
41)
43if TYPE_CHECKING:
44 from .const import VerbT
47_LOGGER = logging.getLogger(__name__)
50HeaderT = str
51PayloadT = str
52_PktIdxT = str
55class Frame:
56 """The Frame class - used as a base by the Command and Packet classes.
58 `RQ --- 01:078710 10:067219 --:------ 3220 005 0000050000`
59 """
61 src: Address # Address | Device
62 dst: Address # Address | Device
63 _addrs: tuple[Address, Address, Address]
65 def __init__(self, frame: str) -> None:
66 """Create a frame from a string.
68 :raises InvalidPacketError: if provided string is invalid.
69 """
71 self._frame: str = frame
72 if not COMMAND_REGEX.match(self._frame):
73 raise exc.PacketInvalid(f"Bad frame: invalid structure: >>>{frame}<<<")
75 fields = frame.lstrip().split(" ")
77 self.verb: VerbT = frame[:2] # type: ignore[assignment]
78 self.seqn: str = fields[1] # . frame[3:6]
79 self.code: Code = fields[5] # type: ignore[assignment]
80 self.len_: str = fields[6] # . frame[42:45] FIXME: len_, _len & len(payload)/2
81 self.payload: PayloadT = fields[7] # frame[46:].split(" ")[0]
82 self._len: int = int(len(self.payload) / 2)
84 try:
85 self.src, self.dst, *self._addrs = pkt_addrs( # type: ignore[assignment]
86 " ".join(fields[i] for i in range(2, 5)) # frame[7:36]
87 )
88 except exc.PacketInvalid as err: # will be: InvalidAddrSetError
89 raise exc.PacketInvalid("Bad frame: invalid address set") from err
91 if len(self.payload) != int(self.len_) * 2:
92 raise exc.PacketInvalid(
93 f"Bad frame: invalid payload: "
94 f"len({self.payload}) is not int('{self.len_}' * 2))"
95 )
97 self._ctx_: bool | str = None # type: ignore[assignment]
98 self._hdr_: str = None # type: ignore[assignment]
99 self._idx_: bool | str = None # type: ignore[assignment]
101 self._has_array_: bool = None # type: ignore[assignment]
102 self._has_ctl_: bool = None # type: ignore[assignment] # TODO: remove
103 self._has_payload_: bool = None # type: ignore[assignment]
105 self._repr: str = None # type: ignore[assignment]
107 # FIXME: this is messy
108 def _validate(self, *, strict_checking: bool = False) -> None:
109 """Validate the frame: it may be a cmd or a (response) pkt.
111 Raise an exception InvalidPacketError (InvalidAddrSetError) if it is not valid.
112 """
114 if len(self._frame[46:].split(" ")[0]) != int(self._frame[42:45]) * 2:
115 raise exc.PacketInvalid("Bad frame: Payload length mismatch")
117 try:
118 # self.src, self.dst, *self._addrs = pkt_addrs(self._frame[7:36])
119 src, dst, *addrs = pkt_addrs(self._frame[7:36])
120 except exc.PacketInvalid as err: # will be: InvalidAddrSetError
121 raise exc.PacketInvalid("Bad frame: Invalid address set") from err
123 if not strict_checking:
124 return
126 try: # Strict checking: helps users avoid constructing bad commands
127 if addrs[0] == NON_DEV_ADDR:
128 assert self.verb == I_, "wrong verb or dst addr should be present"
129 elif addrs[2] == NON_DEV_ADDR:
130 assert self.verb == I_ or src is not dst, (
131 "wrong verb or dst addr should not be src"
132 )
133 elif addrs[0] is addrs[2]:
134 assert self.verb == I_, "wrong verb or dst addr should not be src"
135 else:
136 assert self.verb in (I_, W_), "wrong verb or dst addr should be src"
137 except AssertionError as err:
138 raise exc.PacketInvalid(f"Bad frame: Invalid address set: {err}") from err
140 def __repr__(self) -> str:
141 """Return an unambiguous string representation of this object."""
143 if self._repr is None:
144 self._repr = " ".join( # type: ignore[unreachable]
145 (
146 self.verb,
147 self.seqn,
148 *(repr(a) for a in self._addrs),
149 self.code,
150 self.len_,
151 self.payload,
152 )
153 )
154 return self._repr
156 def __str__(self) -> str:
157 """Return a brief readable string representation of this object."""
158 # repr(self) == repr(cls(str(self)))
160 try:
161 return f"{self!r} # {self._hdr}" # code|ver|device_id|context
162 except AttributeError as err:
163 return f"{self!r} < {err}"
165 def __eq__(self, other: object) -> bool:
166 if not hasattr(other, "_frame"):
167 return NotImplemented
168 return self._frame[4:] == other._frame[4:] # type: ignore[no-any-return]
170 @property
171 def _has_array(self) -> None | bool: # TODO: a mess - has false negatives
172 """Return the True if the payload is an array, False otherwise.
174 May return false negatives (e.g. arrays of length 1), and None if undetermined.
176 An example of a false negative is evohome with only one zone (i.e. the periodic
177 2309/30C9/000A packets).
178 """
180 if self._has_array_ is not None: # HACK: overridden by detect_array(msg, prev)
181 return self._has_array_
183 # False -ves (array length is 1) are an acceptable compromise to extensive checking
185 # .W --- 01:145038 34:092243 --:------ 1FC9 006 07230906368E
186 # .I --- 01:145038 --:------ 01:145038 1FC9 018 07000806368E-FC3B0006368E-071FC906368E
187 # .I --- 01:145038 --:------ 01:145038 1FC9 018 FA000806368E-FC3B0006368E-FA1FC906368E
188 # .I --- 34:092243 --:------ 34:092243 1FC9 030 0030C9896853-002309896853-001060896853-0010E0896853-001FC9896853
189 if self.code == Code._1FC9: # type: ignore[unreachable]
190 self._has_array_ = self.verb != RQ # safe to treat all as array, even len=1
191 return self._has_array_ # don't do any checks for 1FC9 (they will fail)
193 elif self.verb != I_ or self.code not in CODES_WITH_ARRAYS:
194 self._has_array_ = False
196 elif self._len != CODES_WITH_ARRAYS[self.code][0]: # NOTE: can be false -ves
197 a, b = divmod(self._len, CODES_WITH_ARRAYS[self.code][0])
198 self._has_array_ = a > 0 and b == 0
200 elif (
201 self.code in (Code._22C9, Code._3150)
202 and self.src.type == DEV_TYPE_MAP.UFC
203 and self.dst is self.src
204 and self.payload[:1] != "F"
205 ):
206 self._has_array_ = True
208 # elif self.code == Code._000C: # anachronism: variable array length
209 # return
211 else:
212 self._has_array_ = False
214 if self._has_array_:
215 len_ = CODES_WITH_ARRAYS[self.code][0]
217 assert self._len % len_ == 0, (
218 f"{self} < array has length ({self._len}) that is not multiple of {len_}"
219 )
220 assert (
221 self.src.type in (DEV_TYPE_MAP.DTS, DEV_TYPE_MAP.DT2)
222 or self.src == self.dst # DEX
223 ), f"{self} < array is from a non-controller (01)"
224 assert (
225 self.src.type not in (DEV_TYPE_MAP.DTS, DEV_TYPE_MAP.DT2)
226 or self.dst.id == NON_DEV_ADDR.id # DEX
227 ), f"{self} < array is from a non-controller (02)"
229 # .I --- 10:040239 01:223036 --:------ 0009 003 000000 # not array
230 # .I --- 01:102458 --:------ 01:102458 0009 006 FC01FF-F901FF
231 # .I --- 01:145038 --:------ 01:145038 0009 006 FC00FF-F900FF
232 # .I 034 --:------ --:------ 12:126457 2309 006 017EFF-027EFF
233 # .I --- 01:223036 --:------ 01:223036 000A 012 081001F40DAC-091001F40DAC # 2nd fragment
234 # .I 024 --:------ --:------ 12:126457 000A 012 010001F40BB8-020001F40BB8
235 # .I --- 02:044328 --:------ 02:044328 22C9 018 0001F40A2801-0101F40A2801-0201F40A2801
236 # .I --- 23:100224 --:------ 23:100224 2249 007 007EFF7EFFFFFF # can have 2 zones
237 # .I --- 02:044328 --:------ 02:044328 22C9 018 0001F40A2801-0101F40A2801-0201F40A2801
238 # .I --- 02:001107 --:------ 02:001107 3150 010 007A-017A-027A-036A-046A
240 return self._has_array_
242 @property
243 def _has_ctl(self) -> None | bool:
244 """Return True if the packet is to/from a controller."""
246 # NB: the difference between these (_has_ctl, src, dst, -:-) and above (_has_ctl, src, -:-, src)
247 # 2000-01-01T03:00:00.000000 ... I --- 37:123456 --:------ 37:123456 31DA 030 00C8400518646427102AF82EE031FFFFFFC800C8C83FFF64640AFF0AFF00
248 # 2022-11-20T08:32:06.904058 063 RP --- 32:134446 37:171685 --:------ 31DA 030 00EF007FFF2F1B0226069A07EEFFE4F8000038988F0000EFEF1F2420FC00
249 # Maybe only use this for CH/DHW, and not HVAC?
251 if self._has_ctl_ is not None:
252 return self._has_ctl_
254 # TODO: handle RQ/RP to/from HGI/RFG, handle HVAC
256 if {self.src.type, self.dst.type} & { # type: ignore[unreachable]
257 DEV_TYPE_MAP.CTL,
258 DEV_TYPE_MAP.UFC,
259 DEV_TYPE_MAP.PRG,
260 }: # DEX
261 _LOGGER.debug(f"{self} # HAS controller (10)")
262 self._has_ctl_ = True
264 # .I --- 12:010740 --:------ 12:010740 30C9 003 0008D9 # not ctl
265 elif self.dst is self.src: # (not needed?) & self.code == I_:
266 _LOGGER.debug(
267 f"{self} < "
268 + (
269 "HAS"
270 if self.code in CODES_ONLY_FROM_CTL + (Code._31D9, Code._31DA)
271 else "no"
272 )
273 + " controller (20)"
274 )
275 self._has_ctl_ = any(
276 (
277 self.code == Code._3B00 and self.payload[:2] == FC,
278 self.code in CODES_ONLY_FROM_CTL + (Code._31D9, Code._31DA),
279 )
280 )
282 # .I --- --:------ --:------ 10:050360 1FD4 003 002ABE # no ctl
283 # .I 095 --:------ --:------ 12:126457 1F09 003 000BC2 # HAS ctl
284 # .I --- --:------ --:------ 20:001473 31D9 003 000001 # ctl? (HVAC)
285 elif self.dst.id == NON_DEV_ADDR.id:
286 _LOGGER.debug(f"{self} # HAS controller (21)")
287 self._has_ctl_ = self.src.type != DEV_TYPE_MAP.OTB # DEX
289 # .I --- 10:037879 --:------ 12:228610 3150 002 0000 # HAS ctl
290 # .I --- 04:029390 --:------ 12:126457 1060 003 01FF01 # HAS ctl
291 elif self.dst.type in (DEV_TYPE_MAP.DTS, DEV_TYPE_MAP.DT2): # DEX
292 _LOGGER.debug(f"{self} # HAS controller (22)")
293 self._has_ctl_ = True
295 # RQ --- 30:258720 10:050360 --:------ 3EF0 001 00 # UNKNOWN (99)
296 # RP --- 10:050360 30:258720 --:------ 3EF0 006 000011010A1C # UNKNOWN (99)
298 # RQ --- 18:006402 13:049798 --:------ 1FC9 001 00
299 # RP --- 13:049798 18:006402 --:------ 1FC9 006 003EF034C286
300 # RQ --- 30:258720 10:050360 --:------ 22D9 001 00
301 # RP --- 10:050360 30:258720 --:------ 22D9 003 0003E8
302 # RQ --- 30:258720 10:050360 --:------ 3220 005 0000120000
303 # RP --- 10:050360 30:258720 --:------ 3220 005 0040120166
304 # RQ --- 30:258720 10:050360 --:------ 3EF0 001 00
305 # RP --- 10:050360 30:258720 --:------ 3EF0 006 000011010A1C
307 # .I --- 34:021943 63:262142 --:------ 10E0 038 000001C8380A01... # unknown
308 # .I --- 32:168090 30:082155 --:------ 31E0 004 0000C800 # unknown
310 if self._has_ctl_ is None:
311 # if DEV_MODE and DEV_TYPE_MAP.HGI not in (
312 # self.src.type,
313 # self.dst.type,
314 # ): # DEX
315 # _LOGGER.warning(f"{self} # has_ctl - undetermined (99)")
316 self._has_ctl_ = False
318 return self._has_ctl_
320 @property
321 def _has_idx(self) -> bool:
322 """Return True if the payload has an index (or has an array), False otherwise."""
324 return self._idx is not False
326 @property
327 def _has_payload(self) -> bool:
328 """Return True if the packet has a non-null payload, False otherwise.
330 May return false positives. The payload may still have an idx.
331 """
333 if self._has_payload_ is not None:
334 return self._has_payload_
336 self._has_payload_ = not any( # type: ignore[unreachable]
337 (
338 self._len == 1,
339 self.verb == RQ and self.code in RQ_NO_PAYLOAD,
340 self.verb == RQ and self._len == 2 and self.code != Code._0016,
341 # self.verb == RQ and self._len == 2 and self.code in (
342 # Code._2309, Code._2349, Code._3EF1
343 # ),
344 )
345 )
347 return self._has_payload_
349 def _force_has_array(self) -> None:
350 self._has_array_ = True
351 self._ctx_ = None # type: ignore[assignment]
352 self._hdr_ = None # type: ignore[assignment]
353 self._idx_ = None # type: ignore[assignment]
355 @property
356 def _is_fragment(self) -> bool:
357 """Return True is the payload *could* be a fragment, False otherwise."""
358 # .I 036 --:------ --:------ 12:126457 000A 012 010001F40BB8-020001F40BB8 # max 2 zones
360 return (
361 self.code in (Code._000A, Code._22C9)
362 and self.verb == I_
363 and self.src is self.dst
364 # and self._has_array # not needed
365 ) or (self.code == Code._0404 and self.verb in (I_, RP))
367 @property
368 def _ctx(self) -> bool | str: # incl. self._idx
369 """Return the payload's full context, if any (e.g. for 0404: zone_idx/frag_idx).
371 Used to store packets in the entity's message DB. It is a superset of _idx.
372 """
374 if self._ctx_ is not None:
375 return self._ctx_
377 if self.code in ( # type: ignore[unreachable]
378 Code._0005,
379 Code._000C,
380 ): # zone_idx, zone_type (device_role)
381 self._ctx_ = self.payload[:4]
382 elif self.code == Code._0404: # zone_idx, frag_idx
383 self._ctx_ = self._idx + self.payload[10:12]
384 else:
385 self._ctx_ = self._idx
386 return self._ctx_
388 @property
389 def _hdr(self) -> HeaderT: # incl. self._ctx
390 """Return the QoS header (fingerprint) of this packet (i.e. device_id|code|verb).
392 Used for QoS (timeouts, retries), callbacks, etc.
393 """
395 if self._hdr_ is not None:
396 return self._hdr_
398 # FIXME: HACK: sometimes RecursionError
399 self._hdr_ = "|".join((self.code, self.verb)) # type: ignore[unreachable]
400 self._hdr_ = pkt_header(self)
401 return self._hdr_
403 @property
404 def _idx(self) -> bool | str: # FIXME: a mess
405 """Return the payload's index, if any (e.g. zone_idx, domain_id or log_idx).
407 Used to route a packet to the correct entity's (i.e. zone/domain) msg handler.
408 """
410 if self._idx_ is not None:
411 return self._idx_
413 self._idx_ = _pkt_idx(self) or False # type: ignore[unreachable]
414 return self._idx_
417# TODO: a mess - has false negatives
418def _pkt_idx(pkt: Frame) -> None | bool | str: # _has_array, _has_ctl
419 """Return the payload's 2-byte context (e.g. zone_idx, domain_id or log_idx).
421 May return a 2-byte string (usu. pkt.payload[:2]), or:
422 - False if there is no context at all
423 - True if the payload is an array
424 - None if it is indeterminable
425 """
426 # The three iterables (none, simple, complex) are mutex
428 # FIXME: 0016 is broken
430 # mutex 2/4, CODE_IDX_COMPLEX: are not payload[:2]
431 if pkt.code == Code._0005:
432 return pkt._has_array
434 # .I --- 10:040239 01:223036 --:------ 0009 003 000000
435 if pkt.code == Code._0009 and pkt.src.type == DEV_TYPE_MAP.OTB: # DEX
436 return False
438 if pkt.code == Code._000C: # zone_idx/domain_id (complex, payload[0:4])
439 if pkt.payload[2:4] == DEV_ROLE_MAP.APP: # "000F"
440 return str(FC) # mypy
441 if pkt.payload[0:4] == f"01{DEV_ROLE_MAP.HTG}": # "010E"
442 return str(F9) # mypy
443 if pkt.payload[2:4] in (
444 DEV_ROLE_MAP.DHW,
445 DEV_ROLE_MAP.HTG,
446 ): # "000D", "000E"
447 return str(FA) # mypy
448 return pkt.payload[:2]
450 if pkt.code == Code._0404: # assumes only 1 DHW zone (can be 2, but never seen)
451 return "HW" if pkt.payload[2:4] == "23" else pkt.payload[:2]
453 if pkt.code == Code._0418: # log_idx (payload[4:6])
454 return pkt.payload[4:6]
456 if pkt.code == Code._1100: # TODO; can do in parser
457 return pkt.payload[:2] if pkt.payload[:1] == "F" else False # only FC
459 if pkt.code == Code._3220: # msg_id/data_id (payload[4:6])
460 return pkt.payload[4:6]
462 if pkt.code in CODE_IDX_ARE_COMPLEX: # these should be handled above
463 raise NotImplementedError(f"{pkt} # CODE_IDX_COMPLEX") # a coding error
465 # mutex 1/4, CODE_IDX_NONE: always returns False
466 if pkt.code in CODE_IDX_ARE_NONE: # returns False
467 if (
468 CODES_SCHEMA[pkt.code].get(pkt.verb, "")[:3] == "^00"
469 and pkt.payload[:2] != "00"
470 ):
471 raise exc.PacketPayloadInvalid(
472 f"Packet idx is {pkt.payload[:2]}, but expecting no idx (00) (0xAA)"
473 )
474 return False
476 # mutex 3/4, CODE_IDX_SIMPLE: potentially some false -ves?
477 if pkt._has_array:
478 return True # excludes len==1 for 000A, 2309, 30C9
480 # TODO: is this needed?: exceptions to CODE_IDX_SIMPLE
481 if pkt.payload[:2] in (F8, F9, FA, FC): # TODO: F6, F7?, FB, FD
482 if pkt.code not in CODE_IDX_DOMAIN:
483 raise exc.PacketPayloadInvalid(
484 f"Packet idx is {pkt.payload[:2]}, but not expecting a domain id"
485 )
486 return pkt.payload[:2]
488 if (
489 pkt._has_ctl # TODO: exclude HVAC?
490 ): # risk of false -ves, TODO: pkt.src.type == DEV_TYPE_MAP.HGI too? # DEX
491 # 02: 22C9: would be picked up as an array, if len==1 counted
492 # 03: # .I 028 03:094242 --:------ 03:094242 30C9 003 010B22 # ctl
493 # 12/22: 000A|1030|2309|30C9 from (addr0 --:), 1060|3150 (addr0 04:)
494 # 23: 0009|10A0
495 return pkt.payload[:2] # tcs._max_zones checked elsewhere
497 if pkt.code in (Code._31D9, Code._31DA):
498 return pkt.payload[:2]
500 if pkt.payload[:2] != "00":
501 raise exc.PacketPayloadInvalid(
502 f"Packet idx is {pkt.payload[:2]}, but expecting no idx (00) (0xAB)"
503 ) # TODO: add a test for this
505 if pkt.code in CODE_IDX_ARE_SIMPLE:
506 return None # False # TODO: return None (less precise) or risk false -ves?
508 # mutex 4/4, CODE_IDX_UNKNOWN: an unknown code
509 _LOGGER.info(f"{pkt} # Unable to determine payload index (is probably OK)")
510 return None
513def pkt_header(pkt: Frame, /, rx_header: bool = False) -> None | HeaderT:
514 """Return the header of a packet (all packets have a header).
516 Used for QoS, and others.
518 For rx_header=True, return instead the header of the response packet, if one is
519 expected, otherwise return None.
521 Examples include:
522 I --- 04:155407 --:------ 04:155407 30C9 003 00092F # 30C9| I|04:155407
523 RP --- 01:223036 18:005567 --:------ 2349 007 0404B000FFFFFF # 2349|RP|01:223036|04
524 I --- 01:223036 --:------ 01:223036 3B00 002 FCC8 # 3B00| I|01:223036|FC
525 RP --- 01:223036 18:005567 --:------ 000C 012 020800125F91020800125F8D # 000C|RP|01:223036|0208
526 I --- 01:223036 --:------ 01:223036 2309 030 00-0640 01-03E8 02-03... # 2309| I|01:223036 (True)
527 """
529 if pkt.code == Code._1FC9:
530 # .I --- 34:021943 --:------ 34:021943 1FC9 024 00-2309-8855B7 00-1FC9-8855B7
531 # .W --- 01:145038 34:021943 --:------ 1FC9 006 00-2309-06368E # won't know src until it arrives
532 # .I --- 34:021943 01:145038 --:------ 1FC9 006 00-2309-8855B7
533 if not rx_header:
534 device_id = ALL_DEV_ADDR.id if pkt.src == pkt.dst else pkt.dst.id
535 return "|".join((pkt.code, pkt.verb, device_id))
536 if pkt.src == pkt.dst: # and pkt.verb == I_:
537 return "|".join((pkt.code, W_, pkt.src.id))
538 if pkt.verb == W_: # and pkt.src != pkt.dst:
539 return "|".join((pkt.code, I_, pkt.src.id)) # TODO: why not pkt.dst?
540 # if pkt.verb == RQ: # and pkt.src != pkt.dst: # TODO: this breaks things
541 # return "|".join((pkt.code, RP, pkt.dst.id))
542 return None
544 # RQ and W use the dst.id rather than the src.id, as:
545 # - cmd.src could be 18:000730, and echo .src will have changed to (say) 18:123456
546 # - cmd.dst is the effector
548 if rx_header:
549 if pkt.verb in (I_, RP) or pkt.src == pkt.dst: # say: xxxx| W|00:000000|xx
550 return None # no response expected
551 header = "|".join((pkt.code, RP if pkt.verb == RQ else I_, pkt.dst.id))
553 elif pkt.verb in (I_, RP) or pkt.src == pkt.dst:
554 header = "|".join((pkt.code, pkt.verb, pkt.src.id))
556 else:
557 header = "|".join((pkt.code, pkt.verb, pkt.dst.id))
559 try:
560 return f"{header}|{pkt._ctx}" if isinstance(pkt._ctx, str) else header
561 except AssertionError:
562 return header