Coverage for src/ramses_tx/message.py: 31%

159 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - Decode/process a message (payload into JSON).""" 

3 

4from __future__ import annotations 

5 

6import logging 

7import re 

8from datetime import datetime as dt, timedelta as td 

9from functools import lru_cache 

10from typing import TYPE_CHECKING 

11 

12from . import exceptions as exc 

13from .address import Address 

14from .command import Command 

15from .const import DEV_TYPE_MAP, SZ_DHW_IDX, SZ_DOMAIN_ID, SZ_UFH_IDX, SZ_ZONE_IDX 

16from .packet import Packet 

17from .parsers import parse_payload 

18from .ramses import CODE_IDX_ARE_COMPLEX, CODES_SCHEMA, RQ_IDX_COMPLEX 

19 

20# TODO: 

21# long-format msg.__str__ - alias columns don't line up 

22 

23 

24from .const import ( # noqa: F401, isort: skip, pylint: disable=unused-import 

25 I_, 

26 RP, 

27 RQ, 

28 W_, 

29 Code, 

30) 

31 

32if TYPE_CHECKING: 

33 from ramses_rf import Gateway 

34 

35 from .const import IndexT, VerbT # noqa: F401, pylint: disable=unused-import 

36 

37 

38__all__ = ["Message"] 

39 

40 

41CODE_NAMES = {k: v["name"] for k, v in CODES_SCHEMA.items()} 

42 

43MSG_FORMAT_10 = "|| {:10s} | {:10s} | {:2s} | {:16s} | {:^4s} || {}" 

44 

45_TD_SECS_003 = td(seconds=3) 

46 

47 

48_LOGGER = logging.getLogger(__name__) 

49 

50 

51class MessageBase: 

52 """The Message class; will trap/log invalid msgs.""" 

53 

54 def __init__(self, pkt: Packet) -> None: 

55 """Create a message from a valid packet. 

56 

57 :param pkt: The packet to process into a message 

58 :type pkt: Packet 

59 :raises PacketInvalid: If the packet payload cannot be parsed. 

60 """ 

61 

62 self._pkt = pkt 

63 

64 self.src: Address = pkt.src 

65 self.dst: Address = pkt.dst 

66 self._addrs: tuple[Address, Address, Address] = pkt._addrs 

67 

68 self.dtm: dt = pkt.dtm 

69 

70 self.verb: VerbT = pkt.verb 

71 self.seqn: str = ( 

72 pkt.seqn 

73 ) # the msg is part of a set for 1 Code, received in order 

74 self.code: Code = pkt.code 

75 self.len: int = pkt._len 

76 

77 self._payload = self._validate( 

78 self._pkt.payload 

79 ) # ? may raise InvalidPacketError 

80 

81 self._str: str = None # type: ignore[assignment] 

82 

83 def __repr__(self) -> str: 

84 """Return an unambiguous string representation of this object.""" 

85 return str(self._pkt) # repr or str? 

86 

87 def __str__(self) -> str: 

88 """Return a brief readable string representation of this object.""" 

89 

90 def ctx(pkt: Packet) -> str: 

91 ctx = {True: "[..]", False: "", None: "??"}.get(pkt._ctx, pkt._ctx) # type: ignore[arg-type] 

92 if not ctx and pkt.payload[:2] not in ("00", "FF"): 

93 return f"({pkt.payload[:2]})" 

94 return ctx 

95 

96 if self._str is not None: 

97 return self._str 

98 

99 if self.src.id == self._addrs[0].id: # type: ignore[unreachable] 

100 name_0 = self._name(self.src) 

101 name_1 = ( 

102 "" if self.dst is self.src else self._name(self.dst) 

103 ) # use 'is', issue_cc 318 

104 else: 

105 name_0 = "" 

106 name_1 = self._name(self.src) 

107 

108 code_name = CODE_NAMES.get(self.code, f"unknown_{self.code}") 

109 self._str = MSG_FORMAT_10.format( 

110 name_0, name_1, self.verb, code_name, ctx(self._pkt), self.payload 

111 ) 

112 return self._str 

113 

114 def __eq__(self, other: object) -> bool: 

115 if not isinstance(other, Message): 

116 return NotImplemented 

117 return (self.src, self.dst, self.verb, self.code, self._pkt.payload) == ( 

118 other.src, 

119 other.dst, 

120 other.verb, 

121 other.code, 

122 other._pkt.payload, 

123 ) 

124 

125 def __lt__(self, other: object) -> bool: 

126 if not isinstance(other, Message): 

127 return NotImplemented 

128 return self.dtm < other.dtm 

129 

130 def _name(self, addr: Address) -> str: 

131 """Return a friendly name for an Address, or a Device.""" 

132 return f" {addr.id}" # can't do 'CTL:123456' instead of ' 01:123456' 

133 

134 @property 

135 def payload(self): # type: ignore[no-untyped-def] # FIXME -> dict | list: 

136 """Return the payload.""" 

137 return self._payload 

138 

139 @property 

140 def _has_payload(self) -> bool: 

141 """Return False if there is no payload (may falsely Return True). 

142 

143 The message (i.e. the raw payload) may still have an idx. 

144 """ 

145 

146 return self._pkt._has_payload 

147 

148 @property 

149 def _has_array(self) -> bool: 

150 """ 

151 :return: True if the message's raw payload is an array. 

152 """ 

153 

154 return bool(self._pkt._has_array) 

155 

156 @property 

157 def _idx(self) -> dict[str, str]: 

158 """Get the domain_id/zone_idx/other_idx of a message payload, if any. 

159 Used to identify the zone/domain that a message applies to. 

160 

161 :return: an empty dict if there is none such, or None if undetermined. 

162 """ 

163 

164 # .I --- 01:145038 --:------ 01:145038 3B00 002 FCC8 

165 

166 IDX_NAMES = { 

167 Code._0002: "other_idx", # non-evohome: hometronics 

168 Code._10A0: SZ_DHW_IDX, # can be 2 DHW zones per system, albeit unusual 

169 Code._1260: SZ_DHW_IDX, # can be 2 DHW zones per system, albeit unusual 

170 Code._1F41: SZ_DHW_IDX, # can be 2 DHW zones per system, albeit unusual 

171 Code._22C9: SZ_UFH_IDX, # UFH circuit 

172 Code._2389: "other_idx", # anachronistic 

173 Code._2D49: "other_idx", # non-evohome: hometronics 

174 Code._31D9: "hvac_id", 

175 Code._31DA: "hvac_id", 

176 Code._3220: "msg_id", 

177 } # ALSO: SZ_DOMAIN_ID, SZ_ZONE_IDX 

178 

179 if self.code in (Code._31D9, Code._31DA): # shouldn't be needed? 

180 assert isinstance(self._pkt._idx, str) # mypy hint 

181 return {"hvac_id": self._pkt._idx} 

182 

183 if self._pkt._idx in (True, False) or self.code in CODE_IDX_ARE_COMPLEX: 

184 return {} # above was: CODE_IDX_COMPLEX + (Code._3150): 

185 

186 if self.code in (Code._3220,): # FIXME: should be _SIMPLE 

187 return {} 

188 

189 # .I 068 03:201498 --:------ 03:201498 30C9 003 0106D6 # rare 

190 

191 # .I --- 00:034798 --:------ 12:126457 2309 003 0201F4 

192 if not {self.src.type, self.dst.type} & { 

193 DEV_TYPE_MAP.CTL, 

194 DEV_TYPE_MAP.UFC, 

195 DEV_TYPE_MAP.HCW, # ?remove (see above, rare) 

196 DEV_TYPE_MAP.DTS, 

197 DEV_TYPE_MAP.HGI, 

198 DEV_TYPE_MAP.DT2, 

199 DEV_TYPE_MAP.PRG, 

200 }: # FIXME: DEX should be deprecated to use device type rather than class 

201 assert self._pkt._idx == "00", "What!! (AA)" 

202 return {} 

203 

204 # .I 035 --:------ --:------ 12:126457 30C9 003 017FFF 

205 if self.src.type == self.dst.type and self.src.type not in ( 

206 DEV_TYPE_MAP.CTL, 

207 DEV_TYPE_MAP.UFC, 

208 DEV_TYPE_MAP.HCW, # ?remove (see above, rare) 

209 DEV_TYPE_MAP.HGI, 

210 DEV_TYPE_MAP.PRG, 

211 ): # DEX 

212 assert self._pkt._idx == "00", "What!! (AB)" 

213 return {} 

214 

215 # .I --- 04:029362 --:------ 12:126457 3150 002 0162 

216 # if not getattr(self.src, "_is_controller", True) and not getattr( 

217 # self.dst, "_is_controller", True 

218 # ): 

219 # assert self._pkt._idx == "00", "What!! (BA)" 

220 # return {} 

221 

222 # .I --- 04:029362 --:------ 12:126457 3150 002 0162 

223 # if not ( 

224 # getattr(self.src, "_is_controller", True) 

225 # or getattr(self.dst, "_is_controller", True) 

226 # ): 

227 # assert self._pkt._idx == "00", "What!! (BB)" 

228 # return {} 

229 

230 if self.src.type == self.dst.type and not getattr( 

231 self.src, "_is_controller", True 

232 ): # DEX 

233 assert self._pkt._idx == "00", "What!! (BC)" 

234 return {} 

235 

236 # TODO: also 000C (but is a complex idx) 

237 # TODO: also 3150 (when not domain, and will be array if so) 

238 if self.code in (Code._000A, Code._2309) and self.src.type == DEV_TYPE_MAP.UFC: 

239 assert isinstance(self._pkt._idx, str) # mypy hint 

240 return {IDX_NAMES[Code._22C9]: self._pkt._idx} 

241 

242 assert isinstance(self._pkt._idx, str) # mypy hint 

243 idx_name = SZ_DOMAIN_ID if self._pkt._idx[:1] == "F" else SZ_ZONE_IDX 

244 index_name = IDX_NAMES.get(self.code, idx_name) 

245 

246 return {index_name: self._pkt._idx} 

247 

248 # TODO: needs work... 

249 def _validate(self, raw_payload: str) -> dict | list[dict]: # type: ignore[type-arg] 

250 """Validate a message packet payload, and parse it if valid. 

251 

252 :return: a dict containing key: value pairs, or a list of those created from the payload 

253 :raises an InvalidPacketError exception if it is not valid. 

254 """ 

255 

256 try: # parse the payload 

257 # TODO: only accept invalid packets to/from HGI when flag raised 

258 _check_msg_payload(self, self._pkt.payload) # ? InvalidPayloadError 

259 

260 if not self._has_payload and ( 

261 self.verb == RQ and self.code not in RQ_IDX_COMPLEX 

262 ): 

263 return {} 

264 

265 result = parse_payload(self) # invoke the code parsers 

266 

267 if isinstance(result, list): 

268 return result 

269 if isinstance(result, dict): 

270 return {**self._idx, **result} 

271 

272 raise TypeError(f"Invalid payload type: {type(result)}") 

273 

274 except exc.PacketInvalid as err: 

275 _LOGGER.warning("%s < %s", self._pkt, err) 

276 raise err 

277 

278 except AssertionError as err: 

279 # beware: HGI80 can send 'odd' but parseable packets +/- get invalid reply 

280 _LOGGER.exception("%s < %s", self._pkt, f"{err.__class__.__name__}({err})") 

281 raise exc.PacketInvalid("Bad packet") from err 

282 

283 except (AttributeError, LookupError, TypeError, ValueError) as err: # TODO: dev 

284 _LOGGER.exception( 

285 "%s < Coding error: %s", self._pkt, f"{err.__class__.__name__}({err})" 

286 ) 

287 raise exc.PacketInvalid from err 

288 

289 except NotImplementedError as err: # parser_unknown (unknown packet code) 

290 _LOGGER.warning("%s < Unknown packet code (cannot parse)", self._pkt) 

291 raise exc.PacketInvalid from err 

292 

293 

294class Message(MessageBase): 

295 """Extend the Message class, so is useful to a stateful Gateway. 

296 

297 Adds _expired attr to the Message class. 

298 """ 

299 

300 CANT_EXPIRE = -1 # sentinel value for fraction_expired 

301 

302 HAS_EXPIRED = 2.0 # fraction_expired >= HAS_EXPIRED 

303 # .HAS_DIED = 1.0 # fraction_expired >= 1.0 (is expected lifespan) 

304 IS_EXPIRING = 0.8 # fraction_expired >= 0.8 (and < HAS_EXPIRED) 

305 

306 _gwy: Gateway 

307 _fraction_expired: float | None = None 

308 

309 @classmethod 

310 def _from_cmd(cls, cmd: Command, dtm: dt | None = None) -> Message: 

311 """Create a Message from a Command.""" 

312 return cls(Packet._from_cmd(cmd, dtm=dtm)) 

313 

314 @classmethod 

315 def _from_pkt(cls, pkt: Packet) -> Message: 

316 """Create a Message from a Packet.""" 

317 return cls(pkt) 

318 

319 @property 

320 def _expired(self) -> bool: 

321 """Return True if the message is dated (or False otherwise).""" 

322 # fraction_expired = (dt_now - self.dtm - _TD_SECONDS_003) / self._pkt._lifespan 

323 # TODO: keep none >7d, even 10E0, etc. 

324 

325 def fraction_expired(lifespan: td) -> float: 

326 """Return the packet's age as fraction of its 'normal' life span.""" 

327 return (self._gwy._dt_now() - self.dtm - _TD_SECS_003) / lifespan 

328 

329 # 1. Look for easy win... 

330 if self._fraction_expired is not None: 

331 if self._fraction_expired == self.CANT_EXPIRE: 

332 return False 

333 if self._fraction_expired >= self.HAS_EXPIRED: 

334 return True 

335 

336 # 2. Need to update the fraction_expired... 

337 if self.code == Code._1F09 and self.verb != RQ: # sync_cycle is a special case 

338 # RQs won't have remaining_seconds, RP/Ws have only partial cycle times 

339 self._fraction_expired = fraction_expired( 

340 td(seconds=self.payload["remaining_seconds"]), 

341 ) 

342 

343 elif self._pkt._lifespan is False: # Can't expire 

344 self._fraction_expired = self.CANT_EXPIRE 

345 

346 elif self._pkt._lifespan is True: # Can't expire 

347 raise NotImplementedError 

348 

349 else: 

350 self._fraction_expired = fraction_expired(self._pkt._lifespan) 

351 

352 return self._fraction_expired >= self.HAS_EXPIRED 

353 

354 

355@lru_cache(maxsize=256) 

356def re_compile_re_match(regex: str, string: str) -> bool: # Optional[Match[Any]] 

357 # TODO: confirm this does speed things up 

358 # Python has its own caching of re.compile, _MAXCACHE = 512 

359 # https://github.com/python/cpython/blob/3.10/Lib/re.py 

360 return re.compile(regex).match(string) # type: ignore[return-value] 

361 

362 

363def _check_msg_payload(msg: MessageBase, payload: str) -> None: 

364 """Validate a packet's payload against its verb/code pair. 

365 

366 :param msg: The message object being validated 

367 :type msg: MessageBase 

368 :param payload: The raw hex payload string 

369 :type payload: str 

370 :raises PacketInvalid: If the code is unknown or verb/code pair is invalid. 

371 :raises PacketPayloadInvalid: If the payload does not match the expected regex. 

372 """ 

373 

374 _ = repr(msg._pkt) # HACK: ? raise InvalidPayloadError 

375 

376 if msg.code not in CODES_SCHEMA: 

377 raise exc.PacketInvalid(f"Unknown code: {msg.code}") 

378 

379 try: 

380 regex = CODES_SCHEMA[msg.code][msg.verb] 

381 except KeyError: 

382 raise exc.PacketInvalid( 

383 f"Unknown verb/code pair: {msg.verb}/{msg.code}" 

384 ) from None 

385 

386 if not re_compile_re_match(regex, payload): 

387 raise exc.PacketPayloadInvalid(f"Payload doesn't match '{regex}': {payload}")