Coverage for src/ramses_tx/helpers.py: 23%
373 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - Protocol/Transport layer - Helper functions."""
4from __future__ import annotations
6import ctypes
7import sys
8import time
9from collections.abc import Iterable, Mapping
10from datetime import date, datetime as dt
11from typing import TYPE_CHECKING, Final, Literal, TypeAlias
13from .address import hex_id_to_dev_id
14from .const import (
15 FAULT_DEVICE_CLASS,
16 FAULT_STATE,
17 FAULT_TYPE,
18 SZ_AIR_QUALITY,
19 SZ_AIR_QUALITY_BASIS,
20 SZ_BYPASS_POSITION,
21 SZ_CO2_LEVEL,
22 SZ_DEVICE_CLASS,
23 SZ_DEVICE_ID,
24 SZ_DEWPOINT_TEMP,
25 SZ_DOMAIN_IDX,
26 SZ_EXHAUST_FAN_SPEED,
27 SZ_EXHAUST_FLOW,
28 SZ_EXHAUST_TEMP,
29 SZ_FAN_INFO,
30 SZ_FAULT_STATE,
31 SZ_FAULT_TYPE,
32 SZ_HEAT_DEMAND,
33 SZ_INDOOR_HUMIDITY,
34 SZ_INDOOR_TEMP,
35 SZ_LOG_IDX,
36 SZ_OUTDOOR_HUMIDITY,
37 SZ_OUTDOOR_TEMP,
38 SZ_POST_HEAT,
39 SZ_PRE_HEAT,
40 SZ_REL_HUMIDITY,
41 SZ_REMAINING_MINS,
42 SZ_SPEED_CAPABILITIES,
43 SZ_SUPPLY_FAN_SPEED,
44 SZ_SUPPLY_FLOW,
45 SZ_SUPPLY_TEMP,
46 SZ_TEMPERATURE,
47 SZ_TIMESTAMP,
48 FaultDeviceClass,
49 FaultState,
50 FaultType,
51)
52from .ramses import _31DA_FAN_INFO
54if TYPE_CHECKING:
55 from .typed_dicts import PayDictT
57# Sensor faults
58SZ_UNRELIABLE: Final = "unreliable"
59SZ_TOO_HIGH: Final = "out_of_range_high"
60SZ_TOO_LOW: Final = "out_of_range_low"
61# Actuator, Valve/damper faults
62SZ_STUCK_VALVE: Final = "stuck_valve" # Damper/Valve jammed
63SZ_STUCK_ACTUATOR: Final = "stuck_actuator" # Actuator jammed
64# Common (to both) faults
65SZ_OPEN_CIRCUIT: Final = "open_circuit"
66SZ_SHORT_CIRCUIT: Final = "short_circuit"
67SZ_UNAVAILABLE: Final = "unavailable"
68SZ_OTHER_FAULT: Final = "other_fault" # Non-specific fault
70DEVICE_FAULT_CODES = {
71 0x0: SZ_OPEN_CIRCUIT, # NOTE: open, short
72 0x1: SZ_SHORT_CIRCUIT,
73 0x2: SZ_UNAVAILABLE,
74 0xD: SZ_STUCK_VALVE,
75 0xE: SZ_STUCK_ACTUATOR,
76 0xF: SZ_OTHER_FAULT,
77}
78SENSOR_FAULT_CODES = {
79 0x0: SZ_SHORT_CIRCUIT, # NOTE: short, open
80 0x1: SZ_OPEN_CIRCUIT,
81 0x2: SZ_UNAVAILABLE,
82 0x3: SZ_TOO_HIGH,
83 0x4: SZ_TOO_LOW,
84 0x5: SZ_UNRELIABLE,
85 # 0xF: SZ_OTHER_FAULT, # No evidence is explicitly part of the specification
86}
89# TODO: consider returning from helpers as TypeGuard[HexByte]
90# fmt: off
91HexByteAlt = Literal[
92 '00', '01', '02', '03', '04', '05', '06', '07', '08', '09', '0A', '0B', '0C', '0D', '0E', '0F',
93 '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '1A', '1B', '1C', '1D', '1E', '1F',
94 '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '2A', '2B', '2C', '2D', '2E', '2F',
95 '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '3A', '3B', '3C', '3D', '3E', '3F',
96 '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '4A', '4B', '4C', '4D', '4E', '4F',
97 '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '5A', '5B', '5C', '5D', '5E', '5F',
98 '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '6A', '6B', '6C', '6D', '6E', '6F',
99 '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '7A', '7B', '7C', '7D', '7E', '7F',
100 '80', '81', '82', '83', '84', '85', '86', '87', '88', '89', '8A', '8B', '8C', '8D', '8E', '8F',
101 '90', '91', '92', '93', '94', '95', '96', '97', '98', '99', '9A', '9B', '9C', '9D', '9E', '9F',
102 'A0', 'A1', 'A2', 'A3', 'A4', 'A5', 'A6', 'A7', 'A8', 'A9', 'AA', 'AB', 'AC', 'AD', 'AE', 'AF',
103 'B0', 'B1', 'B2', 'B3', 'B4', 'B5', 'B6', 'B7', 'B8', 'B9', 'BA', 'BB', 'BC', 'BD', 'BE', 'BF',
104 'C0', 'C1', 'C2', 'C3', 'C4', 'C5', 'C6', 'C7', 'C8', 'C9', 'CA', 'CB', 'CC', 'CD', 'CE', 'CF',
105 'D0', 'D1', 'D2', 'D3', 'D4', 'D5', 'D6', 'D7', 'D8', 'D9', 'DA', 'DB', 'DC', 'DD', 'DE', 'DF',
106 'E0', 'E1', 'E2', 'E3', 'E4', 'E5', 'E6', 'E7', 'E8', 'E9', 'EA', 'EB', 'EC', 'ED', 'EE', 'EF',
107 'F0', 'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', 'F9', 'FA', 'FB', 'FC', 'FD', 'FE', 'FF'
108]
109# fmt: on
111HexByte: TypeAlias = str
112HexStr2: TypeAlias = str # two characters, one byte
113HexStr4: TypeAlias = str
114HexStr8: TypeAlias = str
115HexStr12: TypeAlias = str
116HexStr14: TypeAlias = str
119ReturnValueDictT: TypeAlias = Mapping[str, float | str | None]
122class _FILE_TIME(ctypes.Structure):
123 """Data structure for GetSystemTimePreciseAsFileTime()."""
125 _fields_ = [("dwLowDateTime", ctypes.c_uint), ("dwHighDateTime", ctypes.c_uint)]
128file_time = _FILE_TIME()
131def timestamp() -> float:
132 """Return the number of seconds since the Unix epoch.
134 This function attempts to return a high-precision value, using specific
135 system calls on Windows if available.
136 :return: The current timestamp in seconds.
137 :rtype: float
138 """
140 # see: https://www.python.org/dev/peps/pep-0564/
141 if sys.platform == "win32":
142 # Windows uses a different epoch (1601-01-01)
143 ctypes.windll.kernel32.GetSystemTimePreciseAsFileTime(ctypes.byref(file_time))
144 _time = (file_time.dwLowDateTime + (file_time.dwHighDateTime << 32)) / 1e7
145 return float(_time - 134774 * 24 * 60 * 60)
146 else:
147 # Linux/macOS uses the Unix epoch (1970-01-01)
148 return time.time_ns() / 1e9
151def dt_now() -> dt:
152 """Return the current datetime as a local/naive datetime object.
154 This is slower, but potentially more accurate, than dt.now(), and is used mainly for
155 packet timestamps.
157 :return: The current local datetime.
158 :rtype: dt
159 """
160 if sys.platform == "win32":
161 return dt.fromtimestamp(timestamp())
162 else:
163 return dt.now()
166def dt_str() -> str:
167 """Return the current datetime as an isoformat string."""
168 return dt_now().isoformat(timespec="microseconds")
171####################################################################################################
174def hex_to_bool(value: HexStr2) -> bool | None: # either False, True or None
175 """Convert a 2-char hex string into a boolean."""
176 if not isinstance(value, str) or len(value) != 2:
177 raise ValueError(f"Invalid value: {value}, is not a 2-char hex string")
178 if value == "FF":
179 return None
180 return {"00": False, "C8": True}[value]
183def hex_from_bool(value: bool | None) -> HexStr2: # either 00, C8 or FF
184 """Convert a boolean into a 2-char hex string."""
185 if value is None:
186 return "FF"
187 if not isinstance(value, bool):
188 raise ValueError(f"Invalid value: {value}, is not bool")
189 return {False: "00", True: "C8"}[value]
192def hex_to_date(value: HexStr8) -> str | None: # YY-MM-DD
193 """Convert am 8-char hex string into a date, format YY-MM-DD."""
194 if not isinstance(value, str) or len(value) != 8:
195 raise ValueError(f"Invalid value: {value}, is not an 8-char hex string")
196 if value == "FFFFFFFF":
197 return None
198 return dt(
199 year=int(value[4:8], 16),
200 month=int(value[2:4], 16),
201 day=int(value[:2], 16) & 0b11111, # 1st 3 bits: DayOfWeek
202 ).strftime("%Y-%m-%d")
205# FIXME: factor=1 should return an int
206def hex_to_double(value: HexStr4, factor: int = 1) -> float | None:
207 """Convert a 4-char hex string into a double."""
208 if not isinstance(value, str) or len(value) != 4:
209 raise ValueError(f"Invalid value: {value}, is not a 4-char hex string")
210 if value == "7FFF":
211 return None
212 return int(value, 16) / factor
215def hex_from_double(value: float | None, factor: int = 1) -> HexStr4:
216 """Convert a double into 4-char hex string."""
217 if value is None:
218 return "7FFF"
219 if not isinstance(value, float | int):
220 raise ValueError(f"Invalid value: {value}, is not a double (a float/int)")
221 return f"{int(value * factor):04X}"
224def hex_to_dtm(value: HexStr12 | HexStr14) -> str | None: # from parsers
225 """Convert a 12/14-char hex string to an isoformat datetime (naive, local)."""
226 # 00141B0A07E3 (...HH:MM:00) for system_mode, zone_mode (schedules?)
227 # 0400041C0A07E3 (...HH:MM:SS) for sync_datetime
229 if not isinstance(value, str) or len(value) not in (12, 14):
230 raise ValueError(f"Invalid value: {value}, is not a 12/14-char hex string")
231 if value[-12:] == "FF" * 6:
232 return None
233 if len(value) == 12:
234 value = f"00{value}"
235 return dt(
236 year=int(value[10:14], 16),
237 month=int(value[8:10], 16),
238 day=int(value[6:8], 16),
239 hour=int(value[4:6], 16) & 0b11111, # 1st 3 bits: DayOfWeek
240 minute=int(value[2:4], 16),
241 second=int(value[:2], 16) & 0b1111111, # 1st bit: used for DST
242 ).isoformat(timespec="seconds")
245def hex_from_dtm(
246 dtm: date | dt | str | None, is_dst: bool = False, incl_seconds: bool = False
247) -> HexStr12 | HexStr14:
248 """Convert a datetime (isoformat str, or naive dtm) to a 12/14-char hex str."""
250 def _dtm_to_hex(year, mon, mday, hour, min, sec, *args: int) -> str: # type: ignore[no-untyped-def]
251 return f"{sec:02X}{min:02X}{hour:02X}{mday:02X}{mon:02X}{year:04X}"
253 if dtm is None:
254 return "FF" * (7 if incl_seconds else 6)
255 if isinstance(dtm, str):
256 dtm = dt.fromisoformat(dtm)
257 dtm_str = _dtm_to_hex(*dtm.timetuple()) # TODO: add DST for tm_isdst
258 if is_dst:
259 dtm_str = f"{int(dtm_str[:2], 16) | 0x80:02X}" + dtm_str[2:]
260 return dtm_str if incl_seconds else dtm_str[2:]
263def hex_to_dts(value: HexStr12) -> str | None:
264 """YY-MM-DD HH:MM:SS."""
265 if not isinstance(value, str) or len(value) != 12:
266 raise ValueError(f"Invalid value: {value}, is not a 12-char hex string")
267 if value == "00000000007F":
268 return None
269 _seqx = int(value, 16)
270 return dt(
271 year=(_seqx & 0b1111111 << 24) >> 24,
272 month=(_seqx & 0b1111 << 36) >> 36,
273 day=(_seqx & 0b11111 << 31) >> 31,
274 hour=(_seqx & 0b11111 << 19) >> 19,
275 minute=(_seqx & 0b111111 << 13) >> 13,
276 second=(_seqx & 0b111111 << 7) >> 7,
277 ).strftime("%y-%m-%dT%H:%M:%S")
280def hex_from_dts(dtm: dt | str | None) -> HexStr12: # TODO: WIP
281 """Convert a datetime (isoformat str, or dtm) to a packed 12-char hex str."""
282 """YY-MM-DD HH:MM:SS."""
283 if dtm is None:
284 return "00000000007F"
285 if isinstance(dtm, str):
286 try:
287 dtm = dt.strptime(dtm, "%y-%m-%dT%H:%M:%S")
288 except ValueError:
289 dtm = dt.fromisoformat(dtm) # type: ignore[arg-type]
291 (tm_year, tm_mon, tm_mday, tm_hour, tm_min, tm_sec, *_) = dtm.timetuple()
292 result = sum(
293 (
294 tm_year % 100 << 24,
295 tm_mon << 36,
296 tm_mday << 31,
297 tm_hour << 19,
298 tm_min << 13,
299 tm_sec << 7,
300 )
301 )
302 return f"{result:012X}"
305def hex_to_flag8(byte: HexByte, lsb: bool = False) -> list[int]: # TODO: use tuple
306 """Split a hex str (a byte) into a list of 8 bits, MSB as first bit by default.
308 If lsb==True, then the LSB is first.
309 The `lsb` boolean is used so that flag[0] is `zone_idx["00"]`, etc.
310 """
311 if not isinstance(byte, str) or len(byte) != 2:
312 raise ValueError(f"Invalid value: '{byte}', is not a 2-char hex string")
313 if lsb: # make LSB is first bit
314 return list((int(byte, 16) & (1 << x)) >> x for x in range(8))
315 return list((int(byte, 16) & (1 << x)) >> x for x in reversed(range(8)))
318def hex_from_flag8(flags: Iterable[int], lsb: bool = False) -> HexByte:
319 """Convert list of 8 bits, MSB bit 1 by default, to a two-char ASCII hex string.
321 The `lsb` boolean is used so that flag[0] is `zone_idx["00"]`, etc.
322 """
323 if not isinstance(flags, list) or len(flags) != 8:
324 raise ValueError(f"Invalid value: '{flags}', is not a list of 8 bits")
325 if lsb: # LSB is first bit
326 return f"{sum(x << idx for idx, x in enumerate(flags)):02X}"
327 return f"{sum(x << idx for idx, x in enumerate(reversed(flags))):02X}"
330# TODO: add a wrapper for EF, & 0xF0
331def hex_to_percent(
332 value: HexStr2, high_res: bool = True
333) -> float | None: # c.f. valve_demand
334 """Convert a 2-char hex string into a percentage.
336 The range is 0-100%, with resolution of 0.5% (high_res, 00-C8) or 1% (00-64).
337 """
338 if not isinstance(value, str) or len(value) != 2:
339 raise ValueError(f"Invalid value: {value}, is not a 2-char hex string")
340 if value == "EF": # TODO: when EF, when 7F?
341 return None # TODO: raise NotImplementedError
342 if (raw_result := int(value, 16)) & 0xF0 == 0xF0:
343 return None # TODO: raise errors
344 result = float(raw_result) / (200 if high_res else 100)
345 if result > 1.0: # move to outer wrapper
346 raise ValueError(f"Invalid result: {result} (0x{value}) is > 1")
347 return result
350def hex_from_percent(value: float | None, high_res: bool = True) -> HexStr2:
351 """Convert a percentage into a 2-char hex string.
353 The range is 0-100%, with resolution of 0.5% (high_res, 00-C8) or 1% (00-64).
354 """
355 if value is None:
356 return "EF"
357 if not isinstance(value, float | int) or not 0 <= value <= 1:
358 raise ValueError(f"Invalid value: {value}, is not a percentage")
359 result = int(value * (200 if high_res else 100))
360 return f"{result:02X}"
363def hex_to_str(value: str) -> str: # printable ASCII characters
364 """Return a string of printable ASCII characters."""
365 # result = bytearray.fromhex(value).split(b"\x7F")[0] # TODO: needs checking
366 if not isinstance(value, str):
367 raise ValueError(f"Invalid value: {value}, is not a string")
368 result = bytearray([x for x in bytearray.fromhex(value) if 31 < x < 127])
369 return result.decode("ascii").strip() if result else ""
372def hex_from_str(value: str) -> str:
373 """Convert a string to a variable-length ASCII hex string."""
374 if not isinstance(value, str):
375 raise ValueError(f"Invalid value: {value}, is not a string")
376 return "".join(f"{ord(x):02X}" for x in value) # or: value.encode().hex()
379def hex_to_temp(value: HexStr4) -> bool | float | None: # TODO: remove bool
380 """Convert a 4-byte 2's complement hex string to a float temperature ('C).
382 :param value: The 4-character hex string (e.g., '07D0')
383 :type value: HexStr4
384 :return: The temperature in Celsius, or None if N/A
385 :rtype: float | None
386 :raises ValueError: If input is not a 4-char hex string or temperature is invalid.
387 """
388 if not isinstance(value, str) or len(value) != 4:
389 raise ValueError(f"Invalid value: {value}, is not a 4-char hex string")
390 if value == "31FF": # means: N/A (== 127.99, 2s complement), signed?
391 return None
392 if value == "7EFF": # possibly only for setpoints? unsigned?
393 return False
394 if value == "7FFF": # also: FFFF?, means: N/A (== 327.67)
395 return None
396 temp: float = int(value, 16)
397 temp = (temp if temp < 2**15 else temp - 2**16) / 100
398 if temp < -273.15:
399 raise ValueError(f"Invalid value: {temp} (0x{value}) is < -273.15")
400 return temp
403def hex_from_temp(value: bool | float | None) -> HexStr4:
404 """Convert a float to a 2's complement 4-byte hex string."""
405 if value is None:
406 return "7FFF" # or: "31FF"?
407 if value is False:
408 return "7EFF"
409 if not isinstance(value, float | int):
410 raise TypeError(f"Invalid temp: {value} is not a float")
411 # if not -(2**7) <= value < 2**7: # TODO: tighten range
412 # raise ValueError(f"Invalid temp: {value} is out of range")
413 temp = int(value * 100)
414 return f"{temp if temp >= 0 else temp + 2**16:04X}"
417########################################################################################
420def parse_fault_log_entry(
421 payload: str,
422) -> PayDictT.FAULT_LOG_ENTRY | PayDictT.FAULT_LOG_ENTRY_NULL:
423 """Return the fault log entry."""
425 assert len(payload) == 44
427 # NOTE: the log_idx will increment as the entry moves down the log, hence '_log_idx'
429 # these are only useful for I_, not RP
430 if (timestamp := hex_to_dts(payload[18:30])) is None:
431 return {f"_{SZ_LOG_IDX}": payload[4:6]} # type: ignore[misc,return-value]
433 result: PayDictT.FAULT_LOG_ENTRY = {
434 f"_{SZ_LOG_IDX}": payload[4:6], # type: ignore[misc]
435 SZ_TIMESTAMP: timestamp,
436 SZ_FAULT_STATE: FAULT_STATE.get(payload[2:4], FaultState.UNKNOWN),
437 SZ_FAULT_TYPE: FAULT_TYPE.get(payload[8:10], FaultType.UNKNOWN),
438 SZ_DOMAIN_IDX: payload[10:12],
439 SZ_DEVICE_CLASS: FAULT_DEVICE_CLASS.get(
440 payload[12:14], FaultDeviceClass.UNKNOWN
441 ),
442 SZ_DEVICE_ID: hex_id_to_dev_id(payload[38:]),
443 "_unknown_3": payload[6:8], # B0 ?priority
444 "_unknown_7": payload[14:18], # 0000
445 "_unknown_15": payload[30:38], # FFFF7000/1/2
446 }
448 return result
451def _faulted_common(param_name: str, value: str) -> dict[str, str]:
452 return {f"{param_name}_fault": f"invalid_{value}"}
455def _faulted_sensor(param_name: str, value: str) -> dict[str, str]:
456 # assert value[:1] in ("8", "F"), value
457 code = int(value[:2], 16) & 0xF
458 fault = SENSOR_FAULT_CODES.get(code, f"invalid_{value}")
459 return {f"{param_name}_fault": fault}
462def _faulted_device(param_name: str, value: str) -> dict[str, str]:
463 assert value[:1] in ("8", "F"), value
464 code = int(value[:2], 16) & 0xF
465 fault: str = DEVICE_FAULT_CODES.get(code, f"invalid_{value}")
466 return {f"{param_name}_fault": fault}
469# TODO: refactor as per 31DA parsers
470def parse_valve_demand(
471 value: HexStr2,
472) -> dict[str, float] | dict[str, str] | dict[str, None]:
473 """Convert a 2-char hex string into a percentage.
475 The range is 0-100%, with resolution of 0.5% (high_res) or 1%.
476 """ # for a damper (restricts flow), or a valve (permits flow)
478 # TODO: remove this...
479 if not isinstance(value, str) or len(value) != 2:
480 raise ValueError(f"Invalid value: {value}, is not a 2-char hex string")
482 if value == "EF":
483 return {SZ_HEAT_DEMAND: None} # Not Implemented
485 if int(value, 16) & 0xF0 == 0xF0:
486 return _faulted_device(SZ_HEAT_DEMAND, value)
488 result = int(value, 16) / 200 # c.f. hex_to_percent
489 if result == 1.01: # HACK - does it mean maximum?
490 result = 1.0
491 elif result > 1.0:
492 raise ValueError(f"Invalid result: {result} (0x{value}) is > 1")
494 return {SZ_HEAT_DEMAND: result}
497AIR_QUALITY_BASIS: dict[str, str] = {
498 "10": "voc", # volatile compounds
499 "20": "co2", # carbon dioxide
500 "40": "rel_humidity", # relative humidity
501}
504# 31DA[2:6] and 12C8[2:6]
505def parse_air_quality(value: HexStr4) -> PayDictT.AIR_QUALITY:
506 """Return the air quality percentage (0.0 to 1.0) and its basis.
508 The basis of the air quality level should be one of: VOC, CO2 or relative humidity.
509 If air_quality is EF, air_quality_basis should be 00.
511 The sensor value is None if there is no sensor present (is not an error).
512 The dict does not include the key if there is a sensor fault.
514 :param value: The 4-character hex string encoding quality and basis
515 :type value: HexStr4
516 :return: A dictionary containing the air quality and its basis (e.g., CO2, VOC)
517 :rtype: PayDictT.AIR_QUALITY
518 """ # VOC: Volatile organic compounds
520 # TODO: remove this as API used only internally...
521 if not isinstance(value, str) or len(value) != 4:
522 raise ValueError(f"Invalid value: {value}, is not a 4-char hex string")
524 assert value[:2] != "EF" or value[2:] == "00", value # TODO: raise exception
525 if value == "EF00": # Not implemented
526 return {SZ_AIR_QUALITY: None}
528 if int(value[:2], 16) & 0xF0 == 0xF0:
529 return _faulted_sensor(SZ_AIR_QUALITY, value) # type: ignore[return-value]
531 level = int(value[:2], 16) / 200 # was: hex_to_percent(value[:2])
532 assert level <= 1.0, value[:2] # TODO: raise exception
534 assert value[2:] in ("10", "20", "40"), value[2:] # TODO: remove assert
536 basis: str = AIR_QUALITY_BASIS.get(
537 value[2:], f"unknown_{value[2:]}"
538 ) # TODO: remove get/unknown
540 return {SZ_AIR_QUALITY: level, SZ_AIR_QUALITY_BASIS: basis}
543def air_quality_code(desc: str) -> str:
544 for k, v in AIR_QUALITY_BASIS.items():
545 if v == desc:
546 return k
547 return "00"
550# 31DA[6:10] and 1298[2:6]
551def parse_co2_level(value: HexStr4) -> PayDictT.CO2_LEVEL:
552 """Return the co2 level (ppm).
554 The sensor value is None if there is no sensor present (is not an error).
555 The dict does not include the key if there is a sensor fault.
556 """
558 # TODO: remove this...
559 if not isinstance(value, str) or len(value) != 4:
560 raise ValueError(f"Invalid value: {value}, is not a 4-char hex string")
562 if value == "7FFF": # Not implemented
563 return {SZ_CO2_LEVEL: None}
565 level = int(value, 16) # was: hex_to_double(value) # is it 2's complement?
567 if int(value[:2], 16) & 0x80 or level >= 0x8000:
568 return _faulted_sensor(SZ_CO2_LEVEL, value) # type: ignore[return-value]
570 # assert int(value[:2], 16) <= 0x8000, value
571 return {SZ_CO2_LEVEL: level}
574def parse_humidity_element(value: str, index: str) -> PayDictT._12A0:
575 """Return the relative humidity (%) and 2 temperatures
577 The result may include current temperature ('C) and include dewpoint temperature ('C).
578 """
579 if index == "01":
580 return _parse_hvac_humidity(SZ_REL_HUMIDITY, value[:2], value[2:6], value[6:10]) # type: ignore[return-value]
581 if index == "02":
582 return _parse_hvac_humidity(
583 SZ_OUTDOOR_HUMIDITY, value[:2], value[2:6], value[6:10]
584 ) # type: ignore[return-value]
585 return _parse_hvac_humidity(SZ_INDOOR_HUMIDITY, value[:2], value[2:6], value[6:10]) # type: ignore[return-value]
588# 31DA[10:12] and 12A0[2:12]
589def parse_indoor_humidity(value: str) -> PayDictT.INDOOR_HUMIDITY:
590 """Return the relative indoor humidity (%).
592 The result may include current temperature ('C), and dewpoint temperature ('C).
593 """
594 return _parse_hvac_humidity(SZ_INDOOR_HUMIDITY, value[:2], value[2:6], value[6:10]) # type: ignore[return-value]
597# 31DA[12:14] and 1280[2:12]
598def parse_outdoor_humidity(value: str) -> PayDictT.OUTDOOR_HUMIDITY:
599 """Return the relative outdoor humidity (%).
601 The result may include current temperature ('C), and dewpoint temperature ('C).
602 """
603 return _parse_hvac_humidity(SZ_OUTDOOR_HUMIDITY, value[:2], value[2:6], value[6:10]) # type: ignore[return-value]
606def _parse_hvac_humidity(
607 param_name: str, value: HexStr2, temp: HexStr4, dewpoint: HexStr4
608) -> ReturnValueDictT:
609 """Return the relative humidity, etc. (called by sensor parsers).
611 The sensor value is None if there is no sensor present (is not an error).
612 The dict does not include the key if there is a sensor fault.
613 """
615 # TODO: remove this...
616 if not isinstance(value, str) or len(value) != 2:
617 raise ValueError(f"Invalid value: {value}, is not a 2-char hex string")
618 if not isinstance(temp, str) or len(temp) not in (0, 4):
619 raise ValueError(f"Invalid temp: {temp}, is not a 4-char hex string")
620 if not isinstance(dewpoint, str) or len(dewpoint) not in (0, 4):
621 raise ValueError(f"Invalid dewpoint: {dewpoint}, is not a 4-char hex string")
623 if value == "EF": # Not implemented
624 return {param_name: None}
626 if int(value, 16) & 0xF0 == 0xF0:
627 return _faulted_sensor(param_name, value)
629 percentage = hex_to_percent(value, False) # TODO: confirm not /200
631 result: dict[str, float | str | None] = {param_name: percentage}
632 if temp:
633 result |= {SZ_TEMPERATURE: hex_to_temp(temp)}
634 if dewpoint:
635 result |= {SZ_DEWPOINT_TEMP: hex_to_temp(dewpoint)}
636 return result
639# 31DA[14:18]
640def parse_exhaust_temp(value: HexStr4) -> PayDictT.EXHAUST_TEMP:
641 """Return the exhaust temperature ('C)."""
642 return _parse_hvac_temp(SZ_EXHAUST_TEMP, value) # type: ignore[return-value]
645# 31DA[18:22]
646def parse_supply_temp(value: HexStr4) -> PayDictT.SUPPLY_TEMP:
647 """Return the supply temperature ('C)."""
648 return _parse_hvac_temp(SZ_SUPPLY_TEMP, value) # type: ignore[return-value]
651# 31DA[22:26]
652def parse_indoor_temp(value: HexStr4) -> PayDictT.INDOOR_TEMP:
653 """Return the indoor temperature ('C)."""
654 return _parse_hvac_temp(SZ_INDOOR_TEMP, value) # type: ignore[return-value]
657# 31DA[26:30] & 1290[2:6]?
658def parse_outdoor_temp(value: HexStr4) -> PayDictT.OUTDOOR_TEMP:
659 """Return the outdoor temperature ('C)."""
660 return _parse_hvac_temp(SZ_OUTDOOR_TEMP, value) # type: ignore[return-value]
663def _parse_hvac_temp(param_name: str, value: HexStr4) -> Mapping[str, float | None]:
664 """Return the temperature ('C) (called by sensor parsers).
666 The sensor value is None if there is no sensor present (is not an error).
667 The dict does not include the key if there is a sensor fault.
668 """
670 # TODO: remove this...
671 if not isinstance(value, str) or len(value) != 4:
672 raise ValueError(f"Invalid value: {value}, is not a 4-char hex string")
674 if value == "7FFF": # Not implemented
675 return {param_name: None}
676 if value == "31FF": # Other
677 return {param_name: None}
679 if int(value[:2], 16) & 0xF0 == 0x80: # or temperature < -273.15:
680 return _faulted_sensor(param_name, value) # type: ignore[return-value]
682 temp: float = int(value, 16)
683 temp = (temp if temp < 2**15 else temp - 2**16) / 100
684 if temp <= -273: # TODO: < 273.15?
685 return _faulted_sensor(param_name, value) # type: ignore[return-value]
687 return {param_name: temp}
690ABILITIES = {
691 15: "off",
692 14: "low_med_high", # 3,2,1 = high,med,low?
693 13: "timer",
694 12: "boost",
695 11: "auto",
696 10: "speed_4",
697 9: "speed_5",
698 8: "speed_6",
699 7: "speed_7",
700 6: "speed_8",
701 5: "speed_9",
702 4: "speed_10",
703 3: "auto_night",
704 2: "reserved",
705 1: "post_heater",
706 0: "pre_heater",
707}
710# 31DA[30:34]
711def parse_capabilities(value: HexStr4) -> PayDictT.CAPABILITIES:
712 """Return the speed capabilities (a bitmask).
714 The sensor value is None if there is no sensor present (is not an error).
715 The dict does not include the key if there is a sensor fault.
716 """
718 # TODO: remove this...
719 if not isinstance(value, str) or len(value) != 4:
720 raise ValueError(f"Invalid value: {value}, is not a 4-char hex string")
722 if value == "7FFF": # TODO: Not implemented???
723 return {SZ_SPEED_CAPABILITIES: None}
725 # assert value in ("0002", "4000", "4808", "F000", "F001", "F800", "F808"), value
727 return {
728 SZ_SPEED_CAPABILITIES: [
729 v for k, v in ABILITIES.items() if int(value, 16) & 2**k
730 ]
731 }
734def capability_bits(cap_list: list[str]) -> int:
735 # 0xF800 = 0b1111100000000000
736 cap_res: int = 0
737 for cap in cap_list:
738 for k, v in ABILITIES.items():
739 if v == cap:
740 cap_res |= 2**k # set bit
741 return cap_res
744# 31DA[34:36]
745def parse_bypass_position(value: HexStr2) -> PayDictT.BYPASS_POSITION:
746 """Return the bypass position (%), usually fully open or closed (0%, no bypass).
748 The sensor value is None if there is no sensor present (is not an error).
749 The dict does not include the key if there is a sensor fault.
750 """
752 # TODO: remove this...
753 if not isinstance(value, str) or len(value) != 2:
754 raise ValueError(f"Invalid value: {value}, is not a 2-char hex string")
756 if value == "EF": # Not implemented
757 return {SZ_BYPASS_POSITION: None}
759 if int(value[:2], 16) & 0xF0 == 0xF0:
760 return _faulted_device(SZ_BYPASS_POSITION, value) # type: ignore[return-value]
762 bypass_pos = int(value, 16) / 200 # was: hex_to_percent(value)
763 assert bypass_pos <= 1.0, value
765 return {SZ_BYPASS_POSITION: bypass_pos}
768# 31DA[36:38] # TODO: WIP (3 more bits), also 22F3 and 22F4?
769def parse_fan_info(value: HexStr2) -> PayDictT.FAN_INFO:
770 """Return the fan state (lookup table for current speed and mode).
772 The sensor value is None if there is no sensor present (is not an error).
773 The dict does not include the key if there is a sensor fault.
774 """
776 # TODO: remove this...
777 if not isinstance(value, str) or len(value) != 2:
778 raise ValueError(f"Invalid value: {value}, is not a 2-char hex string")
780 # if value == "EF": # TODO: Not implemented???
781 # return {SZ_FAN_INFO: None}
783 assert int(value, 16) & 0xE0 in (
784 0x00,
785 0x20,
786 0x40,
787 0x60,
788 0x80,
789 ), f"invalid fan_info: {int(value, 16) & 0xE0}"
791 flags = list((int(value, 16) & (1 << x)) >> x for x in range(7, 4, -1))
793 return {
794 SZ_FAN_INFO: _31DA_FAN_INFO[
795 int(value, 16) & 0x1F
796 ], # lookup description from code
797 "_unknown_fan_info_flags": flags,
798 }
801def fan_info_to_byte(info: str) -> int:
802 for k, v in _31DA_FAN_INFO.items():
803 if v == info:
804 return int(k) & 0x1F
805 return 0x0000
808def fan_info_flags(flags_list: list[int]) -> int:
809 flag_res: int = 0
810 for index, shift in enumerate(range(7, 4, -1)): # index = 7, 6 and 5
811 if flags_list[index] == 1:
812 flag_res |= 1 << shift # set bits
813 return flag_res
816# 31DA[38:40], also 2210
817def parse_exhaust_fan_speed(value: HexStr2) -> PayDictT.EXHAUST_FAN_SPEED:
818 """Return the exhaust fan speed (% of max speed)."""
819 return _parse_fan_speed(SZ_EXHAUST_FAN_SPEED, value) # type: ignore[return-value]
822# 31DA[40:42]
823def parse_supply_fan_speed(value: HexStr2) -> PayDictT.SUPPLY_FAN_SPEED:
824 """Return the supply fan speed (% of max speed)."""
825 return _parse_fan_speed(SZ_SUPPLY_FAN_SPEED, value) # type: ignore[return-value]
828def _parse_fan_speed(param_name: str, value: HexStr2) -> Mapping[str, float | None]:
829 """Return the fan speed (called by sensor parsers).
831 The sensor value is None if there is no sensor present (is not an error).
832 The dict does not include the key if there is a sensor fault.
833 """
835 # TODO: remove this...
836 if not isinstance(value, str) or len(value) != 2:
837 raise ValueError(f"Invalid value: {value}, is not a 2-char hex string")
839 if value == "FF": # Not implemented (is definitely FF, not EF!)
840 return {param_name: None}
842 percentage = int(value, 16) / 200 # was: hex_to_percent(value)
843 if percentage > 1.0:
844 return _faulted_common(param_name, value) # type: ignore[return-value]
846 return {param_name: percentage}
849# 31DA[42:46] & 22F3[2:6] # TODO: make 22F3-friendly
850def parse_remaining_mins(value: HexStr4) -> PayDictT.REMAINING_MINUTES:
851 """Return the remaining time for temporary modes (whole minutes).
853 The sensor value is None if there is no sensor present (is not an error).
854 The dict does not include the key if there is a sensor fault.
855 """
857 # TODO: remove this...
858 if not isinstance(value, str) or len(value) != 4:
859 raise ValueError(f"Invalid value: {value}, is not a 4-char hex string")
861 if value == "0000":
862 return {SZ_REMAINING_MINS: 0}
863 if value == "3FFF":
864 return {SZ_REMAINING_MINS: None}
866 minutes = int(value, 16) # was: hex_to_double(value)
867 assert minutes > 0, value # TODO: raise assert
869 return {SZ_REMAINING_MINS: minutes} # usu. 0-60 mins
872# 31DA[46:48]
873def parse_post_heater(value: HexStr2) -> PayDictT.POST_HEATER:
874 """Return the post-heater state (% of max heat)."""
875 return _parse_fan_heater(SZ_POST_HEAT, value) # type: ignore[return-value]
878# 31DA[48:50]
879def parse_pre_heater(value: HexStr2) -> PayDictT.PRE_HEATER:
880 """Return the pre-heater state (% of max heat)."""
881 return _parse_fan_heater(SZ_PRE_HEAT, value) # type: ignore[return-value]
884def _parse_fan_heater(param_name: str, value: HexStr2) -> Mapping[str, float | None]:
885 """Return the heater state (called by sensor parsers).
887 The sensor value is None if there is no sensor present (is not an error).
888 The dict does not include the key if there is a sensor fault.
889 """
891 # TODO: remove this...
892 if not isinstance(value, str) or len(value) != 2:
893 raise ValueError(f"Invalid value: {value}, is not a 2-char hex string")
895 if value == "EF": # Not implemented
896 return {param_name: None}
898 if int(value, 16) & 0xF0 == 0xF0:
899 return _faulted_sensor(param_name, value) # type: ignore[return-value]
901 percentage = int(value, 16) / 200 # Siber DF EVO 2 is /200, not /100 (Others?)
902 assert percentage <= 1.0, value # TODO: raise exception if > 1.0?
904 return {param_name: percentage} # was: percent_from_hex(value, high_res=False)
907# 31DA[50:54]
908def parse_supply_flow(value: HexStr4) -> PayDictT.SUPPLY_FLOW:
909 """Return the supply flow rate in m^3/hr (Orcon) ?or L/sec (?Itho)."""
910 return _parse_fan_flow(SZ_SUPPLY_FLOW, value) # type: ignore[return-value]
913# 31DA[54:58]
914def parse_exhaust_flow(value: HexStr4) -> PayDictT.EXHAUST_FLOW:
915 """Return the exhaust flow rate in m^3/hr (Orcon) ?or L/sec (?Itho)"""
916 return _parse_fan_flow(SZ_EXHAUST_FLOW, value) # type: ignore[return-value]
919def _parse_fan_flow(param_name: str, value: HexStr4) -> Mapping[str, float | None]:
920 """Return the air flow rate (called by sensor parsers).
922 The sensor value is None if there is no sensor present (is not an error).
923 The dict does not include the key if there is a sensor fault.
924 """
926 # TODO: remove this...
927 if not isinstance(value, str) or len(value) != 4:
928 raise ValueError(f"Invalid value: {value}, is not a 4-char hex string")
930 if value == "7FFF": # Not implemented
931 return {param_name: None}
933 if int(value[:2], 16) & 0x80:
934 return _faulted_sensor(param_name, value) # type: ignore[return-value]
936 flow = int(value, 16) / 100 # was: hex_to_double(value, factor=100)
937 assert flow >= 0, value # TODO: raise exception if < 0?
939 return {param_name: flow}