Coverage for src/ramses_rf/system/heat.py: 32%
513 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - The evohome-compatible system."""
4from __future__ import annotations
6import asyncio
7import logging
8from datetime import datetime as dt, timedelta as td
9from threading import Lock
10from types import SimpleNamespace
11from typing import TYPE_CHECKING, Any, NoReturn, TypeVar
13from ramses_rf.const import (
14 SYS_MODE_MAP,
15 SZ_ACTUATORS,
16 SZ_CHANGE_COUNTER,
17 SZ_DATETIME,
18 SZ_DEVICES,
19 SZ_DHW_IDX,
20 SZ_DOMAIN_ID,
21 SZ_HEAT_DEMAND,
22 SZ_LANGUAGE,
23 SZ_SENSOR,
24 SZ_SYSTEM_MODE,
25 SZ_TEMPERATURE,
26 SZ_ZONE_IDX,
27 SZ_ZONE_MASK,
28 SZ_ZONE_TYPE,
29 SZ_ZONES,
30)
31from ramses_rf.device import (
32 BdrSwitch,
33 Controller,
34 Device,
35 OtbGateway,
36 Temperature,
37 UfhController,
38)
39from ramses_rf.entity_base import Entity, Parent, class_by_attr
40from ramses_rf.helpers import shrink
41from ramses_rf.schemas import (
42 DEFAULT_MAX_ZONES,
43 SCH_TCS,
44 SCH_TCS_DHW,
45 SCH_TCS_ZONES_ZON,
46 SZ_APPLIANCE_CONTROL,
47 SZ_CLASS,
48 SZ_DHW_SYSTEM,
49 SZ_MAX_ZONES,
50 SZ_ORPHANS,
51 SZ_SYSTEM,
52 SZ_UFH_SYSTEM,
53)
54from ramses_tx import (
55 DEV_ROLE_MAP,
56 DEV_TYPE_MAP,
57 ZON_ROLE_MAP,
58 Command,
59 DeviceIdT,
60 Message,
61 Priority,
62)
63from ramses_tx.typed_dicts import PayDictT
65from .faultlog import FaultLog
66from .zones import zone_factory
68if TYPE_CHECKING:
69 from ramses_tx import Address, Packet
71 from .faultlog import FaultIdxT, FaultLogEntry
72 from .zones import DhwZone, Zone
75# TODO: refactor packet routing (filter *before* routing)
78from ramses_rf.const import ( # noqa: F401, isort: skip, pylint: disable=unused-import
79 F9,
80 FA,
81 FC,
82 FF,
83)
85from ramses_rf.const import ( # noqa: F401, isort: skip, pylint: disable=unused-import
86 I_,
87 RP,
88 RQ,
89 W_,
90 Code,
91)
94_LOGGER = logging.getLogger(__name__)
97_SystemT = TypeVar("_SystemT", bound="Evohome")
99_StoredHwT = TypeVar("_StoredHwT", bound="StoredHw")
100_LogbookT = TypeVar("_LogbookT", bound="Logbook")
101_MultiZoneT = TypeVar("_MultiZoneT", bound="MultiZone")
104SYS_KLASS = SimpleNamespace(
105 SYS="system", # Generic (promotable?) system
106 TCS="evohome",
107 PRG="programmer",
108)
111class SystemBase(Parent, Entity): # 3B00 (multi-relay)
112 """The TCS base class."""
114 _SLUG: str = None # type: ignore[assignment]
116 # TODO: check (code so complex, not sure if this is true)
117 childs: list[Device] # type: ignore[assignment]
119 def __init__(self, ctl: Controller) -> None:
120 _LOGGER.debug("Creating a TCS for CTL: %s (%s)", ctl.id, self.__class__)
122 if ctl.id in ctl._gwy.system_by_id:
123 raise LookupError(f"Duplicate TCS for CTL: {ctl.id}")
124 if not isinstance(ctl, Controller): # TODO
125 raise ValueError(f"Invalid CTL: {ctl} (is not a controller)")
127 super().__init__(ctl._gwy)
129 # FIXME: ZZZ entities must know their parent device ID and their own idx
130 self._z_id = ctl.id # the responsible device is the controller
131 self._z_idx = None # ? True (sentinel value to pick up arrays?)
133 self.id: DeviceIdT = ctl.id
135 self.ctl: Controller = ctl
136 self.tcs: Evohome = self # type: ignore[assignment]
137 self._child_id = FF # NOTE: domain_id
139 self._app_cntrl: BdrSwitch | OtbGateway | None = None
140 self._heat_demand = None
142 def __repr__(self) -> str:
143 return f"{self.ctl.id} ({self._SLUG})"
145 def _setup_discovery_cmds(self) -> None:
146 # super()._setup_discovery_cmds()
148 for payload in (
149 f"00{DEV_ROLE_MAP.APP}", # appliance_control
150 f"00{DEV_ROLE_MAP.HTG}", # hotwater_valve
151 f"01{DEV_ROLE_MAP.HTG}", # heating_valve
152 ):
153 cmd = Command.from_attrs(RQ, self.ctl.id, Code._000C, payload)
154 self._add_discovery_cmd(cmd, 60 * 60 * 24, delay=0)
156 cmd = Command.get_tpi_params(self.id)
157 self._add_discovery_cmd(cmd, 60 * 60 * 6, delay=5)
159 def _handle_msg(self, msg: Message) -> None:
160 def eavesdrop_appliance_control(
161 this: Message, *, prev: Message | None = None
162 ) -> None:
163 """Discover the heat relay (10: or 13:) for this system.
165 There's' 3 ways to find a controller's heat relay (in order of reliability):
166 1. The 3220 RQ/RP *to/from a 10:* (1x/5min)
167 2a. The 3EF0 RQ/RP *to/from a 10:* (1x/1min)
168 2b. The 3EF0 RQ (no RP) *to a 13:* (3x/60min)
169 3. The 3B00 I/I exchange between a CTL & a 13: (TPI cycle rate, usu. 6x/hr)
171 Data from the CTL is considered 'authoritative'. The 1FC9 RQ/RP exchange
172 to/from a CTL is too rare to be useful.
173 """
175 # 18:14:14.025 066 RQ --- 01:078710 10:067219 --:------ 3220 005 0000050000
176 # 18:14:14.446 065 RP --- 10:067219 01:078710 --:------ 3220 005 00C00500FF
177 # 14:41:46.599 064 RQ --- 01:078710 10:067219 --:------ 3EF0 001 00
178 # 14:41:46.631 063 RP --- 10:067219 01:078710 --:------ 3EF0 006 0000100000FF
180 # 06:49:03.465 045 RQ --- 01:145038 13:237335 --:------ 3EF0 001 00
181 # 06:49:05.467 045 RQ --- 01:145038 13:237335 --:------ 3EF0 001 00
182 # 06:49:07.468 045 RQ --- 01:145038 13:237335 --:------ 3EF0 001 00
183 # 09:03:59.693 051 I --- 13:237335 --:------ 13:237335 3B00 002 00C8
184 # 09:04:02.667 045 I --- 01:145038 --:------ 01:145038 3B00 002 FCC8
186 if this.code not in (Code._22D9, Code._3220, Code._3B00, Code._3EF0):
187 return
189 # note the order: most to least reliable
190 app_cntrl = None
192 if (
193 this.code in (Code._22D9, Code._3220) and this.verb == RQ
194 ): # TODO: RPs too?
195 # dst could be an Address...
196 if this.src == self.ctl and isinstance(this.dst, OtbGateway): # type: ignore[unreachable]
197 app_cntrl = this.dst # type: ignore[unreachable]
199 elif this.code == Code._3EF0 and this.verb == RQ:
200 # dst could be an Address...
201 if this.src == self.ctl and isinstance(
202 this.dst, # type: ignore[unreachable]
203 BdrSwitch | OtbGateway,
204 ):
205 app_cntrl = this.dst # type: ignore[unreachable]
207 elif this.code == Code._3B00 and this.verb == I_ and prev is not None:
208 if this.src == self.ctl and isinstance(prev.src, BdrSwitch): # type: ignore[unreachable]
209 if prev.code == this.code and prev.verb == this.verb: # type: ignore[unreachable]
210 app_cntrl = prev.src
212 if app_cntrl is not None:
213 app_cntrl.set_parent(self, child_id=FC) # type: ignore[unreachable]
215 # # assert msg.src is self.ctl, f"msg inappropriately routed to {self}"
217 super()._handle_msg(msg)
219 if msg.code == Code._000C:
220 if msg.payload[SZ_ZONE_TYPE] == DEV_ROLE_MAP.APP and msg.payload.get(
221 SZ_DEVICES
222 ):
223 self._gwy.get_device(
224 msg.payload[SZ_DEVICES][0], parent=self, child_id=FC
225 ) # sets self._app_cntrl
226 return
228 if msg.code == Code._3150:
229 if msg.payload.get(SZ_DOMAIN_ID) == FC and msg.verb in (I_, RP):
230 self._heat_demand = msg.payload
232 if self._gwy.config.enable_eavesdrop and not self.appliance_control:
233 eavesdrop_appliance_control(msg)
235 @property
236 def appliance_control(self) -> BdrSwitch | OtbGateway | None:
237 """The TCS relay, aka 'appliance control' (BDR or OTB)."""
238 if self._app_cntrl:
239 return self._app_cntrl
240 app_cntrl = [d for d in self.childs if d._child_id == FC]
241 return app_cntrl[0] if len(app_cntrl) == 1 else None # type: ignore[return-value]
243 @property
244 def tpi_params(self) -> PayDictT._1100 | None: # 1100
245 return self._msg_value(Code._1100) # type: ignore[return-value]
247 @property
248 def heat_demand(self) -> float | None: # 3150/FC
249 return self._msg_value(Code._3150, domain_id=FC, key=SZ_HEAT_DEMAND) # type: ignore[return-value]
251 @property
252 def is_calling_for_heat(self) -> NoReturn:
253 raise NotImplementedError(
254 f"{self}: is_calling_for_heat attr is deprecated, use bool(heat_demand)"
255 )
257 @property
258 def schema(self) -> dict[str, Any]:
259 """Return the system's schema."""
261 schema: dict[str, Any] = {SZ_SYSTEM: {}}
263 schema[SZ_SYSTEM][SZ_APPLIANCE_CONTROL] = (
264 self.appliance_control.id if self.appliance_control else None
265 )
267 schema[SZ_ORPHANS] = sorted(
268 [
269 d.id
270 for d in self.childs # HACK: UFC
271 if not d._child_id and d._is_present # TODO: and d is not self.ctl
272 ] # and not isinstance(d, UfhController)
273 ) # devices without a parent zone, NB: CTL can be a sensor for a zone
275 return schema
277 @property
278 def _schema_min(self) -> dict[str, Any]:
279 """Return the system's minimal-alised schema."""
281 schema: dict[str, Any] = self.schema
282 result: dict[str, Any] = {}
284 try:
285 if schema[SZ_SYSTEM][SZ_APPLIANCE_CONTROL][:2] == DEV_TYPE_MAP.OTB: # DEX
286 result[SZ_SYSTEM] = {
287 SZ_APPLIANCE_CONTROL: schema[SZ_SYSTEM][SZ_APPLIANCE_CONTROL]
288 }
289 except (IndexError, TypeError):
290 result[SZ_SYSTEM] = {SZ_APPLIANCE_CONTROL: None}
292 zones = {}
293 for idx, zone in schema[SZ_ZONES].items():
294 _zone = {}
295 if zone[SZ_SENSOR] and zone[SZ_SENSOR][:2] == DEV_TYPE_MAP.CTL: # DEX
296 _zone = {SZ_SENSOR: zone[SZ_SENSOR]}
297 if devices := [
298 d for d in zone[SZ_ACTUATORS] if d[:2] == DEV_TYPE_MAP.TR0
299 ]: # DEX
300 _zone.update({SZ_ACTUATORS: devices})
301 if _zone:
302 zones[idx] = _zone
303 if zones:
304 result[SZ_ZONES] = zones
306 result |= {
307 k: v
308 for k, v in schema.items()
309 if k in ("orphans",) and v # add UFH?
310 }
312 return result # TODO: check against vol schema
314 @property
315 def params(self) -> dict[str, Any]:
316 """Return the system's configuration."""
318 params: dict[str, Any] = {SZ_SYSTEM: {}}
319 params[SZ_SYSTEM]["tpi_params"] = self._msg_value(Code._1100)
320 return params
322 @property
323 def status(self) -> dict[str, Any]:
324 """Return the system's current state."""
326 status: dict[str, Any] = {SZ_SYSTEM: {}}
327 status[SZ_SYSTEM]["heat_demand"] = self.heat_demand
329 status[SZ_DEVICES] = {
330 d.id: d.status for d in sorted(self.childs, key=lambda x: x.id)
331 }
333 return status
336class MultiZone(SystemBase): # 0005 (+/- 000C?)
337 def __init__(self, *args: Any, **kwargs: Any) -> None:
338 super().__init__(*args, **kwargs)
340 self.zones: list[Zone] = []
341 self.zone_by_idx: dict[str, Zone] = {} # should not include HW
342 self._max_zones: int = getattr(
343 self._gwy.config, SZ_MAX_ZONES, DEFAULT_MAX_ZONES
344 )
346 self._prev_30c9: Message | None = None # used to eavesdrop zone sensors
348 def _setup_discovery_cmds(self) -> None:
349 super()._setup_discovery_cmds()
351 for zone_type in list(ZON_ROLE_MAP.HEAT_ZONES) + [ZON_ROLE_MAP.SEN]:
352 cmd = Command.from_attrs(RQ, self.id, Code._0005, f"00{zone_type}")
353 self._add_discovery_cmd(cmd, 60 * 60 * 24, delay=0)
355 def _handle_msg(self, msg: Message) -> None:
356 """Process any relevant message.
358 If `zone_idx` in payload, route any messages to the corresponding zone.
359 """
361 def eavesdrop_zones(this: Message, *, prev: Message | None = None) -> None:
362 [
363 self.get_htg_zone(v)
364 for d in msg.payload
365 for k, v in d.items()
366 if k == SZ_ZONE_IDX
367 ]
369 def eavesdrop_zone_sensors(
370 this: Message, *, prev: Message | None = None
371 ) -> None:
372 """Determine each zone's sensor by matching zone/sensor temperatures."""
374 def _testable_zones(changed_zones: dict[str, float]) -> dict[float, str]:
375 return {
376 t1: i1
377 for i1, t1 in changed_zones.items()
378 if self.zone_by_idx[i1].sensor is None
379 and t1 not in [t2 for i2, t2 in changed_zones.items() if i2 != i1]
380 }
382 self._prev_30c9, prev = this, self._prev_30c9
383 if prev is None:
384 return # type: ignore[unreachable]
386 # TODO: use msgz/I, not RP
387 secs: int = self._msg_value(Code._1F09, key="remaining_seconds") # type: ignore[assignment]
388 if secs is None or this.dtm > prev.dtm + td(seconds=secs + 5):
389 return # can only compare against 30C9 pkt from the last cycle
391 # _LOGGER.warning("System state (before): %s", self.schema)
393 changed_zones: dict[str, float] = {
394 z[SZ_ZONE_IDX]: z[SZ_TEMPERATURE]
395 for z in this.payload
396 if z not in prev.payload and z[SZ_TEMPERATURE] is not None
397 } # zones with changed temps
398 if not changed_zones:
399 return # ctl's 30C9 says no zones have changed temps during this cycle
401 testable_zones = _testable_zones(changed_zones)
402 if not testable_zones:
403 return # no testable zones
405 testable_sensors = {
406 d.temperature: d
407 for d in self._gwy.devices # NOTE: *not* self.childs
408 if isinstance(d, Temperature) # d.addr.type in DEVICE_HAS_ZONE_SENSOR
409 and d.ctl in (self.ctl, None)
410 and d.temperature is not None
411 and d._msgs[Code._30C9].dtm > prev.dtm # changed during last cycle
412 }
413 if not testable_sensors:
414 return # no testable sensors
416 matched_pairs = {
417 sensor: zone_idx
418 for temp_z, zone_idx in testable_zones.items()
419 for temp_s, sensor in testable_sensors.items()
420 if temp_z == temp_s
421 }
423 for sensor, zone_idx in matched_pairs.items():
424 zone = self.zone_by_idx[zone_idx]
425 self._gwy.get_device(sensor.id, parent=zone, is_sensor=True)
427 # _LOGGER.warning("System state (after): %s", self.schema)
429 # now see if we can allocate the controller as a sensor...
430 if any(z for z in self.zones if z.sensor is self.ctl):
431 return # the controller is already a sensor
433 remaining_zones = _testable_zones(changed_zones)
434 if len(remaining_zones) != 1:
435 return # no testable zones
437 temp, zone_idx = tuple(remaining_zones.items())[0]
439 # can safely(?) assume this zone is using the CTL as a sensor...
440 if not [s for s in testable_sensors if s == temp]:
441 zone = self.zone_by_idx[zone_idx]
442 self._gwy.get_device(self.ctl.id, parent=zone, is_sensor=True)
444 # _LOGGER.warning("System state (finally): %s", self.schema)
446 def handle_msg_by_zone_idx(zone_idx: str, msg: Message) -> None:
447 if zone := self.zone_by_idx.get(zone_idx):
448 zone._handle_msg(msg)
449 # elif self._gwy.config.enable_eavesdrop:
450 # self.get_htg_zone(zone_idx)._handle_msg(msg)
452 super()._handle_msg(msg)
454 if msg.code not in (Code._0005, Code._000A, Code._2309, Code._30C9) and (
455 SZ_ZONE_IDX not in msg.payload # 0004,0008,0009,000C,0404,12B0,2349,3150
456 ):
457 return
459 # TODO: a I/0005 may have changed: del or add zones
460 if msg.code == Code._0005:
461 if (zone_type := msg.payload[SZ_ZONE_TYPE]) in ZON_ROLE_MAP.HEAT_ZONES:
462 [
463 self.get_htg_zone(
464 f"{idx:02X}", **{SZ_CLASS: ZON_ROLE_MAP[zone_type]}
465 )
466 for idx, flag in enumerate(msg.payload[SZ_ZONE_MASK])
467 if flag == 1
468 ]
469 elif zone_type in DEV_ROLE_MAP.HEAT_DEVICES:
470 [
471 self.get_htg_zone(f"{idx:02X}", msg=msg)
472 for idx, flag in enumerate(msg.payload[SZ_ZONE_MASK])
473 if flag == 1
474 ]
475 return
477 # TODO: a I/000C may have changed: del or add devices
478 if msg.code == Code._000C:
479 if msg.payload[SZ_ZONE_TYPE] not in DEV_ROLE_MAP.HEAT_DEVICES:
480 return
481 if msg.payload[SZ_DEVICES]:
482 self.get_htg_zone(msg.payload[SZ_ZONE_IDX], msg=msg)
483 elif zon := self.zone_by_idx.get(msg.payload[SZ_ZONE_IDX]):
484 zon._handle_msg(msg) # tell existing zone: no device
485 return
487 # the CTL knows, but does not announce temps for multiroom_mode zones
488 if msg.code == Code._30C9 and msg._has_array:
489 for z in self.zones:
490 if z.idx not in (x[SZ_ZONE_IDX] for x in msg.payload):
491 z._get_temp()
493 # If some zones still don't have a sensor, maybe eavesdrop?
494 if self._gwy.config.enable_eavesdrop and (
495 msg.code in (Code._000A, Code._2309, Code._30C9) and msg._has_array
496 ): # could do Code._000A, but only 1/hr
497 eavesdrop_zones(msg)
499 # Route all messages to their zones, incl. 000C, 0404, others
500 if isinstance(msg.payload, dict):
501 if zone_idx := msg.payload.get(SZ_ZONE_IDX):
502 handle_msg_by_zone_idx(zone_idx, msg)
503 # TODO: elif msg.payload.get(SZ_DOMAIN_ID) == FA: # DHW
505 elif isinstance(msg.payload, list) and len(msg.payload):
506 # TODO: elif msg.payload.get(SZ_DOMAIN_ID) == FA: # DHW
507 if isinstance(msg.payload[0], dict): # e.g. 1FC9 is a list of lists:
508 for z_dict in msg.payload:
509 handle_msg_by_zone_idx(z_dict.get(SZ_ZONE_IDX), msg)
511 # If some zones still don't have a sensor, maybe eavesdrop?
512 if ( # TODO: edge case: 1 zone with CTL as SEN
513 self._gwy.config.enable_eavesdrop
514 and msg.code == Code._30C9
515 and (msg._has_array or len(self.zones) == 1)
516 and any(z for z in self.zones if not z.sensor)
517 ):
518 eavesdrop_zone_sensors(msg)
520 # TODO: should be a private method
521 def get_htg_zone(
522 self, zone_idx: str, *, msg: Message | None = None, **schema: Any
523 ) -> Zone:
524 """Return a heating zone, create it if required.
526 First, use the schema to create/update it, then pass it any msg to handle.
528 Heating zones are uniquely identified by a tcs_id|zone_idx pair.
529 If a zone is created, attach it to this TCS.
530 """
532 schema = shrink(SCH_TCS_ZONES_ZON(schema))
534 zon: Zone = self.zone_by_idx.get(zone_idx) # type: ignore[assignment]
535 if zon is None: # not found in tcs, create it
536 zon = zone_factory(self, zone_idx, msg=msg, **schema) # type: ignore[unreachable]
537 self.zone_by_idx[zon.idx] = zon
538 self.zones.append(zon)
540 elif schema:
541 zon._update_schema(**schema)
543 if msg:
544 zon._handle_msg(msg)
545 return zon
547 @property
548 def schema(self) -> dict[str, Any]:
549 return {
550 **super().schema,
551 SZ_ZONES: {z.idx: z.schema for z in sorted(self.zones)},
552 }
554 @property
555 def params(self) -> dict[str, Any]:
556 return {
557 **super().params,
558 SZ_ZONES: {z.idx: z.params for z in sorted(self.zones)},
559 }
561 @property
562 def status(self) -> dict[str, Any]:
563 return {
564 **super().status,
565 SZ_ZONES: {z.idx: z.status for z in sorted(self.zones)},
566 }
569class ScheduleSync(SystemBase): # 0006 (+/- 0404?)
570 def __init__(self, *args: Any, **kwargs: Any) -> None:
571 super().__init__(*args, **kwargs)
573 self._msg_0006: Message = None # type: ignore[assignment]
575 # used to stop concurrent get_schedules
576 self.zone_lock = Lock() # FIXME: threading lock, or asyncio lock?
577 self.zone_lock_idx: str | None = None
579 def _setup_discovery_cmds(self) -> None:
580 super()._setup_discovery_cmds()
582 cmd = Command.get_schedule_version(self.id)
583 self._add_discovery_cmd(cmd, 60 * 5, delay=5)
585 def _handle_msg(self, msg: Message) -> None: # NOTE: active
586 """Periodically retrieve the latest global change counter."""
588 super()._handle_msg(msg)
590 if msg.code == Code._0006:
591 self._msg_0006 = msg
593 async def _schedule_version(self, *, force_io: bool = False) -> tuple[int, bool]:
594 """Return the global schedule version number, and an indication if I/O was done.
596 If `force_io`, then RQ the latest change counter from the TCS rather than
597 rely upon a recent (cached) value.
599 Cached values are only used if less than 3 minutes old.
600 """
602 # RQ --- 30:185469 01:037519 --:------ 0006 001 00
603 # RP --- 01:037519 30:185469 --:------ 0006 004 000500E6
605 if (
606 not force_io
607 and self._msg_0006
608 and self._msg_0006.dtm > dt.now() - td(minutes=3)
609 ):
610 return (
611 self._msg_0006.payload[SZ_CHANGE_COUNTER],
612 False,
613 ) # global_ver, did_io
615 cmd = Command.get_schedule_version(self.ctl.id)
616 pkt = await self._gwy.async_send_cmd(
617 cmd, wait_for_reply=True, priority=Priority.HIGH
618 )
619 if pkt:
620 self._msg_0006 = Message(pkt)
622 return self._msg_0006.payload[SZ_CHANGE_COUNTER], True # global_ver, did_io
624 def _refresh_schedules(self) -> None:
625 zone: Zone
627 for zone in getattr(self, SZ_ZONES, []):
628 self._gwy._loop.create_task(zone.get_schedule(force_io=True))
629 if isinstance(self, StoredHw) and self.dhw:
630 self._gwy._loop.create_task(self.dhw.get_schedule(force_io=True))
632 async def _obtain_lock(self, zone_idx: str) -> None:
633 timeout_dtm = dt.now() + td(minutes=3)
634 while dt.now() < timeout_dtm:
635 self.zone_lock.acquire()
636 if self.zone_lock_idx is None:
637 self.zone_lock_idx = zone_idx
638 self.zone_lock.release()
640 if self.zone_lock_idx == zone_idx:
641 break
642 await asyncio.sleep(0.005) # gives the other zone enough time
644 else:
645 raise TimeoutError(
646 f"Unable to obtain lock for {zone_idx} (used by {self.zone_lock_idx})"
647 )
649 def _release_lock(self) -> None:
650 self.zone_lock.acquire()
651 self.zone_lock_idx = None
652 self.zone_lock.release()
654 @property
655 def schedule_version(self) -> int | None:
656 return self._msg_value(Code._0006, key=SZ_CHANGE_COUNTER) # type: ignore[return-value]
658 @property
659 def status(self) -> dict[str, Any]:
660 return {
661 **super().status,
662 "schedule_version": self.schedule_version,
663 }
666class Language(SystemBase): # 0100
667 def _setup_discovery_cmds(self) -> None:
668 super()._setup_discovery_cmds()
670 cmd = Command.get_system_language(self.id)
671 self._add_discovery_cmd(cmd, 60 * 60 * 24, delay=60 * 15)
673 @property
674 def language(self) -> str | None:
675 return self._msg_value(Code._0100, key=SZ_LANGUAGE) # type: ignore[return-value]
677 @property
678 def params(self) -> dict[str, Any]:
679 params = super().params
680 params[SZ_SYSTEM][SZ_LANGUAGE] = self.language
681 return params
684class Logbook(SystemBase): # 0418
685 def __init__(self, *args: Any, **kwargs: Any) -> None:
686 super().__init__(*args, **kwargs)
688 self._prev_event: Message = None # type: ignore[assignment]
689 self._this_event: Message = None # type: ignore[assignment]
691 self._prev_fault: Message = None # type: ignore[assignment]
692 self._this_fault: Message = None # type: ignore[assignment]
694 self._faultlog: FaultLog = FaultLog(self)
696 def _setup_discovery_cmds(self) -> None:
697 super()._setup_discovery_cmds()
699 cmd = Command.get_system_log_entry(self.id, 0)
700 self._add_discovery_cmd(cmd, 60 * 5, delay=5)
701 # self._gwy.add_task(
702 # self._gwy._loop.create_task(self.get_faultlog())
703 # )
705 def _handle_msg(self, msg: Message) -> None: # NOTE: active
706 super()._handle_msg(msg)
708 if msg.code == Code._0418: # and msg.verb in (I_, RP):
709 self._faultlog.handle_msg(msg)
711 async def get_faultlog(
712 self,
713 /,
714 *,
715 start: int = 0,
716 limit: int | None = None,
717 force_refresh: bool = False,
718 ) -> dict[FaultIdxT, FaultLogEntry] | None:
719 return await self._faultlog.get_faultlog(
720 start=start, limit=limit, force_refresh=force_refresh
721 )
723 @property
724 def active_faults(self) -> tuple[str, ...] | None:
725 """Return the most recently logged faults that are not restored."""
726 if self._faultlog.active_faults is None:
727 return None
728 return tuple(str(f) for f in self._faultlog.active_faults)
730 @property
731 def latest_event(self) -> str | None:
732 """Return the most recently logged event (fault or restore), if any."""
733 if not self._faultlog.latest_event:
734 return None
735 return str(self._faultlog.latest_event)
737 @property
738 def latest_fault(self) -> str | None:
739 """Return the most recently logged fault, if any."""
740 if not self._faultlog.latest_fault:
741 return None
742 return str(self._faultlog.latest_fault)
744 @property
745 def status(self) -> dict[str, Any]:
746 return {
747 **super().status,
748 "active_faults": self.active_faults,
749 "latest_event": self.latest_event,
750 "latest_fault": self.latest_fault,
751 }
754class StoredHw(SystemBase): # 10A0, 1260, 1F41
755 MIN_SETPOINT = 30.0 # NOTE: these may be removed
756 MAX_SETPOINT = 85.0
757 DEFAULT_SETPOINT = 50.0
759 def __init__(self, *args: Any, **kwargs: Any) -> None:
760 super().__init__(*args, **kwargs)
761 self._dhw: DhwZone = None # type: ignore[assignment]
763 def _setup_discovery_cmds(self) -> None:
764 super()._setup_discovery_cmds()
766 for payload in (
767 f"00{DEV_ROLE_MAP.DHW}", # dhw_sensor
768 # f"00{DEV_ROLE_MAP.HTG}", # hotwater_valve
769 # f"01{DEV_ROLE_MAP.HTG}", # heating_valve
770 ):
771 cmd = Command.from_attrs(RQ, self.id, Code._000C, payload)
772 self._add_discovery_cmd(cmd, 60 * 60 * 24, delay=0)
774 def _handle_msg(self, msg: Message) -> None:
775 super()._handle_msg(msg)
777 if (
778 not isinstance(msg.payload, dict)
779 or msg.payload.get(SZ_DHW_IDX) is None
780 and msg.payload.get(SZ_DOMAIN_ID) not in (F9, FA)
781 and msg.payload.get(SZ_ZONE_IDX) != "HW"
782 ): # Code._0008, Code._000C, Code._0404, Code._10A0, Code._1260, Code._1F41
783 return
785 # TODO: a I/0005 may have changed zones & may need a restart (del) or not (add)
786 if (
787 msg.code == Code._000C
788 and msg.payload[SZ_ZONE_TYPE] in DEV_ROLE_MAP.DHW_DEVICES
789 ):
790 if msg.payload[SZ_DEVICES]:
791 self.get_dhw_zone(msg=msg) # create DHW zone if required
792 elif self._dhw:
793 self._dhw._handle_msg(msg) # tell existing DHW zone: no device
794 return
796 # RQ --- 18:002563 01:078710 --:------ 10A0 001 00 # every 4h
797 # RP --- 01:078710 18:002563 --:------ 10A0 006 00157C0003E8
799 # Route all messages to their zones, incl. 000C, 0404, others
800 self.get_dhw_zone(msg=msg)
802 # TODO: should be a private method
803 def get_dhw_zone(self, *, msg: Message | None = None, **schema: Any) -> DhwZone:
804 """Return a DHW zone, create it if required.
806 First, use the schema to create/update it, then pass it any msg to handle.
808 DHW zones are uniquely identified by a controller ID.
809 If a DHW zone is created, attach it to this TCS.
810 """
812 schema = shrink(SCH_TCS_DHW(schema))
814 if not self._dhw:
815 self._dhw = zone_factory(self, "HW", msg=msg, **schema) # type: ignore[assignment]
817 elif schema:
818 self._dhw._update_schema(**schema)
820 if msg:
821 self._dhw._handle_msg(msg)
822 return self._dhw
824 @property
825 def dhw(self) -> DhwZone | None:
826 return self._dhw
828 @property
829 def dhw_sensor(self) -> Device | None:
830 return self._dhw.sensor if self._dhw else None
832 @property
833 def hotwater_valve(self) -> Device | None:
834 return self._dhw.hotwater_valve if self._dhw else None
836 @property
837 def heating_valve(self) -> Device | None:
838 return self._dhw.heating_valve if self._dhw else None
840 @property
841 def schema(self) -> dict[str, Any]:
842 return {
843 **super().schema,
844 SZ_DHW_SYSTEM: self._dhw.schema if self._dhw else {},
845 }
847 @property
848 def params(self) -> dict[str, Any]:
849 return {
850 **super().params,
851 SZ_DHW_SYSTEM: self._dhw.params if self._dhw else {},
852 }
854 @property
855 def status(self) -> dict[str, Any]:
856 return {
857 **super().status,
858 SZ_DHW_SYSTEM: self._dhw.status if self._dhw else {},
859 }
862class SysMode(SystemBase): # 2E04
863 def _setup_discovery_cmds(self) -> None:
864 super()._setup_discovery_cmds()
866 cmd = Command.get_system_mode(self.id)
867 self._add_discovery_cmd(cmd, 60 * 5, delay=5)
869 @property
870 def system_mode(self) -> dict[str, Any] | None: # 2E04
871 return self._msg_value(Code._2E04) # type: ignore[return-value]
873 def set_mode(
874 self, system_mode: int | str | None, *, until: dt | str | None = None
875 ) -> asyncio.Task[Packet]:
876 """
877 Set a system mode for a specified duration, or indefinitely.
879 :param system_mode: 2-digit item from SYS_MODE_MAP, positional
880 :param until: optional: end of set period
881 :return:
882 """
883 cmd = Command.set_system_mode(self.id, system_mode, until=until)
884 return self._gwy.send_cmd(cmd, priority=Priority.HIGH, wait_for_reply=True)
886 def set_auto(self) -> asyncio.Task[Packet]:
887 """Revert system to Auto, set non-PermanentOverride zones to FollowSchedule."""
888 return self.set_mode(SYS_MODE_MAP.AUTO)
890 def reset_mode(self) -> asyncio.Task[Packet]:
891 """Revert system to Auto, force *all* zones to FollowSchedule."""
892 return self.set_mode(SYS_MODE_MAP.AUTO_WITH_RESET)
894 @property
895 def params(self) -> dict[str, Any]:
896 params = super().params
897 params[SZ_SYSTEM][SZ_SYSTEM_MODE] = self.system_mode
898 return params
901class Datetime(SystemBase): # 313F
902 def _setup_discovery_cmds(self) -> None:
903 super()._setup_discovery_cmds()
905 cmd = Command.get_system_time(self.id)
906 self._add_discovery_cmd(cmd, 60 * 60, delay=0)
908 def _handle_msg(self, msg: Message) -> None:
909 super()._handle_msg(msg)
911 # FIXME: refactoring protocol stack
912 if msg.code == Code._313F and msg.verb in (I_, RP) and self._gwy._transport:
913 diff = abs(dt.fromisoformat(msg.payload[SZ_DATETIME]) - self._gwy._dt_now())
914 if diff > td(minutes=5):
915 _LOGGER.warning(f"{msg!r} < excessive datetime difference: {diff}")
917 async def get_datetime(self) -> dt | None:
918 cmd = Command.get_system_time(self.id)
919 pkt = await self._gwy.async_send_cmd(cmd, wait_for_reply=True)
920 msg = Message._from_pkt(pkt)
921 return dt.fromisoformat(msg.payload[SZ_DATETIME])
923 async def set_datetime(self, dtm: dt) -> Packet:
924 """Set the date and time of the system."""
926 cmd = Command.set_system_time(self.id, dtm)
927 return await self._gwy.async_send_cmd(cmd, priority=Priority.HIGH)
930class UfHeating(SystemBase):
931 def _ufh_ctls(self) -> list[UfhController]:
932 return sorted([d for d in self.childs if isinstance(d, UfhController)])
934 @property
935 def schema(self) -> dict[str, Any]:
936 return {
937 **super().schema,
938 SZ_UFH_SYSTEM: {d.id: d.schema for d in self._ufh_ctls()},
939 }
941 @property
942 def params(self) -> dict[str, Any]:
943 return {
944 **super().params,
945 SZ_UFH_SYSTEM: {d.id: d.params for d in self._ufh_ctls()},
946 }
948 @property
949 def status(self) -> dict[str, Any]:
950 return {
951 **super().status,
952 SZ_UFH_SYSTEM: {d.id: d.status for d in self._ufh_ctls()},
953 }
956class System(StoredHw, Datetime, Logbook, SystemBase):
957 """The Temperature Control System class."""
959 _SLUG: str = SYS_KLASS.SYS
961 def __init__(self, ctl: Controller, **kwargs: Any) -> None:
962 super().__init__(ctl, **kwargs)
964 self._heat_demands: dict[str, Any] = {}
965 self._relay_demands: dict[str, Any] = {}
966 self._relay_failsafes: dict[str, Any] = {}
968 def _update_schema(self, **schema: Any) -> None:
969 """Update a CH/DHW system with new schema attrs.
971 Raise an exception if the new schema is not a superset of the existing schema.
972 """
974 _schema: dict[str, Any]
975 schema = shrink(SCH_TCS(schema))
977 if schema.get(SZ_SYSTEM) and (
978 dev_id := schema[SZ_SYSTEM].get(SZ_APPLIANCE_CONTROL)
979 ):
980 self._app_cntrl = self._gwy.get_device(dev_id, parent=self, child_id=FC) # type: ignore[assignment]
982 if _schema := (schema.get(SZ_DHW_SYSTEM)): # type: ignore[assignment]
983 self.get_dhw_zone(**_schema) # self._dhw = ...
985 if not isinstance(self, MultiZone):
986 return
988 if _schema := (schema.get(SZ_ZONES)): # type: ignore[assignment]
989 [self.get_htg_zone(idx, **s) for idx, s in _schema.items()]
991 @classmethod
992 def create_from_schema(cls, ctl: Controller, **schema: Any) -> System:
993 """Create a CH/DHW system for a CTL and set its schema attrs.
995 The appropriate System class should have been determined by a factory.
996 Schema attrs include: class (klass) & others.
997 """
999 tcs = cls(ctl)
1000 tcs._update_schema(**schema)
1001 return tcs
1003 def _handle_msg(self, msg: Message) -> None:
1004 super()._handle_msg(msg)
1006 if not isinstance(msg.payload, dict):
1007 return
1009 if (idx := msg.payload.get(SZ_DOMAIN_ID)) and msg.verb in (I_, RP):
1010 idx = msg.payload[SZ_DOMAIN_ID]
1011 if msg.code == Code._0008:
1012 self._relay_demands[idx] = msg
1013 elif msg.code == Code._0009:
1014 self._relay_failsafes[idx] = msg
1015 elif msg.code == Code._3150:
1016 self._heat_demands[idx] = msg
1017 elif msg.code not in (
1018 Code._0001,
1019 Code._000C,
1020 Code._0404,
1021 Code._0418,
1022 Code._1100,
1023 Code._3B00,
1024 ):
1025 assert False, f"Unexpected code with a domain_id: {msg.code}"
1027 @property
1028 def heat_demands(self) -> dict[str, Any] | None: # 3150
1029 # FC: 00-C8 (no F9, FA), TODO: deprecate as FC only?
1030 if not self._heat_demands:
1031 return None
1032 return {k: v.payload["heat_demand"] for k, v in self._heat_demands.items()}
1034 @property
1035 def relay_demands(self) -> dict[str, Any] | None: # 0008
1036 # FC: 00-C8, F9: 00-C8, FA: 00 or C8 only (01: all 3, 02: FC/FA only)
1037 if not self._relay_demands:
1038 return None
1039 return {k: v.payload["relay_demand"] for k, v in self._relay_demands.items()}
1041 @property
1042 def relay_failsafes(self) -> dict[str, Any] | None: # 0009
1043 if not self._relay_failsafes:
1044 return None
1045 return {} # FIXME: failsafe_enabled
1047 @property
1048 def status(self) -> dict[str, Any]:
1049 """Return the system's current state."""
1051 status = super().status
1052 # assert SZ_SYSTEM in status # TODO: removeme
1054 status[SZ_SYSTEM]["heat_demands"] = self.heat_demands
1055 status[SZ_SYSTEM]["relay_demands"] = self.relay_demands
1056 status[SZ_SYSTEM]["relay_failsafes"] = self.relay_failsafes
1058 return status
1061class Evohome(ScheduleSync, Language, SysMode, MultiZone, UfHeating, System):
1062 _SLUG: str = SYS_KLASS.TCS # evohome
1064 # older evohome don't have zone_type=ELE
1067class Chronotherm(Evohome):
1068 _SLUG: str = SYS_KLASS.SYS
1071class Hometronics(System):
1072 _SLUG: str = SYS_KLASS.SYS
1074 # These are only ever been seen from a Hometronics controller
1075 # .I --- 01:023389 --:------ 01:023389 2D49 003 00C800
1076 # .I --- 01:023389 --:------ 01:023389 2D49 003 01C800
1077 # .I --- 01:023389 --:------ 01:023389 2D49 003 880000
1078 # .I --- 01:023389 --:------ 01:023389 2D49 003 FD0000
1080 # Hometronic does not react to W/2349 but rather requires W/2309
1082 #
1083 # def _setup_discovery_cmds(self) -> None:
1084 # # super()._setup_discovery_cmds()
1086 # # will RP to: 0005/configured_zones_alt, but not: configured_zones
1087 # # will RP to: 0004
1089 RQ_SUPPORTED = (Code._0004, Code._000C, Code._2E04, Code._313F) # TODO: WIP
1090 RQ_UNSUPPORTED = ("xxxx",) # 10E0?
1093class Programmer(Evohome):
1094 _SLUG: str = SYS_KLASS.PRG
1097class Sundial(Evohome):
1098 _SLUG: str = SYS_KLASS.SYS
1101# e.g. {"evohome": Evohome}
1102SYS_CLASS_BY_SLUG: dict[str, type[System]] = class_by_attr(__name__, "_SLUG")
1105def system_factory(
1106 ctl: Controller, *, msg: Message | None = None, **schema: Any
1107) -> System:
1108 """Return the system class for a given controller/schema (defaults to evohome)."""
1110 def best_tcs_class(
1111 ctl_addr: Address,
1112 *,
1113 msg: Message | None = None,
1114 eavesdrop: bool = False,
1115 **schema: Any,
1116 ) -> type[System]:
1117 """Return the system class for a given CTL/schema (defaults to evohome)."""
1119 klass: str = schema.get(SZ_CLASS) # type: ignore[assignment]
1121 # a specified system class always takes precedence (even if it is wrong)...
1122 if klass and (cls := SYS_CLASS_BY_SLUG.get(klass)):
1123 _LOGGER.debug(
1124 f"Using an explicitly-defined system class for: {ctl_addr} ({cls._SLUG})"
1125 )
1126 return cls
1128 # otherwise, use the default system class...
1129 _LOGGER.debug(f"Using a generic system class for: {ctl_addr} ({Device._SLUG})")
1130 return Evohome
1132 return best_tcs_class(
1133 ctl.addr,
1134 msg=msg,
1135 eavesdrop=ctl._gwy.config.enable_eavesdrop,
1136 **schema,
1137 ).create_from_schema(ctl, **schema)