Coverage for src/ramses_rf/system/schedule.py: 34%
237 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - Expose an 0404 schedule (is a stateful process)."""
4# TODO: use schemas from evohome_async
6from __future__ import annotations
8import asyncio
9import logging
10import struct
11import zlib
12from collections.abc import Iterable
13from datetime import timedelta as td
14from typing import TYPE_CHECKING, Any, Final, NotRequired, TypeAlias, TypedDict
16import voluptuous as vol # type: ignore[import, unused-ignore]
18from ramses_rf.const import (
19 SZ_FRAG_NUMBER,
20 SZ_FRAGMENT,
21 SZ_SCHEDULE,
22 SZ_TOTAL_FRAGS,
23 SZ_ZONE_IDX,
24)
25from ramses_tx.command import Command
26from ramses_tx.const import SZ_CHANGE_COUNTER, Priority
27from ramses_tx.message import Message
28from ramses_tx.packet import Packet
30from ramses_rf.const import ( # noqa: F401, isort: skip, pylint: disable=unused-import
31 I_,
32 RP,
33 RQ,
34 W_,
35 Code,
36)
38if TYPE_CHECKING:
39 from ramses_rf.system.zones import DhwZone, Zone
42class EmptyDictT(TypedDict):
43 pass
46class SwitchPointDhw(TypedDict):
47 time_of_day: str
48 enabled: bool
51class SwitchPointZon(TypedDict):
52 time_of_day: str
53 heat_setpoint: float
56SwitchPointT: TypeAlias = SwitchPointDhw | SwitchPointZon
57SwitchPointsT: TypeAlias = list[SwitchPointDhw] | list[SwitchPointZon]
60class DayOfWeek(TypedDict):
61 day_of_week: int
62 switchpoints: SwitchPointsT
65DayOfWeekT: TypeAlias = DayOfWeek
66InnerScheduleT: TypeAlias = list[DayOfWeek]
69class _OuterSchedule(TypedDict):
70 zone_idx: str
71 schedule: InnerScheduleT
74class _EmptySchedule(TypedDict):
75 zone_idx: str
76 schedule: NotRequired[EmptyDictT | None]
79OuterScheduleT: TypeAlias = _OuterSchedule | _EmptySchedule
82_LOGGER = logging.getLogger(__name__)
85FIVE_MINS = td(minutes=5)
87SZ_MSG: Final = "msg"
89SZ_DAY_OF_WEEK: Final = "day_of_week"
90SZ_HEAT_SETPOINT: Final = "heat_setpoint"
91SZ_SWITCHPOINTS: Final = "switchpoints"
92SZ_TIME_OF_DAY: Final = "time_of_day"
93SZ_ENABLED: Final = "enabled"
95REGEX_TIME_OF_DAY: Final = r"^([0-1][0-9]|2[0-3]):[0-5][05]$"
98def schema_sched(schema_switchpoint: vol.Schema) -> vol.Schema:
99 schema_sched_day = vol.Schema(
100 {
101 vol.Required(SZ_DAY_OF_WEEK): int,
102 vol.Required(SZ_SWITCHPOINTS): vol.All(
103 [schema_switchpoint], vol.Length(min=1)
104 ),
105 },
106 extra=vol.PREVENT_EXTRA,
107 )
108 return vol.Schema(
109 vol.All([schema_sched_day], vol.Length(min=0, max=7)),
110 extra=vol.PREVENT_EXTRA,
111 )
114SCH_SWITCHPOINT_DHW = vol.Schema(
115 {
116 vol.Required(SZ_TIME_OF_DAY): vol.Match(REGEX_TIME_OF_DAY),
117 vol.Required(SZ_ENABLED): bool,
118 },
119 extra=vol.PREVENT_EXTRA,
120)
122SCH_SWITCHPOINT_ZON = vol.Schema(
123 {
124 vol.Required(SZ_TIME_OF_DAY): vol.Match(REGEX_TIME_OF_DAY),
125 vol.Required(SZ_HEAT_SETPOINT): vol.All(
126 vol.Coerce(float), vol.Range(min=5, max=35)
127 ),
128 },
129 extra=vol.PREVENT_EXTRA,
130)
132SCH_SCHEDULE_DHW = schema_sched(SCH_SWITCHPOINT_DHW)
133SCH_SCHEDULE_DHW_OUTER = vol.Schema(
134 {
135 vol.Required(SZ_ZONE_IDX): "HW",
136 vol.Required(SZ_SCHEDULE): SCH_SCHEDULE_DHW,
137 },
138 extra=vol.PREVENT_EXTRA,
139)
141SCH_SCHEDULE_ZON = schema_sched(SCH_SWITCHPOINT_ZON)
142SCH_SCHEDULE_ZON_OUTER = vol.Schema(
143 {
144 vol.Required(SZ_ZONE_IDX): vol.Match(r"0[0-F]"),
145 vol.Required(SZ_SCHEDULE): SCH_SCHEDULE_ZON,
146 },
147 extra=vol.PREVENT_EXTRA,
148)
150SCH_FULL_SCHEDULE = vol.Schema(
151 vol.Any(SCH_SCHEDULE_DHW_OUTER, SCH_SCHEDULE_ZON_OUTER),
152 extra=vol.PREVENT_EXTRA,
153)
156_PayloadT: TypeAlias = dict[str, Any] # Message payload
157_PayloadSetT: TypeAlias = list[_PayloadT | None]
159_FragmentT: TypeAlias = str
160_FragmentSetT: TypeAlias = list[_FragmentT]
162EMPTY_PAYLOAD_SET: _PayloadSetT = [None]
165# TODO: make stateful (a la binding)
166class Schedule: # 0404
167 """The schedule of a zone."""
169 def __init__(self, zone: DhwZone | Zone) -> None:
170 _LOGGER.debug("Schedule(zon=%s).__init__()", zone)
172 self.id = zone.id
173 self._zone = zone
174 self.idx = zone.idx
176 self.ctl = zone.ctl
177 self.tcs = zone.tcs
178 self._gwy = zone._gwy
180 self._full_schedule: OuterScheduleT | EmptyDictT = {}
182 self._payload_set: _PayloadSetT = EMPTY_PAYLOAD_SET # Rx'd
183 self._fragments: _FragmentSetT = [] # to Tx
185 self._global_ver = 0 # None is a sentinel for 'dont know'
186 self._sched_ver = 0 # the global_ver when this schedule was retrieved
188 def __str__(self) -> str:
189 return f"{self._zone} (schedule)"
191 def _handle_msg(self, msg: Message) -> None:
192 """Process a schedule packet: if possible, create the corresponding schedule."""
194 if msg.code == Code._0006: # keep up, in cause is useful to know in future
195 self._global_ver = msg.payload[SZ_CHANGE_COUNTER]
196 return
198 if msg.code != Code._0404:
199 return
201 # can do via here, or via gwy.async_send_cmd(cmd)
202 # next line also in self._get_schedule(), so protected here with a lock
203 if msg.payload[SZ_TOTAL_FRAGS] != 0xFF and self.tcs.zone_lock_idx != self.idx:
204 self._payload_set = self._update_payload_set(self._payload_set, msg.payload)
206 async def _is_dated(self, *, force_io: bool = False) -> tuple[bool, bool]:
207 """Indicate if it is possible that a more recent schedule is available.
209 If required, retrieve the latest global version (change counter) from the
210 TCS.
212 There may be a false positive if another zone's schedule is changed when
213 this zone's schedule has not. There may be a false negative if this zone's
214 schedule was changed only very recently and a cached global version was
215 used.
217 If `force_io`, then a true negative is guaranteed (it forces an RQ|0006 unless
218 self._global_ver > self._sched_ver).
219 """
221 # this will not cause an I/O...
222 if (
223 not force_io
224 and not self._sched_ver
225 or (self._global_ver and self._global_ver > self._sched_ver)
226 ):
227 return True, False # is_dated, did_io
229 # this may cause an I/O...
230 self._global_ver, did_io = await self.tcs._schedule_version()
231 if did_io or self._global_ver > self._sched_ver:
232 return self._global_ver > self._sched_ver, did_io # is_dated, did_io
234 if force_io: # this will cause an I/O...
235 self._global_ver, did_io = await self.tcs._schedule_version(
236 force_io=force_io
237 )
239 return self._global_ver > self._sched_ver, did_io # is_dated, did_io
241 async def get_schedule(
242 self, *, force_io: bool = False, timeout: float = 15
243 ) -> InnerScheduleT | None:
244 """Retrieve/return the brief schedule of a zone.
246 Return the cached schedule (which may have been eavesdropped) only if the
247 global change counter has not increased.
248 Otherwise, RQ the latest schedule from the controller and return that.
250 If `force_io`, then the latest schedule is guaranteed (it forces an RQ|0006).
251 """
253 try:
254 await asyncio.wait_for(
255 self._get_schedule(force_io=force_io), timeout=timeout
256 )
257 except TimeoutError as err:
258 raise TimeoutError(
259 f"Failed to obtain schedule within {timeout} secs"
260 ) from err
261 # TODO: raise a more parochial exception
262 return self.schedule
264 async def _get_schedule(self, *, force_io: bool = False) -> None:
265 """Retrieve/return the schedule of a zone (sets self._full_schedule)."""
267 async def get_fragment(frag_num: int) -> _PayloadT: # may: TimeoutError?
268 """Retrieve a schedule fragment from the controller."""
270 frag_set_size = 0 if frag_num == 1 else _len(self._payload_set)
271 cmd = Command.get_schedule_fragment(
272 self.ctl.id, self.idx, frag_num, frag_set_size
273 )
274 pkt: Packet = await self._gwy.async_send_cmd(
275 cmd, wait_for_reply=True, priority=Priority.HIGH
276 )
277 msg = Message(pkt)
278 assert isinstance(msg.payload, dict) # mypy check
279 return msg.payload # may: TimeoutError?
281 is_dated, did_io = await self._is_dated(force_io=force_io)
282 if is_dated:
283 self._full_schedule = {} # keep frags, maybe only other scheds have changed
284 if self._full_schedule:
285 return
287 await self.tcs._obtain_lock(self.idx) # maybe raise TimeOutError
289 if not did_io: # must know the version of the schedule about to be RQ'd
290 self._global_ver, _ = await self.tcs._schedule_version(force_io=True)
292 self._payload_set[0] = None # if 1st frag valid: schedule very likely unchanged
293 while frag_num := next(
294 i for i, f in enumerate(self._payload_set, 1) if f is None
295 ):
296 fragment = await get_fragment(frag_num)
297 # next line also in self._handle_msg(), so protected there with a lock
298 self._payload_set = self._update_payload_set(self._payload_set, fragment)
299 if self._full_schedule: # TODO: potential for infinite loop?
300 self._sched_ver = self._global_ver # type: ignore[unreachable]
301 break
303 self.tcs._release_lock()
305 def _proc_payload_set(self, payload_set: _PayloadSetT) -> OuterScheduleT | None:
306 """Process a payload set and return the full schedule (sets `self._schedule`).
308 If the schedule is for DHW, set the `zone_idx` key to 'HW' (to avoid confusing
309 with zone '00').
310 """
312 # TODO: relying upon caller to ensure set is only empty or full
314 if payload_set == EMPTY_PAYLOAD_SET:
315 self._full_schedule = {SZ_ZONE_IDX: self.idx}
316 return self._full_schedule
318 try:
319 schedule = fragz_to_full_sched(
320 payload[SZ_FRAGMENT] for payload in payload_set if payload
321 ) # TODO: messy - what is set not full
322 except zlib.error:
323 return None # TODO: raise a more parochial exception
325 if self.idx == "HW":
326 schedule[SZ_ZONE_IDX] = "HW"
327 self._full_schedule = schedule
329 return self._full_schedule # NOTE: not self.schedule
331 def _update_payload_set(
332 self, payload_set: _PayloadSetT, payload: _PayloadT
333 ) -> _PayloadSetT:
334 """Add a fragment to a frag set and process/return the new set.
336 If the frag set is complete, check for a schedule (sets `self._schedule`).
338 If required, start a new frag set with the fragment.
339 """
341 def init_payload_set(payload: _PayloadT) -> _PayloadSetT:
342 payload_set: _PayloadSetT = [None] * payload[SZ_TOTAL_FRAGS]
343 payload_set[payload[SZ_FRAG_NUMBER] - 1] = payload
344 return payload_set
346 if payload[SZ_TOTAL_FRAGS] is None: # zone has no schedule
347 payload_set = EMPTY_PAYLOAD_SET
348 self._proc_payload_set(payload_set)
349 return payload_set
351 if payload[SZ_TOTAL_FRAGS] != _len(payload_set): # sched has changed
352 return init_payload_set(payload)
354 payload_set[payload[SZ_FRAG_NUMBER] - 1] = payload
355 if None in payload_set or self._proc_payload_set(
356 payload_set
357 ): # sets self._schedule
358 return payload_set
360 return init_payload_set(payload)
362 async def set_schedule(
363 self, schedule: InnerScheduleT, force_refresh: bool = False
364 ) -> InnerScheduleT | None:
365 """Set the schedule of a zone."""
367 async def put_fragment(frag_num: int, frag_cnt: int, fragment: str) -> None:
368 """Send a schedule fragment to the controller."""
370 cmd = Command.set_schedule_fragment(
371 self.ctl.id, self.idx, frag_num, frag_cnt, fragment
372 )
373 await self._gwy.async_send_cmd(
374 cmd, wait_for_reply=True, priority=Priority.HIGH
375 )
377 def normalise_validate(schedule: InnerScheduleT) -> _OuterSchedule:
378 full_schedule: _OuterSchedule
380 if self.idx == "HW":
381 full_schedule = {SZ_ZONE_IDX: "HW", SZ_SCHEDULE: schedule}
382 schedule_schema = SCH_SCHEDULE_DHW_OUTER
383 else:
384 full_schedule = {SZ_ZONE_IDX: self.idx, SZ_SCHEDULE: schedule}
385 schedule_schema = SCH_SCHEDULE_ZON_OUTER
387 try:
388 full_schedule = schedule_schema(full_schedule)
389 except vol.MultipleInvalid as err:
390 raise TypeError(f"failed to set schedule: {err}") from err
392 if self.idx == "HW": # HACK: to avoid confusing dhw with zone '00'
393 full_schedule[SZ_ZONE_IDX] = "00"
395 return full_schedule
397 full_schedule: _OuterSchedule = normalise_validate(schedule)
398 self._fragments = full_sched_to_fragz(full_schedule)
400 await self.tcs._obtain_lock(self.idx) # maybe raise TimeOutError
402 try:
403 for num, frag in enumerate(self._fragments, 1):
404 await put_fragment(num, len(self._fragments), frag)
405 except TimeoutError as err:
406 raise TimeoutError(f"failed to set schedule: {err}") from err
407 else:
408 if not force_refresh:
409 self._global_ver, _ = await self.tcs._schedule_version(force_io=True)
410 # assert self._global_ver > self._sched_ver
411 self._sched_ver = self._global_ver
412 finally:
413 self.tcs._release_lock()
415 if force_refresh:
416 await self.get_schedule(force_io=True) # sets self._full_schedule
417 else:
418 self._full_schedule = full_schedule
420 return self.schedule
422 @property
423 def schedule(self) -> InnerScheduleT | None:
424 """Return the current (not full) schedule, if any."""
425 if not self._full_schedule: # can be {}
426 return None
427 result: InnerScheduleT = self._full_schedule.get(SZ_SCHEDULE) # type: ignore[assignment]
428 return result
430 @property
431 def version(self) -> int | None:
432 """Return the version associated with the current schedule, if any."""
433 return self._sched_ver if self._full_schedule else None
436# TODO: deprecate in favour of len(payload_set)
437def _len(payload_set: _PayloadSetT) -> int:
438 """Return the total number of fragments in the complete frag set.
440 Return 0 if the expected set size is unknown (sentinel value as per RAMSES II).
442 Uses frag_set[i][SZ_TOTAL_FRAGS] instead of `len(frag_set)` (is necessary?).
443 """
444 # for frag in (f for f in payload_set if f is not None): # they will all match
445 # assert len(payload_set) == frag[SZ_TOTAL_FRAGS] # TODO: remove
446 # assert isinstance(frag[SZ_TOTAL_FRAGS], int) # mypy check
447 # result: int = frag[SZ_TOTAL_FRAGS]
448 # return result
450 # assert payload_set == EMPTY_PAYLOAD_SET # TODO: remove
451 # return 0 # sentinel value as per RAMSES protocol
452 return len(payload_set)
455def fragz_to_full_sched(fragments: Iterable[_FragmentT]) -> _OuterSchedule:
456 """Convert a tuple of fragments strs (a blob) into a schedule.
458 May raise a `zlib.error` exception.
459 """
461 def setpoint(value: int) -> dict[str, bool | float]:
462 if value in (0, 1):
463 return {SZ_ENABLED: bool(value)}
464 return {SZ_HEAT_SETPOINT: value / 100}
466 raw_schedule = zlib.decompress(bytearray.fromhex("".join(fragments)))
468 old_day = 0
469 schedule: InnerScheduleT = []
470 switchpoints: SwitchPointsT = [] # type: ignore[assignment, unused-ignore]
472 idx: int
473 dow: int
474 tod: int
475 val: int
477 for i in range(0, len(raw_schedule), 20):
478 idx, dow, tod, val = _struct_unpack(raw_schedule[i : i + 20])
480 if dow > old_day:
481 schedule.append({SZ_DAY_OF_WEEK: old_day, SZ_SWITCHPOINTS: switchpoints})
482 old_day, switchpoints = dow, [] # type: ignore[assignment, unused-ignore]
484 switchpoint: SwitchPointDhw | SwitchPointZon = {
485 SZ_TIME_OF_DAY: "{:02d}:{:02d}".format(*divmod(tod, 60))
486 } | setpoint(val) # type: ignore[assignment]
487 switchpoints.append(switchpoint) # type: ignore[arg-type]
489 schedule.append({SZ_DAY_OF_WEEK: old_day, SZ_SWITCHPOINTS: switchpoints})
491 return {SZ_ZONE_IDX: f"{idx:02X}", SZ_SCHEDULE: schedule}
494def full_sched_to_fragz(full_schedule: _OuterSchedule) -> list[_FragmentT]:
495 """Convert a schedule into a set of fragments (a blob).
497 May raise `KeyError`, `zlib.error` exceptions.
498 """
500 cobj = zlib.compressobj(level=9, wbits=14)
501 frags: list[bytes] = []
503 days_of_week: InnerScheduleT = full_schedule[SZ_SCHEDULE]
504 for week_day in days_of_week:
505 switchpoints: SwitchPointsT = week_day[SZ_SWITCHPOINTS]
506 for switchpoint in switchpoints:
507 frags.append(_struct_pack(full_schedule, week_day, switchpoint))
509 blob = (b"".join(cobj.compress(f) for f in frags) + cobj.flush()).hex().upper()
511 return [blob[i : i + 82] for i in range(0, len(blob), 82)]
514def _struct_pack(
515 full_schedule: OuterScheduleT,
516 week_day: DayOfWeekT,
517 switchpoint: SwitchPointDhw | SwitchPointZon,
518) -> bytes:
519 idx_: str = full_schedule[SZ_ZONE_IDX]
520 dow_: int = week_day[SZ_DAY_OF_WEEK]
521 tod_: str = switchpoint[SZ_TIME_OF_DAY]
523 idx = int(idx_, 16)
524 dow = int(dow_)
525 tod = int(tod_[:2]) * 60 + int(tod_[3:])
527 if SZ_HEAT_SETPOINT in switchpoint:
528 val = int(switchpoint[SZ_HEAT_SETPOINT] * 100) # type: ignore[typeddict-item]
529 else:
530 val = int(bool(switchpoint[SZ_ENABLED]))
532 return struct.pack("<xxxxBxxxBxxxHxxHxx", idx, dow, tod, val)
535def _struct_unpack(raw_schedule: bytes) -> tuple[int, int, int, int]:
536 idx, dow, tod, val, _ = struct.unpack("<xxxxBxxxBxxxHxxHH", raw_schedule)
537 return idx, dow, tod, val
540# 16:27:56.942 000 RQ --- 18:006402 01:145038 --:------ 0006 001 00
541# 16:27:56.958 038 RP --- 01:145038 18:006402 --:------ 0006 004 00050009
543# 16:27:57.005 000 RQ --- 18:006402 01:145038 --:------ 0404 007 0120000800-0100
544# 16:27:57.068 037 RP --- 01:145038 18:006402 --:------ 0404 048 0120000829-0103-68816DCFCB0980301045D1994C3E624916660956604596600516E1D285094112F566F5B80C072222A2
545# 16:27:57.114 000 RQ --- 18:006402 01:145038 --:------ 0404 007 0120000800-0203
546# 16:27:57.161 038 RP --- 01:145038 18:006402 --:------ 0404 048 0120000829-0203-52DF92C79CEA7EDA91C7F06997FDEFC620B287D6143C054FC153F01C780E3C079E03CFC033F00C3C03
547# 16:27:57.202 000 RQ --- 18:006402 01:145038 --:------ 0404 007 0120000800-0303
548# 16:27:57.245 038 RP --- 01:145038 18:006402 --:------ 0404 045 0120000826-0303-CF83E7C1F3E079F0CADC3E5E696BFECC944EED5BF5DEAD7AAD45F0227811BCD87937936E24CF