Coverage for src/ramses_rf/system/faultlog.py: 31%
158 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - Expose an 0418 fault log (is a stateful process)."""
4from __future__ import annotations
6import dataclasses
7import logging
8from collections import OrderedDict
9from typing import TYPE_CHECKING, NewType, TypeAlias
11from ramses_tx import Command, Message, Packet
12from ramses_tx.const import (
13 SZ_LOG_ENTRY,
14 SZ_LOG_IDX,
15 FaultDeviceClass,
16 FaultState,
17 FaultType,
18)
19from ramses_tx.helpers import parse_fault_log_entry
20from ramses_tx.schemas import DeviceIdT
22from ramses_rf.const import ( # noqa: F401, isort: skip, pylint: disable=unused-import
23 I_,
24 RP,
25 RQ,
26 W_,
27 Code,
28)
30if TYPE_CHECKING:
31 from ramses_rf.system.heat import _LogbookT
34FaultTupleT: TypeAlias = tuple[FaultType, FaultDeviceClass, DeviceIdT | None, str]
37DEFAULT_GET_LIMIT = 6
40_LOGGER = logging.getLogger(__name__)
43# {'log_idx': '00', 'log_entry': ('21-12-23T11:59:35', 'restore', 'battery_low', 'actuator', '00', '04:164787', 'B0', '0000', 'FFFF7000')}
46@dataclasses.dataclass(frozen=True, kw_only=True, order=True)
47class FaultLogEntry:
48 """A fault log entry of an evohome fault log.
50 Fault log entries do have a log_idx attr, but this is merely their current location
51 in the system's fault log.
52 """
54 timestamp: str # # # 21-12-23T11:59:35 - assume is unique
55 fault_state: FaultState # # # fault, restore, unknown_c0
56 fault_type: FaultType # # # system_fault, battery_low, sensor_fault, etc.
57 domain_idx: str # # # 00-0F, FC, etc. ? only if dev_class is/not CTL?
58 device_class: FaultDeviceClass # # controller, actuator, sensor, etc.
59 device_id: DeviceIdT | None # # # 04:164787
61 # def __post_init__(self):
62 # def modify(device_id: DeviceIdT) -> DeviceIdT:
63 # object.__setattr__(self, "device_id", modify(self.device_id))
65 def __str__(self) -> str:
66 return (
67 f"{self.timestamp}, {(self.fault_state + ','):<8} {self.fault_type}, "
68 f"{self.device_id}, {self.domain_idx}, {self.device_class}"
69 )
71 def _is_matching_pair(self, other: object) -> bool:
72 """Return True if the other entry could be a matching pair (fault/restore)."""
74 if not isinstance(other, FaultLogEntry): # TODO: make a parochial exception
75 raise TypeError(f"{other} is not not a FaultLogEntry")
77 if self.fault_state == FaultState.FAULT:
78 return (
79 other.fault_state == FaultState.RESTORE
80 and self._as_tuple() == other._as_tuple()
81 and other.timestamp > self.timestamp
82 )
84 if self.fault_state == FaultState.RESTORE:
85 return (
86 other.fault_state == FaultState.FAULT
87 and self._as_tuple() == other._as_tuple()
88 and other.timestamp < self.timestamp
89 )
91 return False
93 def _as_tuple(self) -> FaultTupleT: # only for use within this class
94 """Return the log entry as a tuple, excluding dtm & state (fault/restore)."""
96 return (
97 self.fault_type,
98 self.device_class,
99 self.device_id,
100 self.domain_idx,
101 )
103 @classmethod
104 def from_msg(cls, msg: Message) -> FaultLogEntry:
105 """Create a fault log entry from a message's packet."""
106 return cls.from_pkt(msg._pkt)
108 @classmethod
109 def from_pkt(cls, pkt: Packet) -> FaultLogEntry:
110 """Create a fault log entry from a packet's payload."""
112 log_entry = parse_fault_log_entry(pkt.payload)
113 if log_entry is None: # TODO: make a parochial exception
114 raise TypeError("Null fault log entry")
116 return cls(**{k: v for k, v in log_entry.items() if k[:1] != "_"}) # type: ignore[arg-type]
119FaultDtmT = NewType("FaultDtmT", str)
120FaultIdxT = NewType("FaultIdxT", int)
122FaultLogT: TypeAlias = dict[FaultDtmT, FaultLogEntry]
123FaultMapT: TypeAlias = OrderedDict[FaultIdxT, FaultDtmT]
126class FaultLog: # 0418 # TODO: use a NamedTuple
127 """The fault log of an evohome system.
129 This code assumes that the `timestamp` attr of each log entry is a unique identifier.
131 Null entries do not have a timestamp. All subsequent entries will also be null.
133 The `log_idx` is not an identifier: it is merely the current position of a log entry
134 in the system log.
136 New entries are added to the top of the log (log_idx=0), and the log_idx is
137 incremented for all existing log enties.
138 """
140 _MAX_LOG_IDX = 0x3F # evohome controller only keeps most recent 64 entries
142 def __init__(self, tcs: _LogbookT) -> None:
143 self._tcs: _LogbookT = tcs
144 self.id = tcs.id
145 self._gwy = tcs._gwy
147 self._log: FaultLogT = dict()
148 self._map: FaultMapT = OrderedDict()
149 self._log_done: bool | None = None
151 self._is_current: bool = False # if we now our log is out of date
152 self._is_getting: bool = False
154 def _insert_into_map(self, idx: FaultIdxT, dtm: FaultDtmT | None) -> FaultMapT:
155 """Rebuild the map (as best as possible), given the a log entry."""
157 new_map: FaultMapT = OrderedDict()
159 # usu. idx == 0, but could be > 0
160 new_map |= {
161 k: v for k, v in self._map.items() if k < idx and (dtm is None or v > dtm)
162 }
164 if dtm is None: # there are no subsequent log entries
165 return new_map
167 new_map |= {idx: dtm}
169 if not (idxs := [k for k, v in self._map.items() if v < dtm]):
170 return new_map
172 if (next_idx := min(idxs)) > idx:
173 diff = 0
174 elif next_idx == idx:
175 diff = 1 # next - idx + 1
176 else:
177 diff = idx + 1 # 1 if self._map.get(idx) else 0
179 new_map |= {
180 k + diff: v # type: ignore[misc]
181 for k, v in self._map.items()
182 if (k >= idx or v < dtm) and k + diff <= self._MAX_LOG_IDX
183 }
185 return new_map
187 def handle_msg(self, msg: Message) -> None:
188 """Handle a fault log message (some valid payloads should be ignored)."""
190 assert msg.code == Code._0418 and msg.verb in (I_, RP), "Coding error"
192 if msg.verb == RP and msg.payload[SZ_LOG_ENTRY] is None:
193 # such payloads have idx == "00" (is sentinel for null), so can't know the
194 # corresponding RQ's log_idx, but if verb == I_, safely assume log_idx is 0
195 return
197 self._process_msg(msg)
199 def _process_msg(self, msg: Message) -> None:
200 """Handle a processable fault log message."""
202 if msg.verb == I_:
203 self._is_current = False
205 if SZ_LOG_IDX not in msg.payload:
206 return # we can't do anything useful with this message
208 idx: FaultIdxT = int(msg.payload[SZ_LOG_IDX], 16) # type: ignore[assignment]
210 if msg.payload[SZ_LOG_ENTRY] is None: # NOTE: Subsequent entries will be empty
211 self._map = self._insert_into_map(idx, None)
212 self._log = {k: v for k, v in self._log.items() if k in self._map.values()}
213 return # If idx != 0, should we also check from idx = 0?
215 entry = FaultLogEntry.from_msg(msg) # if msg.payload[SZ_LOG_ENTRY] else None
216 dtm: FaultDtmT = entry.timestamp # type: ignore[assignment]
218 if self._map.get(idx) == dtm:
219 return # i.e. No evidence anything has changed
221 if dtm not in self._log:
222 self._log |= {dtm: entry} # must add entry before _insert_into_map()
223 self._map = self._insert_into_map(idx, dtm) # updates self._map
224 self._log = {k: v for k, v in self._log.items() if k in self._map.values()}
226 # if idx != 0: # there's other (new/changed) entries above this one?
227 # pass
229 def _hack_pkt_idx(self, pkt: Packet, cmd: Command) -> Message:
230 """Modify the Packet so that it has the log index of its corresponding Command.
232 If there is no log entry for log_idx=<idx>, then the headers won't match:
233 - cmd rx_hdr is 0418|RP|<ctl_id>|<idx> (expected)
234 - pkt hdr will 0418|RP|<ctl_id>|00 (response from controller)
236 We can only assume that the Pkt is the reply to the Cmd, which is why using
237 QoS with wait_for_reply=True is vital when getting fault log entries.
239 We can assume 0418| I|<ctl_id>|00 is only for log_idx=00 (I|0418s are stateless)
240 """
242 assert pkt.verb == RP and pkt.code == Code._0418 and pkt._idx == "00"
243 assert pkt.payload == "000000B0000000000000000000007FFFFF7000000000"
245 assert cmd.verb == RQ and pkt.code == Code._0418
246 assert cmd.rx_header and cmd.rx_header[:-2] == pkt._hdr[:-2] # reply to this RQ
248 if cmd._idx == "00": # no need to hack
249 return Message(pkt)
251 idx = cmd.rx_header[-2:] # cmd._idx could be bool/None?
252 pkt.payload = f"0000{idx}B0000000000000000000007FFFFF7000000000"
254 # NOTE: must now reset pkt payload, and its header
255 pkt._repr = pkt._hdr_ = pkt._ctx_ = pkt._idx_ = None # type: ignore[assignment]
256 pkt._frame = pkt._frame[:50] + idx + pkt._frame[52:]
258 assert pkt._hdr == cmd.rx_header, f"{self}: Coding error"
259 assert str(pkt) == pkt._frame[:50] + idx + pkt._frame[52:], (
260 f"{self}: Coding error"
261 )
263 msg = Message(pkt)
264 msg._payload = {SZ_LOG_IDX: idx, SZ_LOG_ENTRY: None} # PayDictT._0418_NULL
266 return msg
268 async def get_faultlog(
269 self,
270 /,
271 *,
272 start: int = 0,
273 limit: int | None = DEFAULT_GET_LIMIT,
274 force_refresh: bool = False,
275 ) -> dict[FaultIdxT, FaultLogEntry]:
276 """Retrieve the fault log from the controller."""
278 if limit is None:
279 limit = DEFAULT_GET_LIMIT
281 self._is_getting = True # TODO: semaphore?
283 # TODO: handle exc.RamsesException (RQ retries exceeded)
284 for idx in range(start, min(start + limit, self._MAX_LOG_IDX + 1)):
285 cmd = Command.get_system_log_entry(self.id, idx)
286 pkt = await self._gwy.async_send_cmd(cmd, wait_for_reply=True)
288 if pkt.payload == "000000B0000000000000000000007FFFFF7000000000":
289 msg = self._hack_pkt_idx(pkt, cmd) # RPs for null entries have idx==00
290 self._process_msg(msg) # since pkt via dispatcher aint got idx
291 break
292 self._process_msg(Message(pkt)) # JIC dispatcher doesn't do this for us
294 self._is_getting = False
295 self._is_current = True
297 return self.faultlog
299 @property
300 def faultlog(self) -> dict[FaultIdxT, FaultLogEntry]:
301 """Return the fault log of a system."""
303 # if self._faultlog:
304 # return self._faultlog
306 return {idx: self._log[dtm] for idx, dtm in self._map.items()}
308 async def is_current(self, force_io: bool = False) -> bool:
309 """Return True if the local fault log is identical to the controllers.
311 If force_io, retrieve the 0th log entry and check it is identical to the local
312 copy.
313 """
315 # if not self._is_current or not force_io: # TODO
316 return self._is_current
318 @property
319 def latest_event(self) -> FaultLogEntry | None:
320 """Return the most recently logged event (fault or restore), if any."""
322 if not self._log: # TODO: raise exception or retrieve log (make function)?
323 return None
325 return self._log[max(k for k in self._log)]
327 @property
328 def latest_fault(self) -> FaultLogEntry | None:
329 """Return the most recently logged fault, if any."""
331 if not self._log: # TODO: raise exception or retrieve log (make function)?
332 return None
334 faults = [k for k, v in self._log.items() if v.fault_state == FaultState.FAULT]
336 if not faults:
337 return None
339 return self._log[max(faults)]
341 @property
342 def active_faults(self) -> tuple[FaultLogEntry, ...] | None:
343 """Return a list of all faults outstanding (i.e. no corresponding restore)."""
345 if not self._log: # TODO: raise exception or retrieve log (make function)?
346 return None
348 restores = {}
349 faults = {}
351 for entry in sorted(self._log.values(), reverse=True):
352 if entry.fault_state == FaultState.RESTORE:
353 # keep to match against upcoming faults
354 restores[entry._as_tuple()] = entry
356 if entry.fault_state == FaultState.FAULT:
357 # look for (existing) matching restore, otherwise keep
358 if entry._as_tuple() in restores:
359 del restores[entry._as_tuple()]
360 else:
361 faults[entry._as_tuple()] = entry
363 return tuple(faults.values())