Coverage for src/ramses_rf/system/faultlog.py: 31%

158 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - Expose an 0418 fault log (is a stateful process).""" 

3 

4from __future__ import annotations 

5 

6import dataclasses 

7import logging 

8from collections import OrderedDict 

9from typing import TYPE_CHECKING, NewType, TypeAlias 

10 

11from ramses_tx import Command, Message, Packet 

12from ramses_tx.const import ( 

13 SZ_LOG_ENTRY, 

14 SZ_LOG_IDX, 

15 FaultDeviceClass, 

16 FaultState, 

17 FaultType, 

18) 

19from ramses_tx.helpers import parse_fault_log_entry 

20from ramses_tx.schemas import DeviceIdT 

21 

22from ramses_rf.const import ( # noqa: F401, isort: skip, pylint: disable=unused-import 

23 I_, 

24 RP, 

25 RQ, 

26 W_, 

27 Code, 

28) 

29 

30if TYPE_CHECKING: 

31 from ramses_rf.system.heat import _LogbookT 

32 

33 

34FaultTupleT: TypeAlias = tuple[FaultType, FaultDeviceClass, DeviceIdT | None, str] 

35 

36 

37DEFAULT_GET_LIMIT = 6 

38 

39 

40_LOGGER = logging.getLogger(__name__) 

41 

42 

43# {'log_idx': '00', 'log_entry': ('21-12-23T11:59:35', 'restore', 'battery_low', 'actuator', '00', '04:164787', 'B0', '0000', 'FFFF7000')} 

44 

45 

46@dataclasses.dataclass(frozen=True, kw_only=True, order=True) 

47class FaultLogEntry: 

48 """A fault log entry of an evohome fault log. 

49 

50 Fault log entries do have a log_idx attr, but this is merely their current location 

51 in the system's fault log. 

52 """ 

53 

54 timestamp: str # # # 21-12-23T11:59:35 - assume is unique 

55 fault_state: FaultState # # # fault, restore, unknown_c0 

56 fault_type: FaultType # # # system_fault, battery_low, sensor_fault, etc. 

57 domain_idx: str # # # 00-0F, FC, etc. ? only if dev_class is/not CTL? 

58 device_class: FaultDeviceClass # # controller, actuator, sensor, etc. 

59 device_id: DeviceIdT | None # # # 04:164787 

60 

61 # def __post_init__(self): 

62 # def modify(device_id: DeviceIdT) -> DeviceIdT: 

63 # object.__setattr__(self, "device_id", modify(self.device_id)) 

64 

65 def __str__(self) -> str: 

66 return ( 

67 f"{self.timestamp}, {(self.fault_state + ','):<8} {self.fault_type}, " 

68 f"{self.device_id}, {self.domain_idx}, {self.device_class}" 

69 ) 

70 

71 def _is_matching_pair(self, other: object) -> bool: 

72 """Return True if the other entry could be a matching pair (fault/restore).""" 

73 

74 if not isinstance(other, FaultLogEntry): # TODO: make a parochial exception 

75 raise TypeError(f"{other} is not not a FaultLogEntry") 

76 

77 if self.fault_state == FaultState.FAULT: 

78 return ( 

79 other.fault_state == FaultState.RESTORE 

80 and self._as_tuple() == other._as_tuple() 

81 and other.timestamp > self.timestamp 

82 ) 

83 

84 if self.fault_state == FaultState.RESTORE: 

85 return ( 

86 other.fault_state == FaultState.FAULT 

87 and self._as_tuple() == other._as_tuple() 

88 and other.timestamp < self.timestamp 

89 ) 

90 

91 return False 

92 

93 def _as_tuple(self) -> FaultTupleT: # only for use within this class 

94 """Return the log entry as a tuple, excluding dtm & state (fault/restore).""" 

95 

96 return ( 

97 self.fault_type, 

98 self.device_class, 

99 self.device_id, 

100 self.domain_idx, 

101 ) 

102 

103 @classmethod 

104 def from_msg(cls, msg: Message) -> FaultLogEntry: 

105 """Create a fault log entry from a message's packet.""" 

106 return cls.from_pkt(msg._pkt) 

107 

108 @classmethod 

109 def from_pkt(cls, pkt: Packet) -> FaultLogEntry: 

110 """Create a fault log entry from a packet's payload.""" 

111 

112 log_entry = parse_fault_log_entry(pkt.payload) 

113 if log_entry is None: # TODO: make a parochial exception 

114 raise TypeError("Null fault log entry") 

115 

116 return cls(**{k: v for k, v in log_entry.items() if k[:1] != "_"}) # type: ignore[arg-type] 

117 

118 

119FaultDtmT = NewType("FaultDtmT", str) 

120FaultIdxT = NewType("FaultIdxT", int) 

121 

122FaultLogT: TypeAlias = dict[FaultDtmT, FaultLogEntry] 

123FaultMapT: TypeAlias = OrderedDict[FaultIdxT, FaultDtmT] 

124 

125 

126class FaultLog: # 0418 # TODO: use a NamedTuple 

127 """The fault log of an evohome system. 

128 

129 This code assumes that the `timestamp` attr of each log entry is a unique identifier. 

130 

131 Null entries do not have a timestamp. All subsequent entries will also be null. 

132 

133 The `log_idx` is not an identifier: it is merely the current position of a log entry 

134 in the system log. 

135 

136 New entries are added to the top of the log (log_idx=0), and the log_idx is 

137 incremented for all existing log enties. 

138 """ 

139 

140 _MAX_LOG_IDX = 0x3F # evohome controller only keeps most recent 64 entries 

141 

142 def __init__(self, tcs: _LogbookT) -> None: 

143 self._tcs: _LogbookT = tcs 

144 self.id = tcs.id 

145 self._gwy = tcs._gwy 

146 

147 self._log: FaultLogT = dict() 

148 self._map: FaultMapT = OrderedDict() 

149 self._log_done: bool | None = None 

150 

151 self._is_current: bool = False # if we now our log is out of date 

152 self._is_getting: bool = False 

153 

154 def _insert_into_map(self, idx: FaultIdxT, dtm: FaultDtmT | None) -> FaultMapT: 

155 """Rebuild the map (as best as possible), given the a log entry.""" 

156 

157 new_map: FaultMapT = OrderedDict() 

158 

159 # usu. idx == 0, but could be > 0 

160 new_map |= { 

161 k: v for k, v in self._map.items() if k < idx and (dtm is None or v > dtm) 

162 } 

163 

164 if dtm is None: # there are no subsequent log entries 

165 return new_map 

166 

167 new_map |= {idx: dtm} 

168 

169 if not (idxs := [k for k, v in self._map.items() if v < dtm]): 

170 return new_map 

171 

172 if (next_idx := min(idxs)) > idx: 

173 diff = 0 

174 elif next_idx == idx: 

175 diff = 1 # next - idx + 1 

176 else: 

177 diff = idx + 1 # 1 if self._map.get(idx) else 0 

178 

179 new_map |= { 

180 k + diff: v # type: ignore[misc] 

181 for k, v in self._map.items() 

182 if (k >= idx or v < dtm) and k + diff <= self._MAX_LOG_IDX 

183 } 

184 

185 return new_map 

186 

187 def handle_msg(self, msg: Message) -> None: 

188 """Handle a fault log message (some valid payloads should be ignored).""" 

189 

190 assert msg.code == Code._0418 and msg.verb in (I_, RP), "Coding error" 

191 

192 if msg.verb == RP and msg.payload[SZ_LOG_ENTRY] is None: 

193 # such payloads have idx == "00" (is sentinel for null), so can't know the 

194 # corresponding RQ's log_idx, but if verb == I_, safely assume log_idx is 0 

195 return 

196 

197 self._process_msg(msg) 

198 

199 def _process_msg(self, msg: Message) -> None: 

200 """Handle a processable fault log message.""" 

201 

202 if msg.verb == I_: 

203 self._is_current = False 

204 

205 if SZ_LOG_IDX not in msg.payload: 

206 return # we can't do anything useful with this message 

207 

208 idx: FaultIdxT = int(msg.payload[SZ_LOG_IDX], 16) # type: ignore[assignment] 

209 

210 if msg.payload[SZ_LOG_ENTRY] is None: # NOTE: Subsequent entries will be empty 

211 self._map = self._insert_into_map(idx, None) 

212 self._log = {k: v for k, v in self._log.items() if k in self._map.values()} 

213 return # If idx != 0, should we also check from idx = 0? 

214 

215 entry = FaultLogEntry.from_msg(msg) # if msg.payload[SZ_LOG_ENTRY] else None 

216 dtm: FaultDtmT = entry.timestamp # type: ignore[assignment] 

217 

218 if self._map.get(idx) == dtm: 

219 return # i.e. No evidence anything has changed 

220 

221 if dtm not in self._log: 

222 self._log |= {dtm: entry} # must add entry before _insert_into_map() 

223 self._map = self._insert_into_map(idx, dtm) # updates self._map 

224 self._log = {k: v for k, v in self._log.items() if k in self._map.values()} 

225 

226 # if idx != 0: # there's other (new/changed) entries above this one? 

227 # pass 

228 

229 def _hack_pkt_idx(self, pkt: Packet, cmd: Command) -> Message: 

230 """Modify the Packet so that it has the log index of its corresponding Command. 

231 

232 If there is no log entry for log_idx=<idx>, then the headers won't match: 

233 - cmd rx_hdr is 0418|RP|<ctl_id>|<idx> (expected) 

234 - pkt hdr will 0418|RP|<ctl_id>|00 (response from controller) 

235 

236 We can only assume that the Pkt is the reply to the Cmd, which is why using 

237 QoS with wait_for_reply=True is vital when getting fault log entries. 

238 

239 We can assume 0418| I|<ctl_id>|00 is only for log_idx=00 (I|0418s are stateless) 

240 """ 

241 

242 assert pkt.verb == RP and pkt.code == Code._0418 and pkt._idx == "00" 

243 assert pkt.payload == "000000B0000000000000000000007FFFFF7000000000" 

244 

245 assert cmd.verb == RQ and pkt.code == Code._0418 

246 assert cmd.rx_header and cmd.rx_header[:-2] == pkt._hdr[:-2] # reply to this RQ 

247 

248 if cmd._idx == "00": # no need to hack 

249 return Message(pkt) 

250 

251 idx = cmd.rx_header[-2:] # cmd._idx could be bool/None? 

252 pkt.payload = f"0000{idx}B0000000000000000000007FFFFF7000000000" 

253 

254 # NOTE: must now reset pkt payload, and its header 

255 pkt._repr = pkt._hdr_ = pkt._ctx_ = pkt._idx_ = None # type: ignore[assignment] 

256 pkt._frame = pkt._frame[:50] + idx + pkt._frame[52:] 

257 

258 assert pkt._hdr == cmd.rx_header, f"{self}: Coding error" 

259 assert str(pkt) == pkt._frame[:50] + idx + pkt._frame[52:], ( 

260 f"{self}: Coding error" 

261 ) 

262 

263 msg = Message(pkt) 

264 msg._payload = {SZ_LOG_IDX: idx, SZ_LOG_ENTRY: None} # PayDictT._0418_NULL 

265 

266 return msg 

267 

268 async def get_faultlog( 

269 self, 

270 /, 

271 *, 

272 start: int = 0, 

273 limit: int | None = DEFAULT_GET_LIMIT, 

274 force_refresh: bool = False, 

275 ) -> dict[FaultIdxT, FaultLogEntry]: 

276 """Retrieve the fault log from the controller.""" 

277 

278 if limit is None: 

279 limit = DEFAULT_GET_LIMIT 

280 

281 self._is_getting = True # TODO: semaphore? 

282 

283 # TODO: handle exc.RamsesException (RQ retries exceeded) 

284 for idx in range(start, min(start + limit, self._MAX_LOG_IDX + 1)): 

285 cmd = Command.get_system_log_entry(self.id, idx) 

286 pkt = await self._gwy.async_send_cmd(cmd, wait_for_reply=True) 

287 

288 if pkt.payload == "000000B0000000000000000000007FFFFF7000000000": 

289 msg = self._hack_pkt_idx(pkt, cmd) # RPs for null entries have idx==00 

290 self._process_msg(msg) # since pkt via dispatcher aint got idx 

291 break 

292 self._process_msg(Message(pkt)) # JIC dispatcher doesn't do this for us 

293 

294 self._is_getting = False 

295 self._is_current = True 

296 

297 return self.faultlog 

298 

299 @property 

300 def faultlog(self) -> dict[FaultIdxT, FaultLogEntry]: 

301 """Return the fault log of a system.""" 

302 

303 # if self._faultlog: 

304 # return self._faultlog 

305 

306 return {idx: self._log[dtm] for idx, dtm in self._map.items()} 

307 

308 async def is_current(self, force_io: bool = False) -> bool: 

309 """Return True if the local fault log is identical to the controllers. 

310 

311 If force_io, retrieve the 0th log entry and check it is identical to the local 

312 copy. 

313 """ 

314 

315 # if not self._is_current or not force_io: # TODO 

316 return self._is_current 

317 

318 @property 

319 def latest_event(self) -> FaultLogEntry | None: 

320 """Return the most recently logged event (fault or restore), if any.""" 

321 

322 if not self._log: # TODO: raise exception or retrieve log (make function)? 

323 return None 

324 

325 return self._log[max(k for k in self._log)] 

326 

327 @property 

328 def latest_fault(self) -> FaultLogEntry | None: 

329 """Return the most recently logged fault, if any.""" 

330 

331 if not self._log: # TODO: raise exception or retrieve log (make function)? 

332 return None 

333 

334 faults = [k for k, v in self._log.items() if v.fault_state == FaultState.FAULT] 

335 

336 if not faults: 

337 return None 

338 

339 return self._log[max(faults)] 

340 

341 @property 

342 def active_faults(self) -> tuple[FaultLogEntry, ...] | None: 

343 """Return a list of all faults outstanding (i.e. no corresponding restore).""" 

344 

345 if not self._log: # TODO: raise exception or retrieve log (make function)? 

346 return None 

347 

348 restores = {} 

349 faults = {} 

350 

351 for entry in sorted(self._log.values(), reverse=True): 

352 if entry.fault_state == FaultState.RESTORE: 

353 # keep to match against upcoming faults 

354 restores[entry._as_tuple()] = entry 

355 

356 if entry.fault_state == FaultState.FAULT: 

357 # look for (existing) matching restore, otherwise keep 

358 if entry._as_tuple() in restores: 

359 del restores[entry._as_tuple()] 

360 else: 

361 faults[entry._as_tuple()] = entry 

362 

363 return tuple(faults.values())