Coverage for src/ramses_rf/device/base.py: 41%
224 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - a RAMSES-II protocol decoder & analyser.
4Base for all devices.
5"""
7from __future__ import annotations
9import logging
10from collections.abc import Iterable
11from typing import TYPE_CHECKING
13from ramses_rf.binding_fsm import BindContext, Vendor
14from ramses_rf.const import DEV_TYPE_MAP, SZ_OEM_CODE, DevType
15from ramses_rf.entity_base import Child, Entity, class_by_attr
16from ramses_rf.helpers import shrink
17from ramses_rf.schemas import (
18 SCH_TRAITS,
19 SZ_ALIAS,
20 SZ_CLASS,
21 SZ_FAKED,
22 SZ_KNOWN_LIST,
23 SZ_SCHEME,
24)
25from ramses_tx import Command, Packet, Priority, QosParams
26from ramses_tx.ramses import CODES_BY_DEV_SLUG, CODES_ONLY_FROM_CTL
28from ramses_rf.const import ( # noqa: F401, isort: skip, pylint: disable=unused-import
29 I_,
30 RP,
31 RQ,
32 W_,
33 Code,
34)
36if TYPE_CHECKING:
37 from typing import Any
39 from ramses_rf import Gateway
40 from ramses_rf.system import Zone
41 from ramses_tx import Address, DeviceIdT, IndexT, Message
44BIND_WAITING_TIMEOUT = 300 # how long to wait, listening for an offer
45BIND_REQUEST_TIMEOUT = 5 # how long to wait for an accept after sending an offer
46BIND_CONFIRM_TIMEOUT = 5 # how long to wait for a confirm after sending an accept
49_LOGGER = logging.getLogger(__name__)
52class DeviceBase(Entity):
53 """The Device base class - can also be used for unknown device types."""
55 _SLUG: str = DevType.DEV
56 _STATE_ATTR: str = None
58 _bind_context: BindContext | None = None
60 def __init__(self, gwy: Gateway, dev_addr: Address, **kwargs: Any) -> None:
61 super().__init__(gwy)
63 # FIXME: gwy.msg_db entities must know their parent device ID and their own idx
64 self._z_id = dev_addr.id # the responsible device is itself
65 self._z_idx = None # depends upon its location in the schema
67 self.id: DeviceIdT = dev_addr.id
69 # self.tcs = None # NOTE: Heat (CH/DHW) devices only
70 # self.ctl = None
71 # self._child_id = None # also in Child class
73 self.addr = dev_addr
74 self.type = dev_addr.type # DEX # TODO: remove this attr? use SLUG?
76 self._scheme: Vendor = None
78 def __str__(self) -> str:
79 if self._STATE_ATTR and hasattr(self, self._STATE_ATTR):
80 state: float | None = getattr(self, self._STATE_ATTR)
81 return f"{self.id} ({self._SLUG}): {state}"
82 return f"{self.id} ({self._SLUG})"
84 def __lt__(self, other: object) -> bool:
85 if not hasattr(other, "id"):
86 return NotImplemented
87 return self.id < other.id # type: ignore[no-any-return]
89 def _update_traits(self, **traits: Any) -> None:
90 """Update a device with new schema attributes.
92 :param traits: The traits to apply (e.g., alias, class, faked)
93 :raises TypeError: If the device is not fakeable but 'faked' is set.
94 """
96 traits = shrink(SCH_TRAITS(traits))
98 if traits.get(SZ_FAKED): # class & alias are done elsewhere
99 if not isinstance(self, Fakeable):
100 raise TypeError(f"Device is not fakeable: {self} (traits={traits})")
101 self._make_fake()
103 self._scheme = traits.get(SZ_SCHEME)
105 @classmethod
106 def create_from_schema(
107 cls, gwy: Gateway, dev_addr: Address, **schema: Any
108 ) -> DeviceBase:
109 """Create a device (for a GWY) and set its schema attrs (aka traits).
111 All devices have traits, but also controllers (CTL, UFC) have a system schema.
113 The appropriate Device class should have been determined by a factory.
114 Schema attrs include: class (SLUG), alias & faked.
115 """
117 dev = cls(gwy, dev_addr)
118 dev._update_traits(**schema) # TODO: split traits/schema
119 return dev
121 def _setup_discovery_cmds(self) -> None:
122 # super()._setup_discovery_cmds()
123 # sometimes, battery-powered devices will respond to an RQ (e.g. bind mode)
125 # if discover_flag & Discover.TRAITS:
126 # self._add_discovery_cmd(cmd(RQ, Code._1FC9, "00", self.id), 60 * 60 * 24)
127 # self._add_discovery_cmd(cmd(RQ, Code._0016, "00", self.id), 60 * 60)
129 pass
131 def _send_cmd(self, cmd: Command, **kwargs: Any) -> None:
132 if self.has_battery and not self.is_faked and cmd.dst.id == self.id:
133 _LOGGER.info(f"{cmd} < Sending inadvisable for {self} (it has a battery)")
135 super()._send_cmd(cmd, **kwargs)
137 def _handle_msg(self, msg: Message) -> None:
138 # # assert msg.src is self or (
139 # # msg.code == Code._1FC9 and msg.payload[SZ_PHASE] == SZ_OFFER
140 # # ), f"msg from {msg.src} inappropriately routed to {self}"
142 super()._handle_msg(msg)
144 if self._SLUG in DEV_TYPE_MAP.PROMOTABLE_SLUGS: # HACK: can get precise class?
145 from . import best_dev_role
147 cls = best_dev_role(
148 self.addr, msg=msg, eavesdrop=self._gwy.config.enable_eavesdrop
149 )
151 if cls._SLUG in (DevType.DEV, self._SLUG):
152 return # either a demotion (DEV), or not promotion (HEA/HVC)
154 if self._SLUG == DevType.HEA and cls._SLUG in DEV_TYPE_MAP.HVAC_SLUGS:
155 return # TODO: should raise error if CODES_OF_HVAC_DOMAIN_ONLY?
157 if self._SLUG == DevType.HVC and cls._SLUG not in DEV_TYPE_MAP.HVAC_SLUGS:
158 return # TODO: should raise error if CODES_OF_HEAT_DOMAIN_ONLY?
160 _LOGGER.warning(
161 f"Promoting the device class of {self} to {cls._SLUG}"
162 f" - use a {SZ_KNOWN_LIST} to explicitly set this device's"
163 f" {SZ_CLASS} to '{DEV_TYPE_MAP[cls._SLUG]}'"
164 )
165 self.__class__ = cls
167 @property
168 def has_battery(self) -> None | bool: # 1060
169 """Return True if the device is battery powered (excludes battery-backup)."""
170 if self._gwy.msg_db:
171 code_list = self._msg_dev_qry()
172 return isinstance(self, BatteryState) or (
173 code_list is not None and Code._1060 in code_list
174 ) # TODO(eb): clean up next line Q1 2026
175 return isinstance(self, BatteryState) or Code._1060 in self._msgz
177 @property
178 def is_faked(self) -> bool:
179 """Return True if the device is faked."""
181 return bool(self._bind_context) # isinstance(self, Fakeable) and...
183 @property
184 def _is_binding(self) -> bool:
185 """Return True if the (faked) device is actively binding."""
187 return self._bind_context and self._bind_context.is_binding
189 @property
190 def _is_present(self) -> bool:
191 """Try to exclude ghost devices (as caused by corrupt packet addresses)."""
192 return any(
193 m.src == self for m in self._msgs.values() if not m._expired
194 ) # TODO: needs addressing
196 @property
197 def schema(self) -> dict[str, Any]:
198 """Return the fixed attributes of the device."""
199 return {} # SZ_CLASS: DEV_TYPE_MAP[self._SLUG]}
201 @property
202 def params(self) -> dict[str, Any]:
203 """Return the configurable attributes of the device."""
204 return {}
206 @property
207 def status(self) -> dict[str, Any]:
208 """Return the state attributes of the device."""
209 return {}
211 @property
212 def traits(self) -> dict[str, Any]:
213 """Get the traits of the device."""
215 result = super().traits
217 known_dev = self._gwy._include.get(self.id)
219 result.update(
220 {
221 SZ_CLASS: DEV_TYPE_MAP[self._SLUG],
222 SZ_ALIAS: known_dev.get(SZ_ALIAS) if known_dev else None,
223 SZ_FAKED: self.is_faked,
224 }
225 )
227 return result | {"_bind": self._msg_value(Code._1FC9)}
230class BatteryState(DeviceBase): # 1060
231 BATTERY_LOW = "battery_low" # boolean
232 BATTERY_STATE = "battery_state" # percentage (0.0-1.0)
234 @property
235 def battery_low(self) -> None | bool: # 1060
236 if self.is_faked:
237 return False
238 return self._msg_value(Code._1060, key=self.BATTERY_LOW)
240 @property
241 def battery_state(self) -> dict[str, Any] | None: # 1060
242 if self.is_faked:
243 return None
244 return self._msg_value(Code._1060)
246 @property
247 def status(self) -> dict[str, Any]:
248 return {
249 **super().status,
250 self.BATTERY_STATE: self.battery_state,
251 }
254class DeviceInfo(DeviceBase): # 10E0
255 def _setup_discovery_cmds(self) -> None:
256 super()._setup_discovery_cmds()
258 # if discover_flag & Discover.SCHEMA:
259 if self._SLUG not in CODES_BY_DEV_SLUG or RP in CODES_BY_DEV_SLUG[
260 self._SLUG
261 ].get(Code._10E0, {}):
262 cmd = Command.from_attrs(RQ, self.id, Code._10E0, "00")
263 self._add_discovery_cmd(cmd, 60 * 60 * 24)
265 @property
266 def device_info(self) -> dict | None: # 10E0
267 return self._msg_value(Code._10E0)
269 @property
270 def traits(self) -> dict[str, Any]:
271 """Return the traits of the device."""
273 result = super().traits
275 if Code._10E0 in self._msgs or Code._10E0 in CODES_BY_DEV_SLUG.get(
276 self._SLUG, []
277 ):
278 result.update({"_info": self.device_info})
280 return result
283# NOTE: devices (Thermostat) not attrs (Temperature) are faked
284class Fakeable(DeviceBase):
285 """There are two types of Faking: impersonation (of real devices) and full-faking.
287 Impersonation of physical devices simply means sending packets on their behalf. This
288 is straight-forward for sensors & remotes (they do not usually receive pkts).
290 Faked (virtual) devices must have any packet addressed to them sent to their
291 handle_msg() method by the dispatcher. Impersonated devices will simply pick up
292 such packets via RF.
293 """
295 def __init__(self, gwy: Gateway, *args: Any, **kwargs: Any) -> None:
296 super().__init__(gwy, *args, **kwargs)
298 self._bind_context: BindContext | None = None
300 # TOD: this is messy - device schema vs device traits
301 if self.id in gwy._include and gwy._include[self.id].get(SZ_FAKED):
302 self._make_fake()
304 if kwargs.get(SZ_FAKED):
305 self._make_fake()
307 def _make_fake(self) -> None:
308 if self._bind_context:
309 return
311 self._bind_context = BindContext(self)
312 self._gwy._include[self.id][SZ_FAKED] = True # TODO: remove this
313 _LOGGER.info(f"Faking now enabled for: {self}")
315 async def _async_send_cmd(
316 self,
317 cmd: Command,
318 priority: Priority | None = None,
319 qos: QosParams | None = None,
320 ) -> Packet | None:
321 """Wrapper to CC: any relevant Commands to the binding Context."""
323 if self._bind_context and self._bind_context.is_binding:
324 # cmd.code in (Code._1FC9, Code._10E0)
325 self._bind_context.sent_cmd(cmd) # other codes needed for edge cases
327 return await super()._async_send_cmd(cmd, priority=priority, qos=qos)
329 def _handle_msg(self, msg: Message) -> None:
330 """Wrapper to CC: any relevant Packets to the binding Context."""
332 super()._handle_msg(msg)
334 if self._bind_context and self._bind_context.is_binding:
335 # msg.code in (Code._1FC9, Code._10E0)
336 self._bind_context.rcvd_msg(msg) # maybe other codes needed for edge cases
338 async def _wait_for_binding_request(
339 self,
340 accept_codes: Iterable[Code],
341 /,
342 *,
343 idx: IndexT = "00",
344 require_ratify: bool = False,
345 ) -> tuple[Packet, Packet, Packet, Packet | None]:
346 """Listen for a binding and return the Offer packets.
348 :param accept_codes: The codes allowed for this binding
349 :type accept_codes: Iterable[Code]
350 :param idx: The index to bind to, defaults to "00"
351 :type idx: IndexT
352 :param require_ratify: Whether a ratification step is required, defaults to False
353 :type require_ratify: bool
354 :return: A tuple of the four binding transaction packets
355 :rtype: tuple[Packet, Packet, Packet, Packet | None]
356 """
358 if not self._bind_context:
359 raise TypeError(f"{self}: Faking not enabled")
361 msgs = await self._bind_context.wait_for_binding_request(
362 accept_codes, idx=idx, require_ratify=require_ratify
363 )
364 return msgs
366 async def wait_for_binding_request(
367 self,
368 accept_codes: Iterable[Code],
369 /,
370 *,
371 idx: IndexT = "00",
372 require_ratify: bool = False,
373 ) -> tuple[Packet, Packet, Packet, Packet | None]:
374 raise NotImplementedError
376 async def _initiate_binding_process(
377 self,
378 offer_codes: Code | Iterable[Code],
379 /,
380 *,
381 confirm_code: Code | None = None,
382 ratify_cmd: Command | None = None,
383 ) -> tuple[Packet, Packet, Packet, Packet | None]:
384 """Start a binding and return the Accept, or raise an exception."""
385 # confirm_code can be FFFF.
387 if not self._bind_context:
388 raise TypeError(f"{self}: Faking not enabled")
390 if isinstance(offer_codes, Iterable):
391 codes: tuple[Code] = offer_codes
392 else:
393 codes = tuple([offer_codes])
395 msgs = await self._bind_context.initiate_binding_process(
396 codes, confirm_code=confirm_code, ratify_cmd=ratify_cmd
397 ) # TODO: if successful, re-discover schema?
398 return msgs
400 async def initiate_binding_process(self) -> Packet:
401 raise NotImplementedError
403 @property
404 def oem_code(self) -> str | None:
405 """Return the OEM code (a 2-char ascii str) for this device, if there is one."""
406 # raise NotImplementedError # self.traits is a @property
407 if not self.traits.get(SZ_OEM_CODE):
408 self.traits[SZ_OEM_CODE] = self._msg_value(Code._10E0, key=SZ_OEM_CODE)
409 return self.traits.get(SZ_OEM_CODE)
412class Device(Child, DeviceBase):
413 """The base class for all devices."""
415 def __init__(self, gwy: Gateway, dev_addr: Address, **kwargs: Any) -> None:
416 _LOGGER.debug("Creating a Device: %s (%s)", dev_addr.id, self.__class__)
417 super().__init__(gwy, dev_addr)
419 gwy._add_device(self)
422class HgiGateway(Device): # HGI (18:)
423 """The HGI80 base class."""
425 _SLUG: str = DevType.HGI
427 def __init__(self, *args: Any, **kwargs: Any) -> None:
428 super().__init__(*args, **kwargs)
430 self.ctl = None # FIXME: a mess
431 self._child_id = "gw" # TODO
432 self.tcs = None
434 @property
435 def schema(self) -> dict[str, Any]:
436 return {}
439class DeviceHeat(Device): # Heat domain: Honeywell CH/DHW or compatible
440 """The base class for the heat domain (Honeywell CH/DHW-compatible devices).
442 Includes UFH and heatpumps (which can also cool).
443 """
445 _SLUG: str = DevType.HEA # shouldn't be any of these instantiated
447 def __init__(self, gwy: Gateway, dev_addr: Address, **kwargs: Any) -> None:
448 super().__init__(gwy, dev_addr, **kwargs)
450 self.ctl = None
451 self.tcs = None
452 self._child_id = None # domain_id, or zone_idx
454 self._iz_controller: None | bool | Message = None
456 def _handle_msg(self, msg: Message) -> None:
457 super()._handle_msg(msg)
459 if msg.verb != I_ or self._iz_controller is not None:
460 return
462 if not self._iz_controller and msg.code in CODES_ONLY_FROM_CTL:
463 if self._iz_controller is None:
464 _LOGGER.info(f"{msg!r} # IS_CONTROLLER (00): is TRUE")
465 self._make_tcs_controller(msg=msg)
466 elif self._iz_controller is False: # TODO: raise CorruptStateError
467 _LOGGER.error(f"{msg!r} # IS_CONTROLLER (01): was FALSE, now True")
469 def _make_tcs_controller(
470 self, *, msg: Message | None = None, **schema: Any
471 ) -> None: # CH/DHW
472 """Attach a TCS (create/update as required) after passing it any msg."""
474 if self.type not in DEV_TYPE_MAP.CONTROLLERS: # potentially can be controllers
475 raise TypeError(f"Invalid device type to be a controller: {self}")
477 self._iz_controller = self._iz_controller or msg or True
479 # @property
480 # def controller(self): # -> Optional[Controller]:
481 # """Return the entity's controller, if known."""
483 # return self.ctl # TODO: if the controller is not known, try to find it?
485 @property
486 def _is_controller(self) -> None | bool:
487 if self._iz_controller is not None:
488 return bool(self._iz_controller) # True, False, or msg
490 if self.ctl is not None: # TODO: messy
491 return self.ctl is self
493 return False
495 @property
496 def zone(self) -> Zone | None:
497 """Return the device's parent zone, if known."""
499 return self._parent
502class DeviceHvac(Device): # HVAC domain: ventilation, PIV, MV/HR
503 """The Device base class for the HVAC domain (ventilation, PIV, MV/HR)."""
505 _SLUG: str = DevType.HVC # these may be instantiated, and promoted later on
507 def __init__(self, gwy: Gateway, dev_addr: Address, **kwargs: Any) -> None:
508 super().__init__(gwy, dev_addr, **kwargs)
510 self._child_id = "hv" # TODO: domain_id/deprecate
512 # def _handle_msg(self, msg: Message) -> None:
513 # super()._handle_msg(msg)
515 # # if type(self) is DeviceHvac:
516 # # if self.type == DEV_TYPE_MAP.RFG: # self.__class__ is Device, DEX
517 # # # TODO: the RFG codes need checking
518 # # if msg.code in (Code._31D9, Code._31DA) and msg.verb in (I_, RP):
519 # # self.__class__ = HvacVentilator
520 # # elif msg.code in (Code._0006, Code._0418, Code._3220) and msg.verb == RQ:
521 # # self.__class__ = RfgGateway
522 # # elif msg.code in (Code._313F,) and msg.verb == W_:
523 # # self.__class__ = RfgGateway
524 # # if type(self) is not Device:
525 # # _LOGGER.warning(f"Promoted a device type for: {self}")
527 # if msg.code in (Code._1298, Code._12A0, Code._22F1, Code._22F3):
528 # self._hvac_trick()
531# e.g. {"HGI": HgiGateway}
532BASE_CLASS_BY_SLUG: dict[str, type[Device]] = class_by_attr(__name__, "_SLUG")