Coverage for src/ramses_tx/parsers.py: 14%
942 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - payload processors.
4NOTES: aspirations on a consistent Schema, going forward:
6============== ======== =================================== ========================
7 :mode/state: :bool: :mutex (infinitive. vs -ing): :flags:
8mode (config.) enabled disabled, heat, cool, heat_cool... ch_enabled, dhw_enabled
9state (action) active idle, heating, cooling... is_heating, is_cooling
10============== ======== =================================== ========================
12- prefer: enabled: True over xx_enabled: True (if only ever 1 flag)
13- prefer: active: True over is_heating: True (if only ever 1 flag)
14- avoid: is_enabled, is_active
15"""
17from __future__ import annotations
19import logging
20import re
21from collections.abc import Mapping
22from datetime import datetime as dt, timedelta as td
23from typing import TYPE_CHECKING, Any
25from . import exceptions as exc
26from .address import ALL_DEV_ADDR, NON_DEV_ADDR, hex_id_to_dev_id
27from .const import (
28 DEV_ROLE_MAP,
29 DEV_TYPE_MAP,
30 FAULT_DEVICE_CLASS,
31 FAULT_STATE,
32 FAULT_TYPE,
33 SYS_MODE_MAP,
34 SZ_ACCEPT,
35 SZ_ACTIVE,
36 SZ_BINDINGS,
37 SZ_BYPASS_MODE,
38 SZ_BYPASS_STATE,
39 SZ_CHANGE_COUNTER,
40 SZ_CONFIRM,
41 SZ_DATETIME,
42 SZ_DEMAND,
43 SZ_DEVICE_CLASS,
44 SZ_DEVICE_ID,
45 SZ_DEVICE_ROLE,
46 SZ_DEVICES,
47 SZ_DHW_FLOW_RATE,
48 SZ_DOMAIN_ID,
49 SZ_DOMAIN_IDX,
50 SZ_DURATION,
51 SZ_FAN_MODE,
52 SZ_FAN_RATE,
53 SZ_FAULT_STATE,
54 SZ_FAULT_TYPE,
55 SZ_FRAG_LENGTH,
56 SZ_FRAG_NUMBER,
57 SZ_FRAGMENT,
58 SZ_IS_DST,
59 SZ_LANGUAGE,
60 SZ_LOCAL_OVERRIDE,
61 SZ_LOG_ENTRY,
62 SZ_LOG_IDX,
63 SZ_MAX_TEMP,
64 SZ_MIN_TEMP,
65 SZ_MODE,
66 SZ_MULTIROOM_MODE,
67 SZ_NAME,
68 SZ_OEM_CODE,
69 SZ_OFFER,
70 SZ_OPENWINDOW_FUNCTION,
71 SZ_PAYLOAD,
72 SZ_PHASE,
73 SZ_PRESSURE,
74 SZ_RELAY_DEMAND,
75 SZ_REMAINING_DAYS,
76 SZ_REMAINING_PERCENT,
77 SZ_REQ_REASON,
78 SZ_SETPOINT,
79 SZ_SETPOINT_BOUNDS,
80 SZ_SYSTEM_MODE,
81 SZ_TEMPERATURE,
82 SZ_TIMESTAMP,
83 SZ_TOTAL_FRAGS,
84 SZ_UFH_IDX,
85 SZ_UNTIL,
86 SZ_VALUE,
87 SZ_WINDOW_OPEN,
88 SZ_ZONE_CLASS,
89 SZ_ZONE_IDX,
90 SZ_ZONE_MASK,
91 SZ_ZONE_TYPE,
92 ZON_MODE_MAP,
93 ZON_ROLE_MAP,
94 DevRole,
95 FaultDeviceClass,
96)
97from .fingerprints import check_signature
98from .helpers import (
99 hex_to_bool,
100 hex_to_date,
101 hex_to_dtm,
102 hex_to_dts,
103 hex_to_flag8,
104 hex_to_percent,
105 hex_to_str,
106 hex_to_temp,
107 parse_air_quality,
108 parse_bypass_position,
109 parse_capabilities,
110 parse_co2_level,
111 parse_exhaust_fan_speed,
112 parse_exhaust_flow,
113 parse_exhaust_temp,
114 parse_fan_info,
115 parse_fault_log_entry,
116 parse_humidity_element,
117 parse_indoor_humidity,
118 parse_indoor_temp,
119 parse_outdoor_humidity,
120 parse_outdoor_temp,
121 parse_post_heater,
122 parse_pre_heater,
123 parse_remaining_mins,
124 parse_supply_fan_speed,
125 parse_supply_flow,
126 parse_supply_temp,
127 parse_valve_demand,
128)
129from .opentherm import (
130 EN,
131 SZ_DESCRIPTION,
132 SZ_MSG_ID,
133 SZ_MSG_NAME,
134 SZ_MSG_TYPE,
135 OtMsgType,
136 decode_frame,
137)
138from .ramses import _31D9_FAN_INFO_VASCO, _2411_PARAMS_SCHEMA
139from .typed_dicts import PayDictT
140from .version import VERSION
142# Kudos & many thanks to:
143# - Evsdd: 0404 (wow!)
144# - Ierlandfan: 3150, 31D9, 31DA, others
145# - ReneKlootwijk: 3EF0
146# - brucemiranda: 3EF0, others
147# - janvken: 10D0, 1470, 1F70, 22B0, 2411, several others
148# - tomkooij: 3110
149# - RemyDeRuysscher: 10E0, 31DA (and related), others
150# - silverailscolo: 12A0, 31DA, others
153from .const import ( # noqa: F401, isort: skip, pylint: disable=unused-import
154 I_,
155 RP,
156 RQ,
157 W_,
158 Code,
159)
160from .const import ( # noqa: F401, isort: skip, pylint: disable=unused-import
161 F6,
162 F8,
163 F9,
164 FA,
165 FB,
166 FC,
167 FF,
168)
170if TYPE_CHECKING:
171 from .message import MessageBase as Message # HACK: merge MsgBase into Msg
173_2411_TABLE = {k: v["description"] for k, v in _2411_PARAMS_SCHEMA.items()}
175LOOKUP_PUZZ = {
176 "10": "engine", # . # version str, e.g. v0.14.0
177 "11": "impersonating", # pkt header, e.g. 30C9| I|03:123001 (15 characters, packed)
178 "12": "message", # . # message only, max len is 16 ascii characters
179 "13": "message", # . # message only, but without a timestamp, max len 22 chars
180 "20": "engine", # . # version str, e.g. v0.50.0, has higher-precision timestamp
181 "7F": "null", # . # packet is null / was nullified: payload to be ignored
182} # "00" is reserved
185_INFORM_DEV_MSG = "Support the development of ramses_rf by reporting this packet"
188_LOGGER = _PKT_LOGGER = logging.getLogger(__name__)
191# rf_unknown
192def parser_0001(payload: str, msg: Message) -> Mapping[str, bool | str | None]:
193 """Parse the 0001 (rf_unknown) packet.
195 :param payload: The raw hex payload
196 :type payload: str
197 :param msg: The message object containing context
198 :type msg: Message
199 :return: A mapping of parsed slot and parameter data
200 :rtype: Mapping[str, bool | str | None]
201 :raises AssertionError: If the payload format does not match expected constants.
202 """
203 # When in test mode, a 12: will send a W ?every 6 seconds:
204 # 12:39:56.099 061 W --- 12:010740 --:------ 12:010740 0001 005 0000000501
205 # 12:40:02.098 061 W --- 12:010740 --:------ 12:010740 0001 005 0000000501
206 # 12:40:08.099 058 W --- 12:010740 --:------ 12:010740 0001 005 0000000501
208 # sent by a THM when is signal strength test mode (0505, except 1st pkt)
209 # 13:48:38.518 080 W --- 12:010740 --:------ 12:010740 0001 005 0000000501
210 # 13:48:45.518 074 W --- 12:010740 --:------ 12:010740 0001 005 0000000505
211 # 13:48:50.518 077 W --- 12:010740 --:------ 12:010740 0001 005 0000000505
213 # sent by a CTL before a rf_check
214 # 15:12:47.769 053 W --- 01:145038 --:------ 01:145038 0001 005 FC00000505
215 # 15:12:47.869 053 RQ --- 01:145038 13:237335 --:------ 0016 002 00FF
216 # 15:12:47.880 053 RP --- 13:237335 01:145038 --:------ 0016 002 0017
218 # 12:30:18.083 047 W --- 01:145038 --:------ 01:145038 0001 005 0800000505
219 # 12:30:23.084 049 W --- 01:145038 --:------ 01:145038 0001 005 0800000505
221 # 15:03:33.187 054 W --- 01:145038 --:------ 01:145038 0001 005 FC00000505
222 # 15:03:38.188 063 W --- 01:145038 --:------ 01:145038 0001 005 FC00000505
223 # 15:03:43.188 064 W --- 01:145038 --:------ 01:145038 0001 005 FC00000505
224 # 15:13:19.757 053 W --- 01:145038 --:------ 01:145038 0001 005 FF00000505
225 # 15:13:24.758 054 W --- 01:145038 --:------ 01:145038 0001 005 FF00000505
226 # 15:13:29.758 068 W --- 01:145038 --:------ 01:145038 0001 005 FF00000505
227 # 15:13:34.759 063 W --- 01:145038 --:------ 01:145038 0001 005 FF00000505
229 # sent by a CTL
230 # 16:49:46.125 057 W --- 04:166090 --:------ 01:032820 0001 005 0100000505
231 # 16:53:34.635 058 W --- 04:166090 --:------ 01:032820 0001 005 0100000505
233 # loopback (not Tx'd) by a HGI80 whenever its button is pressed
234 # 00:22:41.540 --- I --- --:------ --:------ --:------ 0001 005 00FFFF02FF
235 # 00:22:41.757 --- I --- --:------ --:------ --:------ 0001 005 00FFFF0200
236 # 00:22:43.320 --- I --- --:------ --:------ --:------ 0001 005 00FFFF02FF
237 # 00:22:43.415 --- I --- --:------ --:------ --:------ 0001 005 00FFFF0200
239 # From a CM927:
240 # W/--:/--:/12:/00-0000-0501 = Test transmit
241 # W/--:/--:/12:/00-0000-0505 = Field strength
243 if payload[2:6] in ("2000", "8000", "A000"):
244 mode = "hvac"
245 elif payload[2:6] in ("0000", "FFFF"):
246 mode = "heat"
247 else:
248 mode = "heat"
250 if mode == "hvac":
251 result: dict[str, bool | str | None]
253 assert payload[:2] == "00", payload[:2]
254 # assert payload[2:4] in ("20", "80", "A0"), payload[2:4]
255 # assert payload[4:6] == "00", payload[4:6]
256 assert payload[8:10] in ("00", "04", "10", "20", "FF"), payload[8:10]
258 result = {"payload": payload, "slot_num": payload[6:8]}
259 if msg.len >= 6:
260 result.update({"param_num": payload[10:12]})
261 if msg.len >= 7:
262 result.update({"next_slot_num": payload[12:14]})
263 if msg.len >= 8:
264 _14 = None if payload[14:16] == "FF" else bool(int(payload[14:16]))
265 result.update({"boolean_14": _14})
266 return result
268 assert payload[2:6] in ("0000", "FFFF"), payload[2:6]
269 assert payload[8:10] in ("00", "02", "05"), payload[8:10]
271 return {
272 SZ_PAYLOAD: "-".join((payload[:2], payload[2:6], payload[6:8], payload[8:])),
273 }
276# outdoor_sensor (outdoor_weather / outdoor_temperature)
277def parser_0002(payload: str, msg: Message) -> dict[str, Any]:
278 """Parse the 0002 (outdoor_sensor) packet.
280 :param payload: The raw hex payload
281 :type payload: str
282 :param msg: The message object
283 :type msg: Message
284 :return: A dictionary containing the outdoor temperature
285 :rtype: dict[str, Any]
286 """
287 if payload[6:] == "02": # or: msg.src.type == DEV_TYPE_MAP.OUT:
288 return {
289 SZ_TEMPERATURE: hex_to_temp(payload[2:6]),
290 "_unknown": payload[6:],
291 }
293 return {"_payload": payload}
296# zone_name
297def parser_0004(payload: str, msg: Message) -> PayDictT._0004:
298 """Parse the 0004 (zone_name) packet.
300 :param payload: The raw hex payload
301 :type payload: str
302 :param msg: The message object
303 :type msg: Message
304 :return: A dictionary containing the zone name
305 :rtype: PayDictT._0004
306 """
307 # RQ payload is zz00; limited to 12 chars in evohome UI? if "7F"*20: not a zone
309 return {} if payload[4:] == "7F" * 20 else {SZ_NAME: hex_to_str(payload[4:])}
312# system_zones (add/del a zone?) # TODO: needs a cleanup
313def parser_0005(payload: str, msg: Message) -> dict | list[dict]: # TODO: only dict
314 """Parse the 0005 (system_zones) packet to identify zone types and masks.
316 :param payload: The raw hex payload
317 :type payload: str
318 :param msg: The message object
319 :type msg: Message
320 :return: A list or dictionary of zone classes and masks
321 :rtype: dict | list[dict]
322 :raises AssertionError: If the message source is not a recognized device type.
323 """
324 # .I --- 01:145038 --:------ 01:145038 0005 004 00000100
325 # RP --- 02:017205 18:073736 --:------ 0005 004 0009001F
326 # .I --- 34:064023 --:------ 34:064023 0005 012 000A0000-000F0000-00100000
328 def _parser(seqx: str) -> dict:
329 if msg.src.type == DEV_TYPE_MAP.UFC: # DEX, or use: seqx[2:4] == ...
330 zone_mask = hex_to_flag8(seqx[6:8], lsb=True)
331 elif msg.len == 3: # ATC928G1000 - 1st gen monochrome model, max 8 zones
332 zone_mask = hex_to_flag8(seqx[4:6], lsb=True)
333 else:
334 zone_mask = hex_to_flag8(seqx[4:6], lsb=True) + hex_to_flag8(
335 seqx[6:8], lsb=True
336 )
337 zone_class = ZON_ROLE_MAP.get(seqx[2:4], DEV_ROLE_MAP[seqx[2:4]])
338 return {
339 SZ_ZONE_TYPE: seqx[2:4], # TODO: ?remove & keep zone_class?
340 SZ_ZONE_MASK: zone_mask,
341 SZ_ZONE_CLASS: zone_class, # TODO: ?remove & keep zone_type?
342 }
344 if msg.verb == RQ: # RQs have a context: zone_type
345 return {SZ_ZONE_TYPE: payload[2:4], SZ_ZONE_CLASS: DEV_ROLE_MAP[payload[2:4]]}
347 if msg._has_array:
348 assert msg.verb == I_ and msg.src.type == DEV_TYPE_MAP.RND, (
349 f"{msg!r} # expecting I/{DEV_TYPE_MAP.RND}:"
350 ) # DEX
351 return [_parser(payload[i : i + 8]) for i in range(0, len(payload), 8)]
353 return _parser(payload)
356# schedule_sync (any changes?)
357def parser_0006(payload: str, msg: Message) -> PayDictT._0006:
358 """Return the total number of changes to the system schedules.
360 :param payload: The raw hex payload
361 :type payload: str
362 :param msg: The message object
363 :type msg: Message
364 :return: A dictionary containing the schedule change counter
365 :rtype: PayDictT._0006
366 :raises AssertionError: If the payload header is invalid.
367 """
368 # 16:10:34.288 053 RQ --- 30:071715 01:145038 --:------ 0006 001 00
369 # 16:10:34.291 053 RP --- 01:145038 30:071715 --:------ 0006 004 00050008
371 if payload[2:] == "FFFFFF": # RP to an invalid RQ
372 return {}
374 assert payload[2:4] == "05"
376 return {
377 SZ_CHANGE_COUNTER: None if payload[4:] == "FFFF" else int(payload[4:], 16),
378 }
381# relay_demand (domain/zone/device)
382def parser_0008(payload: str, msg: Message) -> PayDictT._0008:
383 """Parse the 0008 (relay_demand) packet.
385 :param payload: The raw hex payload
386 :type payload: str
387 :param msg: The message object
388 :type msg: Message
389 :return: A dictionary containing the relay demand percentage
390 :rtype: PayDictT._0008
391 :raises AssertionError: If the message length is invalid for specific device types.
392 """
393 # https://www.domoticaforum.eu/viewtopic.php?f=7&t=5806&start=105#p73681
394 # e.g. Electric Heat Zone
396 # .I --- 01:145038 --:------ 01:145038 0008 002 0314
397 # .I --- 01:145038 --:------ 01:145038 0008 002 F914
398 # .I --- 01:054173 --:------ 01:054173 0008 002 FA00
399 # .I --- 01:145038 --:------ 01:145038 0008 002 FC14
401 # RP --- 13:109598 18:199952 --:------ 0008 002 0000
402 # RP --- 13:109598 18:199952 --:------ 0008 002 00C8
404 if msg.src.type == DEV_TYPE_MAP.JST and msg.len == 13: # Honeywell Japser, DEX
405 assert msg.len == 13, "expecting length 13"
406 return { # type: ignore[typeddict-item]
407 "ordinal": f"0x{payload[2:8]}",
408 "blob": payload[8:],
409 }
411 return {SZ_RELAY_DEMAND: hex_to_percent(payload[2:4])} # 3EF0[2:4], 3EF1[10:12]
414# relay_failsafe
415def parser_0009(payload: str, msg: Message) -> dict | list[dict]: # TODO: only dict
416 """Parse the 0009 (relay_failsafe) packet.
417 The relay failsafe mode.
419 The failsafe mode defines the relay behaviour if the RF communication is lost (e.g.
420 when a room thermostat stops communicating due to discharged batteries):
422 - False (disabled) - if RF comms are lost, relay will be held in OFF position
423 - True (enabled) - if RF comms are lost, relay will cycle at 20% ON, 80% OFF
425 This setting may need to be enabled to ensure frost protect mode.
427 :param payload: The raw hex payload
428 :type payload: str
429 :param msg: The message object
430 :type msg: Message
431 :return: A dictionary defining if failsafe mode is enabled
432 :rtype: dict | list[dict]
433 :raises AssertionError: If the domain ID in the payload is invalid.
434 """
435 # can get: 003 or 006, e.g.: FC01FF-F901FF or FC00FF-F900FF
436 # .I --- 23:100224 --:------ 23:100224 0009 003 0100FF # 2-zone ST9520C
437 # .I --- 10:040239 01:223036 --:------ 0009 003 000000
439 def _parser(seqx: str) -> dict:
440 assert seqx[:2] in (F9, FC) or int(seqx[:2], 16) < 16
441 return {
442 SZ_DOMAIN_ID if seqx[:1] == "F" else SZ_ZONE_IDX: seqx[:2],
443 "failsafe_enabled": {"00": False, "01": True}.get(seqx[2:4]),
444 "unknown_0": seqx[4:],
445 }
447 if msg._has_array:
448 return [_parser(payload[i : i + 6]) for i in range(0, len(payload), 6)]
450 return {
451 "failsafe_enabled": {"00": False, "01": True}.get(payload[2:4]),
452 "unknown_0": payload[4:],
453 }
456# zone_params (zone_config)
457def parser_000a(
458 payload: str, msg: Message
459) -> PayDictT._000A | list[PayDictT._000A] | PayDictT.EMPTY:
460 """Parse the 000a (zone_params) packet.
462 :param payload: The raw hex payload
463 :type payload: str
464 :param msg: The message object
465 :type msg: Message
466 :return: A dictionary of zone parameters including min/max temps
467 :rtype: PayDictT._000A | list[PayDictT._000A] | PayDictT.EMPTY
468 :raises AssertionError: If the message length is unexpected.
469 """
471 def _parser(seqx: str) -> PayDictT._000A: # null_rp: "007FFF7FFF"
472 bitmap = int(seqx[2:4], 16)
473 return {
474 SZ_MIN_TEMP: hex_to_temp(seqx[4:8]),
475 SZ_MAX_TEMP: hex_to_temp(seqx[8:]),
476 SZ_LOCAL_OVERRIDE: not bool(bitmap & 1),
477 SZ_OPENWINDOW_FUNCTION: not bool(bitmap & 2),
478 SZ_MULTIROOM_MODE: not bool(bitmap & 16),
479 "_unknown_bitmap": f"0b{bitmap:08b}", # TODO: try W with this
480 } # cannot determine zone_type from this information
482 if msg._has_array: # NOTE: these arrays can span 2 pkts!
483 return [
484 {
485 SZ_ZONE_IDX: payload[i : i + 2],
486 **_parser(payload[i : i + 12]),
487 }
488 for i in range(0, len(payload), 12)
489 ]
491 if msg.verb == RQ and msg.len <= 2: # some RQs have a payload (why?)
492 return {}
494 assert msg.len == 6, f"{msg!r} # expecting length 006"
495 return _parser(payload)
498# zone_devices
499def parser_000c(payload: str, msg: Message) -> dict[str, Any]:
500 """Parse the 000c (zone_devices) packet.
502 :param payload: The raw hex payload
503 :type payload: str
504 :param msg: The message object
505 :type msg: Message
506 :return: A dictionary mapping device IDs to zone indices
507 :rtype: dict[str, Any]
508 :raises PacketPayloadInvalid: If the element length in the payload is malformed.
509 :raises AssertionError: If indices or device IDs are invalid.
510 """
511 # .I --- 34:092243 --:------ 34:092243 000C 018 00-0A-7F-FFFFFF 00-0F-7F-FFFFFF 00-10-7F-FFFFFF # noqa: E501
512 # RP --- 01:145038 18:013393 --:------ 000C 006 00-00-00-10DAFD
513 # RP --- 01:145038 18:013393 --:------ 000C 012 01-00-00-10DAF5 01-00-00-10DAFB
515 def complex_idx(seqx: str, msg: Message) -> dict: # complex index
516 """domain_id, zone_idx, or ufx_idx|zone_idx."""
518 # TODO: 000C to a UFC should be ufh_ifx, not zone_idx
519 if msg.src.type == DEV_TYPE_MAP.UFC: # DEX
520 assert int(seqx, 16) < 8, f"invalid ufh_idx: '{seqx}' (0x00)"
521 return {
522 SZ_UFH_IDX: seqx,
523 SZ_ZONE_IDX: None if payload[4:6] == "7F" else payload[4:6],
524 }
526 if payload[2:4] in (DEV_ROLE_MAP.DHW, DEV_ROLE_MAP.HTG):
527 assert int(seqx, 16) < 1 if payload[2:4] == DEV_ROLE_MAP.DHW else 2, (
528 f"invalid _idx: '{seqx}' (0x01)"
529 )
530 return {SZ_DOMAIN_ID: FA if payload[:2] == "00" else F9}
532 if payload[2:4] == DEV_ROLE_MAP.APP:
533 assert int(seqx, 16) < 1, f"invalid _idx: '{seqx}' (0x02)"
534 return {SZ_DOMAIN_ID: FC}
536 assert int(seqx, 16) < 16, f"invalid zone_idx: '{seqx}' (0x03)"
537 return {SZ_ZONE_IDX: seqx}
539 def _parser(
540 seqx: str,
541 ) -> dict: # TODO: assumption that all id/idx are same is wrong!
542 assert seqx[:2] == payload[:2], (
543 f"idx != {payload[:2]} (seqx = {seqx}), short={is_short_000C(payload)}"
544 )
545 assert int(seqx[:2], 16) < 16
546 assert seqx[4:6] == "7F" or seqx[6:] != "F" * 6, f"Bad device_id: {seqx[6:]}"
547 return {hex_id_to_dev_id(seqx[6:12]): seqx[4:6]}
549 def is_short_000C(payload: str) -> bool:
550 """Return True if it is a short 000C (element length is 5, not 6)."""
552 if (pkt_len := len(payload)) != 72:
553 return pkt_len % 12 != 0
555 # 0608-001099C3 0608-001099C5 0608-001099BF 0608-001099BE 0608-001099BD 0608-001099BC # len(element) = 6
556 # 0508-00109901 0800-10990208 0010-99030800 1099-04080010 9905-08001099 0608-00109907 # len(element) = 5
557 elif all(payload[i : i + 4] == payload[:4] for i in range(12, pkt_len, 12)):
558 return False # len(element) = 6 (12)
560 # 06 08-001099C3 06-08001099 C5-06080010 99-BF060800 10-99BE0608 00-1099BD06 08-001099BC # len(element) = 6
561 # 05 08-00109901 08-00109902 08-00109903 08-00109904 08-00109905 08-00109906 08-00109907 # len(element) = 5
562 elif all(payload[i : i + 2] == payload[2:4] for i in range(12, pkt_len, 10)):
563 return True # len(element) = 5 (10)
565 raise exc.PacketPayloadInvalid(
566 "Unable to determine element length"
567 ) # return None
569 if payload[2:4] == DEV_ROLE_MAP.HTG and payload[:2] == "01":
570 dev_role = DEV_ROLE_MAP[DevRole.HT1]
571 else:
572 dev_role = DEV_ROLE_MAP[payload[2:4]]
574 result = {
575 SZ_ZONE_TYPE: payload[2:4],
576 **complex_idx(payload[:2], msg),
577 SZ_DEVICE_ROLE: dev_role,
578 }
579 if msg.verb == RQ: # RQs have a context: index, zone_type, payload is iitt
580 return result
582 # NOTE: Both these are valid! So collision when len = 036!
583 # RP --- 01:239474 18:198929 --:------ 000C 012 06-00-00119A99 06-00-00119B21
584 # RP --- 01:069616 18:205592 --:------ 000C 011 01-00-00121B54 00-00121B52
585 # RP --- 01:239700 18:009874 --:------ 000C 018 07-08-001099C3 07-08-001099C5 07-08-001099BF
586 # RP --- 01:059885 18:010642 --:------ 000C 016 00-00-0011EDAA 00-0011ED92 00-0011EDA0
588 devs = (
589 [_parser(payload[:2] + payload[i : i + 10]) for i in range(2, len(payload), 10)]
590 if is_short_000C(payload)
591 else [_parser(payload[i : i + 12]) for i in range(0, len(payload), 12)]
592 )
594 return {
595 **result,
596 SZ_DEVICES: [k for d in devs for k, v in d.items() if v != "7F"],
597 }
600# unknown_000e, from STA
601def parser_000e(payload: str, msg: Message) -> dict[str, Any]:
602 """Parse the 000e packet.
604 :param payload: The raw hex payload
605 :type payload: str
606 :param msg: The message object
607 :type msg: Message
608 :return: A dictionary containing the raw payload
609 :rtype: dict[str, Any]
610 :raises AssertionError: If the payload value is not recognized.
611 """
612 assert payload in ("000014", "000028"), _INFORM_DEV_MSG
614 return {
615 SZ_PAYLOAD: payload,
616 }
619# rf_check
620def parser_0016(payload: str, msg: Message) -> dict[str, Any]:
621 """Parse the 0016 (rf_check) packet.
623 :param payload: The raw hex payload
624 :type payload: str
625 :param msg: The message object containing context
626 :type msg: Message
627 :return: A dictionary containing rf_strength and rf_value
628 :rtype: dict[str, Any]
629 """
630 # TODO: does 0016 include parent_idx?, but RQ|07:|0000?
631 # RQ --- 22:060293 01:078710 --:------ 0016 002 0200
632 # RP --- 01:078710 22:060293 --:------ 0016 002 021E
633 # RQ --- 12:010740 01:145038 --:------ 0016 002 0800
634 # RP --- 01:145038 12:010740 --:------ 0016 002 081E
635 # RQ --- 07:031785 01:063844 --:------ 0016 002 0000
636 # RP --- 01:063844 07:031785 --:------ 0016 002 002A
638 if msg.verb == RQ: # and msg.len == 1: # TODO: some RQs have a payload
639 return {}
641 rf_value = int(payload[2:4], 16)
642 return {
643 "rf_strength": min(int(rf_value / 5) + 1, 5),
644 "rf_value": rf_value,
645 }
648# language (of device/system)
649def parser_0100(payload: str, msg: Message) -> PayDictT._0100 | PayDictT.EMPTY:
650 """Parse the 0100 (language) packet.
652 :param payload: The raw hex payload
653 :type payload: str
654 :param msg: The message object containing context
655 :type msg: Message
656 :return: A dictionary containing the language string
657 :rtype: PayDictT._0100 | PayDictT.EMPTY
658 """
659 if msg.verb == RQ and msg.len == 1: # some RQs have a payload
660 return {}
662 return {
663 SZ_LANGUAGE: hex_to_str(payload[2:6]),
664 "_unknown_0": payload[6:],
665 }
668# unknown_0150, from OTB
669def parser_0150(payload: str, msg: Message) -> dict[str, Any]:
670 """Parse the 0150 packet.
672 :param payload: The raw hex payload
673 :type payload: str
674 :param msg: The message object
675 :type msg: Message
676 :return: A dictionary containing the raw payload
677 :rtype: dict[str, Any]
678 :raises AssertionError: If the payload is not the expected '000000'.
679 """
680 assert payload == "000000", _INFORM_DEV_MSG
682 return {
683 SZ_PAYLOAD: payload,
684 }
687# unknown_01d0, from a HR91 (when its buttons are pushed)
688def parser_01d0(payload: str, msg: Message) -> dict[str, Any]:
689 """Parse the 01d0 packet (HR91 button push).
691 :param payload: The raw hex payload
692 :type payload: str
693 :param msg: The message object
694 :type msg: Message
695 :return: A dictionary containing the unknown state value
696 :rtype: dict[str, Any]
697 :raises AssertionError: If the payload value is not recognized.
698 """
699 # 23:57:28.869 045 W --- 04:000722 01:158182 --:------ 01D0 002 0003
700 # 23:57:28.931 045 I --- 01:158182 04:000722 --:------ 01D0 002 0003
701 # 23:57:31.581 048 W --- 04:000722 01:158182 --:------ 01E9 002 0003
702 # 23:57:31.643 045 I --- 01:158182 04:000722 --:------ 01E9 002 0000
703 # 23:57:31.749 050 W --- 04:000722 01:158182 --:------ 01D0 002 0000
704 # 23:57:31.811 045 I --- 01:158182 04:000722 --:------ 01D0 002 0000
706 assert payload[2:] in ("00", "03"), _INFORM_DEV_MSG
707 return {
708 "unknown_0": payload[2:],
709 }
712# unknown_01e9, from a HR91 (when its buttons are pushed)
713def parser_01e9(payload: str, msg: Message) -> dict[str, Any]:
714 """Parse the 01e9 packet (HR91 button push).
716 :param payload: The raw hex payload
717 :type payload: str
718 :param msg: The message object
719 :type msg: Message
720 :return: A dictionary containing the unknown state value
721 :rtype: dict[str, Any]
722 :raises AssertionError: If the payload value is not recognized.
723 """
724 # 23:57:31.581348 048 W --- 04:000722 01:158182 --:------ 01E9 002 0003
725 # 23:57:31.643188 045 I --- 01:158182 04:000722 --:------ 01E9 002 0000
727 assert payload[2:] in ("00", "03"), _INFORM_DEV_MSG
728 return {
729 "unknown_0": payload[2:],
730 }
733# unknown_01ff, to/from a Itho Spider/Thermostat
734def parser_01ff(payload: str, msg: Message) -> dict[str, Any]:
735 """Parse the 01ff (Itho Spider) packet.
737 :param payload: The raw hex payload
738 :type payload: str
739 :param msg: The message object containing context
740 :type msg: Message
741 :return: A dictionary of temperature, setpoint bounds, and flags
742 :rtype: dict[str, Any]
743 :raises AssertionError: If internal payload constraints are violated.
744 """
745 # see: https://github.com/zxdavb/ramses_rf/issues/73 & 101
747 # lots of '80's, and I see temps are `int(payload[6:8], 16) / 2`, so I wonder if 0x80 is N/A?
748 # also is '7F'
750 # return {
751 # "dis_temp": None if payload[4:6] == "80" else int(payload[4:6], 16) / 2,
752 # "set_temp": int(payload[6:8], 16) / 2,
753 # "max_temp": int(payload[8:10], 16) / 2, # 22C9 - temp high
754 # "mode_val": payload[10:12],
755 # "mode_xxx": payload[10:11] in ("9", "B", "D") and payload[11:12] in ("0", "2"),
756 # }
758 assert payload[:4] in ("0080", "0180"), f"{_INFORM_DEV_MSG} ({payload[:4]})"
759 assert payload[12:14] == "00", f"{_INFORM_DEV_MSG} ({payload[12:14]})"
760 # assert payload[16:22] in (
761 # "00143C",
762 # "002430",
763 # "7F8080",
764 # ), f"{_INFORM_DEV_MSG} ({payload[16:22]})" # idx|25.9C?
765 assert payload[26:30] == "0000", f"{_INFORM_DEV_MSG} ({payload[26:30]})"
766 assert payload[34:46] == "80800280FF80", f"{_INFORM_DEV_MSG} ({payload[34:46]})"
767 # assert payload[48:] in (
768 # "0000",
769 # "0020",
770 # "0084",
771 # "00A4",
772 # ), f"{_INFORM_DEV_MSG} ({payload[48:]})"
774 if msg.verb in (I_, RQ): # from Spider thermostat to gateway
775 assert payload[14:16] == "80", f"{_INFORM_DEV_MSG} ({payload[14:16]})"
776 # assert payload[22:26] in (
777 # "2832",
778 # "2840",
779 # ), f"{_INFORM_DEV_MSG} ({payload[22:26]})"
780 # assert payload[30:34] in (
781 # "0104",
782 # "4402",
783 # "C102",
784 # "C402",
785 # ), f"{_INFORM_DEV_MSG} ({payload[30:34]})"
786 assert payload[46:48] in ("04", "07"), f"{_INFORM_DEV_MSG} ({payload[46:48]})"
788 if msg.verb in (RP, W_): # from Spider gateway to thermostat
789 # assert payload[14:16] in (
790 # "00",
791 # "7F",
792 # "80",
793 # ), f"{_INFORM_DEV_MSG} ({payload[14:16]})"
794 # assert payload[22:26] in (
795 # "2840",
796 # "8080",
797 # ), f"{_INFORM_DEV_MSG} ({payload[22:26]})"
798 # assert payload[30:34] in (
799 # "0104",
800 # "3100",
801 # "3700",
802 # "B400",
803 # ), f"{_INFORM_DEV_MSG} ({payload[30:34]})"
804 assert payload[46:48] in (
805 "00",
806 "04",
807 "07",
808 ), f"{_INFORM_DEV_MSG} ({payload[46:48]})"
810 setpoint_bounds = (
811 int(payload[6:8], 16) / 2, # as: 22C9[2:6] and [6:10] ???
812 None if msg.verb in (RP, W_) else int(payload[8:10], 16) / 2,
813 )
815 return {
816 SZ_TEMPERATURE: None if msg.verb in (RP, W_) else int(payload[4:6], 16) / 2,
817 SZ_SETPOINT_BOUNDS: setpoint_bounds,
818 "time_planning": not bool(int(payload[10:12], 16) & 1 << 6),
819 "temp_adjusted": bool(int(payload[10:12], 16) & 1 << 5),
820 "_flags_10": payload[10:12], #
821 }
824# zone_schedule (fragment)
825def parser_0404(payload: str, msg: Message) -> PayDictT._0404:
826 """Parse the 0404 (zone_schedule) fragment.
828 :param payload: The raw hex payload
829 :type payload: str
830 :param msg: The message object
831 :type msg: Message
832 :return: A dictionary containing schedule fragment data and total fragments
833 :rtype: PayDictT._0404
834 :raises PacketPayloadInvalid: If the fragment length does not match the header.
835 :raises AssertionError: If internal context bytes are invalid.
836 """
837 # Retrieval of Zone schedule (NB: 200008)
838 # RQ --- 30:185469 01:037519 --:------ 0404 007 00-200008-00-0100
839 # RP --- 01:037519 30:185469 --:------ 0404 048 00-200008-29-0103-6E2...
840 # RQ --- 30:185469 01:037519 --:------ 0404 007 00-200008-00-0203
841 # RP --- 01:037519 30:185469 --:------ 0404 048 00-200008-29-0203-4FD...
842 # RQ --- 30:185469 01:037519 --:------ 0404 007 00-200008-00-0303
843 # RP --- 01:037519 30:185469 --:------ 0404 038 00-200008-1F-0303-C10...
845 # Retrieval of DHW schedule (NB: 230008)
846 # RQ --- 30:185469 01:037519 --:------ 0404 007 00-230008-00-0100
847 # RP --- 01:037519 30:185469 --:------ 0404 048 00-230008-29-0103-618...
848 # RQ --- 30:185469 01:037519 --:------ 0404 007 00-230008-00-0203
849 # RP --- 01:037519 30:185469 --:------ 0404 048 00-230008-29-0203-ED6...
850 # RQ --- 30:185469 01:037519 --:------ 0404 007 00-230008-00-0303
851 # RP --- 01:037519 30:185469 --:------ 0404 014 00-230008-07-0303-13F...
853 # Write a Zone schedule...
854 # .W --- 30:042165 01:076010 --:------ 0404 048 08-200808-29-0104-688...
855 # .I --- 01:076010 30:042165 --:------ 0404 007 08-200808-29-0104
856 # .W --- 30:042165 01:076010 --:------ 0404 048 08-200808-29-0204-007...
857 # .I --- 01:076010 30:042165 --:------ 0404 007 08-200808-29-0204
858 # .W --- 30:042165 01:076010 --:------ 0404 048 08-200808-29-0304-8DD...
859 # .I --- 01:076010 30:042165 --:------ 0404 007 08-200808-29-0304
860 # .W --- 30:042165 01:076010 --:------ 0404 048 08-200808-11-0404-970...
861 # .I --- 01:076010 30:042165 --:------ 0404 007 08-200808-11-0400
863 # RP --- 01:145038 18:013393 --:------ 0404 007 00-230008-00-01FF # no schedule
865 assert payload[4:6] in ("00", payload[:2]), _INFORM_DEV_MSG
867 if int(payload[8:10], 16) * 2 != (frag_length := len(payload[14:])) and (
868 msg.verb != I_ or frag_length != 0
869 ):
870 raise exc.PacketPayloadInvalid(f"Incorrect fragment length: 0x{payload[8:10]}")
872 if msg.verb == RQ: # have a ctx: idx|frag_idx
873 return {
874 SZ_FRAG_NUMBER: int(payload[10:12], 16),
875 SZ_TOTAL_FRAGS: None if payload[12:14] == "00" else int(payload[12:14], 16),
876 }
878 if msg.verb == I_: # have a ctx: idx|frag_idx
879 return {
880 SZ_FRAG_NUMBER: int(payload[10:12], 16),
881 SZ_TOTAL_FRAGS: int(payload[12:14], 16),
882 SZ_FRAG_LENGTH: None if payload[8:10] == "00" else int(payload[8:10], 16),
883 }
885 if payload[12:14] == FF:
886 return {
887 SZ_FRAG_NUMBER: int(payload[10:12], 16),
888 SZ_TOTAL_FRAGS: None,
889 }
891 return {
892 SZ_FRAG_NUMBER: int(payload[10:12], 16),
893 SZ_TOTAL_FRAGS: int(payload[12:14], 16),
894 SZ_FRAG_LENGTH: None if payload[8:10] == "FF" else int(payload[8:10], 16),
895 SZ_FRAGMENT: payload[14:],
896 }
899# system_fault (fault_log_entry) - needs refactoring
900def parser_0418(payload: str, msg: Message) -> PayDictT._0418 | PayDictT._0418_NULL:
901 """Parse the 0418 (system_fault) packet.
903 :param payload: The raw hex payload
904 :type payload: str
905 :param msg: The message object
906 :type msg: Message
907 :return: A dictionary containing a fault log entry or null entry
908 :rtype: PayDictT._0418 | PayDictT._0418_NULL
909 """
910 null_result: PayDictT._0418_NULL
911 full_result: PayDictT._0418
913 # assert int(payload[4:6], 16) < 64, f"Unexpected log_idx: 0x{payload[4:6]}"
915 # RQ --- 18:017804 01:145038 --:------ 0418 003 000005 # log_idx=0x05
916 # RP --- 01:145038 18:017804 --:------ 0418 022 000005B0040000000000CD17B5AE7FFFFF7000000001 # log_idx=0x05
918 # RQ --- 18:017804 01:145038 --:------ 0418 003 000006 # log_idx=0x06
919 # RP --- 01:145038 18:017804 --:------ 0418 022 000000B0000000000000000000007FFFFF7000000000 # log_idx=None (00)
921 if msg.verb == RQ: # has a ctx: log_idx
922 null_result = {SZ_LOG_IDX: payload[4:6]} # type: ignore[typeddict-item]
923 return null_result
925 # NOTE: such payloads have idx=="00": if verb is I, can safely assume log_idx is 0,
926 # but for RP it is sentinel for null (we can't know the corresponding RQ's log_idx)
927 elif hex_to_dts(payload[18:30]) is None:
928 null_result = {SZ_LOG_ENTRY: None}
929 if msg.verb == I_:
930 null_result = {SZ_LOG_IDX: payload[4:6]} | null_result # type: ignore[assignment]
931 return null_result
933 try:
934 assert payload[2:4] in FAULT_STATE, f"fault state: {payload[2:4]}"
935 assert payload[8:10] in FAULT_TYPE, f"fault type: {payload[8:10]}"
936 assert payload[12:14] in FAULT_DEVICE_CLASS, f"device class: {payload[12:14]}"
937 # 1C: 'Comms fault, Actuator': seen with boiler relays
938 assert int(payload[10:12], 16) < 16 or (
939 payload[10:12] in ("1C", F6, F9, FA, FC)
940 ), f"domain id: {payload[10:12]}"
941 except AssertionError as err:
942 _LOGGER.warning(
943 f"{msg!r} < {_INFORM_DEV_MSG} ({err}), with a photo of your fault log"
944 )
946 # log_entry will not be None, because of guard clauses, above
947 log_entry: PayDictT.FAULT_LOG_ENTRY = parse_fault_log_entry(payload) # type: ignore[assignment]
949 # log_idx is not intrinsic to the fault & increments as the fault moves down the log
950 log_entry.pop(f"_{SZ_LOG_IDX}") # type: ignore[misc]
952 _KEYS = (SZ_TIMESTAMP, SZ_FAULT_STATE, SZ_FAULT_TYPE)
953 entry = [v for k, v in log_entry.items() if k in _KEYS]
955 if log_entry[SZ_DEVICE_CLASS] != FaultDeviceClass.ACTUATOR:
956 entry.append(log_entry[SZ_DEVICE_CLASS])
957 elif log_entry[SZ_DOMAIN_IDX] == FC:
958 entry.append(DEV_ROLE_MAP[DevRole.APP]) # actual evohome UI
959 elif log_entry[SZ_DOMAIN_IDX] == FA:
960 entry.append(DEV_ROLE_MAP[DevRole.HTG]) # speculative
961 elif log_entry[SZ_DOMAIN_IDX] == F9:
962 entry.append(DEV_ROLE_MAP[DevRole.HT1]) # speculative
963 else:
964 entry.append(FaultDeviceClass.ACTUATOR)
966 # TODO: remove the qualifier (the assert is false)
967 if log_entry[SZ_DEVICE_CLASS] != FaultDeviceClass.CONTROLLER:
968 # assert log_entry[SZ_DOMAIN_IDX] == "00", log_entry[SZ_DOMAIN_IDX]
969 # key_name = SZ_ZONE_IDX if int(payload[10:12], 16) < 16 else SZ_DOMAIN_ID
970 # log_entry.update({key_name: payload[10:12]})
971 entry.append(log_entry[SZ_DOMAIN_IDX])
973 if log_entry[SZ_DEVICE_ID] not in ("00:000000", "00:000001", "00:000002"):
974 # "00:000001 for Controller? "00:000002 for Unknown?
975 entry.append(log_entry[SZ_DEVICE_ID])
977 entry.extend((payload[6:8], payload[14:18], payload[30:38])) # TODO: remove?
979 full_result = {
980 SZ_LOG_IDX: payload[4:6], # type: ignore[typeddict-item]
981 SZ_LOG_ENTRY: tuple([str(r) for r in entry]),
982 }
983 return full_result
986# unknown_042f, from STA, VMS
987def parser_042f(payload: str, msg: Message) -> dict[str, Any]:
988 """Parse the 042f packet.
990 :param payload: The raw hex payload
991 :type payload: str
992 :param msg: The message object
993 :type msg: Message
994 :return: A dictionary of extracted hex counters
995 :rtype: dict[str, Any]
996 """
997 return {
998 "counter_1": f"0x{payload[2:6]}",
999 "counter_3": f"0x{payload[6:10]}",
1000 "counter_5": f"0x{payload[10:14]}",
1001 "unknown_7": f"0x{payload[14:]}",
1002 }
1005# TODO: unknown_0b04, from THM (only when its a CTL?)
1006def parser_0b04(payload: str, msg: Message) -> dict[str, Any]:
1007 """Parse the 0b04 packet.
1009 :param payload: The raw hex payload
1010 :type payload: str
1011 :param msg: The message object
1012 :type msg: Message
1013 :return: A dictionary containing the unknown data value
1014 :rtype: dict[str, Any]
1015 """
1016 # .I --- --:------ --:------ 12:207082 0B04 002 00C8 # batch of 3, every 24h
1018 return {
1019 "unknown_1": payload[2:],
1020 }
1023# mixvalve_config (zone), FAN
1024def parser_1030(payload: str, msg: Message) -> PayDictT._1030:
1025 """Parse the 1030 (mixvalve_config) packet.
1027 :param payload: The raw hex payload
1028 :type payload: str
1029 :param msg: The message object containing context
1030 :type msg: Message
1031 :return: A dictionary of mixing valve parameters
1032 :rtype: PayDictT._1030
1033 :raises AssertionError: If the message length is unexpected or parameters are malformed.
1034 """
1035 # .I --- 01:145038 --:------ 01:145038 1030 016 0A-C80137-C9010F-CA0196-CB0100-CC0101
1036 # .I --- --:------ --:------ 12:144017 1030 016 01-C80137-C9010F-CA0196-CB010F-CC0101
1037 # RP --- 32:155617 18:005904 --:------ 1030 007 00-200100-21011F
1039 def _parser(seqx: str) -> dict:
1040 assert seqx[2:4] == "01", seqx[2:4]
1042 param_name = {
1043 "20": "unknown_20", # HVAC
1044 "21": "unknown_21", # HVAC
1045 "C8": "max_flow_setpoint", # 55 (0-99) C
1046 "C9": "min_flow_setpoint", # 15 (0-50) C
1047 "CA": "valve_run_time", # 150 (0-240) sec, aka actuator_run_time
1048 "CB": "pump_run_time", # 15 (0-99) sec
1049 "CC": "boolean_cc", # ?boolean?
1050 }[seqx[:2]]
1052 return {param_name: int(seqx[4:], 16)}
1054 assert (msg.len - 1) / 3 in (2, 5), msg.len
1055 # assert payload[30:] in ("00", "01"), payload[30:]
1057 params = [_parser(payload[i : i + 6]) for i in range(2, len(payload), 6)]
1058 return {k: v for x in params for k, v in x.items()} # type: ignore[return-value]
1061# device_battery (battery_state)
1062def parser_1060(payload: str, msg: Message) -> PayDictT._1060:
1063 """Parse the 1060 (device_battery) packet.
1064 Return the battery state.
1066 Some devices (04:) will also report battery level.
1067 :param payload: The raw hex payload
1068 :type payload: str
1069 :param msg: The message object containing context
1070 :type msg: Message
1071 :return: A dictionary containing battery low status and level percentage
1072 :rtype: PayDictT._1060
1073 :raises AssertionError: If the message length is invalid.
1074 """
1076 assert msg.len == 3, msg.len
1077 assert payload[4:6] in ("00", "01")
1079 return {
1080 "battery_low": payload[4:] == "00",
1081 "battery_level": None if payload[2:4] == "00" else hex_to_percent(payload[2:4]),
1082 }
1085# max_ch_setpoint (supply high limit)
1086def parser_1081(payload: str, msg: Message) -> PayDictT._1081:
1087 """Parse the 1081 (max_ch_setpoint) packet.
1089 :param payload: The raw hex payload
1090 :type payload: str
1091 :param msg: The message object
1092 :type msg: Message
1093 :return: A dictionary containing the temperature setpoint
1094 :rtype: PayDictT._1081
1095 """
1096 return {SZ_SETPOINT: hex_to_temp(payload[2:])}
1099# unknown_1090 (non-Evohome, e.g. ST9520C)
1100def parser_1090(payload: str, msg: Message) -> PayDictT._1090:
1101 """Parse the 1090 packet.
1103 :param payload: The raw hex payload
1104 :type payload: str
1105 :param msg: The message object
1106 :type msg: Message
1107 :return: A dictionary containing two temperature values
1108 :rtype: PayDictT._1090
1109 :raises AssertionError: If the message length or payload index is invalid.
1110 """
1111 # 14:08:05.176 095 RP --- 23:100224 22:219457 --:------ 1090 005 007FFF01F4
1112 # 18:08:05.809 095 RP --- 23:100224 22:219457 --:------ 1090 005 007FFF01F4
1114 # this is an educated guess
1115 assert msg.len == 5, _INFORM_DEV_MSG
1116 assert int(payload[:2], 16) < 2, _INFORM_DEV_MSG
1118 return {
1119 "temperature_0": hex_to_temp(payload[2:6]),
1120 "temperature_1": hex_to_temp(payload[6:10]),
1121 }
1124# unknown_1098, from OTB
1125def parser_1098(payload: str, msg: Message) -> dict[str, Any]:
1126 """Parse the 1098 packet.
1128 :param payload: The raw hex payload
1129 :type payload: str
1130 :param msg: The message object
1131 :type msg: Message
1132 :return: A dictionary containing the raw payload and its interpreted value
1133 :rtype: dict[str, Any]
1134 :raises AssertionError: If the payload does not match expected constants.
1135 """
1136 assert payload == "00C8", _INFORM_DEV_MSG
1138 return {
1139 "_payload": payload,
1140 "_value": {"00": False, "C8": True}.get(
1141 payload[2:], hex_to_percent(payload[2:])
1142 ),
1143 }
1146# dhw (cylinder) params # FIXME: a bit messy
1147def parser_10a0(payload: str, msg: Message) -> PayDictT._10A0 | PayDictT.EMPTY:
1148 """Parse the 10a0 (dhw_params) packet.
1150 :param payload: The raw hex payload
1151 :type payload: str
1152 :param msg: The message object containing context
1153 :type msg: Message
1154 :return: A dictionary of DHW parameters or an empty dictionary
1155 :rtype: PayDictT._10A0 | PayDictT.EMPTY
1156 :raises AssertionError: If the message length or valve index is invalid.
1157 """
1158 # RQ --- 07:045960 01:145038 --:------ 10A0 006 00-1087-00-03E4 # RQ/RP, every 24h
1159 # RP --- 01:145038 07:045960 --:------ 10A0 006 00-109A-00-03E8
1160 # RP --- 10:048122 18:006402 --:------ 10A0 003 00-1B58
1162 # these may not be reliable...
1163 # RQ --- 01:136410 10:067219 --:------ 10A0 002 0000
1164 # RQ --- 07:017494 01:078710 --:------ 10A0 006 00-1566-00-03E4
1166 # RQ --- 07:045960 01:145038 --:------ 10A0 006 00-31FF-00-31FF # null
1167 # RQ --- 07:045960 01:145038 --:------ 10A0 006 00-1770-00-03E8
1168 # RQ --- 07:045960 01:145038 --:------ 10A0 006 00-1374-00-03E4
1169 # RQ --- 07:030741 01:102458 --:------ 10A0 006 00-181F-00-03E4
1170 # RQ --- 07:036831 23:100224 --:------ 10A0 006 01-1566-00-03E4 # non-evohome
1172 # these from a RFG...
1173 # RQ --- 30:185469 01:037519 --:------ 0005 002 000E
1174 # RP --- 01:037519 30:185469 --:------ 0005 004 000E0300 # two DHW valves
1175 # RQ --- 30:185469 01:037519 --:------ 10A0 001 01 (01 )
1177 if msg.verb == RQ and msg.len == 1: # some RQs have a payload (why?)
1178 # 045 RQ --- 07:045960 01:145038 --:------ 10A0 006 0013740003E4
1179 # 037 RQ --- 18:013393 01:145038 --:------ 10A0 001 00
1180 # 054 RP --- 01:145038 18:013393 --:------ 10A0 006 0013880003E8
1181 return {}
1183 assert msg.len in (1, 3, 6), msg.len # OTB uses 3, evohome uses 6
1184 assert payload[:2] in ("00", "01"), payload[:2] # can be two DHW valves/system
1186 result: PayDictT._10A0 = {} # type: ignore[typeddict-item]
1187 if msg.len >= 2:
1188 setpoint = hex_to_temp(payload[2:6]) # 255 for OTB? iff no DHW?
1189 result = {SZ_SETPOINT: None if setpoint == 255 else setpoint} # 30.0-85.0 C
1190 if msg.len >= 4:
1191 result["overrun"] = int(payload[6:8], 16) # 0-10 minutes
1192 if msg.len >= 6:
1193 result["differential"] = hex_to_temp(payload[8:12]) # 1.0-10.0 C
1195 return result
1198# unknown_10b0, from OTB
1199def parser_10b0(payload: str, msg: Message) -> dict[str, Any]:
1200 """Parse the 10b0 packet.
1202 :param payload: The raw hex payload
1203 :type payload: str
1204 :param msg: The message object
1205 :type msg: Message
1206 :return: A dictionary containing the raw payload and interpreted value
1207 :rtype: dict[str, Any]
1208 :raises AssertionError: If the payload is invalid.
1209 """
1210 assert payload == "0000", _INFORM_DEV_MSG
1212 return {
1213 "_payload": payload,
1214 "_value": {"00": False, "C8": True}.get(
1215 payload[2:], hex_to_percent(payload[2:])
1216 ),
1217 }
1220# filter_change, HVAC
1221def parser_10d0(payload: str, msg: Message) -> dict[str, Any]:
1222 """Parse the 10d0 (filter_change) packet.
1224 :param payload: The raw hex payload
1225 :type payload: str
1226 :param msg: The message object containing context
1227 :type msg: Message
1228 :return: A dictionary of remaining days, lifetime, and percentage
1229 :rtype: dict[str, Any]
1230 """
1231 # 2022-07-03T22:52:34.571579 045 W --- 37:171871 32:155617 --:------ 10D0 002 00FF
1232 # 2022-07-03T22:52:34.596526 066 I --- 32:155617 37:171871 --:------ 10D0 006 0047B44F0000
1233 # then...
1234 # 2022-07-03T23:14:23.854089 000 RQ --- 37:155617 32:155617 --:------ 10D0 002 0000
1235 # 2022-07-03T23:14:23.876088 084 RP --- 32:155617 37:155617 --:------ 10D0 006 00B4B4C80000
1237 # 00-FF resets the counter, 00-47-B4-4F-0000 is the value (71 180 79).
1238 # Default is 180 180 200. The returned value is the amount of days (180),
1239 # total amount of days till change (180), percentage (200)
1241 result: dict[str, bool | float | None]
1243 if msg.verb == W_:
1244 return {"reset_counter": payload[2:4] != "00"}
1246 result = {}
1248 if payload[2:4] not in ("FF", "FE"):
1249 result[SZ_REMAINING_DAYS] = int(payload[2:4], 16)
1251 if payload[4:6] not in ("FF", "FE"):
1252 result["days_lifetime"] = int(payload[4:6], 16)
1254 result[SZ_REMAINING_PERCENT] = hex_to_percent(payload[6:8])
1256 return result
1259# device_info
1260def parser_10e0(payload: str, msg: Message) -> dict[str, Any]:
1261 """Parse the 10e0 (device_info) packet.
1263 :param payload: The raw hex payload
1264 :type payload: str
1265 :param msg: The message object
1266 :type msg: Message
1267 :return: A dictionary of device specifications and manufacturing data
1268 :rtype: dict[str, Any]
1269 :raises AssertionError: If the message length is invalid for the reported signature.
1270 """
1271 if payload == "00": # some HVAC devices will RP|10E0|00
1272 return {}
1274 assert msg.len in (19, 28, 29, 30, 36, 38), msg.len # >= 19, msg.len
1276 payload = re.sub("(00)*$", "", payload) # remove trailing 00s
1277 assert len(payload) >= 18 * 2
1279 # if DEV_MODE: # TODO
1280 try: # DEX
1281 check_signature(msg.src.type, payload[2:20])
1282 except ValueError as err:
1283 _LOGGER.info(
1284 f"{msg!r} < {_INFORM_DEV_MSG}, with the make/model of device: {msg.src} ({err})"
1285 )
1287 description, _, unknown = payload[36:].partition("00")
1289 result = {
1290 SZ_OEM_CODE: payload[14:16], # 00/FF is CH/DHW, 01/6x is HVAC
1291 # "_manufacturer_group": payload[2:6], # 0001-HVAC, 0002-CH/DHW
1292 "manufacturer_sub_id": payload[6:8],
1293 "product_id": payload[8:10], # if CH/DHW: matches device_type (sometimes)
1294 "date_1": hex_to_date(payload[28:36]) or "0000-00-00", # hardware?
1295 "date_2": hex_to_date(payload[20:28]) or "0000-00-00", # firmware?
1296 # "software_ver_id": payload[10:12],
1297 # "list_ver_id": payload[12:14], # if FF/01 is CH/DHW, then 01/FF
1298 # # "additional_ver_a": payload[16:18],
1299 # # "additional_ver_b": payload[18:20],
1300 # "_signature": payload[2:20],
1301 "description": bytearray.fromhex(description).decode(),
1302 }
1303 if unknown: # TODO: why only RP|OTB, I|DT4s do this?
1304 result["_unknown"] = unknown
1305 return result
1308# device_id
1309def parser_10e1(payload: str, msg: Message) -> PayDictT._10E1:
1310 """Parse the 10e1 (device_id) packet.
1312 :param payload: The raw hex payload
1313 :type payload: str
1314 :param msg: The message object
1315 :type msg: Message
1316 :return: A dictionary containing the device ID
1317 :rtype: PayDictT._10E1
1318 """
1319 return {SZ_DEVICE_ID: hex_id_to_dev_id(payload[2:])}
1322# unknown_10e2 - HVAC
1323def parser_10e2(payload: str, msg: Message) -> dict[str, Any]:
1324 """Parse the 10e2 (HVAC counter) packet.
1326 :param payload: The raw hex payload
1327 :type payload: str
1328 :param msg: The message object containing context
1329 :type msg: Message
1330 :return: A dictionary containing the extracted counter
1331 :rtype: dict[str, Any]
1332 :raises AssertionError: If the payload length is not 6 or prefix is not '00'.
1333 """
1334 # .I --- --:------ --:------ 20:231151 10E2 003 00AD74 # every 2 minutes
1336 assert payload[:2] == "00", _INFORM_DEV_MSG
1337 assert len(payload) == 6, _INFORM_DEV_MSG
1339 return {
1340 "counter": int(payload[2:], 16),
1341 }
1344# tpi_params (domain/zone/device) # FIXME: a bit messy
1345def parser_1100(
1346 payload: str, msg: Message
1347) -> PayDictT._1100 | PayDictT._1100_IDX | PayDictT._JASPER | PayDictT.EMPTY:
1348 """Parse the 1100 (tpi_params) packet.
1350 :param payload: The raw hex payload
1351 :type payload: str
1352 :param msg: The message object containing context
1353 :type msg: Message
1354 :return: A dictionary of TPI parameters or domain index
1355 :rtype: PayDictT._1100 | PayDictT._1100_IDX | PayDictT._JASPER | PayDictT.EMPTY
1356 :raises AssertionError: If TPI values are outside of recognized ranges.
1357 """
1359 def complex_idx(seqx: str) -> PayDictT._1100_IDX | PayDictT.EMPTY:
1360 return {SZ_DOMAIN_ID: seqx} if seqx[:1] == "F" else {} # type: ignore[typeddict-item] # only FC
1362 if msg.src.type == DEV_TYPE_MAP.JIM: # Honeywell Japser, DEX
1363 assert msg.len == 19, msg.len
1364 return {
1365 "ordinal": f"0x{payload[2:8]}",
1366 "blob": payload[8:],
1367 }
1369 if msg.verb == RQ and msg.len == 1: # some RQs have a payload (why?)
1370 return complex_idx(payload[:2])
1372 assert int(payload[2:4], 16) / 4 in range(1, 13), payload[2:4]
1373 assert int(payload[4:6], 16) / 4 in range(1, 31), payload[4:6]
1374 assert int(payload[6:8], 16) / 4 in range(0, 16), payload[6:8]
1376 # for: TPI // heatpump
1377 # - cycle_rate: 6 (3, 6, 9, 12) // ?? (1-9)
1378 # - min_on_time: 1 (1-5) // ?? (1, 5, 10,...30)
1379 # - min_off_time: 1 (1-?) // ?? (0, 5, 10, 15)
1381 def _parser(seqx: str) -> PayDictT._1100:
1382 return {
1383 "cycle_rate": int(int(payload[2:4], 16) / 4), # cycles/hour
1384 "min_on_time": int(payload[4:6], 16) / 4, # min
1385 "min_off_time": int(payload[6:8], 16) / 4, # min
1386 "_unknown_0": payload[8:10], # always 00, FF?
1387 }
1389 result = _parser(payload)
1391 if msg.len > 5:
1392 pbw = hex_to_temp(payload[10:14])
1394 assert pbw is None or 1.5 <= pbw <= 3.0, (
1395 f"unexpected value for PBW: {payload[10:14]}"
1396 )
1398 result.update(
1399 {
1400 "proportional_band_width": pbw,
1401 "_unknown_1": payload[14:], # always 01?
1402 }
1403 )
1405 return complex_idx(payload[:2]) | result
1408# unknown_11f0, from heatpump relay
1409def parser_11f0(payload: str, msg: Message) -> dict[str, Any]:
1410 """Parse the 11f0 (heatpump relay) packet.
1412 :param payload: The raw hex payload
1413 :type payload: str
1414 :param msg: The message object
1415 :type msg: Message
1416 :return: A dictionary containing the raw payload
1417 :rtype: dict[str, Any]
1418 :raises AssertionError: If the payload does not match the expected constant string.
1419 """
1420 assert payload == "000009000000000000", _INFORM_DEV_MSG
1422 return {
1423 SZ_PAYLOAD: payload,
1424 }
1427# dhw cylinder temperature
1428def parser_1260(payload: str, msg: Message) -> PayDictT._1260:
1429 """Parse the 1260 (dhw_temp) packet.
1431 :param payload: The raw hex payload
1432 :type payload: str
1433 :param msg: The message object
1434 :type msg: Message
1435 :return: A dictionary containing the DHW temperature
1436 :rtype: PayDictT._1260
1437 """
1438 return {SZ_TEMPERATURE: hex_to_temp(payload[2:])}
1441# HVAC: outdoor humidity
1442def parser_1280(payload: str, msg: Message) -> PayDictT._1280:
1443 """Parse the 1280 (outdoor_humidity) packet.
1445 :param payload: The raw hex payload
1446 :type payload: str
1447 :param msg: The message object
1448 :type msg: Message
1449 :return: A dictionary containing the outdoor humidity percentage
1450 :rtype: PayDictT._1280
1451 """
1452 return parse_outdoor_humidity(payload[2:])
1455# outdoor temperature
1456def parser_1290(payload: str, msg: Message) -> PayDictT._1290:
1457 """Parse the 1290 (outdoor_temp) packet.
1459 :param payload: The raw hex payload
1460 :type payload: str
1461 :param msg: The message object
1462 :type msg: Message
1463 :return: A dictionary containing the outdoor temperature
1464 :rtype: PayDictT._1290
1465 """
1466 # evohome responds to an RQ, also from OTB
1467 return parse_outdoor_temp(payload[2:])
1470# HVAC: co2_level, see: 31DA[6:10]
1471def parser_1298(payload: str, msg: Message) -> PayDictT._1298:
1472 """Parse the 1298 (co2_level) packet.
1474 :param payload: The raw hex payload
1475 :type payload: str
1476 :param msg: The message object
1477 :type msg: Message
1478 :return: A dictionary containing the CO2 level in PPM
1479 :rtype: PayDictT._1298
1480 """
1481 return parse_co2_level(payload[2:6])
1484# HVAC: indoor_humidity, array of 3 sets for HRU
1485def parser_12a0(
1486 payload: str, msg: Message
1487) -> PayDictT.INDOOR_HUMIDITY | list[PayDictT._12A0]:
1488 """Parse the 12a0 (indoor_humidity) packet.
1490 :param payload: The raw hex payload
1491 :type payload: str
1492 :param msg: The message object containing context
1493 :type msg: Message
1494 :return: A single humidity dict or a list of sensor element dicts
1495 :rtype: PayDictT.INDOOR_HUMIDITY | list[PayDictT._12A0]
1496 """
1497 if len(payload) <= 14:
1498 return parse_indoor_humidity(payload[2:12])
1500 return [
1501 {
1502 "hvac_idx": payload[i : i + 2], # used as index
1503 **parse_humidity_element(payload[i + 2 : i + 12], payload[i : i + 2]),
1504 }
1505 for i in range(0, len(payload), 14)
1506 ]
1509# window_state (of a device/zone)
1510def parser_12b0(payload: str, msg: Message) -> PayDictT._12B0:
1511 """Parse the 12b0 (window_state) packet.
1513 :param payload: The raw hex payload
1514 :type payload: str
1515 :param msg: The message object
1516 :type msg: Message
1517 :return: A dictionary containing the window open status
1518 :rtype: PayDictT._12B0
1519 :raises AssertionError: If the payload state bytes are unrecognized.
1520 """
1521 assert payload[2:] in ("0000", "C800", "FFFF"), payload[2:] # "FFFF" means N/A
1523 return {
1524 SZ_WINDOW_OPEN: hex_to_bool(payload[2:4]),
1525 }
1528# displayed temperature (on a TR87RF bound to a RFG100)
1529def parser_12c0(payload: str, msg: Message) -> PayDictT._12C0:
1530 """Parse the 12c0 (displayed_temp) packet.
1532 :param payload: The raw hex payload
1533 :type payload: str
1534 :param msg: The message object
1535 :type msg: Message
1536 :return: A dictionary containing the temperature and its measurement units
1537 :rtype: PayDictT._12C0
1538 """
1539 if payload[2:4] == "80":
1540 temp: float | None = None
1541 elif payload[4:6] == "00": # units are 1.0 F
1542 temp = int(payload[2:4], 16)
1543 else: # if payload[4:] == "01": # units are 0.5 C
1544 temp = int(payload[2:4], 16) / 2
1546 result: PayDictT._12C0 = {
1547 SZ_TEMPERATURE: temp,
1548 "units": {"00": "Fahrenheit", "01": "Celsius"}[payload[4:6]], # type: ignore[typeddict-item]
1549 }
1550 if len(payload) > 6:
1551 result["_unknown_6"] = payload[6:]
1552 return result
1555# HVAC: air_quality (and air_quality_basis), see: 31DA[2:6]
1556def parser_12c8(payload: str, msg: Message) -> PayDictT._12C8:
1557 """Parse the 12c8 (air_quality) packet.
1559 :param payload: The raw hex payload
1560 :type payload: str
1561 :param msg: The message object
1562 :type msg: Message
1563 :return: A dictionary containing the air quality percentage and basis
1564 :rtype: PayDictT._12C8
1565 """
1566 return parse_air_quality(payload[2:6])
1569# dhw_flow_rate
1570def parser_12f0(payload: str, msg: Message) -> PayDictT._12F0:
1571 """Parse the 12f0 (dhw_flow_rate) packet.
1573 :param payload: The raw hex payload
1574 :type payload: str
1575 :param msg: The message object
1576 :type msg: Message
1577 :return: A dictionary containing the DHW flow rate
1578 :rtype: PayDictT._12F0
1579 """
1580 return {SZ_DHW_FLOW_RATE: hex_to_temp(payload[2:])}
1583# ch_pressure
1584def parser_1300(payload: str, msg: Message) -> PayDictT._1300:
1585 """Parse the 1300 (ch_pressure) packet.
1587 :param payload: The raw hex payload
1588 :type payload: str
1589 :param msg: The message object
1590 :type msg: Message
1591 :return: A dictionary containing the system pressure in bar
1592 :rtype: PayDictT._1300
1593 """
1594 # 0x9F6 (2550 dec = 2.55 bar) appears to be a sentinel value
1595 return {SZ_PRESSURE: None if payload[2:] == "09F6" else hex_to_temp(payload[2:])}
1598# programme_scheme, HVAC
1599def parser_1470(payload: str, msg: Message) -> dict[str, Any]:
1600 """Parse the 1470 (programme_scheme) packet.
1602 :param payload: The raw hex payload
1603 :type payload: str
1604 :param msg: The message object containing context
1605 :type msg: Message
1606 :return: A dictionary of the schedule scheme and daily setpoint count
1607 :rtype: dict[str, Any]
1608 :raises AssertionError: If the payload format or constants are unrecognized.
1609 """
1610 # Seen on Orcon: see 1470, 1F70, 22B0
1612 SCHEDULE_SCHEME = {
1613 "9": "one_per_week",
1614 "A": "two_per_week", # week_day, week_end
1615 "B": "one_each_day", # seven_per_week (default?)
1616 }
1618 assert payload[8:10] == "80", _INFORM_DEV_MSG
1619 assert msg.verb == W_ or payload[4:8] == "0E60", _INFORM_DEV_MSG
1620 assert msg.verb == W_ or payload[10:] == "2A0108", _INFORM_DEV_MSG
1621 assert msg.verb != W_ or payload[4:] == "000080000000", _INFORM_DEV_MSG
1623 # schedule...
1624 # [2:3] - 1, every/all days, 1&6, weekdays/weekends, 1-7, each individual day
1625 # [3:4] - # setpoints/day (default 3)
1626 assert payload[2:3] in SCHEDULE_SCHEME and (
1627 payload[3:4] in ("2", "3", "4", "5", "6")
1628 ), _INFORM_DEV_MSG
1630 return {
1631 "scheme": SCHEDULE_SCHEME.get(payload[2:3], f"unknown_{payload[2:3]}"),
1632 "daily_setpoints": payload[3:4],
1633 "_value_4": payload[4:8],
1634 "_value_8": payload[8:10],
1635 "_value_10": payload[10:],
1636 }
1639# system_sync
1640def parser_1f09(payload: str, msg: Message) -> PayDictT._1F09:
1641 """Parse the 1f09 (system_sync) packet.
1643 :param payload: The raw hex payload
1644 :type payload: str
1645 :param msg: The message object containing context
1646 :type msg: Message
1647 :return: A dictionary with remaining seconds and the calculated next sync time
1648 :rtype: PayDictT._1F09
1649 :raises AssertionError: If the packet length is not 3.
1650 """
1651 # 22:51:19.287 067 I --- --:------ --:------ 12:193204 1F09 003 010A69
1652 # 22:51:19.318 068 I --- --:------ --:------ 12:193204 2309 003 010866
1653 # 22:51:19.321 067 I --- --:------ --:------ 12:193204 30C9 003 0108C3
1655 # domain_id from 01:/CTL:
1656 # - FF for regular sync messages
1657 # - 00 when responding to a request
1658 # - F8 after binding a device
1660 assert msg.len == 3, f"length is {msg.len}, expecting 3"
1661 assert payload[:2] in ("00", "01", F8, FF) # W/F8
1663 seconds = int(payload[2:6], 16) / 10
1664 next_sync = msg.dtm + td(seconds=seconds)
1666 return {
1667 "remaining_seconds": seconds,
1668 "_next_sync": dt.strftime(next_sync, "%H:%M:%S"),
1669 }
1672# dhw_mode
1673def parser_1f41(payload: str, msg: Message) -> PayDictT._1F41:
1674 """Parse the 1f41 (dhw_mode) packet.
1676 :param payload: The raw hex payload
1677 :type payload: str
1678 :param msg: The message object
1679 :type msg: Message
1680 :return: A dictionary containing DHW mode, activity, and duration/until data
1681 :rtype: PayDictT._1F41
1682 :raises AssertionError: If payload constants or message lengths are invalid.
1683 """
1684 # 053 RP --- 01:145038 18:013393 --:------ 1F41 006 00FF00FFFFFF # no stored DHW
1686 assert payload[4:6] in ZON_MODE_MAP, f"{payload[4:6]} (0xjj)"
1687 assert payload[4:6] == ZON_MODE_MAP.TEMPORARY or msg.len == 6, (
1688 f"{msg!r}: expected length 6"
1689 )
1690 assert payload[4:6] != ZON_MODE_MAP.TEMPORARY or msg.len == 12, (
1691 f"{msg!r}: expected length 12"
1692 )
1693 assert payload[6:12] == "FFFFFF", (
1694 f"{msg!r}: expected FFFFFF instead of '{payload[6:12]}'"
1695 )
1697 result: PayDictT._1F41 = {SZ_MODE: ZON_MODE_MAP.get(payload[4:6])} # type: ignore[typeddict-item]
1698 if payload[2:4] != "FF":
1699 result[SZ_ACTIVE] = {"00": False, "01": True, "FF": None}[payload[2:4]]
1700 # if payload[4:6] == ZON_MODE_MAP.COUNTDOWN:
1701 # result[SZ_UNTIL] = dtm_from_hex(payload[6:12])
1702 if payload[4:6] == ZON_MODE_MAP.TEMPORARY:
1703 result[SZ_UNTIL] = hex_to_dtm(payload[12:24])
1705 return result
1708# programme_config, HVAC
1709def parser_1f70(payload: str, msg: Message) -> dict[str, Any]:
1710 """Parse the 1f70 (programme_config) packet.
1712 :param payload: The raw hex payload
1713 :type payload: str
1714 :param msg: The message object containing context
1715 :type msg: Message
1716 :return: A dictionary containing schedule indices and start times
1717 :rtype: dict[str, Any]
1718 :raises AssertionError: If internal payload constraints are violated.
1719 """
1720 # Seen on Orcon: see 1470, 1F70, 22B0
1722 try:
1723 assert payload[:2] == "00", f"expected 00, not {payload[:2]}"
1724 assert payload[2:4] in ("00", "01"), f"expected (00|01), not {payload[2:4]}"
1725 assert payload[4:8] == "0800", f"expected 0800, not {payload[4:8]}"
1726 assert payload[10:14] == "0000", f"expected 0000, not {payload[10:14]}"
1727 assert msg.verb in (RQ, W_) or payload[14:16] == "15"
1728 assert msg.verb in (I_, RP) or payload[14:16] == "00"
1729 assert msg.verb == RQ or payload[22:24] == "60"
1730 assert msg.verb != RQ or payload[22:24] == "00"
1731 assert msg.verb == RQ or payload[24:26] in ("E4", "E5", "E6"), _INFORM_DEV_MSG
1732 assert msg.verb == RP or payload[26:] == "000000"
1733 assert msg.verb != RP or payload[26:] == "008000"
1735 except AssertionError as err:
1736 _LOGGER.warning(f"{msg!r} < {_INFORM_DEV_MSG} ({err})")
1738 # assert int(payload[16:18], 16) < 7, _INFORM_DEV_MSG
1740 return {
1741 "day_idx": payload[16:18], # depends upon 1470[3:4]?
1742 "setpoint_idx": payload[8:10], # needs to be mod 1470[3:4]?
1743 "start_time": f"{int(payload[18:20], 16):02d}:{int(payload[20:22], 16):02d}",
1744 "fan_speed_wip": payload[24:26], # # E4/E5/E6 / 00(RQ)
1745 "_value_02": payload[2:4], # # 00/01 / 00(RQ)
1746 "_value_04": payload[4:8], # # 0800
1747 "_value_10": payload[10:14], # 0000
1748 "_value_14": payload[14:16], # 15(RP,I) / 00(RQ,W)
1749 "_value_22": payload[22:24], # 60 / 00(RQ)
1750 "_value_26": payload[26:], # # 008000(RP) / 000000(I/RQ/W)
1751 }
1754# rf_bind
1755def parser_1fc9(payload: str, msg: Message) -> PayDictT._1FC9:
1756 """Parse the 1fc9 (rf_bind) packet.
1758 :param payload: The raw hex payload
1759 :type payload: str
1760 :param msg: The message object containing context
1761 :type msg: Message
1762 :return: A dictionary identifying the binding phase (Offer/Accept/Confirm) and bindings
1763 :rtype: PayDictT._1FC9
1764 :raises PacketPayloadInvalid: If the binding format is unknown.
1765 :raises AssertionError: If the payload length or constants are invalid.
1766 """
1768 def _parser(seqx: str) -> list[str]:
1769 if seqx[:2] not in ("90",):
1770 assert (
1771 seqx[6:] == payload[6:12] # [6:12] is repeated
1772 ), f"{seqx[6:]} != {payload[6:12]}" # all with same controller
1773 if seqx[:2] not in (
1774 "21", # HVAC, Nuaire
1775 "63", # HVAC
1776 "65", # HVAC, ClimaRad
1777 "66", # HVAC, Vasco
1778 "67", # HVAC
1779 "6C", # HVAC
1780 "90", # HEAT
1781 F6,
1782 F9,
1783 FA,
1784 FB,
1785 FC,
1786 FF,
1787 ): # or: not in DOMAIN_TYPE_MAP: ??
1788 assert int(seqx[:2], 16) < 16, _INFORM_DEV_MSG
1789 return [seqx[:2], seqx[2:6], hex_id_to_dev_id(seqx[6:])]
1791 if msg.verb == I_ and msg.dst.id in (msg.src.id, ALL_DEV_ADDR.id):
1792 bind_phase = SZ_OFFER
1793 elif msg.verb == W_ and msg.src is not msg.dst:
1794 bind_phase = SZ_ACCEPT
1795 elif msg.verb == I_:
1796 bind_phase = SZ_CONFIRM # len(payload) could be 2 (e.g. 00, 21)
1797 elif msg.verb == RP:
1798 bind_phase = None
1799 else:
1800 raise exc.PacketPayloadInvalid("Unknown binding format")
1802 if len(payload) == 2 and bind_phase == SZ_CONFIRM:
1803 return {SZ_PHASE: bind_phase, SZ_BINDINGS: [[payload]]} # double-bracket OK
1805 assert msg.len >= 6 and msg.len % 6 == 0, msg.len # assuming not RQ
1806 assert msg.verb in (I_, W_, RP), msg.verb # devices will respond to a RQ!
1807 # assert (
1808 # msg.src.id == hex_id_to_dev_id(payload[6:12])
1809 # ), f"{payload[6:12]} ({hex_id_to_dev_id(payload[6:12])})" # NOTE: use_regex
1810 bindings = [
1811 _parser(payload[i : i + 12])
1812 for i in range(0, len(payload), 12)
1813 # if payload[i : i + 2] != "90" # TODO: WIP, what is 90?
1814 ]
1815 return {SZ_PHASE: bind_phase, SZ_BINDINGS: bindings}
1818# unknown_1fca, HVAC?
1819def parser_1fca(payload: str, msg: Message) -> Mapping[str, str]:
1820 """Parse the 1fca packet.
1822 :param payload: The raw hex payload
1823 :type payload: str
1824 :param msg: The message object
1825 :type msg: Message
1826 :return: A mapping of unknown identifiers and associated device IDs
1827 :rtype: Mapping[str, str]
1828 """
1829 # .W --- 30:248208 34:021943 --:------ 1FCA 009 00-01FF-7BC990-FFFFFF # sent x2
1831 return {
1832 "_unknown_0": payload[:2],
1833 "_unknown_1": payload[2:6],
1834 "device_id_0": hex_id_to_dev_id(payload[6:12]),
1835 "device_id_1": hex_id_to_dev_id(payload[12:]),
1836 }
1839# unknown_1fd0, from OTB
1840def parser_1fd0(payload: str, msg: Message) -> dict[str, Any]:
1841 """Parse the 1fd0 (OpenTherm) packet.
1843 :param payload: The raw hex payload
1844 :type payload: str
1845 :param msg: The message object
1846 :type msg: Message
1847 :return: A dictionary containing the raw payload
1848 :rtype: dict[str, Any]
1849 :raises AssertionError: If the payload does not match the expected null string.
1850 """
1851 assert payload == "0000000000000000", _INFORM_DEV_MSG
1853 return {
1854 SZ_PAYLOAD: payload,
1855 }
1858# opentherm_sync, otb_sync
1859def parser_1fd4(payload: str, msg: Message) -> PayDictT._1FD4:
1860 """Parse the 1fd4 (opentherm_sync) packet.
1862 :param payload: The raw hex payload
1863 :type payload: str
1864 :param msg: The message object
1865 :type msg: Message
1866 :return: A dictionary containing the sync ticker value
1867 :rtype: PayDictT._1FD4
1868 """
1869 return {"ticker": int(payload[2:], 16)}
1872# WIP: HVAC auto requests (confirmed for Orcon, others?)
1873def parser_2210(payload: str, msg: Message) -> dict[str, Any]:
1874 """Parse the 2210 (HVAC auto request) packet.
1876 :param payload: The raw hex payload
1877 :type payload: str
1878 :param msg: The message object
1879 :type msg: Message
1880 :return: A dictionary of fan speed, request reason, and unknown flags
1881 :rtype: dict[str, Any]
1882 :raises AssertionError: If payload constants or internal consistency checks fail.
1883 """
1884 try:
1885 assert msg.verb in (RP, I_) or payload == "00"
1886 assert payload[10:12] == payload[38:40], (
1887 f"expected byte 19 {payload[10:12]}, not {payload[38:40]}"
1888 ) # auto requested fan speed %. Identical [38:40] is for supply?
1889 assert payload[20:22] == payload[48:50] and payload[20:22] in (
1890 "00", # idle
1891 "02", # requested by CO2 level/sensor
1892 "03", # requested by humidity level/sensor
1893 ), f"expected req_reason (00|02|03), not {payload[20:22]}"
1894 assert payload[78:80] in (
1895 "00",
1896 "02",
1897 ), f"expected byte 39 (00|02), not {payload[78:80]}"
1898 assert payload[80:82] in (
1899 "01",
1900 "08",
1901 ), f"expected byte 40 (01|08), not {payload[80:82]}"
1902 assert payload[82:] in (
1903 "00",
1904 "40",
1905 ), f"expected byte 41- (00|40), not {payload[82:]}"
1907 except AssertionError as err:
1908 _LOGGER.warning(f"{msg!r} < {_INFORM_DEV_MSG} ({err})")
1910 _req = "IDL"
1911 if payload[20:22] == "02":
1912 _req = "CO2"
1913 elif payload[20:22] == "03":
1914 _req = "HUM"
1916 return {
1917 **parse_exhaust_fan_speed(
1918 payload[10:12]
1919 ), # for Orcon: 29 hex == 41 decimal divided by 2 gives 20.5 (%)
1920 SZ_REQ_REASON: _req,
1921 "unknown_78": payload[78:80],
1922 "unknown_80": payload[80:82],
1923 "unknown_82": payload[82:],
1924 }
1927# now_next_setpoint - Programmer/Hometronics
1928def parser_2249(payload: str, msg: Message) -> dict | list[dict]: # TODO: only dict
1929 """Parse the 2249 (now_next_setpoint) packet.
1931 :param payload: The raw hex payload
1932 :type payload: str
1933 :param msg: The message object
1934 :type msg: Message
1935 :return: A dictionary or list of current/next setpoints and time remaining
1936 :rtype: dict | list[dict]
1937 """
1938 # see: https://github.com/jrosser/honeymon/blob/master/decoder.cpp#L357-L370
1939 # .I --- 23:100224 --:------ 23:100224 2249 007 00-7EFF-7EFF-FFFF
1941 def _parser(seqx: str) -> dict[str, bool | float | int | str | None]:
1942 minutes = int(seqx[10:], 16)
1943 next_setpoint = msg.dtm + td(minutes=minutes)
1944 return {
1945 "setpoint_now": hex_to_temp(seqx[2:6]),
1946 "setpoint_next": hex_to_temp(seqx[6:10]),
1947 "minutes_remaining": minutes,
1948 "_next_setpoint": dt.strftime(next_setpoint, "%H:%M:%S"),
1949 }
1951 # the ST9520C can support two heating zones, so: msg.len in (7, 14)?
1952 if msg._has_array:
1953 return [
1954 {
1955 SZ_ZONE_IDX: payload[i : i + 2],
1956 **_parser(payload[i + 2 : i + 14]),
1957 }
1958 for i in range(0, len(payload), 14)
1959 ]
1961 return _parser(payload)
1964# program_enabled, HVAC
1965def parser_22b0(payload: str, msg: Message) -> dict[str, Any]:
1966 """Parse the 22b0 (program_enabled) packet.
1968 :param payload: The raw hex payload
1969 :type payload: str
1970 :param msg: The message object
1971 :type msg: Message
1972 :return: A dictionary containing the program enabled status
1973 :rtype: dict[str, Any]
1974 """
1975 # Seen on Orcon: see 1470, 1F70, 22B0
1977 # .W --- 37:171871 32:155617 --:------ 22B0 002 0005 # enable, calendar on
1978 # .I --- 32:155617 37:171871 --:------ 22B0 002 0005
1980 # .W --- 37:171871 32:155617 --:------ 22B0 002 0006 # disable, calendar off
1981 # .I --- 32:155617 37:171871 --:------ 22B0 002 0006
1983 return {
1984 "enabled": {"06": False, "05": True}.get(payload[2:4]),
1985 }
1988# setpoint_bounds, TODO: max length = 24?
1989def parser_22c9(payload: str, msg: Message) -> dict | list[dict]: # TODO: only dict
1990 """Parse the 22c9 (setpoint_bounds) packet.
1992 :param payload: The raw hex payload
1993 :type payload: str
1994 :param msg: The message object
1995 :type msg: Message
1996 :return: A dictionary or list containing mode and temperature bounds
1997 :rtype: dict | list[dict]
1998 :raises AssertionError: If the payload length or suffix is unrecognized.
1999 """
2000 # .I --- 02:001107 --:------ 02:001107 22C9 024 00-0834-0A28-01-0108340A2801-0208340A2801-0308340A2801 # noqa: E501
2001 # .I --- 02:001107 --:------ 02:001107 22C9 006 04-0834-0A28-01
2003 # .I --- 21:064743 --:------ 21:064743 22C9 006 00-07D0-0834-02
2004 # .W --- 21:064743 02:250708 --:------ 22C9 006 03-07D0-0834-02
2005 # .I --- 02:250708 21:064743 --:------ 22C9 008 03-07D0-7FFF-020203
2007 # Notes on 008|suffix: only seen as I, only when no array, only as 7FFF(0101|0202)03$
2009 def _parser(seqx: str) -> dict:
2010 assert seqx[10:] in ("01", "02"), f"is {seqx[10:]}, expecting 01 or 02"
2012 return {
2013 SZ_MODE: {"01": "heat", "02": "cool"}[seqx[10:]], # TODO: or action?
2014 SZ_SETPOINT_BOUNDS: (hex_to_temp(seqx[2:6]), hex_to_temp(seqx[6:10])),
2015 } # lower, upper setpoints
2017 if msg._has_array:
2018 return [
2019 {
2020 SZ_UFH_IDX: payload[i : i + 2],
2021 **_parser(payload[i : i + 12]),
2022 }
2023 for i in range(0, len(payload), 12)
2024 ]
2026 assert msg.len != 8 or payload[10:] in ("010103", "020203"), _INFORM_DEV_MSG
2028 return _parser(payload[:12])
2031# unknown_22d0, UFH system mode (heat/cool)
2032def parser_22d0(payload: str, msg: Message) -> dict[str, Any]:
2033 """Parse the 22d0 (UFH system mode) packet.
2035 :param payload: The raw hex payload
2036 :type payload: str
2037 :param msg: The message object
2038 :type msg: Message
2039 :return: A dictionary of UFH index, flags, and active modes
2040 :rtype: dict[str, Any]
2041 :raises AssertionError: If payload constants or flags are invalid.
2042 """
2044 def _parser(seqx: str) -> dict:
2045 # assert seqx[2:4] in ("00", "03", "10", "13", "14"), _INFORM_DEV_MSG
2046 assert seqx[4:6] == "00", _INFORM_DEV_MSG
2047 return {
2048 "idx": seqx[:2],
2049 "_flags": hex_to_flag8(seqx[2:4]),
2050 "cool_mode": bool(int(seqx[2:4], 16) & 0x02),
2051 "heat_mode": bool(int(seqx[2:4], 16) & 0x04),
2052 "is_active": bool(int(seqx[2:4], 16) & 0x10),
2053 "_unknown": payload[4:],
2054 }
2056 if len(payload) == 8:
2057 assert payload[6:] in ("00", "02", "0A"), _INFORM_DEV_MSG
2058 else:
2059 assert payload[4:] == "001E14030020", _INFORM_DEV_MSG
2061 return _parser(payload)
2064# desired boiler setpoint
2065def parser_22d9(payload: str, msg: Message) -> PayDictT._22D9:
2066 """Parse the 22d9 (desired boiler setpoint) packet.
2068 :param payload: The raw hex payload
2069 :type payload: str
2070 :param msg: The message object
2071 :type msg: Message
2072 :return: A dictionary containing the target temperature setpoint
2073 :rtype: PayDictT._22D9
2074 """
2075 return {SZ_SETPOINT: hex_to_temp(payload[2:6])}
2078# WIP: unknown, HVAC
2079def parser_22e0(payload: str, msg: Message) -> Mapping[str, float | None]:
2080 """Parse the 22e0 packet.
2082 :param payload: The raw hex payload
2083 :type payload: str
2084 :param msg: The message object
2085 :type msg: Message
2086 :return: A mapping of percentage values extracted from the payload
2087 :rtype: Mapping[str, float | None]
2088 :raises AssertionError: If a value exceeds the expected 200 threshold.
2089 :raises ValueError: If the payload cannot be parsed as percentages.
2090 """
2092 # RP --- 32:155617 18:005904 --:------ 22E0 004 00-34-A0-1E
2093 # RP --- 32:153258 18:005904 --:------ 22E0 004 00-64-A0-1E
2094 def _parser(seqx: str) -> float:
2095 assert int(seqx, 16) <= 200 or seqx == "E6" # only for 22E0, not 22E5/22E9
2096 return int(seqx, 16) / 200
2098 try:
2099 return {
2100 f"percent_{i}": hex_to_percent(payload[i : i + 2])
2101 for i in range(2, len(payload), 2)
2102 }
2103 except ValueError:
2104 return {
2105 "percent_2": hex_to_percent(payload[2:4]),
2106 "percent_4": _parser(payload[4:6]),
2107 "percent_6": hex_to_percent(payload[6:8]),
2108 }
2111# WIP: unknown, HVAC
2112def parser_22e5(payload: str, msg: Message) -> Mapping[str, float | None]:
2113 """Parse the 22e5 packet.
2115 :param payload: The raw hex payload
2116 :type payload: str
2117 :param msg: The message object
2118 :type msg: Message
2119 :return: A mapping of percentage values extracted from the payload
2120 :rtype: Mapping[str, float | None]
2121 """
2122 # RP --- 32:153258 18:005904 --:------ 22E5 004 00-96-C8-14
2123 # RP --- 32:155617 18:005904 --:------ 22E5 004 00-72-C8-14
2125 return parser_22e0(payload, msg)
2128# WIP: unknown, HVAC
2129def parser_22e9(payload: str, msg: Message) -> Mapping[str, float | str | None]:
2130 """Parse the 22e9 packet.
2132 :param payload: The raw hex payload
2133 :type payload: str
2134 :param msg: The message object
2135 :type msg: Message
2136 :return: A mapping of unknown identifiers or percentage values
2137 :rtype: Mapping[str, float | str | None]
2138 """
2139 if payload[2:4] == "01":
2140 return {
2141 "unknown_4": payload[4:6],
2142 "unknown_6": payload[6:8],
2143 }
2144 return parser_22e0(payload, msg)
2147# fan_speed (switch_mode), HVAC
2148def parser_22f1(payload: str, msg: Message) -> dict[str, Any]:
2149 """Parse the 22f1 (fan_speed) packet.
2151 :param payload: The raw hex payload
2152 :type payload: str
2153 :param msg: The message object containing context
2154 :type msg: Message
2155 :return: A dictionary containing the fan mode, scheme, and internal indices
2156 :rtype: dict[str, Any]
2157 :raises AssertionError: If the fan mode or mode set is unrecognized.
2158 """
2159 try:
2160 assert payload[0:2] in ("00", "63")
2161 assert not payload[4:] or int(payload[2:4], 16) <= int(payload[4:], 16), (
2162 "mode_idx > mode_max"
2163 )
2164 except AssertionError as err:
2165 _LOGGER.warning(f"{msg!r} < {_INFORM_DEV_MSG} ({err})")
2167 if msg._addrs[0] == NON_DEV_ADDR: # and payload[4:6] == "04":
2168 from .ramses import _22F1_MODE_ITHO as _22F1_FAN_MODE # TODO: only if 04
2170 _22f1_mode_set: tuple[str, ...] = ("", "04")
2171 _22f1_scheme = "itho"
2173 # elif msg._addrs[0] == NON_DEV_ADDR: # and payload[4:6] == "04":
2174 # _22F1_FAN_MODE = {
2175 # f"{x:02X}": f"speed_{x}" for x in range(int(payload[4:6], 16) + 1)
2176 # } | {"00": "off"}
2178 # _22f1_mode_set = (payload[4:6], )
2179 # _22f1_scheme = "itho_2"
2181 elif payload[4:6] == "0A":
2182 from .ramses import _22F1_MODE_NUAIRE as _22F1_FAN_MODE
2184 _22f1_mode_set = ("", "0A")
2185 _22f1_scheme = "nuaire"
2187 elif payload[4:6] == "06":
2188 from .ramses import _22F1_MODE_VASCO as _22F1_FAN_MODE
2190 _22f1_mode_set = (
2191 "",
2192 "00",
2193 "06",
2194 ) # "00" seen incidentally on a ClimaRad 4-button remote: OFF?
2195 _22f1_scheme = "vasco"
2197 else:
2198 from .ramses import _22F1_MODE_ORCON as _22F1_FAN_MODE
2200 _22f1_mode_set = ("", "04", "07", "0B") # 0B?
2201 _22f1_scheme = "orcon"
2203 try:
2204 assert payload[2:4] in _22F1_FAN_MODE, f"unknown fan_mode: {payload[2:4]}"
2205 assert payload[4:6] in _22f1_mode_set, f"unknown mode_set: {payload[4:6]}"
2206 except AssertionError as err:
2207 _LOGGER.warning(f"{msg!r} < {_INFORM_DEV_MSG} ({err})")
2209 return {
2210 SZ_FAN_MODE: _22F1_FAN_MODE.get(payload[2:4], f"unknown_{payload[2:4]}"),
2211 "_scheme": _22f1_scheme,
2212 "_mode_idx": f"{int(payload[2:4], 16) & 0x0F:02X}",
2213 "_mode_max": payload[4:6] or None,
2214 # "_payload": payload,
2215 }
2218# WIP: unknown, HVAC (flow rate?)
2219def parser_22f2(payload: str, msg: Message) -> list: # TODO: only dict
2220 """Parse the 22f2 (HVAC flow rate) packet.
2222 :param payload: The raw hex payload
2223 :type payload: str
2224 :param msg: The message object containing context
2225 :type msg: Message
2226 :return: A list of dictionaries containing HVAC indices and measurements
2227 :rtype: list
2228 """
2229 # ClimeRad minibox uses 22F2 for speed feedback
2231 def _parser(seqx: str) -> dict:
2232 assert seqx[:2] in ("00", "01"), f"is {seqx[:2]}, expecting 00/01"
2234 return {
2235 "hvac_idx": seqx[:2],
2236 "measure": hex_to_temp(seqx[2:]),
2237 }
2239 return [_parser(payload[i : i + 6]) for i in range(0, len(payload), 6)]
2242# fan_boost, HVAC
2243def parser_22f3(payload: str, msg: Message) -> dict[str, Any]:
2244 """Parse the 22f3 (fan_boost) packet.
2246 :param payload: The raw hex payload
2247 :type payload: str
2248 :param msg: The message object containing context
2249 :type msg: Message
2250 :return: A dictionary of boost settings, duration, and fan modes
2251 :rtype: dict[str, Any]
2252 :raises AssertionError: If internal payload structure is malformed.
2253 """
2254 # NOTE: for boost timer for high
2255 try:
2256 assert msg.len <= 7 or payload[14:] == "0000", f"byte 7: {payload[14:]}"
2257 except AssertionError as err:
2258 _LOGGER.warning(f"{msg!r} < {_INFORM_DEV_MSG} ({err})")
2260 new_speed = { # from now, until timer expiry
2261 0x00: "fan_boost", # set fan off, or 'boost' mode?
2262 0x01: "per_request?", # set fan as per payload[6:10]?
2263 0x02: "per_request", # set fan as per payload[6:10]
2264 }.get(int(payload[2:4], 0x10) & 0x07) # 0b0000-0111
2266 fallback_speed = { # after timer expiry
2267 0x00: "per_vent_speed", # set fan as per current fan mode
2268 0x08: "fan_off", # set fan off?
2269 0x10: "per_request", # set fan as per payload[10:14]
2270 0x18: "per_vent_speed?", # set fan as per current fan mode/speed?
2271 }.get(int(payload[2:4], 0x10) & 0x38) # 0b0011-1000
2273 units = {
2274 0x00: "minutes",
2275 0x40: "hours",
2276 0x80: "index", # TODO: days, day-of-week, day-of-month?
2277 }.get(int(payload[2:4], 0x10) & 0xC0) # 0b1100-0000
2279 duration = int(payload[4:6], 16) * 60 if units == "hours" else int(payload[4:6], 16)
2280 result = {}
2282 if msg.len >= 3:
2283 result = {
2284 "minutes" if units != "index" else "index": duration,
2285 "flags": hex_to_flag8(payload[2:4]),
2286 "new_speed_mode": new_speed,
2287 "fallback_speed_mode": fallback_speed,
2288 }
2290 if msg._addrs[0] == NON_DEV_ADDR and msg.len <= 3:
2291 result["_scheme"] = "itho"
2293 if msg.len >= 5 and payload[6:10] != "0000": # new speed
2294 mode_info = parser_22f1(f"00{payload[6:10]}", msg)
2295 result["_scheme"] = mode_info.get("_scheme")
2296 result["fan_mode"] = mode_info.get("fan_mode")
2298 if msg.len >= 7 and payload[10:14] != "0000": # fallback speed
2299 mode_info = parser_22f1(f"00{payload[10:14]}", msg)
2300 result["fallback_fan_mode"] = mode_info.get("fan_mode")
2302 return result
2305# WIP: unknown, HVAC
2306def parser_22f4(payload: str, msg: Message) -> dict[str, Any]:
2307 """Parse the 22f4 packet.
2309 :param payload: The raw hex payload
2310 :type payload: str
2311 :param msg: The message object containing context
2312 :type msg: Message
2313 :return: A dictionary containing interpreted fan mode and rate
2314 :rtype: dict[str, Any]
2315 :raises AssertionError: If the extracted mode or rate is invalid.
2316 """
2317 if msg.len == 13 and payload[14:] == "000000000000":
2318 # ClimaRad Ventura fan & remote
2319 _pl = payload[:4] + payload[12:14] if payload[10:12] == "00" else payload[8:14]
2320 else:
2321 _pl = payload[:6]
2323 MODE_LOOKUP = {
2324 0x00: "off",
2325 0x20: "paused",
2326 0x40: "auto",
2327 0x60: "manual",
2328 }
2329 mode = int(_pl[2:4], 16) & 0x60
2330 assert mode in MODE_LOOKUP, mode
2332 RATE_LOOKUP = {
2333 0x00: "speed 0", # "off"?,
2334 0x01: "speed 1", # "low", or trickle?
2335 0x02: "speed 2", # "medium-low", or low?
2336 0x03: "speed 3", # "medium",
2337 0x04: "speed 4", # "medium-high", or high?
2338 0x05: "boost", # "boost", aka purge?
2339 }
2340 rate = int(_pl[4:6], 16) & 0x03
2341 assert mode != 0x60 or rate in RATE_LOOKUP, rate
2343 return {
2344 SZ_FAN_MODE: MODE_LOOKUP[mode],
2345 SZ_FAN_RATE: RATE_LOOKUP.get(rate),
2346 }
2349# bypass_mode, HVAC
2350def parser_22f7(payload: str, msg: Message) -> dict[str, Any]:
2351 """Parse the 22f7 (bypass_mode) packet.
2353 :param payload: The raw hex payload
2354 :type payload: str
2355 :param msg: The message object containing context
2356 :type msg: Message
2357 :return: A dictionary of bypass mode, state, and position
2358 :rtype: dict[str, Any]
2359 """
2360 result = {
2361 SZ_BYPASS_MODE: {"00": "off", "C8": "on", "FF": "auto"}.get(payload[2:4]),
2362 }
2363 if msg.verb != W_ or payload[4:] not in ("", "EF"):
2364 result[SZ_BYPASS_STATE] = {"00": "off", "C8": "on"}.get(payload[4:])
2365 result.update(**parse_bypass_position(payload[4:])) # type: ignore[arg-type]
2367 return result
2370# WIP: unknown_mode, HVAC
2371def parser_22f8(payload: str, msg: Message) -> dict[str, Any]:
2372 """Parse the 22f8 packet.
2374 :param payload: The raw hex payload
2375 :type payload: str
2376 :param msg: The message object containing context
2377 :type msg: Message
2378 :return: A dictionary of raw internal values
2379 :rtype: dict[str, Any]
2380 """
2381 # from: https://github.com/arjenhiemstra/ithowifi/blob/master/software/NRG_itho_wifi/src/IthoPacket.h
2383 # message command bytes specific for AUTO RFT (536-0150)
2384 # ithoMessageAUTORFTAutoNightCommandBytes[] = {0x22, 0xF8, 0x03, 0x63, 0x02, 0x03};
2385 # .W --- 32:111111 37:111111 --:------ 22F8 003 630203
2387 # message command bytes specific for DemandFlow remote (536-0146)
2388 # ithoMessageDFLowCommandBytes[] = {0x22, 0xF8, 0x03, 0x00, 0x01, 0x02};
2389 # ithoMessageDFHighCommandBytes[] = {0x22, 0xF8, 0x03, 0x00, 0x02, 0x02};
2391 return {
2392 "value_02": payload[2:4],
2393 "value_04": payload[4:6],
2394 }
2397# setpoint (of device/zones)
2398def parser_2309(
2399 payload: str, msg: Message
2400) -> PayDictT._2309 | list[PayDictT._2309] | PayDictT.EMPTY:
2401 """Parse the 2309 (setpoint) packet.
2403 :param payload: The raw hex payload
2404 :type payload: str
2405 :param msg: The message object containing context
2406 :type msg: Message
2407 :return: A setpoint dictionary, list of setpoints, or an empty dictionary
2408 :rtype: PayDictT._2309 | list[PayDictT._2309] | PayDictT.EMPTY
2409 """
2410 if msg._has_array:
2411 return [
2412 {
2413 SZ_ZONE_IDX: payload[i : i + 2],
2414 SZ_SETPOINT: hex_to_temp(payload[i + 2 : i + 6]),
2415 }
2416 for i in range(0, len(payload), 6)
2417 ]
2419 # RQ --- 22:131874 01:063844 --:------ 2309 003 020708
2420 if msg.verb == RQ and msg.len == 1: # some RQs have a payload (why?)
2421 return {}
2423 return {SZ_SETPOINT: hex_to_temp(payload[2:])}
2426# zone_mode # TODO: messy
2427def parser_2349(payload: str, msg: Message) -> PayDictT._2349 | PayDictT.EMPTY:
2428 """Parse the 2349 (zone_mode) packet.
2430 :param payload: The raw hex payload
2431 :type payload: str
2432 :param msg: The message object containing context
2433 :type msg: Message
2434 :return: A dictionary containing zone mode, setpoint, and override details
2435 :rtype: PayDictT._2349 | PayDictT.EMPTY
2436 :raises AssertionError: If the message length or mode is invalid.
2437 """
2438 # RQ --- 34:225071 30:258557 --:------ 2349 001 00
2439 # RP --- 30:258557 34:225071 --:------ 2349 013 007FFF00FFFFFFFFFFFFFFFFFF
2440 # RP --- 30:253184 34:010943 --:------ 2349 013 00064000FFFFFF00110E0507E5
2441 # .I --- 10:067219 --:------ 10:067219 2349 004 00000001
2443 if msg.verb == RQ and msg.len <= 2: # some RQs have a payload (why?)
2444 return {}
2446 assert msg.len in (7, 13), f"expected len 7,13, got {msg.len}"
2448 assert payload[6:8] in ZON_MODE_MAP, f"unknown zone_mode: {payload[6:8]}"
2449 result: PayDictT._2349 = {
2450 SZ_MODE: ZON_MODE_MAP.get(payload[6:8]), # type: ignore[typeddict-item]
2451 SZ_SETPOINT: hex_to_temp(payload[2:6]),
2452 }
2454 if msg.len >= 7: # has a dtm if mode == "04"
2455 if payload[8:14] == "FF" * 3: # 03/FFFFFF OK if W?
2456 assert payload[6:8] != ZON_MODE_MAP.COUNTDOWN, f"{payload[6:8]} (0x00)"
2457 else:
2458 assert payload[6:8] == ZON_MODE_MAP.COUNTDOWN, f"{payload[6:8]} (0x01)"
2459 result[SZ_DURATION] = int(payload[8:14], 16)
2461 if msg.len >= 13:
2462 if payload[14:] == "FF" * 6:
2463 assert payload[6:8] in (
2464 ZON_MODE_MAP.FOLLOW,
2465 ZON_MODE_MAP.PERMANENT,
2466 ), f"{payload[6:8]} (0x02)"
2467 result[SZ_UNTIL] = None # TODO: remove?
2468 else:
2469 assert payload[6:8] != ZON_MODE_MAP.PERMANENT, f"{payload[6:8]} (0x03)"
2470 result[SZ_UNTIL] = hex_to_dtm(payload[14:26])
2472 return result
2475# unknown_2389, from 03:
2476def parser_2389(payload: str, msg: Message) -> dict[str, Any]:
2477 """Parse the 2389 packet.
2479 :param payload: The raw hex payload
2480 :type payload: str
2481 :param msg: The message object containing context
2482 :type msg: Message
2483 :return: A dictionary containing an unknown temperature measurement
2484 :rtype: dict[str, Any]
2485 """
2486 return {
2487 "_unknown": hex_to_temp(payload[2:6]),
2488 }
2491# unknown_2400, from OTB, FAN
2492def parser_2400(payload: str, msg: Message) -> dict[str, Any]:
2493 """Parse the 2400 packet.
2495 :param payload: The raw hex payload
2496 :type payload: str
2497 :param msg: The message object containing context
2498 :type msg: Message
2499 :return: A dictionary containing the raw payload
2500 :rtype: dict[str, Any]
2501 """
2502 # RP --- 32:155617 18:005904 --:------ 2400 045 00001111-1010929292921110101020110010000080100010100000009191111191910011119191111111111100 # Orcon FAN
2503 # RP --- 10:048122 18:006402 --:------ 2400 004 0000000F
2504 # assert payload == "0000000F", _INFORM_DEV_MSG
2506 return {
2507 SZ_PAYLOAD: payload,
2508 }
2511# unknown_2401, from OTB
2512def parser_2401(payload: str, msg: Message) -> dict[str, Any]:
2513 """Parse the 2401 packet.
2515 :param payload: The raw hex payload
2516 :type payload: str
2517 :param msg: The message object containing context
2518 :type msg: Message
2519 :return: A dictionary of decoded flags and valve demand
2520 :rtype: dict[str, Any]
2521 :raises AssertionError: If payload constants or bit flags are unrecognized.
2522 """
2523 try:
2524 assert payload[2:4] == "00", f"byte 1: {payload[2:4]}"
2525 assert int(payload[4:6], 16) & 0b11110000 == 0, (
2526 f"byte 2: {hex_to_flag8(payload[4:6])}"
2527 )
2528 assert int(payload[6:], 0x10) <= 200, f"byte 3: {payload[6:]}"
2529 except AssertionError as err:
2530 _LOGGER.warning(f"{msg!r} < {_INFORM_DEV_MSG} ({err})")
2532 return {
2533 "_flags_2": hex_to_flag8(payload[4:6]),
2534 **parse_valve_demand(payload[6:8]), # ~3150|FC
2535 "_value_2": int(payload[4:6], 0x10),
2536 }
2539# unknown_2410, from OTB, FAN
2540def parser_2410(payload: str, msg: Message) -> dict[str, Any]:
2541 """Parse the 2410 packet.
2543 :param payload: The raw hex payload
2544 :type payload: str
2545 :param msg: The message object containing context
2546 :type msg: Message
2547 :return: A dictionary of current, min, and max values and metadata
2548 :rtype: dict[str, Any]
2549 :raises AssertionError: If the payload format does not match expected constants.
2550 """
2551 # RP --- 10:048122 18:006402 --:------ 2410 020 00-00000000-00000000-00000001-00000001-00000C # OTB
2552 # RP --- 32:155617 18:005904 --:------ 2410 020 00-00003EE8-00000000-FFFFFFFF-00000000-1002A6 # Orcon Fan
2554 def unstuff(seqx: str) -> tuple:
2555 val = int(seqx, 16)
2556 # if val & 0x40:
2557 # raise TypeError
2558 signed = bool(val & 0x80)
2559 length = (val >> 3 & 0x07) or 1
2560 d_type = {0b000: "a", 0b001: "b", 0b010: "c", 0b100: "d"}.get(
2561 val & 0x07, val & 0x07
2562 )
2563 return signed, length, d_type
2565 try:
2566 assert payload[:6] == "00" * 3, _INFORM_DEV_MSG
2567 assert payload[10:18] == "00" * 4, _INFORM_DEV_MSG
2568 assert payload[18:26] in ("00000001", "FFFFFFFF"), _INFORM_DEV_MSG
2569 assert payload[26:34] in ("00000001", "00000000"), _INFORM_DEV_MSG
2570 except AssertionError as err:
2571 _LOGGER.warning(f"{msg!r} < {_INFORM_DEV_MSG} ({err})")
2573 return {
2574 "tail": payload[34:],
2575 "xxx_34": unstuff(payload[34:36]),
2576 "xxx_36": unstuff(payload[36:38]),
2577 "xxx_38": unstuff(payload[38:]),
2578 "cur_value": payload[2:10],
2579 "min_value": payload[10:18],
2580 "max_value": payload[18:26],
2581 "oth_value": payload[26:34],
2582 }
2585# fan_params, HVAC
2586def parser_2411(payload: str, msg: Message) -> dict[str, Any]:
2587 """Parse the 2411 (fan_params) packet.
2589 :param payload: The raw hex payload
2590 :type payload: str
2591 :param msg: The message object containing context
2592 :type msg: Message
2593 :return: A dictionary containing the parameter ID, description, and decoded value
2594 :rtype: dict[str, Any]
2595 """
2596 # There is a relationship between 0001 and 2411
2597 # RQ --- 37:171871 32:155617 --:------ 0001 005 0020000A04
2598 # RP --- 32:155617 37:171871 --:------ 0001 008 0020000A004E0B00 # 0A -> 2411|4E
2599 # RQ --- 37:171871 32:155617 --:------ 2411 003 00004E # 11th menu option (i.e. 0x0A)
2600 # RP --- 32:155617 37:171871 --:------ 2411 023 00004E460000000001000000000000000100000001A600
2602 def counter(x: str) -> int:
2603 return int(x, 16)
2605 def centile(x: str) -> float:
2606 return int(x, 16) / 10
2608 _2411_DATA_TYPES = {
2609 "00": (2, counter), # 4E (0-1), 54 (15-60)
2610 "01": (2, centile), # 52 (0.0-25.0) (%)
2611 "0F": (2, hex_to_percent), # xx (0.0-1.0) (%)
2612 "10": (4, counter), # 31 (0-1800) (days)
2613 # "20": (4, counter), # unknown data type, uncomment when we have more info
2614 "92": (4, hex_to_temp), # 75 (0-30) (C)
2615 } # TODO: _2411_TYPES.get(payload[8:10], (8, no_op))
2617 # Handle unknown parameters gracefully instead of asserting
2618 param_id = payload[4:6]
2619 try:
2620 description = _2411_TABLE.get(param_id, "Unknown")
2621 if param_id not in _2411_TABLE:
2622 _LOGGER.warning(
2623 f"2411 message received with unknown parameter ID: {param_id}. "
2624 f"This parameter is not in the known parameter schema. "
2625 f"Message: {msg!r}"
2626 )
2627 except Exception as err:
2628 _LOGGER.warning(f"Error looking up 2411 parameter {param_id}: {err}")
2629 description = "Unknown"
2631 result = {
2632 "parameter": param_id,
2633 "description": description,
2634 }
2636 if msg.verb == RQ:
2637 return result
2639 try:
2640 # Handle unknown data types gracefully instead of asserting
2641 if payload[8:10] not in _2411_DATA_TYPES:
2642 warningmsg = (
2643 f"{msg!r} < {_INFORM_DEV_MSG} (param {param_id} has unknown data_type: {payload[8:10]}). "
2644 f"This parameter uses an unrecognized data type. "
2645 f"Please report this packet and any context about what changed on your system."
2646 )
2647 # Return partial result with raw hex values for unknown data types
2648 if msg.len == 9:
2649 result |= {
2650 "value": f"0x{payload[10:18]}", # Raw hex value
2651 "_value_06": payload[6:10],
2652 "_unknown_data_type": payload[8:10],
2653 }
2654 else:
2655 result |= {
2656 "value": f"0x{payload[10:18]}", # Raw hex value
2657 "_value_06": payload[6:10],
2658 "min_value": f"0x{payload[18:26]}", # Raw hex value
2659 "max_value": f"0x{payload[26:34]}", # Raw hex value
2660 "precision": f"0x{payload[34:42]}", # Raw hex value
2661 "_value_42": payload[42:],
2662 # Flexible footer - capture everything after precision
2663 }
2664 _LOGGER.warning(f"{warningmsg}. Found values: {result}")
2665 return result
2667 # Handle known data types normally
2668 length, parser = _2411_DATA_TYPES[payload[8:10]]
2669 result |= {
2670 "value": parser(payload[10:18][-length:]), # type: ignore[operator]
2671 "_value_06": payload[6:10],
2672 }
2674 if msg.len == 9:
2675 return result
2677 return (
2678 result
2679 | {
2680 "min_value": parser(payload[18:26][-length:]), # type: ignore[operator]
2681 "max_value": parser(payload[26:34][-length:]), # type: ignore[operator]
2682 "precision": parser(payload[34:42][-length:]), # type: ignore[operator]
2683 "_value_42": payload[42:],
2684 # Flexible footer - capture everything after precision
2685 # eg. older Orcon models may have a footer of 2 bytes
2686 }
2687 )
2688 except Exception as err:
2689 _LOGGER.warning(f"{msg!r} < {_INFORM_DEV_MSG} (Error parsing 2411: {err})")
2690 # Return partial result for any parsing errors
2691 result["value"] = f"0x{payload[10:18]}" # Raw hex value
2692 result["_parse_error"] = f"Parser error: {err}"
2693 return result
2696# unknown_2420, from OTB
2697def parser_2420(payload: str, msg: Message) -> dict[str, Any]:
2698 """Parse the 2420 (OpenTherm) packet.
2700 :param payload: The raw hex payload
2701 :type payload: str
2702 :param msg: The message object containing context
2703 :type msg: Message
2704 :return: A dictionary containing the raw payload
2705 :rtype: dict[str, Any]
2706 :raises AssertionError: If the payload does not match the expected constant string.
2707 """
2708 assert payload == "00000010" + "00" * 34, _INFORM_DEV_MSG
2710 return {
2711 SZ_PAYLOAD: payload,
2712 }
2715# _state (of cooling?), from BDR91T, hometronics CTL
2716def parser_2d49(payload: str, msg: Message) -> PayDictT._2D49:
2717 """Parse the 2d49 packet.
2719 :param payload: The raw hex payload
2720 :type payload: str
2721 :param msg: The message object containing context
2722 :type msg: Message
2723 :return: A dictionary containing the boolean state
2724 :rtype: PayDictT._2D49
2725 :raises AssertionError: If the payload state bytes are unrecognized.
2726 """
2727 assert payload[2:] in ("0000", "00FF", "C800", "C8FF"), _INFORM_DEV_MSG
2729 return {
2730 "state": hex_to_bool(payload[2:4]),
2731 }
2734# system_mode
2735def parser_2e04(payload: str, msg: Message) -> PayDictT._2E04:
2736 """Parse the 2e04 (system_mode) packet.
2738 :param payload: The raw hex payload
2739 :type payload: str
2740 :param msg: The message object containing context
2741 :type msg: Message
2742 :return: A dictionary containing the system mode and optional duration
2743 :rtype: PayDictT._2E04
2744 :raises AssertionError: If the system mode or packet length is invalid.
2745 """
2746 # if msg.verb == W_:
2748 # .I --— 01:020766 --:------ 01:020766 2E04 016 FFFFFFFFFFFFFF0007FFFFFFFFFFFF04 # Manual # noqa: E501
2749 # .I --— 01:020766 --:------ 01:020766 2E04 016 FFFFFFFFFFFFFF0000FFFFFFFFFFFF04 # Automatic/times # noqa: E501
2751 if msg.len == 8: # evohome
2752 assert payload[:2] in SYS_MODE_MAP, f"Unknown system mode: {payload[:2]}"
2754 elif msg.len == 16: # hometronics, lifestyle ID:
2755 assert 0 <= int(payload[:2], 16) <= 15 or payload[:2] == FF, payload[:2]
2756 assert payload[16:18] in (SYS_MODE_MAP.AUTO, SYS_MODE_MAP.CUSTOM), payload[
2757 16:18
2758 ]
2759 assert payload[30:32] == SYS_MODE_MAP.DAY_OFF, payload[30:32]
2760 # assert False
2762 else:
2763 # msg.len in (8, 16) # evohome 8, hometronics 16
2764 assert False, f"Packet length is {msg.len} (expecting 8, 16)"
2766 result: PayDictT._2E04 = {SZ_SYSTEM_MODE: SYS_MODE_MAP[payload[:2]]}
2767 if payload[:2] not in (
2768 SYS_MODE_MAP.AUTO,
2769 SYS_MODE_MAP.HEAT_OFF,
2770 SYS_MODE_MAP.AUTO_WITH_RESET,
2771 ):
2772 result.update(
2773 {SZ_UNTIL: hex_to_dtm(payload[2:14]) if payload[14:16] != "00" else None}
2774 )
2775 return result # TODO: double-check the final "00"
2778# presence_detect, HVAC sensor, or Timed boost for Vasco D60
2779def parser_2e10(payload: str, msg: Message) -> dict[str, Any]:
2780 """Parse the 2e10 packet.
2782 :param payload: The raw hex payload
2783 :type payload: str
2784 :param msg: The message object containing context
2785 :type msg: Message
2786 :return: A dictionary defining if presence is detected
2787 :rtype: dict[str, Any]
2788 :raises AssertionError: If the payload is not in a recognized format.
2789 """
2790 assert payload in ("0001", "000000", "000100"), _INFORM_DEV_MSG
2791 presence: int = int(payload[2:4])
2792 return {
2793 "presence_detected": bool(presence),
2794 "_unknown_4": payload[4:],
2795 }
2798# current temperature (of device, zone/s)
2799def parser_30c9(payload: str, msg: Message) -> dict | list[dict]: # TODO: only dict
2800 """Parse the 30c9 (temperature) packet.
2802 :param payload: The raw hex payload
2803 :type payload: str
2804 :param msg: The message object containing context
2805 :type msg: Message
2806 :return: A dictionary or list of temperatures by zone index
2807 :rtype: dict | list[dict]
2808 """
2809 if msg._has_array:
2810 return [
2811 {
2812 SZ_ZONE_IDX: payload[i : i + 2],
2813 SZ_TEMPERATURE: hex_to_temp(payload[i + 2 : i + 6]),
2814 }
2815 for i in range(0, len(payload), 6)
2816 ]
2818 return {SZ_TEMPERATURE: hex_to_temp(payload[2:])}
2821# ufc_demand, HVAC (Itho autotemp / spider)
2822def parser_3110(payload: str, msg: Message) -> PayDictT._3110:
2823 """Parse the 3110 (ufc_demand) packet.
2825 :param payload: The raw hex payload
2826 :type payload: str
2827 :param msg: The message object containing context
2828 :type msg: Message
2829 :return: A dictionary containing the operating mode and demand percentage
2830 :rtype: PayDictT._3110
2831 :raises AssertionError: If payload constants or demand values are invalid.
2832 """
2833 # .I --- 02:250708 --:------ 02:250708 3110 004 0000C820 # cooling, 100%
2834 # .I --- 21:042656 --:------ 21:042656 3110 004 00000010 # heating, 0%
2836 SZ_COOLING = "cooling"
2837 SZ_DISABLE = "disabled"
2838 SZ_HEATING = "heating"
2839 SZ_UNKNOWN = "unknown"
2841 try:
2842 assert payload[2:4] == "00", f"byte 1: {payload[2:4]}" # ?circuit_idx?
2843 assert int(payload[4:6], 16) <= 200, f"byte 2: {payload[4:6]}"
2844 assert payload[6:] in ("00", "10", "20"), f"byte 3: {payload[6:]}"
2845 assert payload[6:] in ("10", "20") or payload[4:6] == "00", (
2846 f"byte 3: {payload[6:]}"
2847 )
2848 except AssertionError as err:
2849 _LOGGER.warning(f"{msg!r} < {_INFORM_DEV_MSG} ({err})")
2851 mode = {
2852 0x00: SZ_DISABLE,
2853 0x10: SZ_HEATING,
2854 0x20: SZ_COOLING,
2855 }.get(int(payload[6:8], 16) & 0x30, SZ_UNKNOWN)
2857 if mode not in (SZ_COOLING, SZ_HEATING):
2858 return {SZ_MODE: mode}
2860 return {SZ_MODE: mode, SZ_DEMAND: hex_to_percent(payload[4:6])}
2863# unknown_3120, from STA, FAN
2864def parser_3120(payload: str, msg: Message) -> dict[str, Any]:
2865 """Parse the 3120 packet.
2867 :param payload: The raw hex payload
2868 :type payload: str
2869 :param msg: The message object containing context
2870 :type msg: Message
2871 :return: A dictionary of raw internal segments
2872 :rtype: dict[str, Any]
2873 :raises AssertionError: If individual byte segments fail validation.
2874 """
2875 # .I --- 34:136285 --:------ 34:136285 3120 007 0070B0000000FF # every ~3:45:00!
2876 # RP --- 20:008749 18:142609 --:------ 3120 007 0070B000009CFF
2877 # .I --- 37:258565 --:------ 37:258565 3120 007 0080B0010003FF
2879 try:
2880 assert payload[:2] == "00", f"byte 0: {payload[:2]}"
2881 assert payload[2:4] in ("00", "70", "80"), f"byte 1: {payload[2:4]}"
2882 assert payload[4:6] == "B0", f"byte 2: {payload[4:6]}"
2883 assert payload[6:8] in ("00", "01"), f"byte 3: {payload[6:8]}"
2884 assert payload[8:10] == "00", f"byte 4: {payload[8:10]}"
2885 assert payload[10:12] in ("00", "03", "0A", "9C"), f"byte 5: {payload[10:12]}"
2886 assert payload[12:] == "FF", f"byte 6: {payload[12:]}"
2887 except AssertionError as err:
2888 _LOGGER.warning(f"{msg!r} < {_INFORM_DEV_MSG} ({err})")
2890 return {
2891 "unknown_0": payload[2:10],
2892 "unknown_5": payload[10:12],
2893 "unknown_2": payload[12:],
2894 }
2897# WIP: unknown, HVAC
2898def parser_313e(payload: str, msg: Message) -> dict[str, Any]:
2899 """Parse the 313e packet.
2901 :param payload: The raw hex payload
2902 :type payload: str
2903 :param msg: The message object containing context
2904 :type msg: Message
2905 :return: A dictionary containing calculated Zulu time and raw internal values
2906 :rtype: dict[str, Any]
2907 :raises AssertionError: If the payload prefix or expected constant suffix is invalid.
2908 """
2909 assert payload[:2] == "00"
2910 assert payload[12:] == "003C800000"
2912 result = (
2913 msg.dtm - td(seconds=int(payload[10:12], 16), minutes=int(payload[2:10], 16))
2914 ).isoformat()
2916 return {
2917 "zulu": result,
2918 "value_02": payload[2:10],
2919 "value_10": payload[10:12],
2920 "value_12": payload[12:],
2921 }
2924# datetime
2925def parser_313f(payload: str, msg: Message) -> PayDictT._313F: # TODO: look for TZ
2926 """Parse the 313f (datetime) packet.
2928 :param payload: The raw hex payload
2929 :type payload: str
2930 :param msg: The message object containing context
2931 :type msg: Message
2932 :return: A dictionary containing the datetime and DST flag
2933 :rtype: PayDictT._313F
2934 :raises AssertionError: If the payload context is unexpected for the source device type.
2935 """
2936 # 2020-03-28T03:59:21.315178 045 RP --- 01:158182 04:136513 --:------ 313F 009 00FC3500A41C0307E4
2937 # 2020-03-29T04:58:30.486343 045 RP --- 01:158182 04:136485 --:------ 313F 009 00FC8400C51D0307E4
2938 # 2022-09-20T20:50:32.800676 065 RP --- 01:182924 18:068640 --:------ 313F 009 00F9203234140907E6
2939 # 2020-05-31T11:37:50.351511 056 I --- --:------ --:------ 12:207082 313F 009 0038021ECB1F0507E4
2941 # https://www.automatedhome.co.uk/vbulletin/showthread.php?5085-My-HGI80-equivalent-Domoticz-setup-without-HGI80&p=36422&viewfull=1#post36422
2942 # every day at ~4am TRV/RQ->CTL/RP, approx 5-10secs apart (CTL respond at any time)
2944 assert msg.src.type != DEV_TYPE_MAP.CTL or payload[2:4] in (
2945 "F0",
2946 "F9",
2947 "FC",
2948 ), f"{payload[2:4]} unexpected for CTL" # DEX
2949 assert (
2950 msg.src.type not in (DEV_TYPE_MAP.DTS, DEV_TYPE_MAP.DT2) or payload[2:4] == "38"
2951 ), f"{payload[2:4]} unexpected for DTS" # DEX
2952 # assert (
2953 # msg.src.type != DEV_TYPE_MAP.FAN or payload[2:4] == "7C"
2954 # ), f"{payload[2:4]} unexpected for FAN" # DEX
2955 assert msg.src.type != DEV_TYPE_MAP.RFG or payload[2:4] == "60", (
2956 "{payload[2:4]} unexpected for RFG"
2957 ) # DEX
2959 return {
2960 SZ_DATETIME: hex_to_dtm(payload[4:18]),
2961 SZ_IS_DST: True if bool(int(payload[4:6], 16) & 0x80) else None,
2962 "_unknown_0": payload[2:4],
2963 }
2966# heat_demand (of device, FC domain) - valve status (%open)
2967def parser_3150(payload: str, msg: Message) -> dict | list[dict]: # TODO: only dict
2968 """Parse the 3150 (heat_demand) packet.
2970 :param payload: The raw hex payload
2971 :type payload: str
2972 :param msg: The message object containing context
2973 :type msg: Message
2974 :return: A dictionary or list of dictionaries containing zone indices and valve demand
2975 :rtype: dict | list[dict]
2976 """
2977 # event-driven, and periodically; FC domain is maximum of all zones
2978 # TODO: all have a valid domain will UFC/CTL respond to an RQ, for FC, for a zone?
2980 # .I --- 04:136513 --:------ 01:158182 3150 002 01CA < often seen CA, artefact?
2982 def complex_idx(seqx: str, msg: Message) -> dict[str, str]:
2983 # assert seqx[:2] == FC or (int(seqx[:2], 16) < MAX_ZONES) # <5, 8 for UFC
2984 idx_name = "ufx_idx" if msg.src.type == DEV_TYPE_MAP.UFC else SZ_ZONE_IDX # DEX
2985 return {SZ_DOMAIN_ID if seqx[:1] == "F" else idx_name: seqx[:2]}
2987 if msg._has_array:
2988 return [
2989 {
2990 **complex_idx(payload[i : i + 2], msg),
2991 **parse_valve_demand(payload[i + 2 : i + 4]),
2992 }
2993 for i in range(0, len(payload), 4)
2994 ]
2996 return parse_valve_demand(payload[2:]) # TODO: check UFC/FC is == CTL/FC
2999# fan state (ventilation status), HVAC
3000def parser_31d9(payload: str, msg: Message) -> dict[str, Any]:
3001 """Parse the 31d9 (fan state) packet.
3003 :param payload: The raw hex payload
3004 :type payload: str
3005 :param msg: The message object containing context
3006 :type msg: Message
3007 :return: A dictionary containing fan mode, speed, and status flags
3008 :rtype: dict[str, Any]
3009 :raises AssertionError: If payload constants or byte segments fail validation.
3010 """
3011 # NOTE: Itho and ClimaRad use 0x00-C8 for %, whilst Nuaire uses 0x00-64
3012 try:
3013 assert payload[4:6] == "FF" or int(payload[4:6], 16) <= 200, (
3014 f"byte 2: {payload[4:6]}"
3015 )
3016 except AssertionError as err:
3017 _LOGGER.warning(f"{msg!r} < {_INFORM_DEV_MSG} ({err})")
3019 bitmap = int(payload[2:4], 16)
3021 # NOTE: 31D9[4:6] is fan_speed (ClimaRad minibox, Itho) *or* fan_mode (Orcon, Vasco)
3022 result = {
3023 **parse_exhaust_fan_speed(payload[4:6]), # for itho
3024 SZ_FAN_MODE: payload[4:6], # orcon, vasco/climarad
3025 "passive": bool(bitmap & 0x02),
3026 "damper_only": bool(bitmap & 0x04), # i.e. valve only
3027 "filter_dirty": bool(bitmap & 0x20),
3028 "frost_cycle": bool(bitmap & 0x40),
3029 "has_fault": bool(bitmap & 0x80),
3030 "_flags": hex_to_flag8(payload[2:4]),
3031 }
3033 # Fan Mode Lookup 1 for Vasco codes
3034 if msg.len == 3: # usu: I -->20: (no seq#)
3035 if (
3036 (payload[:4] == "0000" or payload[:4] == "0080") # Senza, meaning of 0x80?
3037 and msg._addrs[0] == msg._addrs[2]
3038 and msg._addrs[1] == NON_DEV_ADDR
3039 ):
3040 # _31D9_FAN_INFO for Vasco D60 HRU and ClimaRad Minibox, S-Fan, (REM: RQ only, msg.len==1)
3041 try:
3042 assert int(payload[4:6], 16) & 0xFF in _31D9_FAN_INFO_VASCO, (
3043 f"unknown 31D9 fan_mode lookup key: {payload[4:6]}"
3044 )
3045 except AssertionError as err:
3046 _LOGGER.warning(f"{msg!r} < {_INFORM_DEV_MSG} ({err})")
3047 fan_mode = _31D9_FAN_INFO_VASCO.get(
3048 int(payload[4:6], 16) & 0xFF, f"unknown_{payload[4:6]}"
3049 )
3050 result[SZ_FAN_MODE] = fan_mode # replace
3051 # if not replaced, 31D9 FAN_MODE is a 2 digit string HEX
3052 return result
3054 try:
3055 assert payload[6:8] in ("00", "07", "0A", "FE"), f"byte 3: {payload[6:8]}"
3056 except AssertionError as err:
3057 _LOGGER.warning(f"{msg!r} < {_INFORM_DEV_MSG} ({err})")
3059 result.update({"_unknown_3": payload[6:8]})
3061 if msg.len == 4: # usu: I -->20: (no seq#)
3062 return result
3064 try:
3065 assert payload[8:32] in ("00" * 12, "20" * 12), f"byte 4: {payload[8:32]}"
3066 assert payload[32:] in ("00", "04", "08"), f"byte 16: {payload[32:]}"
3067 except AssertionError as err:
3068 _LOGGER.warning(f"{msg!r} < {_INFORM_DEV_MSG} ({err})")
3070 return {
3071 **result,
3072 "_unknown_4": payload[8:32],
3073 "unknown_16": payload[32:],
3074 }
3077# ventilation state (extended), HVAC
3078def parser_31da(payload: str, msg: Message) -> PayDictT._31DA:
3079 """Parse the 31da (extended ventilation state) packet.
3081 :param payload: The raw hex payload
3082 :type payload: str
3083 :param msg: The message object containing context
3084 :type msg: Message
3085 :return: A dictionary of all decoded ventilation parameters
3086 :rtype: PayDictT._31DA
3087 """
3088 # see: https://github.com/python/typing/issues/1445
3089 result = {
3090 **parse_exhaust_fan_speed(payload[38:40]), # maybe 31D9[4:6] for some?
3091 **parse_fan_info(payload[36:38]), # 22F3-ish
3092 **parse_air_quality(payload[2:6]), # 12C8[2:6]
3093 **parse_co2_level(payload[6:10]), # 1298[2:6]
3094 **parse_indoor_humidity(payload[10:12]), # 12A0?
3095 **parse_outdoor_humidity(payload[12:14]),
3096 **parse_exhaust_temp(payload[14:18]), # to outside
3097 **parse_supply_temp(payload[18:22]), # to home
3098 **parse_indoor_temp(payload[22:26]), # in home
3099 **parse_outdoor_temp(payload[26:30]), # 1290?
3100 **parse_capabilities(payload[30:34]),
3101 **parse_bypass_position(payload[34:36]), # 22F7-ish
3102 **parse_supply_fan_speed(payload[40:42]),
3103 **parse_remaining_mins(payload[42:46]), # mins, ~22F3[2:6]
3104 **parse_post_heater(payload[46:48]),
3105 **parse_pre_heater(payload[48:50]),
3106 **parse_supply_flow(payload[50:54]), # NOTE: is supply, not exhaust
3107 **parse_exhaust_flow(payload[54:58]), # NOTE: order switched from others
3108 }
3109 if len(payload) == 58:
3110 return result # type: ignore[return-value]
3112 result.update({"_extra": payload[58:]}) # sporadic [58:60] always 00
3113 return result # type: ignore[return-value]
3115 # From an Orcon 15RF Display
3116 # 1 Software version
3117 # 4 RH value in home (%) SZ_INDOOR_HUMIDITY
3118 # 5 RH value supply air (%) SZ_OUTDOOR_HUMIDITY
3119 # 6 Exhaust air temperature out (°C) SZ_EXHAUST_TEMPERATURE
3120 # 7 Supply air temperature to home (°C) SZ_SUPPLY_TEMPERATURE
3121 # 8 Temperature from home (°C) SZ_INDOOR_TEMPERATURE
3122 # 9 Temperature outside (°C) SZ_OUTDOOR_TEMPERATURE
3123 # 10 Bypass position SZ_BYPASS_POSITION
3124 # 11 Exhaust fan speed (%) SZ_EXHAUST_FAN_SPEED
3125 # 12 Fan supply speed (%) SZ_SUPPLY_FAN_SPEED
3126 # 13 Remaining after run time (min.) SZ_REMAINING_TIME - for humidity scenario
3127 # 14 Preheater control (MaxComfort) (%) SZ_PRE_HEAT
3128 # 16 Actual supply flow rate (m3/h) SZ_SUPPLY_FLOW (Orcon is m3/h, data is L/s)
3129 # 17 Current discharge flow rate (m3/h) SZ_EXHAUST_FLOW
3132# vent_demand, HVAC
3133def parser_31e0(payload: str, msg: Message) -> dict | list[dict]: # TODO: only dict
3134 """Parse the 31e0 (vent_demand) packet.
3135 "van" means "of".
3136 - 0 = min. van min. potm would be:
3137 - 0 = minimum of minimum potentiometer
3139 See: https://www.industrialcontrolsonline.com/honeywell-t991a
3140 - modulates air temperatures in ducts
3141 :param payload: The raw hex payload
3142 :type payload: str
3143 :param msg: The message object containing context
3144 :type msg: Message
3145 :return: A dictionary or list of dictionaries containing flags and demand percentage
3146 :rtype: dict | list[dict]
3147 :raises AssertionError: If the payload suffix is not a recognized constant.
3148 """
3150 # coding note:
3151 # case 0x31E0: ' 12768:
3152 # {
3153 # string str4;
3154 # unchecked
3155 # {
3156 # result.Fan = Conversions.ToString((double)(int)data[checked(start + 1)] / 2.0);
3157 # str4 = "";
3158 # }
3159 # str4 = (data[start + 2] & 0xF) switch
3160 # {
3161 # 0 => str4 + "0 = min. potm. ",
3162 # 1 => str4 + "0 = min. van min. potm ",
3163 # 2 => str4 + "0 = min. fan ",
3164 # _ => "",
3165 # };
3166 # switch (data[start + 2] & 0xF0)
3167 # {
3168 # case 16:
3169 # str4 += "100 = max. potm";
3170 # break;
3171 # case 32:
3172 # str4 += "100 = max. van max. potm ";
3173 # break;
3174 # case 48:
3175 # str4 += "100 = max. fan ";
3176 # break;
3177 # }
3178 # result.Data = str4;
3179 # break;
3180 # }
3182 # .I --- 37:005302 32:132403 --:------ 31E0 008 00-0000-00 01-0064-00 # RF15 CO2 to Orcon HRC400 series SmartComfort Valve
3184 # .I --- 29:146052 32:023459 --:------ 31E0 003 00-0000
3185 # .I --- 29:146052 32:023459 --:------ 31E0 003 00-00C8
3187 # .I --- 32:168240 30:079129 --:------ 31E0 004 00-0000-FF
3188 # .I --- 32:168240 30:079129 --:------ 31E0 004 00-0000-FF
3189 # .I --- 32:166025 --:------ 30:079129 31E0 004 00-0000-00
3191 # .I --- 32:168090 30:082155 --:------ 31E0 004 00-00C8-00
3192 # .I --- 37:258565 37:261128 --:------ 31E0 004 00-0001-00
3194 def _parser(seqx: str) -> dict:
3195 assert seqx[6:] in ("", "00", "FF")
3196 return {
3197 # "hvac_idx": seqx[:2],
3198 "flags": seqx[2:4],
3199 "vent_demand": hex_to_percent(seqx[4:6]),
3200 "_unknown_3": payload[6:],
3201 }
3203 if len(payload) > 8:
3204 return [_parser(payload[x : x + 8]) for x in range(0, len(payload), 8)]
3205 return _parser(payload)
3208# supplied boiler water (flow) temp
3209def parser_3200(payload: str, msg: Message) -> PayDictT._3200:
3210 """Parse the 3200 (supplied_temp) packet.
3212 :param payload: The raw hex payload
3213 :type payload: str
3214 :param msg: The message object containing context
3215 :type msg: Message
3216 :return: A dictionary containing the water flow temperature
3217 :rtype: PayDictT._3200
3218 """
3219 return {SZ_TEMPERATURE: hex_to_temp(payload[2:])}
3222# return (boiler) water temp
3223def parser_3210(payload: str, msg: Message) -> PayDictT._3210:
3224 """Parse the 3210 (return_temp) packet.
3226 :param payload: The raw hex payload
3227 :type payload: str
3228 :param msg: The message object containing context
3229 :type msg: Message
3230 :return: A dictionary containing the return water temperature
3231 :rtype: PayDictT._3210
3232 """
3233 return {SZ_TEMPERATURE: hex_to_temp(payload[2:])}
3236# opentherm_msg, from OTB (and OT_RND)
3237def parser_3220(payload: str, msg: Message) -> dict[str, Any]:
3238 """Parse an OpenTherm message packet.
3240 :param payload: The raw hex payload
3241 :type payload: str
3242 :param msg: The message object containing context
3243 :type msg: Message
3244 :return: A dictionary of decoded OpenTherm data and descriptions
3245 :rtype: dict[str, Any]
3246 :raises AssertionError: If internal OpenTherm consistency checks fail.
3247 :raises PacketPayloadInvalid: If the OpenTherm frame is malformed or uses unknown IDs.
3248 """
3249 try:
3250 ot_type, ot_id, ot_value, ot_schema = decode_frame(payload[2:10])
3251 except AssertionError as err:
3252 raise AssertionError(f"OpenTherm: {err}") from err
3253 except ValueError as err:
3254 raise exc.PacketPayloadInvalid(f"OpenTherm: {err}") from err
3256 # NOTE: Unknown-DataId isn't an invalid payload & is useful to train the OTB device
3257 if ot_schema is None and ot_type != OtMsgType.UNKNOWN_DATAID: # type: ignore[unreachable]
3258 raise exc.PacketPayloadInvalid(
3259 f"OpenTherm: Unknown data-id: 0x{ot_id:02X} ({ot_id})"
3260 )
3262 result = {
3263 SZ_MSG_ID: ot_id,
3264 SZ_MSG_TYPE: str(ot_type),
3265 SZ_MSG_NAME: ot_value.pop(SZ_MSG_NAME, None),
3266 }
3268 if msg.verb == RQ: # RQs have a context: msg_id (and a payload)
3269 assert (
3270 ot_type != OtMsgType.READ_DATA
3271 or payload[6:10] == "0000" # likely true for RAMSES
3272 ), f"OpenTherm: Invalid msg-type|data-value: {ot_type}|{payload[6:10]}"
3274 if ot_type != OtMsgType.READ_DATA:
3275 assert ot_type in (
3276 OtMsgType.WRITE_DATA,
3277 OtMsgType.INVALID_DATA,
3278 ), f"OpenTherm: Invalid msg-type for RQ: {ot_type}"
3280 result.update(ot_value) # TODO: find some of these packets to review
3282 result[SZ_DESCRIPTION] = ot_schema.get(EN)
3283 return result
3285 # if msg.verb != RP:
3286 # raise
3288 _LIST = (OtMsgType.DATA_INVALID, OtMsgType.UNKNOWN_DATAID, OtMsgType.RESERVED)
3289 assert ot_type not in _LIST or payload[6:10] in (
3290 "0000",
3291 "FFFF",
3292 ), f"OpenTherm: Invalid msg-type|data-value: {ot_type}|{payload[6:10]}"
3294 # HACK: These OT data id can pop in/out of 47AB, which is an invalid value
3295 if payload[6:] == "47AB" and ot_id in (0x12, 0x13, 0x19, 0x1A, 0x1B, 0x1C):
3296 ot_value[SZ_VALUE] = None
3297 # HACK: This OT data id can be 1980, which is an invalid value
3298 if payload[6:] == "1980" and ot_id: # CH pressure is 25.5 bar!
3299 ot_value[SZ_VALUE] = None
3300 # HACK: Done above, not in OT.decode_frame() as they isn't in the OT specification
3302 if ot_type not in _LIST:
3303 assert ot_type in (
3304 OtMsgType.READ_ACK,
3305 OtMsgType.WRITE_ACK,
3306 ), f"OpenTherm: Invalid msg-type for RP: {ot_type}"
3308 result.update(ot_value)
3310 try: # These are checking flags in payload of data-id 0x00
3311 assert ot_id != 0 or (
3312 [result[SZ_VALUE][i] for i in (2, 3, 4, 5, 6, 7)] == [0] * 6
3313 # and [result[SZ_VALUE][i] for i in (1, )] == [1]
3314 ), result[SZ_VALUE]
3316 assert ot_id != 0 or (
3317 [result[SZ_VALUE][8 + i] for i in (0, 4, 5, 6, 7)] == [0] * 5
3318 # and [result[SZ_VALUE][8 + i] for i in (1, 2, 3)] == [0] * 3
3319 ), result[SZ_VALUE]
3321 except AssertionError:
3322 _LOGGER.warning(
3323 f"{msg!r} < {_INFORM_DEV_MSG}, with a description of your system"
3324 )
3326 result[SZ_DESCRIPTION] = ot_schema.get(EN)
3327 return result
3330# unknown_3221, from OTB, FAN
3331def parser_3221(payload: str, msg: Message) -> dict[str, Any]:
3332 """Parse the 3221 packet.
3334 :param payload: The raw hex payload
3335 :type payload: str
3336 :param msg: The message object containing context
3337 :type msg: Message
3338 :return: A dictionary containing the extracted numeric value
3339 :rtype: dict[str, Any]
3340 :raises AssertionError: If the extracted value exceeds the valid 0xC8 threshold.
3341 """
3342 # RP --- 10:052644 18:198151 --:------ 3221 002 000F
3343 # RP --- 10:048122 18:006402 --:------ 3221 002 0000
3344 # RP --- 32:155617 18:005904 --:------ 3221 002 000A
3346 assert int(payload[2:], 16) <= 0xC8, _INFORM_DEV_MSG
3348 return {
3349 "_payload": payload,
3350 SZ_VALUE: int(payload[2:], 16),
3351 }
3354# WIP: unknown, HVAC
3355def parser_3222(payload: str, msg: Message) -> dict[str, Any]:
3356 """Parse the 3222 packet.
3358 :param payload: The raw hex payload
3359 :type payload: str
3360 :param msg: The message object containing context
3361 :type msg: Message
3362 :return: A dictionary containing offset, length, and raw data
3363 :rtype: dict[str, Any]
3364 :raises AssertionError: If the payload prefix is not '00'.
3365 """
3366 assert payload[:2] == "00"
3368 # e.g. RP|3222|00FE00 (payload = 3 bytes)
3369 if msg.len == 3:
3370 assert payload[4:] == "00" # aka length 0
3372 return {
3373 "_value": f"0x{payload[2:4]}",
3374 }
3376 # e.g. RP|3222|000604000F100E (payload > 3 bytes)
3377 return {
3378 "offset": f"0x{payload[2:4]}", # bytes
3379 "length": f"0x{payload[4:6]}", # bytes
3380 "_data": f"{'..' * int(payload[2:4])}{payload[6:]}",
3381 }
3384# unknown_3223, from OTB
3385def parser_3223(payload: str, msg: Message) -> dict[str, Any]:
3386 """Parse the 3223 (OpenTherm) packet.
3388 :param payload: The raw hex payload
3389 :type payload: str
3390 :param msg: The message object containing context
3391 :type msg: Message
3392 :return: A dictionary containing the extracted value
3393 :rtype: dict[str, Any]
3394 :raises AssertionError: If the value exceeds the valid 0xC8 threshold.
3395 """
3396 assert int(payload[2:], 16) <= 0xC8, _INFORM_DEV_MSG
3398 return {
3399 "_payload": payload,
3400 SZ_VALUE: int(payload[2:], 16),
3401 }
3404# actuator_sync (aka sync_tpi: TPI cycle sync)
3405def parser_3b00(payload: str, msg: Message) -> PayDictT._3B00:
3406 """Decode a 3B00 packet (actuator_sync).
3408 This signal marks the start or end of a TPI cycle to synchronize relay behavior.
3410 The heat relay regularly broadcasts a 3B00 at the end(?) of every TPI cycle, the
3411 frequency of which is determined by the (TPI) cycle rate in 1100.
3413 The CTL subsequently broadcasts a 3B00 (i.e. at the start of every TPI cycle).
3415 The OTB does not send these packets, but the CTL sends a regular broadcast anyway
3416 for the benefit of any zone actuators (e.g. zone valve zones).
3418 :param payload: The raw hex payload
3419 :type payload: str
3420 :param msg: The message object containing context
3421 :type msg: Message
3422 :return: A dictionary containing the sync state and domain ID
3423 :rtype: PayDictT._3B00
3424 :raises AssertionError: If the payload length or constants are invalid for the device type.
3425 """
3426 # system timing master: the device that sends I/FCC8 pkt controls the heater relay
3428 # 053 I --- 13:209679 --:------ 13:209679 3B00 002 00C8
3429 # 045 I --- 01:158182 --:------ 01:158182 3B00 002 FCC8
3430 # 052 I --- 13:209679 --:------ 13:209679 3B00 002 00C8
3431 # 045 I --- 01:158182 --:------ 01:158182 3B00 002 FCC8
3433 # 063 I --- 01:078710 --:------ 01:078710 3B00 002 FCC8
3434 # 064 I --- 01:078710 --:------ 01:078710 3B00 002 FCC8
3436 def complex_idx(payload: str, msg: Message) -> dict: # has complex idx
3437 if (
3438 msg.verb == I_
3439 and msg.src.type in (DEV_TYPE_MAP.CTL, DEV_TYPE_MAP.PRG)
3440 and msg.src is msg.dst
3441 ): # DEX
3442 assert payload[:2] == FC
3443 return {SZ_DOMAIN_ID: FC}
3444 assert payload[:2] == "00"
3445 return {}
3447 assert msg.len == 2, msg.len
3448 assert payload[:2] == {
3449 DEV_TYPE_MAP.CTL: FC,
3450 DEV_TYPE_MAP.BDR: "00",
3451 DEV_TYPE_MAP.PRG: FC,
3452 }.get(msg.src.type, "00") # DEX
3453 assert payload[2:] == "C8", payload[2:] # Could it be a percentage?
3455 return {
3456 **complex_idx(payload[:2], msg), # type: ignore[typeddict-item]
3457 "actuator_sync": hex_to_bool(payload[2:]),
3458 }
3461# actuator_state
3462def parser_3ef0(payload: str, msg: Message) -> PayDictT._3EF0 | PayDictT._JASPER:
3463 """Parse the 3ef0 (actuator_state) packet.
3465 :param payload: The raw hex payload
3466 :type payload: str
3467 :param msg: The message object containing context
3468 :type msg: Message
3469 :return: A dictionary of modulation levels, flags, and setpoints
3470 :rtype: PayDictT._3EF0 | PayDictT._JASPER
3471 :raises AssertionError: If payload constants, flags, or message lengths are unrecognized.
3472 """
3473 result: dict[str, Any]
3475 if msg.src.type == DEV_TYPE_MAP.JIM: # Honeywell Jasper
3476 assert msg.len == 20, f"expecting len 20, got: {msg.len}"
3477 return {
3478 "ordinal": f"0x{payload[2:8]}",
3479 "blob": payload[8:],
3480 }
3482 # TODO: These two should be picked up by the regex
3483 assert msg.len in (3, 6, 9), f"Invalid payload length: {msg.len}"
3484 # assert payload[:2] == "00", f"Invalid payload context: {payload[:2]}"
3486 # NOTE: some [2:4] appear to intend 0x00-0x64 (high_res=False), instead of 0x00-0xC8
3487 # NOTE: for best compatibility, all will be switched to 0x00-0xC8 (high_res=True)
3489 if msg.len == 3: # I|BDR|003 (the following are the only two payloads ever seen)
3490 # .I --- 13:042805 --:------ 13:042805 3EF0 003 0000FF
3491 # .I --- 13:023770 --:------ 13:023770 3EF0 003 00C8FF
3492 assert payload[2:4] in ("00", "C8"), f"byte 1: {payload[2:4]} (not 00/C8)"
3493 assert payload[4:6] == "FF", f"byte 2: {payload[4:6]} (not FF)"
3494 mod_level = hex_to_percent(payload[2:4], high_res=True)
3496 else: # msg.len >= 6: # RP|OTB|006 (to RQ|CTL/HGI/RFG)
3497 # RP --- 10:004598 34:003611 --:------ 3EF0 006 0000100000FF
3498 # RP --- 10:004598 34:003611 --:------ 3EF0 006 0000110000FF
3499 # RP --- 10:138822 01:187666 --:------ 3EF0 006 0064100C00FF
3500 # RP --- 10:138822 01:187666 --:------ 3EF0 006 0064100200FF
3501 assert payload[4:6] in ("00", "10", "11"), f"byte 2: {payload[4:6]}"
3502 mod_level = hex_to_percent(payload[2:4], high_res=True) # 00-64/C8 (or FF)
3504 result = {
3505 "modulation_level": mod_level, # 0008[2:4], 3EF1[10:12]
3506 "_flags_2": payload[4:6],
3507 }
3509 if msg.len >= 6: # RP|OTB|006 (to RQ|CTL/HGI/RFG)
3510 # RP --- 10:138822 01:187666 --:------ 3EF0 006 000110FA00FF # ?corrupt
3512 # for OTB (there's no reliable) modulation_level <-> flame_state)
3514 result.update(
3515 {
3516 "_flags_3": hex_to_flag8(payload[6:8]),
3517 "ch_active": bool(int(payload[6:8], 0x10) & 1 << 1),
3518 "dhw_active": bool(int(payload[6:8], 0x10) & 1 << 2),
3519 "cool_active": bool(int(payload[6:8], 0x10) & 1 << 4),
3520 "flame_on": bool(int(payload[6:8], 0x10) & 1 << 3), # flame_on
3521 "_unknown_4": payload[8:10], # FF, 00, 01, 0A
3522 "_unknown_5": payload[10:12], # FF, 13, 1C, ?others
3523 } # TODO: change to flame_active?
3524 )
3526 if msg.len >= 9: # I/RP|OTB|009 (R8820A only?)
3527 assert int(payload[12:14], 16) & 0b11111100 == 0, f"byte 6: {payload[12:14]}"
3528 assert int(payload[12:14], 16) & 0b00000010 == 2, f"byte 6: {payload[12:14]}"
3529 assert 10 <= int(payload[14:16], 16) <= 90, f"byte 7: {payload[14:16]}"
3530 assert int(payload[16:18], 16) in (0, 100), f"byte 8: {payload[18:]}"
3532 result.update(
3533 {
3534 "_flags_6": hex_to_flag8(payload[12:14]),
3535 "ch_enabled": bool(int(payload[12:14], 0x10) & 1 << 0),
3536 "ch_setpoint": int(payload[14:16], 0x10),
3537 "max_rel_modulation": hex_to_percent(payload[16:18], high_res=True),
3538 }
3539 )
3541 try: # Trying to decode flags...
3542 # assert payload[4:6] != "11" or (
3543 # payload[2:4] == "00"
3544 # ), f"bytes 1+2: {payload[2:6]}" # 97% is 00 when 11, but not always
3546 assert payload[4:6] in ("00", "10", "11", "FF"), f"byte 2: {payload[4:6]}"
3548 assert "_flags_3" not in result or (
3549 payload[6:8] == "FF" or int(payload[6:8], 0x10) & 0b10100000 == 0
3550 ), f"byte 3: {result['_flags_3']}"
3551 # only 10:040239 does 0b01000000, only Itho Autotemp does 0b00010000
3553 assert "_unknown_4" not in result or (
3554 payload[8:10] in ("FF", "00", "01", "02", "04", "0A")
3555 ), f"byte 4: {payload[8:10]}"
3556 # only 10:040239 does 04
3558 assert "_unknown_5" not in result or (
3559 payload[10:12] in ("00", "13", "1C", "FF")
3560 ), f"byte 5: {payload[10:12]}"
3562 assert "_flags_6" not in result or (
3563 int(payload[12:14], 0x10) & 0b11111100 == 0
3564 ), f"byte 6: {result['_flags_6']}"
3566 except AssertionError as err:
3567 _LOGGER.warning(
3568 f"{msg!r} < {_INFORM_DEV_MSG} ({err}), with a description of your system"
3569 )
3570 return result # type: ignore[return-value]
3573# actuator_cycle
3574def parser_3ef1(payload: str, msg: Message) -> PayDictT._3EF1 | PayDictT._JASPER:
3575 """Parse the 3ef1 (actuator_cycle) packet.
3577 :param payload: The raw hex payload
3578 :type payload: str
3579 :param msg: The message object containing context
3580 :type msg: Message
3581 :return: A dictionary of modulation levels and cycle/actuator countdowns
3582 :rtype: PayDictT._3EF1 | PayDictT._JASPER
3583 :raises AssertionError: If the countdown values exceed recognized thresholds.
3584 """
3585 if msg.src.type == DEV_TYPE_MAP.JIM: # Honeywell Jasper, DEX
3586 assert msg.len == 18, f"expecting len 18, got: {msg.len}"
3587 return {
3588 "ordinal": f"0x{payload[2:8]}",
3589 "blob": payload[8:],
3590 }
3592 if (
3593 msg.src.type == DEV_TYPE_MAP.JST
3594 ): # and msg.len == 12: # or (12, 20) Japser, DEX
3595 assert msg.len == 12, f"expecting len 12, got: {msg.len}"
3596 return {
3597 "ordinal": f"0x{payload[2:8]}",
3598 "blob": payload[8:],
3599 }
3601 percent = hex_to_percent(payload[10:12])
3603 if payload[12:] == "FF": # is BDR
3604 assert percent is None or percent in (0, 1), f"byte 5: {payload[10:12]}"
3606 else: # is OTB
3607 # assert (
3608 # re.compile(r"^00[0-9A-F]{10}10").match(payload)
3609 # ), "doesn't match: " + r"^00[0-9A-F]{10}10"
3610 assert payload[2:6] == "7FFF", f"byte 1: {payload[2:6]}"
3611 assert payload[6:10] == "003C", f"byte 3: {payload[6:10]}" # 60 seconds
3612 assert percent is None or percent <= 1, f"byte 5: {payload[10:12]}"
3614 cycle_countdown = None if payload[2:6] == "7FFF" else int(payload[2:6], 16)
3615 if cycle_countdown is not None:
3616 if cycle_countdown > 0x7FFF:
3617 cycle_countdown -= 0x10000
3618 assert cycle_countdown < 7200, f"byte 1: {payload[2:6]}" # 7200 seconds
3620 actuator_countdown = None if payload[6:10] == "7FFF" else int(payload[6:10], 16)
3621 if actuator_countdown is not None:
3622 if actuator_countdown > 0x7FFF: # "87B3", "9DFA", "DCE1", "E638", "F8F7"
3623 # actuator_countdown = 0x10000 - actuator_countdown + cycle_countdown
3624 actuator_countdown = cycle_countdown # Needs work
3625 # assert actuator_countdown <= cycle_countdown, f"byte 3: {payload[6:10]}"
3627 return {
3628 "modulation_level": percent, # 0008[2:4], 3EF0[2:4]
3629 "actuator_countdown": actuator_countdown,
3630 "cycle_countdown": cycle_countdown,
3631 "_unknown_0": payload[12:],
3632 }
3635# timestamp, HVAC
3636def parser_4401(payload: str, msg: Message) -> dict[str, Any]:
3637 """Parse the 4401 (HVAC timestamp) packet.
3639 :param payload: The raw hex payload
3640 :type payload: str
3641 :param msg: The message object containing context
3642 :type msg: Message
3643 :return: A dictionary of source/destination timestamps and update flags
3644 :rtype: dict[str, Any]
3645 :raises AssertionError: If the payload format or constants are invalid.
3646 """
3647 if msg.verb == RP:
3648 return {}
3650 # 2022-07-28T14:21:38.895354 095 W --- 37:010164 37:010151 --:------ 4401 020 10 7E-E99E90C8 00-E99E90C7-3BFF 7E-E99E90C8-000B
3651 # 2022-07-28T14:21:57.414447 076 RQ --- 20:225479 20:257336 --:------ 4401 020 10 2E-E99E90DB 00-00000000-0000 00-00000000-000B
3652 # 2022-07-28T14:21:57.625474 045 I --- 20:257336 20:225479 --:------ 4401 020 10 2E-E99E90DB 00-E99E90DA-F0FF BD-00000000-000A
3653 # 2022-07-28T14:22:02.932576 088 RQ --- 37:010188 20:257336 --:------ 4401 020 10 22-E99E90E0 00-00000000-0000 00-00000000-000B
3654 # 2022-07-28T14:22:03.053744 045 I --- 20:257336 37:010188 --:------ 4401 020 10 22-E99E90E0 00-E99E90E0-75FF BD-00000000-000A
3655 # 2022-07-28T14:22:20.516363 045 RQ --- 20:255710 20:257400 --:------ 4401 020 10 0B-E99E90F2 00-00000000-0000 00-00000000-000B
3656 # 2022-07-28T14:22:20.571640 085 I --- 20:255251 20:229597 --:------ 4401 020 10 39-E99E90F1 00-E99E90F1-5CFF 40-00000000-000A
3657 # 2022-07-28T14:22:20.648696 058 I --- 20:257400 20:255710 --:------ 4401 020 10 0B-E99E90F2 00-E99E90F1-D4FF DA-00000000-000B
3659 # 2022-11-03T23:00:04.854479 088 RQ --- 20:256717 37:013150 --:------ 4401 020 10 00-00259261 00-00000000-0000 00-00000000-0063
3660 # 2022-11-03T23:00:05.102491 045 I --- 37:013150 20:256717 --:------ 4401 020 10 00-00259261 00-000C9E4C-1800 00-00000000-0063
3661 # 2022-11-03T23:00:17.820659 072 I --- 20:256112 20:255825 --:------ 4401 020 10 00-00F1EB91 00-00E8871B-B700 00-00000000-0063
3662 # 2022-11-03T23:01:25.495391 065 I --- 20:257732 20:257680 --:------ 4401 020 10 00-002E9C98 00-00107923-9E00 00-00000000-0063
3663 # 2022-11-03T23:01:33.753467 066 RQ --- 20:257732 20:256112 --:------ 4401 020 10 00-0010792C 00-00000000-0000 00-00000000-0063
3664 # 2022-11-03T23:01:33.997485 072 I --- 20:256112 20:257732 --:------ 4401 020 10 00-0010792C 00-00E88767-AD00 00-00000000-0063
3665 # 2022-11-03T23:01:52.391989 090 I --- 20:256717 20:255301 --:------ 4401 020 10 00-009870E1 00-002592CC-6300 00-00000000-0063
3667 def hex_to_epoch(seqx: str) -> None | str: # seconds since 1-1-1970
3668 if seqx == "00" * 4:
3669 return None
3670 return str(
3671 dt.fromtimestamp(int(seqx, 16))
3672 ) # - int(payload[22:26], 16) * 15 * 60))
3674 # 10 7E-E99E90C8 00-E99E90C7-3BFF 7E-E99E90C8-000B
3675 # hex(int(dt.fromisoformat("2022-07-28T14:21:38.895354").timestamp())).upper()
3676 # '0x62E20ED2'
3678 assert payload[:2] == "10", payload[:2]
3679 assert payload[12:14] == "00", payload[12:14]
3680 assert payload[36:38] == "00", payload[36:38]
3682 assert msg.verb != I_ or payload[24:26] in ("00", "7C", "FF"), payload[24:26]
3683 assert msg.verb != W_ or payload[24:26] in ("7C", "FF"), payload[24:26]
3684 assert msg.verb != RQ or payload[24:26] == "00", payload[24:26]
3686 assert msg.verb != RQ or payload[14:22] == "00" * 4, payload[14:22]
3687 assert msg.verb != W_ or payload[28:36] != "00" * 4, payload[28:36]
3689 assert payload[38:40] in ("08", "09", "0A", "0B", "63"), payload[38:40]
3691 # assert payload[2:4] == payload[26:28], f"{payload[2:4]}, {payload[26:24]}"
3693 return {
3694 "last_update_dst": payload[2:4],
3695 "time_dst": hex_to_epoch(payload[4:12]),
3696 "_unknown_12": payload[12:14], # usu.00
3697 "time_src": hex_to_epoch(payload[14:22]),
3698 "offset": payload[22:24], # *15 mins?
3699 "_unknown_24": payload[24:26],
3700 "last_update_src": payload[26:28],
3701 "time_dst_receive_src": hex_to_epoch(payload[28:36]),
3702 "_unknown_36": payload[36:38], # usu.00
3703 "hops_dst_src": payload[38:40],
3704 }
3707# temperatures (see: 4e02) - Itho spider/autotemp
3708def parser_4e01(payload: str, msg: Message) -> dict[str, Any]:
3709 """Parse the 4e01 (Itho temperatures) packet.
3711 :param payload: The raw hex payload
3712 :type payload: str
3713 :param msg: The message object containing context
3714 :type msg: Message
3715 :return: A dictionary containing an array of temperature measurements
3716 :rtype: dict[str, Any]
3717 :raises AssertionError: If the number of temperature groups does not match the packet length.
3718 """
3719 # .I --- 02:248945 02:250708 --:------ 4E01 018 00-7FFF7FFF7FFF09077FFF7FFF7FFF7FFF-00 # 23.11, 8-group
3720 # .I --- 02:250984 02:250704 --:------ 4E01 018 00-7FFF7FFF7FFF7FFF08387FFF7FFF7FFF-00 # 21.04
3722 num_groups = int((msg.len - 2) / 2) # e.g. (18 - 2) / 2
3723 assert num_groups * 2 == msg.len - 2, (
3724 _INFORM_DEV_MSG
3725 ) # num_groups: len 018 (8-group, 2+8*4), or 026 (12-group, 2+12*4)
3727 x, y = 0, 2 + num_groups * 4
3729 assert payload[x : x + 2] == "00", _INFORM_DEV_MSG
3730 assert payload[y : y + 2] == "00", _INFORM_DEV_MSG
3732 return {
3733 "temperatures": [hex_to_temp(payload[i : i + 4]) for i in range(2, y, 4)],
3734 }
3737# setpoint_bounds (see: 4e01) - Itho spider/autotemp
3738def parser_4e02(
3739 payload: str, msg: Message
3740) -> dict[str, Any]: # sent a triplets, 1 min apart
3741 """Parse the 4e02 (Itho setpoint bounds) packet.
3743 :param payload: The raw hex payload
3744 :type payload: str
3745 :param msg: The message object containing context
3746 :type msg: Message
3747 :return: A dictionary containing the mode and associated setpoint bounds
3748 :rtype: dict[str, Any]
3749 :raises AssertionError: If the payload constants or mode indicators are invalid.
3750 """
3751 # .I --- 02:248945 02:250708 --:------ 4E02 034 00-7FFF7FFF7FFF07D07FFF7FFF7FFF7FFF-02-7FFF7FFF7FFF08347FFF7FFF7FFF7FFF # 20.00-21.00
3752 # .I --- 02:250984 02:250704 --:------ 4E02 034 00-7FFF7FFF7FFF076C7FFF7FFF7FFF7FFF-02-7FFF7FFF7FFF07D07FFF7FFF7FFF7FFF #
3754 num_groups = int((msg.len - 2) / 4) # e.g. (34 - 2) / 4
3755 assert num_groups * 4 == msg.len - 2, (
3756 _INFORM_DEV_MSG
3757 ) # num_groups: len 034 (8-group, 2+8*4), or 050 (12-group, 2+12*4)
3759 x, y = 0, 2 + num_groups * 4
3761 assert payload[x : x + 2] == "00", _INFORM_DEV_MSG # expect no context
3762 assert payload[y : y + 2] in (
3763 "02",
3764 "03",
3765 "04",
3766 "05",
3767 ), _INFORM_DEV_MSG # mode: cool/heat?
3769 setpoints = [
3770 (hex_to_temp(payload[x + i :][:4]), hex_to_temp(payload[y + i :][:4]))
3771 for i in range(2, y, 4)
3772 ] # lower, upper setpoints
3774 return {
3775 SZ_MODE: {"02": "cool", "03": "cool+", "04": "heat", "05": "cool+"}[
3776 payload[y : y + 2]
3777 ],
3778 SZ_SETPOINT_BOUNDS: [s if s != (None, None) else None for s in setpoints],
3779 }
3782# hvac_4e04
3783def parser_4e04(payload: str, msg: Message) -> dict[str, Any]:
3784 """Parse the 4e04 (HVAC mode) packet.
3786 :param payload: The raw hex payload
3787 :type payload: str
3788 :param msg: The message object containing context
3789 :type msg: Message
3790 :return: A dictionary containing the system mode
3791 :rtype: dict[str, Any]
3792 :raises AssertionError: If the mode byte or data value is unrecognized.
3793 """
3794 MODE = {
3795 "00": "off",
3796 "01": "heat",
3797 "02": "cool",
3798 }
3800 assert payload[2:4] in MODE, _INFORM_DEV_MSG
3801 assert int(payload[4:], 16) < 0x40 or payload[4:] in (
3802 "FB", # error code?
3803 "FC", # error code?
3804 "FD", # error code?
3805 "FE", # error code?
3806 "FF", # N/A?
3807 )
3809 return {
3810 SZ_MODE: MODE.get(payload[2:4], "Unknown"),
3811 "_unknown_2": payload[4:],
3812 }
3815# WIP: AT outdoor low - Itho spider/autotemp
3816def parser_4e0d(payload: str, msg: Message) -> dict[str, Any]:
3817 """Parse the 4e0d packet.
3819 :param payload: The raw hex payload
3820 :type payload: str
3821 :param msg: The message object containing context
3822 :type msg: Message
3823 :return: A dictionary containing the raw payload
3824 :rtype: dict[str, Any]
3825 """
3826 # .I --- 02:250704 02:250984 --:------ 4E0D 002 0100 # Itho Autotemp: only(?) master -> slave
3827 # .I --- 02:250704 02:250984 --:------ 4E0D 002 0101 # why does it have a context?
3829 return {
3830 "_payload": payload,
3831 }
3834# AT fault circulation - Itho spider/autotemp
3835def parser_4e14(payload: str, msg: Message) -> dict[str, Any]:
3836 """Parse the 4e14 (circulation fault) packet.
3837 result = "AT fault circulation";
3838 result = (((payload[2:] & 0x01) != 0x01) ? " Fault state : no fault " : " Fault state : fault ")
3839 result = (((payload[2:] & 0x02) != 0x02) ? (text4 + "Circulation state : no fault ") : (text4 + " Circulation state : fault "))
3841 :param payload: The raw hex payload
3842 :type payload: str
3843 :param msg: The message object containing context
3844 :type msg: Message
3845 :return: A dictionary indicating fault and circulation states
3846 :rtype: dict[str, Any]
3847 """
3848 return {}
3851# wpu_state (hvac state) - Itho spider/autotemp
3852def parser_4e15(payload: str, msg: Message) -> dict[str, Any]:
3853 """Parse the 4e15 (WPU state) packet.
3855 :param payload: The raw hex payload
3856 :type payload: str
3857 :param msg: The message object containing context
3858 :type msg: Message
3859 :return: A dictionary of boolean flags for cooling, heating, and DHW activity
3860 :rtype: dict[str, Any]
3861 :raises TypeError: If the payload indicates simultaneous heating and cooling.
3862 :raises AssertionError: If unknown bit flags are present.
3863 """
3864 # .I --- 21:034158 02:250676 --:------ 4E15 002 0000 # WPU "off" (maybe heating, but compressor off)
3865 # .I --- 21:064743 02:250708 --:------ 4E15 002 0001 # WPU cooling active
3866 # .I --- 21:057565 02:250677 --:------ 4E15 002 0002 # WPU heating, compressor active
3867 # .I --- 21:064743 02:250708 --:------ 4E15 002 0004 # WPU in "DHW mode" boiler active
3868 # .I --- 21:033160 02:250704 --:------ 4E15 002 0005 # 0x03, and 0x06 not seen in the wild
3870 if int(payload[2:], 16) & 0xF0:
3871 pass
3873 # If none of these, then is 'Off'
3874 SZ_COOLING = "is_cooling"
3875 SZ_DHW_ING = "is_dhw_ing"
3876 SZ_HEATING = "is_heating"
3877 # SZ_PUMPING = "is_pumping"
3879 assert int(payload[2:], 16) & 0xF8 == 0x00, (
3880 _INFORM_DEV_MSG
3881 ) # check for unknown bit flags
3882 if int(payload[2:], 16) & 0x03 == 0x03: # is_cooling *and* is_heating (+/- DHW)
3883 raise TypeError # TODO: Use local exception & ?Move to higher layer
3884 assert int(payload[2:], 16) & 0x07 != 0x06, _INFORM_DEV_MSG # can't heat and DHW
3886 return {
3887 "_flags": hex_to_flag8(payload[2:]),
3888 # SZ_PUMPING: bool(int(payload[2:], 16) & 0x08),
3889 SZ_DHW_ING: bool(int(payload[2:], 16) & 0x04),
3890 SZ_HEATING: bool(int(payload[2:], 16) & 0x02),
3891 SZ_COOLING: bool(int(payload[2:], 16) & 0x01),
3892 }
3895# TODO: hvac_4e16 - Itho spider/autotemp
3896def parser_4e16(payload: str, msg: Message) -> dict[str, Any]:
3897 """Parse the 4e16 packet.
3899 :param payload: The raw hex payload
3900 :type payload: str
3901 :param msg: The message object containing context
3902 :type msg: Message
3903 :return: A dictionary containing the raw payload
3904 :rtype: dict[str, Any]
3905 :raises AssertionError: If the payload is not the expected null sequence.
3906 """
3907 # .I --- 02:250984 02:250704 --:------ 4E16 007 00000000000000 # Itho Autotemp: slave -> master
3909 assert payload == "00000000000000", _INFORM_DEV_MSG
3911 return {
3912 "_payload": payload,
3913 }
3916# TODO: Fan characteristics - Itho
3917def parser_4e20(payload: str, msg: Message) -> dict[str, Any]:
3918 """Parse the 4e20 (fan characteristics) packet.
3920 result = "Fan characteristics: "
3921 result += [C[ABC][210] hex_to_sint32[i:i+4] for i in range(2, 34, 4)]
3923 :param payload: The raw hex payload
3924 :type payload: str
3925 :param msg: The message object containing context
3926 :type msg: Message
3927 :return: A dictionary of decoded fan constants
3928 :rtype: dict[str, Any]
3929 """
3930 return {}
3933# TODO: Potentiometer control - Itho
3934def parser_4e21(payload: str, msg: Message) -> dict[str, Any]:
3935 """Parse the 4e21 (potentiometer control) packet.
3937 result = "Potentiometer control: "
3938 result += "Rel min: " + hex_to_sint16(data[2:4]) # 16 bit, 2's complement
3939 result += "Min of rel min: " + hex_to_sint16(data[4:6])
3940 result += "Abs min: " + hex_to_sint16(data[6:8])
3941 result += "Rel max: " + hex_to_sint16(data[8:10])
3942 result += "Max rel: " + hex_to_sint16(data[10:12])
3943 result += "Abs max: " + hex_to_sint16(data[12:14]))
3945 :param payload: The raw hex payload
3946 :type payload: str
3947 :param msg: The message object containing context
3948 :type msg: Message
3949 :return: A dictionary of absolute and relative power limits
3950 :rtype: dict[str, Any]
3951 """
3952 return {}
3955# # faked puzzle pkt shouldn't be decorated
3956def parser_7fff(payload: str, _: Message) -> dict[str, Any]:
3957 """Parse the 7fff (puzzle) packet.
3959 :param payload: The raw hex payload
3960 :type payload: str
3961 :param _: The message object (unused)
3962 :return: A dictionary containing the message type, timestamp, and metadata
3963 :rtype: dict[str, Any]
3964 """
3965 if payload[:2] != "00":
3966 _LOGGER.debug("Invalid/deprecated Puzzle packet")
3967 return {
3968 "msg_type": payload[:2],
3969 SZ_PAYLOAD: hex_to_str(payload[2:]),
3970 }
3972 if payload[2:4] not in LOOKUP_PUZZ:
3973 _LOGGER.debug("Invalid/deprecated Puzzle packet")
3974 return {
3975 "msg_type": payload[2:4],
3976 "message": hex_to_str(payload[4:]),
3977 }
3979 result: dict[str, None | str] = {}
3980 if int(payload[2:4]) >= int("20", 16):
3981 dtm = dt.fromtimestamp(int(payload[4:16], 16) / 1e7) # TZ-naive
3982 result["datetime"] = dtm.isoformat(timespec="milliseconds")
3983 elif payload[2:4] != "13":
3984 dtm = dt.fromtimestamp(int(payload[4:16], 16) / 1000) # TZ-naive
3985 result["datetime"] = dtm.isoformat(timespec="milliseconds")
3987 msg_type = LOOKUP_PUZZ.get(payload[2:4], SZ_PAYLOAD)
3989 if payload[2:4] == "11":
3990 mesg = hex_to_str(payload[16:])
3991 result[msg_type] = f"{mesg[:4]}|{mesg[4:6]}|{mesg[6:]}"
3993 elif payload[2:4] == "13":
3994 result[msg_type] = hex_to_str(payload[4:])
3996 elif payload[2:4] == "7F":
3997 result[msg_type] = payload[4:]
3999 else:
4000 result[msg_type] = hex_to_str(payload[16:])
4002 return {**result, "parser": f"v{VERSION}"}
4005def parser_unknown(payload: str, msg: Message) -> dict[str, Any]:
4006 """Apply a generic parser for unrecognized packet codes.
4008 :param payload: The raw hex payload
4009 :type payload: str
4010 :param msg: The message object containing context
4011 :type msg: Message
4012 :return: A dictionary containing the raw payload and code information
4013 :rtype: dict[str, Any]
4014 """
4015 # TODO: it may be useful to generically search payloads for hex_ids, commands, etc.
4017 # These are generic parsers
4018 if msg.len == 2 and payload[:2] == "00":
4019 return {
4020 "_payload": payload,
4021 "_value": {"00": False, "C8": True}.get(payload[2:], int(payload[2:], 16)),
4022 }
4024 if msg.len == 3 and payload[:2] == "00":
4025 return {
4026 "_payload": payload,
4027 "_value": hex_to_temp(payload[2:]),
4028 }
4030 return {
4031 "_payload": payload,
4032 "_unknown_code": msg.code,
4033 "_parse_error": "No parser available for this packet type",
4034 }
4037_PAYLOAD_PARSERS = {
4038 k[7:].upper(): v
4039 for k, v in locals().items()
4040 if callable(v) and k.startswith("parser_") and len(k) == 11
4041}
4044def parse_payload(msg: Message) -> dict | list[dict]:
4045 """Apply the appropriate parser defined in this module to the message.
4047 :param msg: A Message object containing packet data and extra attributes
4048 :type msg: Message
4049 :return: A dict of key:value pairs or a list of such dicts
4050 :rtype: dict | list[dict]
4051 :raises AssertionError: If the packet fails an internal consistency check.
4052 """
4053 result: dict | list[dict]
4054 try:
4055 result = _PAYLOAD_PARSERS.get(msg.code, parser_unknown)(msg._pkt.payload, msg)
4056 if isinstance(result, dict) and msg.seqn.isnumeric(): # e.g. 22F1/3
4057 result["seqx_num"] = msg.seqn
4058 except AssertionError as err:
4059 _LOGGER.warning(
4060 f"{msg!r} < {_INFORM_DEV_MSG} ({err}). "
4061 f"This packet could not be parsed completely. "
4062 f"Please report this message and any context about what changed on your system when this occurred."
4063 )
4064 # Return partial result with error info
4065 result = {
4066 "_payload": msg._pkt.payload,
4067 "_parse_error": f"AssertionError: {err}",
4068 "_unknown_code": msg.code,
4069 }
4070 if isinstance(result, dict) and msg.seqn.isnumeric():
4071 result["seqx_num"] = msg.seqn
4073 return result