Coverage for src/ramses_rf/entity_base.py: 19%
593 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - Base class for all RAMSES-II objects: devices and constructs."""
4from __future__ import annotations
6import asyncio
7import contextlib
8import logging
9import random
10from collections.abc import Iterable
11from datetime import datetime as dt, timedelta as td
12from inspect import getmembers, isclass
13from sys import modules
14from types import ModuleType
15from typing import TYPE_CHECKING, Any, Final
17from ramses_rf.helpers import schedule_task
18from ramses_tx import Priority, QosParams
19from ramses_tx.address import ALL_DEVICE_ID
20from ramses_tx.const import MsgId
21from ramses_tx.opentherm import OPENTHERM_MESSAGES
22from ramses_tx.ramses import CODES_SCHEMA
24from . import exceptions as exc
25from .const import (
26 DEV_TYPE_MAP,
27 SZ_ACTUATORS,
28 SZ_DOMAIN_ID,
29 SZ_NAME,
30 SZ_SENSOR,
31 SZ_ZONE_IDX,
32)
33from .schemas import SZ_CIRCUITS
35from .const import ( # noqa: F401, isort: skip, pylint: disable=unused-import
36 I_,
37 RP,
38 RQ,
39 W_,
40 Code,
41 VerbT,
42)
44from .const import ( # noqa: F401, isort: skip, pylint: disable=unused-import
45 F9,
46 FA,
47 FC,
48 FF,
49)
51if TYPE_CHECKING:
52 from ramses_tx import Command, Message, Packet, VerbT
53 from ramses_tx.frame import HeaderT
54 from ramses_tx.opentherm import OtDataId
55 from ramses_tx.schemas import DeviceIdT, DevIndexT
57 from .device import (
58 BdrSwitch,
59 Controller,
60 DhwSensor,
61 OtbGateway,
62 TrvActuator,
63 UfhCircuit,
64 )
65 from .gateway import Gateway
66 from .system import Evohome
69_QOS_TX_LIMIT = 12 # TODO: needs work
70_ID_SLICE = 9
71_SZ_LAST_PKT: Final = "last_msg"
72_SZ_NEXT_DUE: Final = "next_due"
73_SZ_TIMEOUT: Final = "timeout"
74_SZ_FAILURES: Final = "failures"
75_SZ_INTERVAL: Final = "interval"
76_SZ_COMMAND: Final = "command"
78#
79# NOTE: All debug flags should be False for deployment to end-users
80_DBG_ENABLE_DISCOVERY_BACKOFF: Final[bool] = False
82_LOGGER = logging.getLogger(__name__)
85def class_by_attr(name: str, attr: str) -> dict[str, Any]: # TODO: change to __module__
86 """Return a mapping of a (unique) attr of classes in a module to that class."""
88 def predicate(m: ModuleType) -> bool:
89 return isclass(m) and m.__module__ == name and getattr(m, attr, None)
91 return {getattr(c[1], attr): c[1] for c in getmembers(modules[name], predicate)}
94class _Entity:
95 """The ultimate base class for Devices/Zones/Systems.
97 This class is mainly concerned with:
98 - if the entity can Rx packets (e.g. can the HGI send it an RQ?)
99 """
101 _SLUG: str = None # type: ignore[assignment]
103 def __init__(self, gwy: Gateway) -> None:
104 self._gwy = gwy
105 self.id: DeviceIdT = None # type: ignore[assignment]
107 self._qos_tx_count = 0 # the number of pkts Tx'd with no matching Rx
109 def __repr__(self) -> str:
110 return f"{self.id} ({self._SLUG})"
112 # TODO: should be a private method
113 def deprecate_device(self, pkt: Packet, reset: bool = False) -> None:
114 """If an entity is deprecated enough times, stop sending to it."""
116 if reset:
117 self._qos_tx_count = 0
118 return
120 self._qos_tx_count += 1
121 if self._qos_tx_count == _QOS_TX_LIMIT:
122 _LOGGER.warning(
123 f"{pkt} < Sending now deprecated for {self} "
124 "(consider adjusting device_id filters)"
125 ) # TODO: take whitelist into account
127 def _handle_msg(self, msg: Message) -> None:
128 """Store a msg in the DBs."""
130 raise NotImplementedError # to be handled by implementing classes
132 # FIXME: this is a mess - to deprecate for async version?
133 def _send_cmd(self, cmd: Command, **kwargs: Any) -> asyncio.Task | None:
134 """Send a Command & return the corresponding Task."""
136 # Don't poll this device if it is not responding
137 if self._qos_tx_count > _QOS_TX_LIMIT:
138 _LOGGER.info(f"{cmd} < Sending was deprecated for {self}")
139 return None # TODO: raise Exception (should be handled before now)
141 if [ # TODO: remove this
142 k for k in kwargs if k not in ("priority", "num_repeats")
143 ]: # FIXME: deprecate QoS in kwargs, should be qos=QosParams(...)
144 raise RuntimeError("Deprecated kwargs: %s", kwargs)
146 # cmd._source_entity = self # TODO: is needed?
147 return self._gwy.send_cmd(cmd, wait_for_reply=False, **kwargs)
149 # FIXME: this is a mess
150 async def _async_send_cmd(
151 self,
152 cmd: Command,
153 priority: Priority | None = None,
154 qos: QosParams | None = None, # FIXME: deprecate QoS in kwargs?
155 ) -> Packet | None:
156 """Send a Command & return the response Packet, or the echo Packet otherwise."""
158 # Don't poll this device if it is not responding
159 if self._qos_tx_count > _QOS_TX_LIMIT:
160 _LOGGER.warning(f"{cmd} < Sending was deprecated for {self}")
161 return None # FIXME: raise Exception (should be handled before now)
163 # cmd._source_entity = self # TODO: is needed?
164 return await self._gwy.async_send_cmd(
165 cmd,
166 max_retries=qos.max_retries if qos else None,
167 priority=priority,
168 timeout=qos.timeout if qos else None,
169 wait_for_reply=qos.wait_for_reply if qos else None,
170 )
173class _MessageDB(_Entity):
174 """Maintain/utilize an entity's state database.
176 EntityBase msg_db query methods
178 (ix = database.py.MessageIndex method)
180 .. table:: Database Query Methods
181 :widths: auto
183 ==== ====================== ==================== ============ ========== ==========
184 e. method name args returns uses used by
185 ==== ====================== ==================== ============ ========== ==========
186 e1 _get_msg_by_hdr hdr Message i3 discover
187 e2 _msg_value code(s), Msg, args dict[k,v] e3,e4
188 e3 _msg_value_code code, verb, key dict[k,v] e4,e5,e6 e6
189 e4 _msg_value_msg Msg, (code) dict[k,v] e2,e3
190 e5 _msg_qry_by_code_key code, key, (verb=) e6,
191 e6 _msg_value_qry_by_code key code, key str/float e3,e5
192 e7 _msg_qry sql e8
193 e8 _msg_count sql e7
194 e9 supported_cmds list(Codes) i7
195 e10 _msgs() i5
196 ==== ====================== ==================== ============ ========== ==========
198 """
200 _gwy: Gateway
201 ctl: Controller
202 tcs: Evohome
204 # These attr used must be in this class
205 _z_id: DeviceIdT
206 _z_idx: DevIndexT | None # e.g. 03, HW. Is None for CTL, TCS.
207 # idx is one of:
208 # - a simple index (e.g. zone_idx, domain_id, aka child_id)
209 # - a compound ctx (e.g. 0005/000C/0418)
210 # - True (an array of elements, each with its own idx),
211 # - False (no idx, is usu. 00),
212 # - None (not determinable, rare)
214 def __init__(self, gwy: Gateway) -> None:
215 super().__init__(gwy)
217 self._msgs_: dict[
218 Code, Message
219 ] = {} # TODO(eb): deprecated, used in test, remove Q1 2026
220 if not self._gwy.msg_db: # TODO(eb): deprecated since 0.52.1, remove Q1 2026
221 self._msgz_: dict[
222 Code, dict[VerbT, dict[bool | str | None, Message]]
223 ] = {} # code/verb/ctx,
225 # As of 0.52.1 we use SQLite MessageIndex, see ramses_rf/database.py
226 # _msgz_ (nested) was only used in this module. Note:
227 # _msgz (now rebuilt from _msgs) is also used in:
228 # - client.py: for code in device._msgz.values()
229 # - base.py: Code._1060 in self._msgz
230 # [x] device.heat (no longer used)
232 def _handle_msg(self, msg: Message) -> None:
233 """Store a msg in the DBs.
234 Uses SQLite MessageIndex since 0.52.1
235 """
237 if not (
238 msg.src.id == self.id[:_ID_SLICE] # do store if dev is msg.src
239 or (
240 msg.dst.id == self.id[:_ID_SLICE] and msg.verb != RQ
241 ) # skip RQs to self
242 or (
243 msg.dst.id == ALL_DEVICE_ID and msg.code == Code._1FC9
244 ) # skip rf_bind rq
245 ):
246 return # don't store the rest
248 if self._gwy.msg_db: # central SQLite MessageIndex
249 _LOGGER.debug(
250 "For %s (_z_id %s) add to msg_db: %s, src %s, dst %s",
251 self.id,
252 self._z_id,
253 msg,
254 msg.src,
255 msg.dst,
256 )
257 debug_code: Code = Code._3150 # for debugging only log these, pick your own
258 if msg.code == debug_code and msg.src.id.startswith("01:"):
259 _LOGGER.debug(
260 "Added msg from %s with code %s to _gwy.msg_db. hdr=%s",
261 msg.src,
262 msg.code,
263 msg._pkt._hdr,
264 )
265 # print(self._gwy.get(src=str(msg.src[:9]), code=debug_code)) # < success!
266 # Result in test log: lookup fails
267 # msg.src = 01:073976 (CTL)
268 # Added msg from 01:073976 (CTL) with code 0005 to _gwy.msg_db
269 # query is for: 01:073976 < no suffix, extended lookup to [:12] chars
270 self._gwy.msg_db.add(msg)
272 # ignore any replaced message that might be returned
273 else: # TODO(eb): remove Q1 2026
274 if msg.code not in self._msgz_: # deprecated since 0.52.1
275 # Store msg verb + ctx by code in nested self._msgz_ Dict
276 self._msgz_[msg.code] = {msg.verb: {msg._pkt._ctx: msg}}
277 elif msg.verb not in self._msgz_[msg.code]:
278 # Same, 1 level deeper
279 self._msgz_[msg.code][msg.verb] = {msg._pkt._ctx: msg}
280 else:
281 # Same, replacing previous message
282 self._msgz_[msg.code][msg.verb][msg._pkt._ctx] = msg
284 # Also store msg by code in flat self._msgs_ dict (stores the latest I/RP msgs by code)
285 # TODO(eb): deprecated since 0.52.1, remove next block _msgs_ Q1 2026
286 if msg.verb in (I_, RP): # drop RQ's
287 # if msg.code == Code._3150 and msg.src.id.startswith(
288 # "02:"
289 # ): # print for UFC only, 1 failing test
290 # print(
291 # f"Added msg with code {msg.code} to {self.id}._msgs_. hdr={msg._pkt._hdr}"
292 # )
293 self._msgs_[msg.code] = msg
295 @property
296 def _msg_list(self) -> list[Message]:
297 """Return a flattened list of all messages logged on this device."""
298 # (only) used in gateway.py#get_state() and in tests/tests/test_eavesdrop_schema.py
299 # TODO remove _msg_list Q1 2026
300 if self._gwy.msg_db:
301 msg_list_qry: list[Message] = []
302 code_list = self._msg_dev_qry()
303 if code_list:
304 for c in code_list:
305 if c in self._msgs:
306 # safeguard against lookup failures ("sim" packets?)
307 msg_list_qry.append(self._msgs[c])
308 else:
309 # evohome has these errors
310 # _msg_list could not fetch self._msgs[7FFF] for 18:072981 (_z_id 18:072981)
311 _LOGGER.debug(
312 "_msg_list could not fetch self._msgs[%s] for %s (_z_id %s)",
313 c,
314 self.id,
315 self._z_id,
316 )
317 return msg_list_qry
318 # else create from legacy nested dict
319 return [
320 msg
321 for code in self._msgz.values()
322 for ctx in code.values()
323 for msg in ctx.values()
324 ]
326 def _add_record(
327 self, id: DeviceIdT, code: Code | None = None, verb: str = " I"
328 ) -> None:
329 """Add a (dummy) record to the central SQLite MessageIndex."""
330 # used by heat.py init
331 if self._gwy.msg_db:
332 self._gwy.msg_db.add_record(id, code=str(code), verb=verb)
333 # else:
334 # _LOGGER.warning("Missing MessageIndex")
335 # raise NotImplementedError
337 def _delete_msg(self, msg: Message) -> None: # FIXME: this is a mess
338 """Remove the msg from all state databases. Used for expired msgs."""
340 from .device import Device
342 obj: _MessageDB
344 # delete from the central SQLite MessageIndex
345 if self._gwy.msg_db:
346 self._gwy.msg_db.rem(msg)
348 entities: list[_MessageDB] = []
349 if isinstance(msg.src, Device):
350 entities = [msg.src]
351 if getattr(msg.src, "tcs", None):
352 entities.append(msg.src.tcs)
353 if msg.src.tcs.dhw:
354 entities.append(msg.src.tcs.dhw)
355 entities.extend(msg.src.tcs.zones)
357 # remove the msg from all the state DBs
358 # TODO(eb): remove Q1 2026
359 for obj in entities:
360 if msg in obj._msgs_.values():
361 del obj._msgs_[msg.code]
362 if not self._gwy.msg_db: # _msgz_ is deprecated, only used during migration
363 with contextlib.suppress(KeyError):
364 del obj._msgz_[msg.code][msg.verb][msg._pkt._ctx]
366 ### entity_base query methods
368 def _get_msg_by_hdr(self, hdr: HeaderT) -> Message | None:
369 """Return a msg, if any, that matches a given header."""
371 if self._gwy.msg_db:
372 # use central SQLite MessageIndex
373 msgs = self._gwy.msg_db.get(hdr=hdr)
374 # only 1 result expected since hdr is a unique key in _gwy.msg_db
375 if msgs:
376 if msgs[0]._pkt._hdr != hdr:
377 raise LookupError
378 return msgs[0]
379 else:
380 msg: Message
381 code: Code
382 verb: VerbT
384 # _ is device_id
385 code, verb, _, *args = hdr.split("|") # type: ignore[assignment]
387 try:
388 if args and (ctx := args[0]): # ctx may == True
389 msg = self._msgz[code][verb][ctx]
390 elif False in self._msgz[code][verb]:
391 msg = self._msgz[code][verb][False]
392 elif None in self._msgz[code][verb]:
393 msg = self._msgz[code][verb][None]
394 else:
395 return None
396 except KeyError:
397 return None
399 if msg._pkt._hdr != hdr:
400 raise LookupError
401 return msg
402 return None
404 def _msg_flag(self, code: Code, key: str, idx: int) -> bool | None:
405 if flags := self._msg_value(code, key=key):
406 return bool(flags[idx])
407 return None
409 def _msg_value(
410 self, code: Code | Iterable[Code], *args: Any, **kwargs: Any
411 ) -> dict | list | None:
412 """
413 Get the value for a Code from the database or from a Message object provided.
415 :param code: filter messages by Code or a tuple of codes (optional)
416 :param args: Message (optional)
417 :param kwargs: zone to filter on (optional)
418 :return: a dict containing key: value pairs, or a list of those
419 """
420 if isinstance(code, str | tuple): # a code or a tuple of codes
421 return self._msg_value_code(code, *args, **kwargs)
423 assert isinstance(code, Message), (
424 f"Invalid format: _msg_value({code})"
425 ) # catch invalidly formatted code, only handle Message from here
426 return self._msg_value_msg(code, *args, **kwargs)
428 def _msg_value_code(
429 self,
430 code: Code,
431 verb: VerbT | None = None,
432 key: str | None = None,
433 **kwargs: Any,
434 ) -> dict | list | None:
435 """
436 Query the _msgz message dict or the SQLite MessageIndex for the most recent
437 key: value pairs(s) for a given code.
439 :param code: filter messages by Code or a tuple of Codes, optional
440 :param verb: filter on I, RQ, RP, optional, only with a single Code
441 :param key: value keyword to retrieve, not together with verb RQ
442 :param kwargs: extra filter, e.g. zone_idx='01'
443 :return: a dict containing key: value pairs, or a list of those
444 """
445 assert not isinstance(code, tuple) or verb is None, (
446 f"Unsupported: using a tuple ({code}) with a verb ({verb})"
447 )
449 if verb:
450 if verb == VerbT("RQ"):
451 # must be a single code
452 assert not isinstance(code, tuple) or verb is None, (
453 f"Unsupported: using a keyword ({key}) with verb RQ. Ignoring key"
454 )
455 key = None
456 try:
457 if self._gwy.msg_db: # central SQLite MessageIndex, use verb= kwarg
458 code = Code(
459 self._msg_qry_by_code_key(code, key, **kwargs, verb=verb)
460 )
461 msg = self._msgs.get(code)
462 else: # deprecated lookup in nested _msgz
463 msgs = self._msgz[code][verb]
464 msg = max(msgs.values()) if msgs else None
465 except KeyError:
466 msg = None
468 elif isinstance(code, tuple):
469 msgs = [m for m in self._msgs.values() if m.code in code]
470 msg = max(msgs) if msgs else None
471 # return highest = latest? value found in code:value pairs
472 else: # single Code
473 # for Zones, this doesn't work, returns first result = often wrong
474 # TODO fix in _msg_qry_by_code_key()
475 msg = self._msgs.get(code)
477 return self._msg_value_msg(msg, key=key, **kwargs)
479 def _msg_value_msg(
480 self,
481 msg: Message | None,
482 key: str = "*",
483 zone_idx: str | None = None,
484 domain_id: str | None = None,
485 ) -> dict | list | None:
486 """
487 Get from a Message all or a specific key with its value(s),
488 optionally filtering for a zone or a domain
490 :param msg: a Message to inspect
491 :param key: the key to filter on
492 :param zone_idx: the zone to filter on
493 :param domain_id: the domain to filter on
494 :return: a dict containing key: value pairs, or a list of those
495 """
496 if msg is None:
497 return None
498 elif msg._expired:
499 self._gwy._loop.call_soon(self._delete_msg, msg) # HA bugs without defer
501 if msg.code == Code._1FC9: # NOTE: list of lists/tuples
502 return [x[1] for x in msg.payload]
504 idx: str | None = None
505 val: str | None = None # holds the expected matching id value
507 if domain_id:
508 idx, val = SZ_DOMAIN_ID, domain_id
509 elif zone_idx:
510 idx, val = SZ_ZONE_IDX, zone_idx
512 if isinstance(msg.payload, dict):
513 msg_dict = msg.payload # could be a mismatch on idx, accept
514 elif idx: # a list of dicts, e.g. SZ_DOMAIN_ID=FC
515 msg_dict = {
516 k: v for d in msg.payload for k, v in d.items() if d[idx] == val
517 }
518 else: # a list without idx
519 # TODO: this isn't ideal: e.g. a controller is being treated like a 'stat
520 # .I 101 --:------ --:------ 12:126457 2309 006 0107D0-0207D0 # is a CTL
521 msg_dict = msg.payload[0] # we pick the first
523 assert (
524 (not domain_id and not zone_idx)
525 or (msg_dict.get(idx) == val)
526 or (idx == SZ_DOMAIN_ID)
527 ), (
528 f"full dict:{msg_dict}, payload:{msg.payload} < Coding error: key='{idx}', val='{val}'"
529 ) # should not be there
531 if (
532 key == "*" or not key
533 ): # from a SQLite wildcard query, return first=only? k,v
534 return {
535 k: v
536 for k, v in msg_dict.items()
537 if k not in ("dhw_idx", SZ_DOMAIN_ID, SZ_ZONE_IDX) and k[:1] != "_"
538 }
539 return msg_dict.get(key)
541 # SQLite methods, since 0.52.0
543 def _msg_dev_qry(self) -> list[Code] | None:
544 """
545 Retrieve from the MessageIndex a list of Code keys involving this device.
547 :return: list of Codes or an empty list when the query returned empty
548 """
550 if self._gwy.msg_db:
551 # SQLite query on MessageIndex
552 res: list[Code] = []
554 if len(self.id) == 9:
555 # fetch a ctl's message codes (add all its children?)
556 sql = """
557 SELECT code from messages WHERE
558 verb in (' I', 'RP')
559 AND (src = ? OR dst = ?)
560 AND ctx LIKE ?
561 """
562 _ctx_qry = "%"
564 elif self.id[_ID_SLICE:] == "_HW":
565 # fetch a DHW entity's message codes
566 sql = """
567 SELECT code from messages WHERE
568 verb in (' I', 'RP')
569 AND (src = ? OR dst = ?)
570 AND (ctx IN ('FC', 'FA', 'F9', 'FA') OR plk LIKE ?)
571 """
572 _ctx_qry = "%dhw_idx%"
574 else:
575 # fetch a zone's message codes
576 sql = """
577 SELECT code from messages WHERE
578 verb in (' I', 'RP')
579 AND (src = ? OR dst = ?)
580 AND ctx LIKE ?
581 """
582 _ctx_qry = f"%{self.id[_ID_SLICE + 1 :]}%"
584 for rec in self._gwy.msg_db.qry_field(
585 sql, (self.id[:_ID_SLICE], self.id[:_ID_SLICE], _ctx_qry)
586 ):
587 _LOGGER.debug(
588 "Fetched from index: %s for %s (_z_id %s)",
589 rec[0],
590 self.id,
591 self._z_id,
592 )
593 # Example: "Fetched from index: code 1FD4 for 01:123456 (_z_id 01)"
594 res.append(Code(str(rec[0])))
595 return res
596 else:
597 _LOGGER.warning("Missing MessageIndex")
598 raise NotImplementedError
600 def _msg_qry_by_code_key(
601 self,
602 code: Code | tuple[Code] | None = None,
603 key: str | None = None,
604 **kwargs: Any,
605 ) -> Code | None:
606 """
607 Retrieve from the MessageIndex the most current Code for a code(s) &
608 keyword combination involving this device.
610 :param code: (optional) a message Code to use, e.g. Code._31DA or a tuple of Codes
611 :param key: (optional) message keyword to fetch, e.g. SZ_HUMIDITY
612 :param kwargs: optional verb='vb' single verb
613 :return: Code of most recent query result message or None when query returned empty
614 """
615 if self._gwy.msg_db:
616 code_qry: str = "= "
617 if code is None:
618 code_qry = "LIKE '%'" # wildcard
619 elif isinstance(code, tuple):
620 for cd in code:
621 code_qry += f"'{str(cd)}' OR code = '"
622 code_qry = code_qry[:-13] # trim last OR
623 else:
624 code_qry += str(code)
625 if kwargs["verb"] and kwargs["verb"] in (" I", "RP"):
626 vb = f"('{str(kwargs['verb'])}',)"
627 else:
628 vb = "(' I', 'RP',)"
629 ctx_qry = "%"
630 if kwargs["zone_idx"]:
631 ctx_qry = f"%{kwargs['zone_idx']}%"
632 elif kwargs["dhw_idx"]: # DHW
633 ctx_qry = f"%{kwargs['dhw_idx']}%"
634 key_qry = "%" if key is None else f"%{key}%"
636 # SQLite query on MessageIndex
637 sql = """
638 SELECT dtm, code from messages WHERE
639 verb in ?
640 AND (src = ? OR dst = ?)
641 AND (code ?)
642 AND (ctx LIKE ?)
643 AND (plk LIKE ?)
644 """
645 latest: dt = dt(0, 0, 0)
646 res = None
648 for rec in self._gwy.msg_db.qry_field(
649 sql,
650 (
651 vb,
652 self.id[:_ID_SLICE],
653 self.id[:_ID_SLICE],
654 code_qry,
655 ctx_qry,
656 key_qry,
657 ),
658 ):
659 _LOGGER.debug(
660 "_msg_qry_by_code_key fetched rec: %s, code: %s", rec, code_qry
661 )
662 assert isinstance(rec[0], dt) # mypy hint
663 if rec[0] > latest: # dtm, only use most recent
664 res = Code(rec[1])
665 latest = rec[0]
666 return res
667 else:
668 _LOGGER.warning("Missing MessageIndex")
669 raise NotImplementedError
671 def _msg_value_qry_by_code_key(
672 self,
673 code: Code | None = None,
674 key: str | None = None,
675 **kwargs: Any,
676 ) -> str | float | None:
677 """
678 Retrieve from the _msgs dict the most current value of a specific code & keyword combination
679 or the first key's value when no key is specified.
681 :param code: (optional) a single message Code to use, e.g. 31DA
682 :param key: (optional) message keyword to fetch the value for, e.g. SZ_HUMIDITY or * (wildcard)
683 :param kwargs: not used as of 0.52.1
684 :return: a single string or float value or None when qry returned empty
685 """
686 val_msg: dict | list | None = None
687 val: object = None
688 cd: Code | None = self._msg_qry_by_code_key(code, key)
689 if cd is None or cd not in self._msgs:
690 _LOGGER.warning("Code %s not in device %s's messages", cd, self.id)
691 else:
692 val_msg = self._msg_value_msg(
693 self._msgs[cd],
694 key=key, # key can be wildcard *
695 )
696 if val_msg:
697 val = val_msg[0]
698 _LOGGER.debug("Extracted val %s for code %s, key %s", val, code, key)
700 if isinstance(val, float):
701 return float(val)
702 else:
703 return str(val)
705 def _msg_qry(self, sql: str) -> list[dict]:
706 """
707 SQLite custom query for an entity's stored payloads using the full MessageIndex.
708 See ramses_rf/database.py
710 :param sql: custom SQLite query on MessageIndex. Can include multiple CODEs in SELECT.
711 :return: list of payload dicts from the selected messages, or an empty list
712 """
714 res: list[dict] = []
715 if sql and self._gwy.msg_db:
716 # example query:
717 # """SELECT code from messages WHERE verb in (' I', 'RP') AND (src = ? OR dst = ?)
718 # AND (code = '31DA' OR ...) AND (plk LIKE '%{SZ_FAN_INFO}%' OR ...)""" = 2 params
719 for rec in self._gwy.msg_db.qry_field(
720 sql, (self.id[:_ID_SLICE], self.id[:_ID_SLICE])
721 ):
722 _pl = self._msgs[Code(rec[0])].payload
723 # add payload dict to res(ults)
724 res.append(_pl) # only if newer, handled by MessageIndex
725 return res
727 def _msg_count(self, sql: str) -> int:
728 """
729 Get the number of messages in a query result.
731 :param sql: custom SQLite query on MessageIndex.
732 :return: amount of messages in entity's database, 0 for no results
733 """
734 return len(self._msg_qry(sql))
736 @property
737 def traits(self) -> dict[str, Any]:
738 """Get the codes seen by the entity."""
740 codes = {
741 code: (CODES_SCHEMA[code][SZ_NAME] if code in CODES_SCHEMA else None)
742 for code in sorted(self._msgs)
743 if self._msgs[code].src == (self if hasattr(self, "addr") else self.ctl)
744 }
746 return {"_sent": list(codes.keys())}
748 @property
749 def _msgs(self) -> dict[Code, Message]:
750 """
751 Get a flat dict af all I/RP messages logged with this device as src or dst.
753 :return: flat dict of messages by Code
754 """
755 if not self._gwy.msg_db:
756 return self._msgs_
757 # _LOGGER.warning("Missing MessageIndex")
758 # raise NotImplementedError
760 # if self.id[:3] == "18:": # HGI, confirm this is correct, tests suggest so
761 # return {}
763 # a routine to debug dict creation, see test_systems.py:
764 # print(f"Create _msgs for {self.id}:")
765 # results = self._gwy.msg_db._cu.execute("SELECT dtm, src, code from messages WHERE verb in (' I', 'RP') and code is '3150'")
766 # for r in results:
767 # print(r)
769 if len(self.id) == 9:
770 # fetch a ctl's message dtms (add all its children?)
771 sql = """
772 SELECT dtm from messages WHERE
773 verb in (' I', 'RP')
774 AND (src = ? OR dst = ?)
775 AND ctx LIKE ?
776 """
777 _ctx_qry = "%"
779 elif self.id[_ID_SLICE:] == "_HW":
780 # fetch a DHW entity's message dtms
781 sql = """
782 SELECT dtm from messages WHERE
783 verb in (' I', 'RP')
784 AND (src = ? OR dst = ?)
785 AND (ctx IN ('FC', 'FA', 'F9', 'FA') OR plk LIKE ?)
786 """
787 _ctx_qry = "%dhw_idx%"
788 # TODO add Children messages? self.ctl.dhw
789 else:
790 # fetch a zone's message dtms
791 sql = """
792 SELECT dtm from messages WHERE
793 verb in (' I', 'RP')
794 AND (src = ? OR dst = ?)
795 AND ctx LIKE ?
796 """
797 _ctx_qry = f"%{self.id[_ID_SLICE + 1 :]}%"
799 _msg_dict = { # since 0.52.3 use ctx (context) instead of just the address
800 m.code: m
801 for m in self._gwy.msg_db.qry(
802 sql, (self.id[:_ID_SLICE], self.id[:_ID_SLICE], _ctx_qry)
803 ) # e.g. 01:123456_HW, 01:123456_02 (Zone)
804 }
805 # if CTL, remove 3150, 3220 heat_demand, both are only stored on children
806 # HACK
807 # if self.id[:3] == "01:" and self._SLUG == "CTL":
808 # with next ON: 2 errors , both 1x UFC, 1x CTR
809 # with next OFF: 4 errors, all CTR
810 # if Code._3150 in _msg_dict: # Note: CTL can send a 3150 (see heat_ufc_00)
811 # _msg_dict.pop(Code._3150) # keep, prefer to have 2 extra instead of missing 1
812 # if Code._3220 in _msg_dict:
813 # _msg_dict.pop(Code._3220)
814 # _LOGGER.debug(f"Removed 3150/3220 from %s._msgs dict", self.id)
815 return _msg_dict
817 @property
818 def _msgz(self) -> dict[Code, dict[VerbT, dict[bool | str | None, Message]]]:
819 """
820 Get a nested dict of all I/RP messages logged with this device as either src or dst.
821 Based on SQL query on MessageIndex with device as src or dst.
823 :return: dict of messages involving this device, nested by Code, Verb, Context
824 """
825 if not self._gwy.msg_db:
826 return self._msgz_ # TODO(eb): remove and uncomment next Q1 2026
827 # _LOGGER.warning("Missing MessageIndex")
828 # raise NotImplementedError
830 # build _msgz from MessageIndex/_msgs:
831 msgs_1: dict[Code, dict[VerbT, dict[bool | str | None, Message]]] = {}
832 msg: Message
834 for msg in self._msgs.values(): # contains only verbs I, RP
835 if msg.code not in msgs_1:
836 msgs_1[msg.code] = {msg.verb: {msg._pkt._ctx: msg}}
837 elif msg.verb not in msgs_1[msg.code]:
838 msgs_1[msg.code][msg.verb] = {msg._pkt._ctx: msg}
839 else:
840 msgs_1[msg.code][msg.verb][msg._pkt._ctx] = msg
842 return msgs_1
845class _Discovery(_MessageDB):
846 MAX_CYCLE_SECS = 30
847 MIN_CYCLE_SECS = 3
849 def __init__(self, gwy: Gateway) -> None:
850 super().__init__(gwy)
852 self._discovery_cmds: dict[HeaderT, dict] = None # type: ignore[assignment]
853 self._discovery_poller: asyncio.Task | None = None
855 self._supported_cmds: dict[str, bool | None] = {}
856 self._supported_cmds_ctx: dict[str, bool | None] = {}
858 if not gwy.config.disable_discovery:
859 # self._start_discovery_poller() # Can't use derived classes don't exist yet
860 gwy._loop.call_soon(self._start_discovery_poller)
862 @property # TODO: needs tidy up
863 def discovery_cmds(self) -> dict[HeaderT, dict]:
864 """Return the pollable commands."""
865 if self._discovery_cmds is None:
866 self._discovery_cmds = {}
867 self._setup_discovery_cmds()
868 return self._discovery_cmds
870 @property
871 def supported_cmds(self) -> dict[Code, Any]:
872 """Return the current list of pollable command codes."""
873 if self._gwy.msg_db:
874 return {
875 code: CODES_SCHEMA[code][SZ_NAME]
876 for code in sorted(
877 self._gwy.msg_db.get_rp_codes(
878 (self.id[:_ID_SLICE], self.id[:_ID_SLICE])
879 )
880 )
881 if self._is_not_deprecated_cmd(code)
882 }
883 return { # TODO(eb): deprecated since 0.52.1, remove Q1 2026
884 code: (CODES_SCHEMA[code][SZ_NAME] if code in CODES_SCHEMA else None)
885 for code in sorted(self._msgz)
886 if self._msgz[code].get(RP) and self._is_not_deprecated_cmd(code)
887 }
889 @property
890 def supported_cmds_ot(self) -> dict[MsgId, Any]:
891 """Return the current list of pollable OT msg_ids."""
893 def _to_data_id(msg_id: MsgId | str) -> OtDataId:
894 return int(msg_id, 16) # type: ignore[return-value]
896 # def _to_msg_id(data_id: OtDataId | int) -> MsgId: # not used
897 # return f"{data_id:02X}" # type: ignore[return-value]
899 res: list[str] = []
900 if self._gwy.msg_db:
901 # SQLite query for ctx field on MessageIndex
902 sql = """
903 SELECT ctx from messages WHERE
904 verb = 'RP'
905 AND code = '3220'
906 AND (src = ? OR dst = ?)
907 """
908 for rec in self._gwy.msg_db.qry_field(
909 sql, (self.id[:_ID_SLICE], self.id[:_ID_SLICE])
910 ):
911 _LOGGER.debug("Fetched OT ctx from index: %s", rec[0])
912 res.append(rec[0])
913 else: # TODO(eb): remove next Q1 2026
914 res_dict: dict[bool | str | None, Message] | list[Any] = self._msgz[
915 Code._3220
916 ].get(RP, [])
917 assert isinstance(res_dict, dict)
918 res = list(res_dict.keys())
919 # raise NotImplementedError
921 return {
922 f"0x{msg_id}": OPENTHERM_MESSAGES[_to_data_id(msg_id)].get("en") # type: ignore[misc]
923 for msg_id in sorted(res)
924 if (
925 self._is_not_deprecated_cmd(Code._3220, ctx=msg_id)
926 and _to_data_id(msg_id) in OPENTHERM_MESSAGES
927 )
928 }
930 def _is_not_deprecated_cmd(self, code: Code, ctx: str | None = None) -> bool:
931 """Return True if the code|ctx pair is not deprecated."""
933 if ctx is None:
934 supported_cmds = self._supported_cmds
935 idx = str(code)
936 else:
937 supported_cmds = self._supported_cmds_ctx
938 idx = f"{code}|{ctx}"
940 return supported_cmds.get(idx, None) is not False
942 def _setup_discovery_cmds(self) -> None:
943 raise NotImplementedError
945 def _add_discovery_cmd(
946 self,
947 cmd: Command,
948 interval: float,
949 *,
950 delay: float = 0,
951 timeout: float | None = None,
952 ) -> None:
953 """Schedule a command to run periodically.
955 Both `timeout` and `delay` are in seconds.
956 """
958 if cmd.rx_header is None: # TODO: raise TypeError
959 _LOGGER.warning(f"cmd({cmd}): invalid (null) header not added to discovery")
960 return
962 if cmd.rx_header in self.discovery_cmds:
963 _LOGGER.info(f"cmd({cmd}): duplicate header not added to discovery")
964 return
966 if delay:
967 delay += random.uniform(0.05, 0.45)
969 self.discovery_cmds[cmd.rx_header] = {
970 _SZ_COMMAND: cmd,
971 _SZ_INTERVAL: td(seconds=max(interval, self.MAX_CYCLE_SECS)),
972 _SZ_LAST_PKT: None,
973 _SZ_NEXT_DUE: dt.now() + td(seconds=delay),
974 _SZ_TIMEOUT: timeout,
975 _SZ_FAILURES: 0,
976 }
978 def _start_discovery_poller(self) -> None:
979 """Start the discovery poller (if it is not already running)."""
981 if self._discovery_poller and not self._discovery_poller.done():
982 return
984 self._discovery_poller = schedule_task(self._poll_discovery_cmds)
985 self._discovery_poller.set_name(f"{self.id}_discovery_poller")
986 self._gwy.add_task(self._discovery_poller)
988 async def _stop_discovery_poller(self) -> None:
989 """Stop the discovery poller (only if it is running)."""
990 if not self._discovery_poller or self._discovery_poller.done():
991 return
993 self._discovery_poller.cancel()
994 with contextlib.suppress(asyncio.CancelledError):
995 await self._discovery_poller
997 async def _poll_discovery_cmds(self) -> None:
998 """Send any outstanding commands that are past due.
1000 If a relevant message was received recently enough, reschedule the corresponding
1001 command for later.
1002 """
1004 while True:
1005 await self.discover()
1007 if self.discovery_cmds:
1008 next_due = min(t[_SZ_NEXT_DUE] for t in self.discovery_cmds.values())
1009 delay = max((next_due - dt.now()).total_seconds(), self.MIN_CYCLE_SECS)
1010 else:
1011 delay = self.MAX_CYCLE_SECS
1013 await asyncio.sleep(min(delay, self.MAX_CYCLE_SECS))
1015 async def discover(self) -> None:
1016 def find_latest_msg(hdr: HeaderT, task: dict) -> Message | None:
1017 """
1018 :return: the latest message for a header from any source (not just RPs).
1019 """
1020 msgs: list[Message] = [
1021 m
1022 for m in [self._get_msg_by_hdr(hdr[:5] + v + hdr[7:]) for v in (I_, RP)]
1023 if m is not None
1024 ]
1026 try:
1027 if task[_SZ_COMMAND].code in (Code._000A, Code._30C9):
1028 if self._gwy.msg_db: # use bespoke MessageIndex qry
1029 sql = """
1030 SELECT dtm from messages WHERE
1031 code = ?
1032 AND verb in (' I', 'RP')
1033 AND ctx = 'True'
1034 AND (src = ? OR dst = ?)
1035 """
1036 res = self._gwy.msg_db.qry(
1037 sql,
1038 (
1039 task[_SZ_COMMAND].code,
1040 self.tcs.id[:_ID_SLICE],
1041 self.tcs.id[:_ID_SLICE],
1042 ),
1043 )
1044 if len(res) > 0:
1045 msgs += res[0] # expect 1 Message in returned tuple
1046 else:
1047 _LOGGER.debug(
1048 f"No msg found for hdr {hdr}, task code {task[_SZ_COMMAND].code}"
1049 )
1050 else: # TODO(eb) remove next Q1 2026
1051 # CRITICAL FIX: self.tcs might be None during early discovery
1052 if self.tcs:
1053 msgs += [self.tcs._msgz[task[_SZ_COMMAND].code][I_][True]]
1054 # raise NotImplementedError
1055 except KeyError:
1056 pass
1058 return max(msgs) if msgs else None
1060 def backoff(hdr: HeaderT, failures: int) -> td:
1061 """Backoff the interval if there are/were any failures."""
1063 if not _DBG_ENABLE_DISCOVERY_BACKOFF: # FIXME: data gaps
1064 return self.discovery_cmds[hdr][_SZ_INTERVAL] # type: ignore[no-any-return]
1066 if failures > 5:
1067 secs = 60 * 60 * 6
1068 _LOGGER.error(
1069 f"No response for {hdr} ({failures}/5): throttling to 1/6h"
1070 )
1071 elif failures > 2:
1072 _LOGGER.warning(
1073 f"No response for {hdr} ({failures}/5): retrying in {self.MAX_CYCLE_SECS}s"
1074 )
1075 secs = self.MAX_CYCLE_SECS
1076 else:
1077 _LOGGER.info(
1078 f"No response for {hdr} ({failures}/5): retrying in {self.MIN_CYCLE_SECS}s"
1079 )
1080 secs = self.MIN_CYCLE_SECS
1082 return td(seconds=secs)
1084 async def send_disc_cmd(
1085 hdr: HeaderT, task: dict, timeout: float = 15
1086 ) -> Packet | None: # TODO: use constant instead of 15
1087 """Send a scheduled command and wait for/return the response."""
1089 try:
1090 pkt: Packet | None = await asyncio.wait_for(
1091 self._gwy.async_send_cmd(task[_SZ_COMMAND]),
1092 timeout=timeout, # self.MAX_CYCLE_SECS?
1093 )
1095 # TODO: except: handle no QoS
1097 except exc.ProtocolError as err: # InvalidStateError, SendTimeoutError
1098 _LOGGER.warning(f"{self}: Failed to send discovery cmd: {hdr}: {err}")
1100 except TimeoutError as err: # safety valve timeout
1101 _LOGGER.warning(
1102 f"{self}: Failed to send discovery cmd: {hdr} within {timeout} secs: {err}"
1103 )
1105 else:
1106 return pkt
1108 return None
1110 for hdr, task in self.discovery_cmds.items():
1111 dt_now = dt.now()
1113 if (msg := find_latest_msg(hdr, task)) and (
1114 task[_SZ_NEXT_DUE] < msg.dtm + task[_SZ_INTERVAL]
1115 ): # if a newer message is available, take it
1116 task[_SZ_FAILURES] = 0 # only if task[_SZ_LAST_PKT].verb == RP?
1117 task[_SZ_LAST_PKT] = msg._pkt
1118 task[_SZ_NEXT_DUE] = msg.dtm + task[_SZ_INTERVAL]
1120 if task[_SZ_NEXT_DUE] > dt_now:
1121 continue # if (most recent) last_msg is not yet due...
1123 # since we may do I/O, check if the code|msg_id is deprecated
1124 task[_SZ_NEXT_DUE] = dt_now + task[_SZ_INTERVAL] # might undeprecate later
1126 if not self._is_not_deprecated_cmd(task[_SZ_COMMAND].code):
1127 continue
1128 if not self._is_not_deprecated_cmd(
1129 task[_SZ_COMMAND].code, ctx=task[_SZ_COMMAND].payload[4:6]
1130 ): # only for Code._3220
1131 continue
1133 # we'll have to do I/O...
1134 task[_SZ_NEXT_DUE] = dt_now + backoff(hdr, task[_SZ_FAILURES]) # JIC
1136 if pkt := await send_disc_cmd(hdr, task): # TODO: OK 4 some exceptions
1137 task[_SZ_FAILURES] = 0 # only if task[_SZ_LAST_PKT].verb == RP?
1138 task[_SZ_LAST_PKT] = pkt
1139 task[_SZ_NEXT_DUE] = pkt.dtm + task[_SZ_INTERVAL]
1140 else:
1141 task[_SZ_FAILURES] += 1
1142 task[_SZ_LAST_PKT] = None
1143 task[_SZ_NEXT_DUE] = dt_now + backoff(hdr, task[_SZ_FAILURES])
1145 def _deprecate_code_ctx(
1146 self, pkt: Packet, ctx: str = None, reset: bool = False
1147 ) -> None:
1148 """If a code|ctx is deprecated twice, stop polling for it."""
1150 def deprecate(supported_dict: dict[str, bool | None], idx: str) -> None:
1151 if idx not in supported_dict:
1152 supported_dict[idx] = None
1153 elif supported_dict[idx] is None:
1154 _LOGGER.info(
1155 f"{pkt} < Polling now deprecated for code|ctx={idx}: "
1156 "it appears to be unsupported"
1157 )
1158 supported_dict[idx] = False
1160 def reinstate(supported_dict: dict[str, bool | None], idx: str) -> None:
1161 if self._is_not_deprecated_cmd(idx, None) is False:
1162 _LOGGER.info(
1163 f"{pkt} < Polling now reinstated for code|ctx={idx}: "
1164 "it now appears supported"
1165 )
1166 if idx in supported_dict:
1167 supported_dict.pop(idx)
1169 if ctx is None:
1170 supported_cmds = self._supported_cmds
1171 idx: str = pkt.code
1172 else:
1173 supported_cmds = self._supported_cmds_ctx
1174 idx = f"{pkt.code}|{ctx}"
1176 (reinstate if reset else deprecate)(supported_cmds, idx)
1179class Entity(_Discovery):
1180 """The base class for Devices/Zones/Systems."""
1183class Parent(Entity): # A System, Zone, DhwZone or a UfhController
1184 """A Parent can be a System (TCS), a heating Zone, a DHW Zone, or a UfhController.
1186 For a System, children include the appliance controller, the children of all Zones
1187 (incl. the DHW Zone), and also any UFH controllers.
1189 For a heating Zone, children are limited to a sensor, and a number of actuators.
1191 For the DHW Zone, the children are limited to a sensor, a DHW valve, and/or a
1192 heating valve.
1194 There is a `set_parent` method, but no `set_child` method.
1195 """
1197 actuator_by_id: dict[DeviceIdT, BdrSwitch | UfhCircuit | TrvActuator]
1198 actuators: list[BdrSwitch | UfhCircuit | TrvActuator]
1200 circuit_by_id: dict[str, Any]
1202 _app_cntrl: BdrSwitch | OtbGateway | None
1203 _dhw_sensor: DhwSensor | None
1204 _dhw_valve: BdrSwitch | None
1205 _htg_valve: BdrSwitch | None
1207 def __init__(self, *args: Any, child_id: str = None, **kwargs: Any) -> None:
1208 super().__init__(*args, **kwargs)
1210 self._child_id: str = child_id # type: ignore[assignment]
1212 # self._sensor: Child = None
1213 self.child_by_id: dict[str, Child] = {}
1214 self.childs: list[Child] = []
1216 @property
1217 def zone_idx(self) -> str:
1218 """Return the domain id.
1220 For zones and circuits, the domain id is an idx, e.g.: '00', '01', '02'...
1221 For systems, it is 'FF', otherwise it is one of 'F9', 'FA' or 'FC'.
1222 """
1223 return self._child_id
1225 @zone_idx.setter # TODO: should be a private setter
1226 def zone_idx(self, value: str) -> None:
1227 """Set the domain id, after validating it."""
1228 self._child_id = value
1230 def _add_child(
1231 self, child: Any, *, child_id: str = None, is_sensor: bool = None
1232 ) -> None:
1233 """Add a child device to this Parent, after validating the association.
1235 Also sets various other parent-specific object references (e.g. parent._sensor).
1237 This method should be invoked by the child's corresponding `set_parent` method.
1238 """
1240 # NOTE: here to prevent circular references
1241 from .device import (
1242 BdrSwitch,
1243 DhwSensor,
1244 OtbGateway,
1245 OutSensor,
1246 TrvActuator,
1247 UfhCircuit,
1248 UfhController,
1249 )
1250 from .system import DhwZone, System, Zone
1252 if hasattr(self, "childs") and child not in self.childs: # Any parent
1253 assert isinstance(
1254 self, System | Zone | DhwZone | UfhController
1255 ) # TODO: remove me
1257 if is_sensor and child_id == FA: # DHW zone (sensor)
1258 assert isinstance(self, DhwZone) # TODO: remove me
1259 assert isinstance(child, DhwSensor)
1260 if self._dhw_sensor and self._dhw_sensor is not child:
1261 raise exc.SystemSchemaInconsistent(
1262 f"{self} changed dhw_sensor (from {self._dhw_sensor} to {child})"
1263 )
1264 self._dhw_sensor = child
1266 elif is_sensor and hasattr(self, SZ_SENSOR): # HTG zone
1267 assert isinstance(self, Zone) # TODO: remove me
1268 if self.sensor and self.sensor is not child:
1269 raise exc.SystemSchemaInconsistent(
1270 f"{self} changed zone sensor (from {self.sensor} to {child})"
1271 )
1272 self._sensor = child
1274 elif is_sensor:
1275 raise TypeError(
1276 f"not a valid combination for {self}: {child}|{child_id}|{is_sensor}"
1277 )
1279 elif hasattr(self, SZ_CIRCUITS): # UFH circuit
1280 assert isinstance(self, UfhController) # TODO: remove me
1281 if child not in self.circuit_by_id:
1282 self.circuit_by_id[child.id] = child
1284 elif hasattr(self, SZ_ACTUATORS): # HTG zone
1285 assert isinstance(self, Zone) # TODO: remove me
1286 assert isinstance(child, BdrSwitch | UfhCircuit | TrvActuator)
1287 if child not in self.actuators:
1288 self.actuators.append(child)
1289 self.actuator_by_id[child.id] = child # type: ignore[assignment,index]
1291 elif child_id == F9: # DHW zone (HTG valve)
1292 assert isinstance(self, DhwZone) # TODO: remove me
1293 assert isinstance(child, BdrSwitch)
1294 if self._htg_valve and self._htg_valve is not child:
1295 raise exc.SystemSchemaInconsistent(
1296 f"{self} changed htg_valve (from {self._htg_valve} to {child})"
1297 )
1298 self._htg_valve = child
1300 elif child_id == FA: # DHW zone (DHW valve)
1301 assert isinstance(self, DhwZone) # TODO: remove me
1302 assert isinstance(child, BdrSwitch)
1303 if self._dhw_valve and self._dhw_valve is not child:
1304 raise exc.SystemSchemaInconsistent(
1305 f"{self} changed dhw_valve (from {self._dhw_valve} to {child})"
1306 )
1307 self._dhw_valve = child
1309 elif child_id == FC: # Appliance Controller
1310 assert isinstance(self, System) # TODO: remove me
1311 assert isinstance(child, BdrSwitch | OtbGateway)
1312 if self._app_cntrl and self._app_cntrl is not child:
1313 raise exc.SystemSchemaInconsistent(
1314 f"{self} changed app_cntrl (from {self._app_cntrl} to {child})"
1315 )
1316 self._app_cntrl = child
1318 elif child_id == FF: # System
1319 assert isinstance(self, System) # TODO: remove me?
1320 assert isinstance(child, UfhController | OutSensor)
1321 pass
1323 else:
1324 raise TypeError(
1325 f"not a valid combination for {self}: {child}|{child_id}|{is_sensor}"
1326 )
1328 self.childs.append(child)
1329 self.child_by_id[child.id] = child
1332class Child(Entity): # A Zone, Device or a UfhCircuit
1333 """A Device can be the Child of a Parent (a System, a heating Zone, or a DHW Zone).
1335 A Device may/may not have a Parent, but all devices will have the gateway as a
1336 parent, so that they can always be found via `gwy.child_by_id[device_id]`.
1338 In addition, the gateway has `system_by_id`, the Systems have `zone_by_id`, and the
1339 heating Zones have `actuator_by_id` dicts.
1341 There is a `set_parent` method, but no `set_child` method.
1342 """
1344 def __init__(
1345 self,
1346 *args: Any,
1347 parent: Parent = None,
1348 is_sensor: bool | None = None,
1349 **kwargs: Any,
1350 ) -> None:
1351 super().__init__(*args, **kwargs)
1353 self._parent = parent
1354 self._is_sensor = is_sensor
1356 self._child_id: str | None = None # TODO: should be: str?
1358 def _handle_msg(self, msg: Message) -> None:
1359 from .device import Controller, Device, UfhController
1361 def eavesdrop_parent_zone() -> None:
1362 if isinstance(msg.src, UfhController):
1363 return
1365 if SZ_ZONE_IDX not in msg.payload:
1366 return
1368 if isinstance(self, Device): # FIXME: a mess... see issue ramses_cc #249
1369 # the following is a mess - may just be better off deprecating it
1370 if self.type in DEV_TYPE_MAP.HEAT_ZONE_ACTUATORS:
1371 self.set_parent(msg.dst, child_id=msg.payload[SZ_ZONE_IDX])
1373 elif self.type in DEV_TYPE_MAP.THM_DEVICES:
1374 self.set_parent(
1375 msg.dst, child_id=msg.payload[SZ_ZONE_IDX], is_sensor=True
1376 )
1378 super()._handle_msg(msg)
1380 if not self._gwy.config.enable_eavesdrop or (
1381 msg.src is msg.dst or not isinstance(msg.dst, Controller) # UfhController))
1382 ):
1383 return
1385 if not self._parent or not self._child_id:
1386 eavesdrop_parent_zone()
1388 def _get_parent(
1389 self, parent: Parent, *, child_id: str = None, is_sensor: bool | None = None
1390 ) -> tuple[Parent, str | None]:
1391 """Get the device's parent, after validating it."""
1393 # NOTE: here to prevent circular references
1394 from .device import (
1395 BdrSwitch,
1396 Controller,
1397 DhwSensor,
1398 OtbGateway,
1399 OutSensor,
1400 Thermostat,
1401 TrvActuator,
1402 UfhCircuit,
1403 UfhController,
1404 )
1405 from .system import DhwZone, Evohome, System, Zone
1407 if isinstance(self, UfhController):
1408 child_id = FF
1410 if isinstance(parent, Controller): # A controller can't be a Parent
1411 parent = parent.tcs
1413 if isinstance(parent, Evohome) and child_id:
1414 if child_id in (F9, FA):
1415 parent = parent.get_dhw_zone()
1416 # elif child_id == FC:
1417 # pass
1418 elif int(child_id, 16) < parent._max_zones:
1419 parent = parent.get_htg_zone(child_id)
1421 elif isinstance(parent, Zone) and not child_id:
1422 child_id = child_id or parent.idx
1424 # elif isinstance(parent, DhwZone) and child_id:
1425 # child_id = child_id or parent.idx # ?"HW"
1427 elif isinstance(parent, UfhController) and not child_id:
1428 raise TypeError(
1429 f"{self}: can't set child_id to: {child_id} "
1430 f"(for Circuits, it must be a circuit_idx)"
1431 )
1433 # if child_id is None:
1434 # child_id = parent._child_id # or, for zones: parent.idx
1436 if self._parent and self._parent != parent:
1437 raise exc.SystemSchemaInconsistent(
1438 f"{self} can't change parent "
1439 f"({self._parent}_{self._child_id} to {parent}_{child_id})"
1440 )
1442 # if self._child_id is not None and self._child_id != child_id:
1443 # raise CorruptStateError(
1444 # f"{self} can't set domain to: {child_id}, "
1445 # f"({self._parent}_{self._child_id} to {parent}_{child_id})"
1446 # )
1448 # if self._parent:
1449 # if self._parent.ctl is not parent:
1450 # raise CorruptStateError(f"parent mismatch: {self._parent.ctl} is not {parent}")
1451 # if self._child_id and self._child_id != child_id:
1452 # raise CorruptStateError(f"child_id mismatch: {self._child_id} != {child_id}")
1454 PARENT_RULES: dict[Any, dict] = {
1455 DhwZone: {SZ_ACTUATORS: (BdrSwitch,), SZ_SENSOR: (DhwSensor,)},
1456 System: {
1457 SZ_ACTUATORS: (BdrSwitch, OtbGateway, UfhController),
1458 SZ_SENSOR: (OutSensor,),
1459 },
1460 UfhController: {SZ_ACTUATORS: (UfhCircuit,), SZ_SENSOR: ()},
1461 Zone: {
1462 SZ_ACTUATORS: (BdrSwitch, TrvActuator, UfhCircuit),
1463 SZ_SENSOR: (Controller, Thermostat, TrvActuator),
1464 },
1465 }
1467 for k, v in PARENT_RULES.items():
1468 if isinstance(parent, k):
1469 rules = v
1470 break
1471 else:
1472 raise TypeError(
1473 f"for Parent {parent}: not a valid parent "
1474 f"(it must be {tuple(PARENT_RULES.keys())})"
1475 )
1477 if is_sensor and not isinstance(self, rules[SZ_SENSOR]):
1478 raise TypeError(
1479 f"for Parent {parent}: Sensor {self} must be {rules[SZ_SENSOR]}"
1480 )
1481 if not is_sensor and not isinstance(self, rules[SZ_ACTUATORS]):
1482 raise TypeError(
1483 f"for Parent {parent}: Actuator {self} must be {rules[SZ_ACTUATORS]}"
1484 )
1486 if isinstance(parent, Zone):
1487 if child_id != parent.idx:
1488 raise TypeError(
1489 f"{self}: can't set child_id to: {child_id} "
1490 f"(it must match its parent's zone idx, {parent.idx})"
1491 )
1493 elif isinstance(parent, DhwZone): # usu. FA (HW), could be F9
1494 if child_id not in (F9, FA): # may not be known if eavesdrop'd
1495 raise TypeError(
1496 f"{self}: can't set child_id to: {child_id} "
1497 f"(for DHW, it must be F9 or FA)"
1498 )
1500 elif isinstance(parent, System): # usu. FC
1501 if child_id not in (FC, FF): # was: not in (F9, FA, FC, "HW"):
1502 raise TypeError(
1503 f"{self}: can't set child_id to: {child_id} "
1504 f"(for TCS, it must be FC)"
1505 )
1507 elif not isinstance(parent, UfhController): # is like CTL/TCS combined
1508 raise TypeError(
1509 f"{self}: can't set Parent to: {parent} "
1510 f"(it must be System, DHW, Zone, or UfhController)"
1511 )
1513 return parent, child_id
1515 # TODO: should be a private method
1516 def set_parent(
1517 self, parent: Parent | None, *, child_id: str = None, is_sensor: bool = None
1518 ) -> Parent:
1519 """Set the device's parent, after validating it.
1521 This method will then invoke the parent's corresponding `set_child` method.
1523 Devices don't have parents, rather: parents have children; a mis-configured
1524 system could easily leave a device as a child of multiple parents (or bound
1525 to multiple controllers).
1527 It is assumed that a device is only bound to one controller, either a (Evohome)
1528 controller, or an UFH controller.
1529 """
1531 from .device import ( # NOTE: here to prevent circular references
1532 Controller,
1533 UfhController,
1534 )
1536 parent, child_id = self._get_parent(
1537 parent, child_id=child_id, is_sensor=is_sensor
1538 )
1539 ctl = parent if isinstance(parent, UfhController) else parent.ctl
1541 if self.ctl and self.ctl is not ctl:
1542 # NOTE: assume a device is bound to only one CTL (usu. best practice)
1543 raise exc.SystemSchemaInconsistent(
1544 f"{self} can't change controller: {self.ctl} to {ctl} "
1545 "(or perhaps the device has multiple controllers?"
1546 )
1548 parent._add_child(self, child_id=child_id, is_sensor=is_sensor)
1549 # parent.childs.append(self)
1550 # parent.child_by_id[self.id] = self
1552 self._child_id = child_id
1553 self._parent = parent
1555 assert isinstance(ctl, Controller) # mypy hint
1557 self.ctl: Controller = ctl
1558 self.tcs: Evohome = ctl.tcs
1560 return parent