Coverage for src/ramses_tx/helpers.py: 23%

373 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - Protocol/Transport layer - Helper functions.""" 

3 

4from __future__ import annotations 

5 

6import ctypes 

7import sys 

8import time 

9from collections.abc import Iterable, Mapping 

10from datetime import date, datetime as dt 

11from typing import TYPE_CHECKING, Final, Literal, TypeAlias 

12 

13from .address import hex_id_to_dev_id 

14from .const import ( 

15 FAULT_DEVICE_CLASS, 

16 FAULT_STATE, 

17 FAULT_TYPE, 

18 SZ_AIR_QUALITY, 

19 SZ_AIR_QUALITY_BASIS, 

20 SZ_BYPASS_POSITION, 

21 SZ_CO2_LEVEL, 

22 SZ_DEVICE_CLASS, 

23 SZ_DEVICE_ID, 

24 SZ_DEWPOINT_TEMP, 

25 SZ_DOMAIN_IDX, 

26 SZ_EXHAUST_FAN_SPEED, 

27 SZ_EXHAUST_FLOW, 

28 SZ_EXHAUST_TEMP, 

29 SZ_FAN_INFO, 

30 SZ_FAULT_STATE, 

31 SZ_FAULT_TYPE, 

32 SZ_HEAT_DEMAND, 

33 SZ_INDOOR_HUMIDITY, 

34 SZ_INDOOR_TEMP, 

35 SZ_LOG_IDX, 

36 SZ_OUTDOOR_HUMIDITY, 

37 SZ_OUTDOOR_TEMP, 

38 SZ_POST_HEAT, 

39 SZ_PRE_HEAT, 

40 SZ_REL_HUMIDITY, 

41 SZ_REMAINING_MINS, 

42 SZ_SPEED_CAPABILITIES, 

43 SZ_SUPPLY_FAN_SPEED, 

44 SZ_SUPPLY_FLOW, 

45 SZ_SUPPLY_TEMP, 

46 SZ_TEMPERATURE, 

47 SZ_TIMESTAMP, 

48 FaultDeviceClass, 

49 FaultState, 

50 FaultType, 

51) 

52from .ramses import _31DA_FAN_INFO 

53 

54if TYPE_CHECKING: 

55 from .typed_dicts import PayDictT 

56 

57# Sensor faults 

58SZ_UNRELIABLE: Final = "unreliable" 

59SZ_TOO_HIGH: Final = "out_of_range_high" 

60SZ_TOO_LOW: Final = "out_of_range_low" 

61# Actuator, Valve/damper faults 

62SZ_STUCK_VALVE: Final = "stuck_valve" # Damper/Valve jammed 

63SZ_STUCK_ACTUATOR: Final = "stuck_actuator" # Actuator jammed 

64# Common (to both) faults 

65SZ_OPEN_CIRCUIT: Final = "open_circuit" 

66SZ_SHORT_CIRCUIT: Final = "short_circuit" 

67SZ_UNAVAILABLE: Final = "unavailable" 

68SZ_OTHER_FAULT: Final = "other_fault" # Non-specific fault 

69 

70DEVICE_FAULT_CODES = { 

71 0x0: SZ_OPEN_CIRCUIT, # NOTE: open, short 

72 0x1: SZ_SHORT_CIRCUIT, 

73 0x2: SZ_UNAVAILABLE, 

74 0xD: SZ_STUCK_VALVE, 

75 0xE: SZ_STUCK_ACTUATOR, 

76 0xF: SZ_OTHER_FAULT, 

77} 

78SENSOR_FAULT_CODES = { 

79 0x0: SZ_SHORT_CIRCUIT, # NOTE: short, open 

80 0x1: SZ_OPEN_CIRCUIT, 

81 0x2: SZ_UNAVAILABLE, 

82 0x3: SZ_TOO_HIGH, 

83 0x4: SZ_TOO_LOW, 

84 0x5: SZ_UNRELIABLE, 

85 # 0xF: SZ_OTHER_FAULT, # No evidence is explicitly part of the specification 

86} 

87 

88 

89# TODO: consider returning from helpers as TypeGuard[HexByte] 

90# fmt: off 

91HexByteAlt = Literal[ 

92 '00', '01', '02', '03', '04', '05', '06', '07', '08', '09', '0A', '0B', '0C', '0D', '0E', '0F', 

93 '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '1A', '1B', '1C', '1D', '1E', '1F', 

94 '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '2A', '2B', '2C', '2D', '2E', '2F', 

95 '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '3A', '3B', '3C', '3D', '3E', '3F', 

96 '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '4A', '4B', '4C', '4D', '4E', '4F', 

97 '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '5A', '5B', '5C', '5D', '5E', '5F', 

98 '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '6A', '6B', '6C', '6D', '6E', '6F', 

99 '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '7A', '7B', '7C', '7D', '7E', '7F', 

100 '80', '81', '82', '83', '84', '85', '86', '87', '88', '89', '8A', '8B', '8C', '8D', '8E', '8F', 

101 '90', '91', '92', '93', '94', '95', '96', '97', '98', '99', '9A', '9B', '9C', '9D', '9E', '9F', 

102 'A0', 'A1', 'A2', 'A3', 'A4', 'A5', 'A6', 'A7', 'A8', 'A9', 'AA', 'AB', 'AC', 'AD', 'AE', 'AF', 

103 'B0', 'B1', 'B2', 'B3', 'B4', 'B5', 'B6', 'B7', 'B8', 'B9', 'BA', 'BB', 'BC', 'BD', 'BE', 'BF', 

104 'C0', 'C1', 'C2', 'C3', 'C4', 'C5', 'C6', 'C7', 'C8', 'C9', 'CA', 'CB', 'CC', 'CD', 'CE', 'CF', 

105 'D0', 'D1', 'D2', 'D3', 'D4', 'D5', 'D6', 'D7', 'D8', 'D9', 'DA', 'DB', 'DC', 'DD', 'DE', 'DF', 

106 'E0', 'E1', 'E2', 'E3', 'E4', 'E5', 'E6', 'E7', 'E8', 'E9', 'EA', 'EB', 'EC', 'ED', 'EE', 'EF', 

107 'F0', 'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', 'F9', 'FA', 'FB', 'FC', 'FD', 'FE', 'FF' 

108] 

109# fmt: on 

110 

111HexByte: TypeAlias = str 

112HexStr2: TypeAlias = str # two characters, one byte 

113HexStr4: TypeAlias = str 

114HexStr8: TypeAlias = str 

115HexStr12: TypeAlias = str 

116HexStr14: TypeAlias = str 

117 

118 

119ReturnValueDictT: TypeAlias = Mapping[str, float | str | None] 

120 

121 

122class _FILE_TIME(ctypes.Structure): 

123 """Data structure for GetSystemTimePreciseAsFileTime().""" 

124 

125 _fields_ = [("dwLowDateTime", ctypes.c_uint), ("dwHighDateTime", ctypes.c_uint)] 

126 

127 

128file_time = _FILE_TIME() 

129 

130 

131def timestamp() -> float: 

132 """Return the number of seconds since the Unix epoch. 

133 

134 This function attempts to return a high-precision value, using specific 

135 system calls on Windows if available. 

136 :return: The current timestamp in seconds. 

137 :rtype: float 

138 """ 

139 

140 # see: https://www.python.org/dev/peps/pep-0564/ 

141 if sys.platform == "win32": 

142 # Windows uses a different epoch (1601-01-01) 

143 ctypes.windll.kernel32.GetSystemTimePreciseAsFileTime(ctypes.byref(file_time)) 

144 _time = (file_time.dwLowDateTime + (file_time.dwHighDateTime << 32)) / 1e7 

145 return float(_time - 134774 * 24 * 60 * 60) 

146 else: 

147 # Linux/macOS uses the Unix epoch (1970-01-01) 

148 return time.time_ns() / 1e9 

149 

150 

151def dt_now() -> dt: 

152 """Return the current datetime as a local/naive datetime object. 

153 

154 This is slower, but potentially more accurate, than dt.now(), and is used mainly for 

155 packet timestamps. 

156 

157 :return: The current local datetime. 

158 :rtype: dt 

159 """ 

160 if sys.platform == "win32": 

161 return dt.fromtimestamp(timestamp()) 

162 else: 

163 return dt.now() 

164 

165 

166def dt_str() -> str: 

167 """Return the current datetime as an isoformat string.""" 

168 return dt_now().isoformat(timespec="microseconds") 

169 

170 

171#################################################################################################### 

172 

173 

174def hex_to_bool(value: HexStr2) -> bool | None: # either False, True or None 

175 """Convert a 2-char hex string into a boolean.""" 

176 if not isinstance(value, str) or len(value) != 2: 

177 raise ValueError(f"Invalid value: {value}, is not a 2-char hex string") 

178 if value == "FF": 

179 return None 

180 return {"00": False, "C8": True}[value] 

181 

182 

183def hex_from_bool(value: bool | None) -> HexStr2: # either 00, C8 or FF 

184 """Convert a boolean into a 2-char hex string.""" 

185 if value is None: 

186 return "FF" 

187 if not isinstance(value, bool): 

188 raise ValueError(f"Invalid value: {value}, is not bool") 

189 return {False: "00", True: "C8"}[value] 

190 

191 

192def hex_to_date(value: HexStr8) -> str | None: # YY-MM-DD 

193 """Convert am 8-char hex string into a date, format YY-MM-DD.""" 

194 if not isinstance(value, str) or len(value) != 8: 

195 raise ValueError(f"Invalid value: {value}, is not an 8-char hex string") 

196 if value == "FFFFFFFF": 

197 return None 

198 return dt( 

199 year=int(value[4:8], 16), 

200 month=int(value[2:4], 16), 

201 day=int(value[:2], 16) & 0b11111, # 1st 3 bits: DayOfWeek 

202 ).strftime("%Y-%m-%d") 

203 

204 

205# FIXME: factor=1 should return an int 

206def hex_to_double(value: HexStr4, factor: int = 1) -> float | None: 

207 """Convert a 4-char hex string into a double.""" 

208 if not isinstance(value, str) or len(value) != 4: 

209 raise ValueError(f"Invalid value: {value}, is not a 4-char hex string") 

210 if value == "7FFF": 

211 return None 

212 return int(value, 16) / factor 

213 

214 

215def hex_from_double(value: float | None, factor: int = 1) -> HexStr4: 

216 """Convert a double into 4-char hex string.""" 

217 if value is None: 

218 return "7FFF" 

219 if not isinstance(value, float | int): 

220 raise ValueError(f"Invalid value: {value}, is not a double (a float/int)") 

221 return f"{int(value * factor):04X}" 

222 

223 

224def hex_to_dtm(value: HexStr12 | HexStr14) -> str | None: # from parsers 

225 """Convert a 12/14-char hex string to an isoformat datetime (naive, local).""" 

226 # 00141B0A07E3 (...HH:MM:00) for system_mode, zone_mode (schedules?) 

227 # 0400041C0A07E3 (...HH:MM:SS) for sync_datetime 

228 

229 if not isinstance(value, str) or len(value) not in (12, 14): 

230 raise ValueError(f"Invalid value: {value}, is not a 12/14-char hex string") 

231 if value[-12:] == "FF" * 6: 

232 return None 

233 if len(value) == 12: 

234 value = f"00{value}" 

235 return dt( 

236 year=int(value[10:14], 16), 

237 month=int(value[8:10], 16), 

238 day=int(value[6:8], 16), 

239 hour=int(value[4:6], 16) & 0b11111, # 1st 3 bits: DayOfWeek 

240 minute=int(value[2:4], 16), 

241 second=int(value[:2], 16) & 0b1111111, # 1st bit: used for DST 

242 ).isoformat(timespec="seconds") 

243 

244 

245def hex_from_dtm( 

246 dtm: date | dt | str | None, is_dst: bool = False, incl_seconds: bool = False 

247) -> HexStr12 | HexStr14: 

248 """Convert a datetime (isoformat str, or naive dtm) to a 12/14-char hex str.""" 

249 

250 def _dtm_to_hex(year, mon, mday, hour, min, sec, *args: int) -> str: # type: ignore[no-untyped-def] 

251 return f"{sec:02X}{min:02X}{hour:02X}{mday:02X}{mon:02X}{year:04X}" 

252 

253 if dtm is None: 

254 return "FF" * (7 if incl_seconds else 6) 

255 if isinstance(dtm, str): 

256 dtm = dt.fromisoformat(dtm) 

257 dtm_str = _dtm_to_hex(*dtm.timetuple()) # TODO: add DST for tm_isdst 

258 if is_dst: 

259 dtm_str = f"{int(dtm_str[:2], 16) | 0x80:02X}" + dtm_str[2:] 

260 return dtm_str if incl_seconds else dtm_str[2:] 

261 

262 

263def hex_to_dts(value: HexStr12) -> str | None: 

264 """YY-MM-DD HH:MM:SS.""" 

265 if not isinstance(value, str) or len(value) != 12: 

266 raise ValueError(f"Invalid value: {value}, is not a 12-char hex string") 

267 if value == "00000000007F": 

268 return None 

269 _seqx = int(value, 16) 

270 return dt( 

271 year=(_seqx & 0b1111111 << 24) >> 24, 

272 month=(_seqx & 0b1111 << 36) >> 36, 

273 day=(_seqx & 0b11111 << 31) >> 31, 

274 hour=(_seqx & 0b11111 << 19) >> 19, 

275 minute=(_seqx & 0b111111 << 13) >> 13, 

276 second=(_seqx & 0b111111 << 7) >> 7, 

277 ).strftime("%y-%m-%dT%H:%M:%S") 

278 

279 

280def hex_from_dts(dtm: dt | str | None) -> HexStr12: # TODO: WIP 

281 """Convert a datetime (isoformat str, or dtm) to a packed 12-char hex str.""" 

282 """YY-MM-DD HH:MM:SS.""" 

283 if dtm is None: 

284 return "00000000007F" 

285 if isinstance(dtm, str): 

286 try: 

287 dtm = dt.strptime(dtm, "%y-%m-%dT%H:%M:%S") 

288 except ValueError: 

289 dtm = dt.fromisoformat(dtm) # type: ignore[arg-type] 

290 

291 (tm_year, tm_mon, tm_mday, tm_hour, tm_min, tm_sec, *_) = dtm.timetuple() 

292 result = sum( 

293 ( 

294 tm_year % 100 << 24, 

295 tm_mon << 36, 

296 tm_mday << 31, 

297 tm_hour << 19, 

298 tm_min << 13, 

299 tm_sec << 7, 

300 ) 

301 ) 

302 return f"{result:012X}" 

303 

304 

305def hex_to_flag8(byte: HexByte, lsb: bool = False) -> list[int]: # TODO: use tuple 

306 """Split a hex str (a byte) into a list of 8 bits, MSB as first bit by default. 

307 

308 If lsb==True, then the LSB is first. 

309 The `lsb` boolean is used so that flag[0] is `zone_idx["00"]`, etc. 

310 """ 

311 if not isinstance(byte, str) or len(byte) != 2: 

312 raise ValueError(f"Invalid value: '{byte}', is not a 2-char hex string") 

313 if lsb: # make LSB is first bit 

314 return list((int(byte, 16) & (1 << x)) >> x for x in range(8)) 

315 return list((int(byte, 16) & (1 << x)) >> x for x in reversed(range(8))) 

316 

317 

318def hex_from_flag8(flags: Iterable[int], lsb: bool = False) -> HexByte: 

319 """Convert list of 8 bits, MSB bit 1 by default, to a two-char ASCII hex string. 

320 

321 The `lsb` boolean is used so that flag[0] is `zone_idx["00"]`, etc. 

322 """ 

323 if not isinstance(flags, list) or len(flags) != 8: 

324 raise ValueError(f"Invalid value: '{flags}', is not a list of 8 bits") 

325 if lsb: # LSB is first bit 

326 return f"{sum(x << idx for idx, x in enumerate(flags)):02X}" 

327 return f"{sum(x << idx for idx, x in enumerate(reversed(flags))):02X}" 

328 

329 

330# TODO: add a wrapper for EF, & 0xF0 

331def hex_to_percent( 

332 value: HexStr2, high_res: bool = True 

333) -> float | None: # c.f. valve_demand 

334 """Convert a 2-char hex string into a percentage. 

335 

336 The range is 0-100%, with resolution of 0.5% (high_res, 00-C8) or 1% (00-64). 

337 """ 

338 if not isinstance(value, str) or len(value) != 2: 

339 raise ValueError(f"Invalid value: {value}, is not a 2-char hex string") 

340 if value == "EF": # TODO: when EF, when 7F? 

341 return None # TODO: raise NotImplementedError 

342 if (raw_result := int(value, 16)) & 0xF0 == 0xF0: 

343 return None # TODO: raise errors 

344 result = float(raw_result) / (200 if high_res else 100) 

345 if result > 1.0: # move to outer wrapper 

346 raise ValueError(f"Invalid result: {result} (0x{value}) is > 1") 

347 return result 

348 

349 

350def hex_from_percent(value: float | None, high_res: bool = True) -> HexStr2: 

351 """Convert a percentage into a 2-char hex string. 

352 

353 The range is 0-100%, with resolution of 0.5% (high_res, 00-C8) or 1% (00-64). 

354 """ 

355 if value is None: 

356 return "EF" 

357 if not isinstance(value, float | int) or not 0 <= value <= 1: 

358 raise ValueError(f"Invalid value: {value}, is not a percentage") 

359 result = int(value * (200 if high_res else 100)) 

360 return f"{result:02X}" 

361 

362 

363def hex_to_str(value: str) -> str: # printable ASCII characters 

364 """Return a string of printable ASCII characters.""" 

365 # result = bytearray.fromhex(value).split(b"\x7F")[0] # TODO: needs checking 

366 if not isinstance(value, str): 

367 raise ValueError(f"Invalid value: {value}, is not a string") 

368 result = bytearray([x for x in bytearray.fromhex(value) if 31 < x < 127]) 

369 return result.decode("ascii").strip() if result else "" 

370 

371 

372def hex_from_str(value: str) -> str: 

373 """Convert a string to a variable-length ASCII hex string.""" 

374 if not isinstance(value, str): 

375 raise ValueError(f"Invalid value: {value}, is not a string") 

376 return "".join(f"{ord(x):02X}" for x in value) # or: value.encode().hex() 

377 

378 

379def hex_to_temp(value: HexStr4) -> bool | float | None: # TODO: remove bool 

380 """Convert a 4-byte 2's complement hex string to a float temperature ('C). 

381 

382 :param value: The 4-character hex string (e.g., '07D0') 

383 :type value: HexStr4 

384 :return: The temperature in Celsius, or None if N/A 

385 :rtype: float | None 

386 :raises ValueError: If input is not a 4-char hex string or temperature is invalid. 

387 """ 

388 if not isinstance(value, str) or len(value) != 4: 

389 raise ValueError(f"Invalid value: {value}, is not a 4-char hex string") 

390 if value == "31FF": # means: N/A (== 127.99, 2s complement), signed? 

391 return None 

392 if value == "7EFF": # possibly only for setpoints? unsigned? 

393 return False 

394 if value == "7FFF": # also: FFFF?, means: N/A (== 327.67) 

395 return None 

396 temp: float = int(value, 16) 

397 temp = (temp if temp < 2**15 else temp - 2**16) / 100 

398 if temp < -273.15: 

399 raise ValueError(f"Invalid value: {temp} (0x{value}) is < -273.15") 

400 return temp 

401 

402 

403def hex_from_temp(value: bool | float | None) -> HexStr4: 

404 """Convert a float to a 2's complement 4-byte hex string.""" 

405 if value is None: 

406 return "7FFF" # or: "31FF"? 

407 if value is False: 

408 return "7EFF" 

409 if not isinstance(value, float | int): 

410 raise TypeError(f"Invalid temp: {value} is not a float") 

411 # if not -(2**7) <= value < 2**7: # TODO: tighten range 

412 # raise ValueError(f"Invalid temp: {value} is out of range") 

413 temp = int(value * 100) 

414 return f"{temp if temp >= 0 else temp + 2**16:04X}" 

415 

416 

417######################################################################################## 

418 

419 

420def parse_fault_log_entry( 

421 payload: str, 

422) -> PayDictT.FAULT_LOG_ENTRY | PayDictT.FAULT_LOG_ENTRY_NULL: 

423 """Return the fault log entry.""" 

424 

425 assert len(payload) == 44 

426 

427 # NOTE: the log_idx will increment as the entry moves down the log, hence '_log_idx' 

428 

429 # these are only useful for I_, not RP 

430 if (timestamp := hex_to_dts(payload[18:30])) is None: 

431 return {f"_{SZ_LOG_IDX}": payload[4:6]} # type: ignore[misc,return-value] 

432 

433 result: PayDictT.FAULT_LOG_ENTRY = { 

434 f"_{SZ_LOG_IDX}": payload[4:6], # type: ignore[misc] 

435 SZ_TIMESTAMP: timestamp, 

436 SZ_FAULT_STATE: FAULT_STATE.get(payload[2:4], FaultState.UNKNOWN), 

437 SZ_FAULT_TYPE: FAULT_TYPE.get(payload[8:10], FaultType.UNKNOWN), 

438 SZ_DOMAIN_IDX: payload[10:12], 

439 SZ_DEVICE_CLASS: FAULT_DEVICE_CLASS.get( 

440 payload[12:14], FaultDeviceClass.UNKNOWN 

441 ), 

442 SZ_DEVICE_ID: hex_id_to_dev_id(payload[38:]), 

443 "_unknown_3": payload[6:8], # B0 ?priority 

444 "_unknown_7": payload[14:18], # 0000 

445 "_unknown_15": payload[30:38], # FFFF7000/1/2 

446 } 

447 

448 return result 

449 

450 

451def _faulted_common(param_name: str, value: str) -> dict[str, str]: 

452 return {f"{param_name}_fault": f"invalid_{value}"} 

453 

454 

455def _faulted_sensor(param_name: str, value: str) -> dict[str, str]: 

456 # assert value[:1] in ("8", "F"), value 

457 code = int(value[:2], 16) & 0xF 

458 fault = SENSOR_FAULT_CODES.get(code, f"invalid_{value}") 

459 return {f"{param_name}_fault": fault} 

460 

461 

462def _faulted_device(param_name: str, value: str) -> dict[str, str]: 

463 assert value[:1] in ("8", "F"), value 

464 code = int(value[:2], 16) & 0xF 

465 fault: str = DEVICE_FAULT_CODES.get(code, f"invalid_{value}") 

466 return {f"{param_name}_fault": fault} 

467 

468 

469# TODO: refactor as per 31DA parsers 

470def parse_valve_demand( 

471 value: HexStr2, 

472) -> dict[str, float] | dict[str, str] | dict[str, None]: 

473 """Convert a 2-char hex string into a percentage. 

474 

475 The range is 0-100%, with resolution of 0.5% (high_res) or 1%. 

476 """ # for a damper (restricts flow), or a valve (permits flow) 

477 

478 # TODO: remove this... 

479 if not isinstance(value, str) or len(value) != 2: 

480 raise ValueError(f"Invalid value: {value}, is not a 2-char hex string") 

481 

482 if value == "EF": 

483 return {SZ_HEAT_DEMAND: None} # Not Implemented 

484 

485 if int(value, 16) & 0xF0 == 0xF0: 

486 return _faulted_device(SZ_HEAT_DEMAND, value) 

487 

488 result = int(value, 16) / 200 # c.f. hex_to_percent 

489 if result == 1.01: # HACK - does it mean maximum? 

490 result = 1.0 

491 elif result > 1.0: 

492 raise ValueError(f"Invalid result: {result} (0x{value}) is > 1") 

493 

494 return {SZ_HEAT_DEMAND: result} 

495 

496 

497AIR_QUALITY_BASIS: dict[str, str] = { 

498 "10": "voc", # volatile compounds 

499 "20": "co2", # carbon dioxide 

500 "40": "rel_humidity", # relative humidity 

501} 

502 

503 

504# 31DA[2:6] and 12C8[2:6] 

505def parse_air_quality(value: HexStr4) -> PayDictT.AIR_QUALITY: 

506 """Return the air quality percentage (0.0 to 1.0) and its basis. 

507 

508 The basis of the air quality level should be one of: VOC, CO2 or relative humidity. 

509 If air_quality is EF, air_quality_basis should be 00. 

510 

511 The sensor value is None if there is no sensor present (is not an error). 

512 The dict does not include the key if there is a sensor fault. 

513 

514 :param value: The 4-character hex string encoding quality and basis 

515 :type value: HexStr4 

516 :return: A dictionary containing the air quality and its basis (e.g., CO2, VOC) 

517 :rtype: PayDictT.AIR_QUALITY 

518 """ # VOC: Volatile organic compounds 

519 

520 # TODO: remove this as API used only internally... 

521 if not isinstance(value, str) or len(value) != 4: 

522 raise ValueError(f"Invalid value: {value}, is not a 4-char hex string") 

523 

524 assert value[:2] != "EF" or value[2:] == "00", value # TODO: raise exception 

525 if value == "EF00": # Not implemented 

526 return {SZ_AIR_QUALITY: None} 

527 

528 if int(value[:2], 16) & 0xF0 == 0xF0: 

529 return _faulted_sensor(SZ_AIR_QUALITY, value) # type: ignore[return-value] 

530 

531 level = int(value[:2], 16) / 200 # was: hex_to_percent(value[:2]) 

532 assert level <= 1.0, value[:2] # TODO: raise exception 

533 

534 assert value[2:] in ("10", "20", "40"), value[2:] # TODO: remove assert 

535 

536 basis: str = AIR_QUALITY_BASIS.get( 

537 value[2:], f"unknown_{value[2:]}" 

538 ) # TODO: remove get/unknown 

539 

540 return {SZ_AIR_QUALITY: level, SZ_AIR_QUALITY_BASIS: basis} 

541 

542 

543def air_quality_code(desc: str) -> str: 

544 for k, v in AIR_QUALITY_BASIS.items(): 

545 if v == desc: 

546 return k 

547 return "00" 

548 

549 

550# 31DA[6:10] and 1298[2:6] 

551def parse_co2_level(value: HexStr4) -> PayDictT.CO2_LEVEL: 

552 """Return the co2 level (ppm). 

553 

554 The sensor value is None if there is no sensor present (is not an error). 

555 The dict does not include the key if there is a sensor fault. 

556 """ 

557 

558 # TODO: remove this... 

559 if not isinstance(value, str) or len(value) != 4: 

560 raise ValueError(f"Invalid value: {value}, is not a 4-char hex string") 

561 

562 if value == "7FFF": # Not implemented 

563 return {SZ_CO2_LEVEL: None} 

564 

565 level = int(value, 16) # was: hex_to_double(value) # is it 2's complement? 

566 

567 if int(value[:2], 16) & 0x80 or level >= 0x8000: 

568 return _faulted_sensor(SZ_CO2_LEVEL, value) # type: ignore[return-value] 

569 

570 # assert int(value[:2], 16) <= 0x8000, value 

571 return {SZ_CO2_LEVEL: level} 

572 

573 

574def parse_humidity_element(value: str, index: str) -> PayDictT._12A0: 

575 """Return the relative humidity (%) and 2 temperatures 

576 

577 The result may include current temperature ('C) and include dewpoint temperature ('C). 

578 """ 

579 if index == "01": 

580 return _parse_hvac_humidity(SZ_REL_HUMIDITY, value[:2], value[2:6], value[6:10]) # type: ignore[return-value] 

581 if index == "02": 

582 return _parse_hvac_humidity( 

583 SZ_OUTDOOR_HUMIDITY, value[:2], value[2:6], value[6:10] 

584 ) # type: ignore[return-value] 

585 return _parse_hvac_humidity(SZ_INDOOR_HUMIDITY, value[:2], value[2:6], value[6:10]) # type: ignore[return-value] 

586 

587 

588# 31DA[10:12] and 12A0[2:12] 

589def parse_indoor_humidity(value: str) -> PayDictT.INDOOR_HUMIDITY: 

590 """Return the relative indoor humidity (%). 

591 

592 The result may include current temperature ('C), and dewpoint temperature ('C). 

593 """ 

594 return _parse_hvac_humidity(SZ_INDOOR_HUMIDITY, value[:2], value[2:6], value[6:10]) # type: ignore[return-value] 

595 

596 

597# 31DA[12:14] and 1280[2:12] 

598def parse_outdoor_humidity(value: str) -> PayDictT.OUTDOOR_HUMIDITY: 

599 """Return the relative outdoor humidity (%). 

600 

601 The result may include current temperature ('C), and dewpoint temperature ('C). 

602 """ 

603 return _parse_hvac_humidity(SZ_OUTDOOR_HUMIDITY, value[:2], value[2:6], value[6:10]) # type: ignore[return-value] 

604 

605 

606def _parse_hvac_humidity( 

607 param_name: str, value: HexStr2, temp: HexStr4, dewpoint: HexStr4 

608) -> ReturnValueDictT: 

609 """Return the relative humidity, etc. (called by sensor parsers). 

610 

611 The sensor value is None if there is no sensor present (is not an error). 

612 The dict does not include the key if there is a sensor fault. 

613 """ 

614 

615 # TODO: remove this... 

616 if not isinstance(value, str) or len(value) != 2: 

617 raise ValueError(f"Invalid value: {value}, is not a 2-char hex string") 

618 if not isinstance(temp, str) or len(temp) not in (0, 4): 

619 raise ValueError(f"Invalid temp: {temp}, is not a 4-char hex string") 

620 if not isinstance(dewpoint, str) or len(dewpoint) not in (0, 4): 

621 raise ValueError(f"Invalid dewpoint: {dewpoint}, is not a 4-char hex string") 

622 

623 if value == "EF": # Not implemented 

624 return {param_name: None} 

625 

626 if int(value, 16) & 0xF0 == 0xF0: 

627 return _faulted_sensor(param_name, value) 

628 

629 percentage = hex_to_percent(value, False) # TODO: confirm not /200 

630 

631 result: dict[str, float | str | None] = {param_name: percentage} 

632 if temp: 

633 result |= {SZ_TEMPERATURE: hex_to_temp(temp)} 

634 if dewpoint: 

635 result |= {SZ_DEWPOINT_TEMP: hex_to_temp(dewpoint)} 

636 return result 

637 

638 

639# 31DA[14:18] 

640def parse_exhaust_temp(value: HexStr4) -> PayDictT.EXHAUST_TEMP: 

641 """Return the exhaust temperature ('C).""" 

642 return _parse_hvac_temp(SZ_EXHAUST_TEMP, value) # type: ignore[return-value] 

643 

644 

645# 31DA[18:22] 

646def parse_supply_temp(value: HexStr4) -> PayDictT.SUPPLY_TEMP: 

647 """Return the supply temperature ('C).""" 

648 return _parse_hvac_temp(SZ_SUPPLY_TEMP, value) # type: ignore[return-value] 

649 

650 

651# 31DA[22:26] 

652def parse_indoor_temp(value: HexStr4) -> PayDictT.INDOOR_TEMP: 

653 """Return the indoor temperature ('C).""" 

654 return _parse_hvac_temp(SZ_INDOOR_TEMP, value) # type: ignore[return-value] 

655 

656 

657# 31DA[26:30] & 1290[2:6]? 

658def parse_outdoor_temp(value: HexStr4) -> PayDictT.OUTDOOR_TEMP: 

659 """Return the outdoor temperature ('C).""" 

660 return _parse_hvac_temp(SZ_OUTDOOR_TEMP, value) # type: ignore[return-value] 

661 

662 

663def _parse_hvac_temp(param_name: str, value: HexStr4) -> Mapping[str, float | None]: 

664 """Return the temperature ('C) (called by sensor parsers). 

665 

666 The sensor value is None if there is no sensor present (is not an error). 

667 The dict does not include the key if there is a sensor fault. 

668 """ 

669 

670 # TODO: remove this... 

671 if not isinstance(value, str) or len(value) != 4: 

672 raise ValueError(f"Invalid value: {value}, is not a 4-char hex string") 

673 

674 if value == "7FFF": # Not implemented 

675 return {param_name: None} 

676 if value == "31FF": # Other 

677 return {param_name: None} 

678 

679 if int(value[:2], 16) & 0xF0 == 0x80: # or temperature < -273.15: 

680 return _faulted_sensor(param_name, value) # type: ignore[return-value] 

681 

682 temp: float = int(value, 16) 

683 temp = (temp if temp < 2**15 else temp - 2**16) / 100 

684 if temp <= -273: # TODO: < 273.15? 

685 return _faulted_sensor(param_name, value) # type: ignore[return-value] 

686 

687 return {param_name: temp} 

688 

689 

690ABILITIES = { 

691 15: "off", 

692 14: "low_med_high", # 3,2,1 = high,med,low? 

693 13: "timer", 

694 12: "boost", 

695 11: "auto", 

696 10: "speed_4", 

697 9: "speed_5", 

698 8: "speed_6", 

699 7: "speed_7", 

700 6: "speed_8", 

701 5: "speed_9", 

702 4: "speed_10", 

703 3: "auto_night", 

704 2: "reserved", 

705 1: "post_heater", 

706 0: "pre_heater", 

707} 

708 

709 

710# 31DA[30:34] 

711def parse_capabilities(value: HexStr4) -> PayDictT.CAPABILITIES: 

712 """Return the speed capabilities (a bitmask). 

713 

714 The sensor value is None if there is no sensor present (is not an error). 

715 The dict does not include the key if there is a sensor fault. 

716 """ 

717 

718 # TODO: remove this... 

719 if not isinstance(value, str) or len(value) != 4: 

720 raise ValueError(f"Invalid value: {value}, is not a 4-char hex string") 

721 

722 if value == "7FFF": # TODO: Not implemented??? 

723 return {SZ_SPEED_CAPABILITIES: None} 

724 

725 # assert value in ("0002", "4000", "4808", "F000", "F001", "F800", "F808"), value 

726 

727 return { 

728 SZ_SPEED_CAPABILITIES: [ 

729 v for k, v in ABILITIES.items() if int(value, 16) & 2**k 

730 ] 

731 } 

732 

733 

734def capability_bits(cap_list: list[str]) -> int: 

735 # 0xF800 = 0b1111100000000000 

736 cap_res: int = 0 

737 for cap in cap_list: 

738 for k, v in ABILITIES.items(): 

739 if v == cap: 

740 cap_res |= 2**k # set bit 

741 return cap_res 

742 

743 

744# 31DA[34:36] 

745def parse_bypass_position(value: HexStr2) -> PayDictT.BYPASS_POSITION: 

746 """Return the bypass position (%), usually fully open or closed (0%, no bypass). 

747 

748 The sensor value is None if there is no sensor present (is not an error). 

749 The dict does not include the key if there is a sensor fault. 

750 """ 

751 

752 # TODO: remove this... 

753 if not isinstance(value, str) or len(value) != 2: 

754 raise ValueError(f"Invalid value: {value}, is not a 2-char hex string") 

755 

756 if value == "EF": # Not implemented 

757 return {SZ_BYPASS_POSITION: None} 

758 

759 if int(value[:2], 16) & 0xF0 == 0xF0: 

760 return _faulted_device(SZ_BYPASS_POSITION, value) # type: ignore[return-value] 

761 

762 bypass_pos = int(value, 16) / 200 # was: hex_to_percent(value) 

763 assert bypass_pos <= 1.0, value 

764 

765 return {SZ_BYPASS_POSITION: bypass_pos} 

766 

767 

768# 31DA[36:38] # TODO: WIP (3 more bits), also 22F3 and 22F4? 

769def parse_fan_info(value: HexStr2) -> PayDictT.FAN_INFO: 

770 """Return the fan state (lookup table for current speed and mode). 

771 

772 The sensor value is None if there is no sensor present (is not an error). 

773 The dict does not include the key if there is a sensor fault. 

774 """ 

775 

776 # TODO: remove this... 

777 if not isinstance(value, str) or len(value) != 2: 

778 raise ValueError(f"Invalid value: {value}, is not a 2-char hex string") 

779 

780 # if value == "EF": # TODO: Not implemented??? 

781 # return {SZ_FAN_INFO: None} 

782 

783 assert int(value, 16) & 0xE0 in ( 

784 0x00, 

785 0x20, 

786 0x40, 

787 0x60, 

788 0x80, 

789 ), f"invalid fan_info: {int(value, 16) & 0xE0}" 

790 

791 flags = list((int(value, 16) & (1 << x)) >> x for x in range(7, 4, -1)) 

792 

793 return { 

794 SZ_FAN_INFO: _31DA_FAN_INFO[ 

795 int(value, 16) & 0x1F 

796 ], # lookup description from code 

797 "_unknown_fan_info_flags": flags, 

798 } 

799 

800 

801def fan_info_to_byte(info: str) -> int: 

802 for k, v in _31DA_FAN_INFO.items(): 

803 if v == info: 

804 return int(k) & 0x1F 

805 return 0x0000 

806 

807 

808def fan_info_flags(flags_list: list[int]) -> int: 

809 flag_res: int = 0 

810 for index, shift in enumerate(range(7, 4, -1)): # index = 7, 6 and 5 

811 if flags_list[index] == 1: 

812 flag_res |= 1 << shift # set bits 

813 return flag_res 

814 

815 

816# 31DA[38:40], also 2210 

817def parse_exhaust_fan_speed(value: HexStr2) -> PayDictT.EXHAUST_FAN_SPEED: 

818 """Return the exhaust fan speed (% of max speed).""" 

819 return _parse_fan_speed(SZ_EXHAUST_FAN_SPEED, value) # type: ignore[return-value] 

820 

821 

822# 31DA[40:42] 

823def parse_supply_fan_speed(value: HexStr2) -> PayDictT.SUPPLY_FAN_SPEED: 

824 """Return the supply fan speed (% of max speed).""" 

825 return _parse_fan_speed(SZ_SUPPLY_FAN_SPEED, value) # type: ignore[return-value] 

826 

827 

828def _parse_fan_speed(param_name: str, value: HexStr2) -> Mapping[str, float | None]: 

829 """Return the fan speed (called by sensor parsers). 

830 

831 The sensor value is None if there is no sensor present (is not an error). 

832 The dict does not include the key if there is a sensor fault. 

833 """ 

834 

835 # TODO: remove this... 

836 if not isinstance(value, str) or len(value) != 2: 

837 raise ValueError(f"Invalid value: {value}, is not a 2-char hex string") 

838 

839 if value == "FF": # Not implemented (is definitely FF, not EF!) 

840 return {param_name: None} 

841 

842 percentage = int(value, 16) / 200 # was: hex_to_percent(value) 

843 if percentage > 1.0: 

844 return _faulted_common(param_name, value) # type: ignore[return-value] 

845 

846 return {param_name: percentage} 

847 

848 

849# 31DA[42:46] & 22F3[2:6] # TODO: make 22F3-friendly 

850def parse_remaining_mins(value: HexStr4) -> PayDictT.REMAINING_MINUTES: 

851 """Return the remaining time for temporary modes (whole minutes). 

852 

853 The sensor value is None if there is no sensor present (is not an error). 

854 The dict does not include the key if there is a sensor fault. 

855 """ 

856 

857 # TODO: remove this... 

858 if not isinstance(value, str) or len(value) != 4: 

859 raise ValueError(f"Invalid value: {value}, is not a 4-char hex string") 

860 

861 if value == "0000": 

862 return {SZ_REMAINING_MINS: 0} 

863 if value == "3FFF": 

864 return {SZ_REMAINING_MINS: None} 

865 

866 minutes = int(value, 16) # was: hex_to_double(value) 

867 assert minutes > 0, value # TODO: raise assert 

868 

869 return {SZ_REMAINING_MINS: minutes} # usu. 0-60 mins 

870 

871 

872# 31DA[46:48] 

873def parse_post_heater(value: HexStr2) -> PayDictT.POST_HEATER: 

874 """Return the post-heater state (% of max heat).""" 

875 return _parse_fan_heater(SZ_POST_HEAT, value) # type: ignore[return-value] 

876 

877 

878# 31DA[48:50] 

879def parse_pre_heater(value: HexStr2) -> PayDictT.PRE_HEATER: 

880 """Return the pre-heater state (% of max heat).""" 

881 return _parse_fan_heater(SZ_PRE_HEAT, value) # type: ignore[return-value] 

882 

883 

884def _parse_fan_heater(param_name: str, value: HexStr2) -> Mapping[str, float | None]: 

885 """Return the heater state (called by sensor parsers). 

886 

887 The sensor value is None if there is no sensor present (is not an error). 

888 The dict does not include the key if there is a sensor fault. 

889 """ 

890 

891 # TODO: remove this... 

892 if not isinstance(value, str) or len(value) != 2: 

893 raise ValueError(f"Invalid value: {value}, is not a 2-char hex string") 

894 

895 if value == "EF": # Not implemented 

896 return {param_name: None} 

897 

898 if int(value, 16) & 0xF0 == 0xF0: 

899 return _faulted_sensor(param_name, value) # type: ignore[return-value] 

900 

901 percentage = int(value, 16) / 200 # Siber DF EVO 2 is /200, not /100 (Others?) 

902 assert percentage <= 1.0, value # TODO: raise exception if > 1.0? 

903 

904 return {param_name: percentage} # was: percent_from_hex(value, high_res=False) 

905 

906 

907# 31DA[50:54] 

908def parse_supply_flow(value: HexStr4) -> PayDictT.SUPPLY_FLOW: 

909 """Return the supply flow rate in m^3/hr (Orcon) ?or L/sec (?Itho).""" 

910 return _parse_fan_flow(SZ_SUPPLY_FLOW, value) # type: ignore[return-value] 

911 

912 

913# 31DA[54:58] 

914def parse_exhaust_flow(value: HexStr4) -> PayDictT.EXHAUST_FLOW: 

915 """Return the exhaust flow rate in m^3/hr (Orcon) ?or L/sec (?Itho)""" 

916 return _parse_fan_flow(SZ_EXHAUST_FLOW, value) # type: ignore[return-value] 

917 

918 

919def _parse_fan_flow(param_name: str, value: HexStr4) -> Mapping[str, float | None]: 

920 """Return the air flow rate (called by sensor parsers). 

921 

922 The sensor value is None if there is no sensor present (is not an error). 

923 The dict does not include the key if there is a sensor fault. 

924 """ 

925 

926 # TODO: remove this... 

927 if not isinstance(value, str) or len(value) != 4: 

928 raise ValueError(f"Invalid value: {value}, is not a 4-char hex string") 

929 

930 if value == "7FFF": # Not implemented 

931 return {param_name: None} 

932 

933 if int(value[:2], 16) & 0x80: 

934 return _faulted_sensor(param_name, value) # type: ignore[return-value] 

935 

936 flow = int(value, 16) / 100 # was: hex_to_double(value, factor=100) 

937 assert flow >= 0, value # TODO: raise exception if < 0? 

938 

939 return {param_name: flow}