Coverage for src/ramses_rf/device/heat.py: 44%
657 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - devices from the CH/DHW (heat) domain."""
4from __future__ import annotations
6import logging
7from collections.abc import Callable
8from typing import TYPE_CHECKING, Any, Final
10from ramses_rf import exceptions as exc
11from ramses_rf.const import (
12 DEV_ROLE_MAP,
13 DEV_TYPE_MAP,
14 DOMAIN_TYPE_MAP,
15 SZ_DEVICES,
16 SZ_DOMAIN_ID,
17 SZ_HEAT_DEMAND,
18 SZ_PRESSURE,
19 SZ_RELAY_DEMAND,
20 SZ_SETPOINT,
21 SZ_TEMPERATURE,
22 SZ_UFH_IDX,
23 SZ_WINDOW_OPEN,
24 SZ_ZONE_IDX,
25 SZ_ZONE_MASK,
26 SZ_ZONE_TYPE,
27 ZON_ROLE_MAP,
28 DevType,
29)
30from ramses_rf.device import Device
31from ramses_rf.entity_base import Child, Entity, Parent, class_by_attr
32from ramses_rf.helpers import shrink
33from ramses_rf.schemas import SCH_TCS, SZ_ACTUATORS, SZ_CIRCUITS
34from ramses_tx import NON_DEV_ADDR, Command, Priority
35from ramses_tx.const import SZ_NUM_REPEATS, SZ_PRIORITY, MsgId
36from ramses_tx.opentherm import (
37 PARAMS_DATA_IDS,
38 SCHEMA_DATA_IDS,
39 STATUS_DATA_IDS,
40 SZ_MSG_ID,
41 SZ_MSG_NAME,
42 SZ_MSG_TYPE,
43 SZ_VALUE,
44 OtMsgType,
45)
46from ramses_tx.ramses import CODES_OF_HEAT_DOMAIN_ONLY, CODES_ONLY_FROM_CTL
47from ramses_tx.typed_dicts import PayDictT
49from .base import BatteryState, DeviceHeat, Fakeable
51from ramses_rf.const import ( # noqa: F401, isort: skip, pylint: disable=unused-import
52 F9,
53 FA,
54 FC,
55 FF,
56)
58from ramses_rf.const import ( # noqa: F401, isort: skip, pylint: disable=unused-import
59 I_,
60 RP,
61 RQ,
62 W_,
63 Code,
64)
66from ramses_tx.const import (
67 SZ_BOILER_OUTPUT_TEMP,
68 SZ_BOILER_RETURN_TEMP,
69 SZ_BOILER_SETPOINT,
70 SZ_BURNER_FAILED_STARTS,
71 SZ_BURNER_HOURS,
72 SZ_BURNER_STARTS,
73 SZ_CH_ACTIVE,
74 SZ_CH_ENABLED,
75 SZ_CH_MAX_SETPOINT,
76 SZ_CH_PUMP_HOURS,
77 SZ_CH_PUMP_STARTS,
78 SZ_CH_SETPOINT,
79 SZ_CH_WATER_PRESSURE,
80 SZ_COOLING_ACTIVE,
81 SZ_COOLING_ENABLED,
82 SZ_DHW_ACTIVE,
83 SZ_DHW_BLOCKING,
84 SZ_DHW_BURNER_HOURS,
85 SZ_DHW_BURNER_STARTS,
86 SZ_DHW_ENABLED,
87 SZ_DHW_FLOW_RATE,
88 SZ_DHW_PUMP_HOURS,
89 SZ_DHW_PUMP_STARTS,
90 SZ_DHW_SETPOINT,
91 SZ_DHW_TEMP,
92 SZ_FAULT_PRESENT,
93 SZ_FLAME_ACTIVE,
94 SZ_FLAME_SIGNAL_LOW,
95 SZ_MAX_REL_MODULATION,
96 SZ_OEM_CODE,
97 SZ_OTC_ACTIVE,
98 SZ_OUTSIDE_TEMP,
99 SZ_REL_MODULATION_LEVEL,
100 SZ_SUMMER_MODE,
101)
103if TYPE_CHECKING:
104 from ramses_rf.system import Evohome, Zone
105 from ramses_tx import Address, Message, Packet
106 from ramses_tx.opentherm import OtDataId
109QOS_LOW = {SZ_PRIORITY: Priority.LOW} # FIXME: deprecate QoS in kwargs
110QOS_MID = {SZ_PRIORITY: Priority.HIGH} # FIXME: deprecate QoS in kwargs
111QOS_MAX = {SZ_PRIORITY: Priority.HIGH, SZ_NUM_REPEATS: 3} # FIXME: deprecate QoS...
113#
114# NOTE: All debug flags should be False for deployment to end-users
115_DBG_ENABLE_DEPRECATION: Final[bool] = False
116_DBG_EXTRA_OTB_DISCOVERY: Final[bool] = False
118_LOGGER = logging.getLogger(__name__)
121class Actuator(DeviceHeat): # 3EF0, 3EF1 (for 10:/13:)
122 # .I --- 13:109598 --:------ 13:109598 3EF0 003 00C8FF # event-driven, 00/C8
123 # RP --- 13:109598 18:002563 --:------ 0008 002 00C8 # 00/C8, as above
124 # RP --- 13:109598 18:002563 --:------ 3EF1 007 0000BF-00BFC8FF # 00/C8, as above
126 # RP --- 10:048122 18:140805 --:------ 3EF1 007 007FFF-003C2A10 # 10:s only RP, always 7FFF
127 # RP --- 13:109598 18:199952 --:------ 3EF1 007 0001B8-01B800FF # 13:s only RP
129 # RP --- 10:047707 18:199952 --:------ 3EF0 009 001110-0A00FF-033100 # 10:s only RP
130 # RP --- 10:138926 34:010253 --:------ 3EF0 006 002E11-0000FF # 10:s only RP
131 # .I --- 13:209679 --:------ 13:209679 3EF0 003 00C8FF # 13:s only I
133 ACTUATOR_CYCLE: Final = "actuator_cycle"
134 ACTUATOR_ENABLED: Final = "actuator_enabled" # boolean
135 ACTUATOR_STATE: Final = "actuator_state"
136 MODULATION_LEVEL: Final = "modulation_level" # percentage (0.0-1.0)
138 def _handle_msg(self, msg: Message) -> None: # NOTE: active
139 super()._handle_msg(msg)
141 if isinstance(self, OtbGateway):
142 return
144 if self._gwy.config.disable_discovery:
145 return
147 # TODO: why are we doing this here? Should simply use discovery poller!
148 if msg.code == Code._3EF0 and msg.verb == I_ and not self.is_faked:
149 # lf._send_cmd(Command.get_relay_demand(self.id), qos=QOS_LOW)
150 self._send_cmd(
151 Command.from_attrs(RQ, self.id, Code._3EF1, "00"), **QOS_LOW
152 ) # actuator cycle
154 @property
155 def actuator_cycle(self) -> dict | None: # 3EF1
156 return self._msg_value(Code._3EF1)
158 @property
159 def actuator_state(self) -> dict | None: # 3EF0
160 return self._msg_value(Code._3EF0)
162 @property
163 def status(self) -> dict[str, Any]:
164 return {
165 **super().status,
166 self.ACTUATOR_CYCLE: self.actuator_cycle,
167 self.ACTUATOR_STATE: self.actuator_state,
168 }
171class HeatDemand(DeviceHeat): # 3150
172 HEAT_DEMAND: Final = SZ_HEAT_DEMAND # percentage valve open (0.0-1.0)
174 @property
175 def heat_demand(self) -> float | None: # 3150
176 return self._msg_value(Code._3150, key=self.HEAT_DEMAND)
178 @property
179 def status(self) -> dict[str, Any]:
180 return {
181 **super().status,
182 self.HEAT_DEMAND: self.heat_demand,
183 }
186class Setpoint(DeviceHeat): # 2309
187 SETPOINT: Final = SZ_SETPOINT # degrees Celsius
189 @property
190 def setpoint(self) -> float | None: # 2309
191 return self._msg_value(Code._2309, key=self.SETPOINT)
193 @property
194 def status(self) -> dict[str, Any]:
195 return {
196 **super().status,
197 self.SETPOINT: self.setpoint,
198 }
201class Weather(DeviceHeat): # 0002
202 TEMPERATURE: Final = SZ_TEMPERATURE # TODO: deprecate
204 @property
205 def temperature(self) -> float | None: # 0002
206 return self._msg_value(Code._0002, key=SZ_TEMPERATURE)
208 @temperature.setter
209 def temperature(self, value: float | None) -> None:
210 """Fake the outdoor temperature of the sensor."""
212 if not self.is_faked:
213 raise exc.DeviceNotFaked(f"{self}: Faking is not enabled")
215 cmd = Command.put_outdoor_temp(self.id, value)
216 self._gwy.send_cmd(cmd, num_repeats=2, priority=Priority.HIGH)
218 @property
219 def status(self) -> dict[str, Any]:
220 return {
221 **super().status,
222 self.TEMPERATURE: self.temperature,
223 }
226class RelayDemand(DeviceHeat): # 0008
227 # .I --- 01:054173 --:------ 01:054173 1FC9 018 03-0008-04D39D FC-3B00-04D39D 03-1FC9-04D39D
228 # .W --- 13:123456 01:054173 --:------ 1FC9 006 00-3EF0-35E240
229 # .I --- 01:054173 13:123456 --:------ 1FC9 006 00-FFFF-04D39D
231 # Some either 00/C8, others 00-C8
232 # .I --- 01:145038 --:------ 01:145038 0008 002 0314 # ZON valve zone (ELE too?)
233 # .I --- 01:145038 --:------ 01:145038 0008 002 F914 # HTG valve
234 # .I --- 01:054173 --:------ 01:054173 0008 002 FA00 # DHW valve
235 # .I --- 01:145038 --:------ 01:145038 0008 002 FC14 # appliance_relay
237 # RP --- 13:109598 18:199952 --:------ 0008 002 0000
238 # RP --- 13:109598 18:199952 --:------ 0008 002 00C8
240 RELAY_DEMAND: Final = SZ_RELAY_DEMAND # percentage (0.0-1.0)
242 def _setup_discovery_cmds(self) -> None:
243 super()._setup_discovery_cmds()
245 if not self.is_faked: # discover_flag & Discover.STATUS and
246 self._add_discovery_cmd(Command.get_relay_demand(self.id), 60 * 15)
248 @property
249 def relay_demand(self) -> float | None: # 0008
250 return self._msg_value(Code._0008, key=self.RELAY_DEMAND)
252 @property
253 def status(self) -> dict[str, Any]:
254 return {
255 **super().status,
256 self.RELAY_DEMAND: self.relay_demand,
257 }
260class DhwTemperature(DeviceHeat): # 1260
261 TEMPERATURE: Final = SZ_TEMPERATURE # TODO: deprecate
263 @property
264 def temperature(self) -> float | None: # 1260
265 return self._msg_value(Code._1260, key=SZ_TEMPERATURE)
267 @temperature.setter
268 def temperature(self, value: float | None) -> None:
269 """Fake the DHW temperature of the sensor."""
271 if not self.is_faked:
272 raise exc.DeviceNotFaked(f"{self}: Faking is not enabled")
274 cmd = Command.put_dhw_temp(self.id, value)
275 self._gwy.send_cmd(cmd, num_repeats=2, priority=Priority.HIGH)
277 @property
278 def status(self) -> dict[str, Any]:
279 return {
280 **super().status,
281 self.TEMPERATURE: self.temperature,
282 }
285class Temperature(DeviceHeat): # 30C9
286 # .I --- 34:145039 --:------ 34:145039 1FC9 012 00-30C9-8A368F 00-1FC9-8A368F
287 # .W --- 01:054173 34:145039 --:------ 1FC9 006 03-2309-04D39D # real CTL
288 # .I --- 34:145039 01:054173 --:------ 1FC9 006 00-30C9-8A368F
289 @property
290 def temperature(self) -> float | None: # 30C9
291 return self._msg_value(Code._30C9, key=SZ_TEMPERATURE)
293 @temperature.setter
294 def temperature(self, value: float | None) -> None:
295 """Fake the indoor temperature of the sensor."""
297 if not self.is_faked:
298 raise exc.DeviceNotFaked(f"{self}: Faking is not enabled")
300 cmd = Command.put_sensor_temp(self.id, value)
301 self._gwy.send_cmd(cmd, num_repeats=2, priority=Priority.HIGH)
303 @property
304 def status(self) -> dict[str, Any]:
305 return {
306 **super().status,
307 SZ_TEMPERATURE: self.temperature,
308 }
311class RfgGateway(DeviceHeat): # RFG (30:)
312 """The RFG100 base class."""
314 _SLUG = DevType.RFG
315 _STATE_ATTR = None
318class Controller(DeviceHeat): # CTL (01):
319 """The Controller base class."""
321 HEAT_DEMAND: Final = SZ_HEAT_DEMAND
323 _SLUG = DevType.CTL
324 _STATE_ATTR = HEAT_DEMAND
326 def __init__(self, *args: Any, **kwargs: Any) -> None:
327 super().__init__(*args, **kwargs)
329 # self.ctl = None
330 self.tcs = None # TODO: = self?
331 self._make_tcs_controller(**kwargs) # NOTE: must create_from_schema first
333 def _handle_msg(self, msg: Message) -> None:
334 super()._handle_msg(msg)
336 self.tcs._handle_msg(msg)
338 def _make_tcs_controller(
339 self, *, msg: Message | None = None, **schema: Any
340 ) -> None: # CH/DHW
341 """Attach a TCS (create/update as required) after passing it any msg."""
343 def get_system(*, msg: Message | None = None, **schema: Any) -> Evohome:
344 """Return a TCS (temperature control system), create it if required.
346 Use the schema to create/update it, then pass it any msg to handle.
348 TCSs are uniquely identified by a controller ID.
349 If a TCS is created, attach it to this device (which should be a CTL).
350 """
352 from ramses_rf.system import system_factory
354 schema = shrink(SCH_TCS(schema))
356 if not self.tcs:
357 self.tcs = system_factory(self, msg=msg, **schema)
359 elif schema:
360 self.tcs._update_schema(**schema)
362 if msg:
363 self.tcs._handle_msg(msg)
364 return self.tcs
366 super()._make_tcs_controller(msg=None, **schema)
368 self.tcs = get_system(msg=msg, **schema)
371class Programmer(Controller): # PRG (23):
372 """The Controller base class."""
374 _SLUG = DevType.PRG
377class UfhController(Parent, DeviceHeat): # UFC (02):
378 """The UFC class, the HCE80 that controls the UFH zones."""
380 HEAT_DEMAND: Final = SZ_HEAT_DEMAND
382 _SLUG = DevType.UFC
383 _STATE_ATTR = HEAT_DEMAND
385 _child_id = FA
386 _iz_controller = True
388 childs: list[UfhCircuit] # TODO: check (code so complex, not sure if this is true)
390 # 12:27:24.398 067 I --- 02:000921 --:------ 01:191718 3150 002 0360
391 # 12:27:24.546 068 I --- 02:000921 --:------ 01:191718 3150 002 065A
392 # 12:27:24.693 067 I --- 02:000921 --:------ 01:191718 3150 002 045C
393 # 12:27:24.824 059 I --- 01:191718 --:------ 01:191718 3150 002 FC5C
394 # 12:27:24.857 067 I --- 02:000921 --:------ 02:000921 3150 006 0060-015A-025C
396 def __init__(self, *args: Any, **kwargs: Any) -> None:
397 super().__init__(*args, **kwargs)
399 self.circuit_by_id = {f"{i:02X}": {} for i in range(8)}
401 self._setpoints: Message | None = None
402 self._heat_demand: Message | None = None
403 self._heat_demands: Message | None = None
404 self._relay_demand: Message | None = None
405 self._relay_demand_fa: Message | None = None
407 def _setup_discovery_cmds(self) -> None:
408 super()._setup_discovery_cmds()
410 # Only RPs are: 0001, 0005/000C, 10E0, 000A/2309 & 22D0
412 cmd = Command.from_attrs(RQ, self.id, Code._0005, f"00{DEV_ROLE_MAP.UFH}")
413 self._add_discovery_cmd(cmd, 60 * 60 * 24)
415 # TODO: this needs work
416 # if discover_flag & Discover.PARAMS: # only 2309 has any potential?
417 for ufc_idx in self.circuit_by_id:
418 cmd = Command.get_zone_config(self.id, ufc_idx)
419 self._add_discovery_cmd(cmd, 60 * 60 * 6)
421 cmd = Command.get_zone_setpoint(self.id, ufc_idx)
422 self._add_discovery_cmd(cmd, 60 * 60 * 6)
424 for ufc_idx in range(8):
425 payload = f"{ufc_idx:02X}{DEV_ROLE_MAP.UFH}"
426 cmd = Command.from_attrs(RQ, self.id, Code._000C, payload)
427 self._add_discovery_cmd(cmd, 60 * 60 * 24)
429 def _handle_msg(self, msg: Message) -> None:
430 super()._handle_msg(msg)
432 # Several assumptions are made, regarding 000C pkts:
433 # - UFC bound only to CTL (not, e.g. SEN)
434 # - all circuits bound to the same controller
436 if msg.code == Code._0005: # system_zones
437 # {'zone_type': '09', 'zone_mask': [1, 1, 1, 1, 1, 0, 0, 0], 'zone_class': 'underfloor_heating'}
439 if msg.payload[SZ_ZONE_TYPE] not in (ZON_ROLE_MAP.ACT, ZON_ROLE_MAP.UFH):
440 return # ignoring ZON_ROLE_MAP.SEN for now
442 for idx, flag in enumerate(msg.payload[SZ_ZONE_MASK]):
443 ufh_idx = f"{idx:02X}"
444 if not flag:
445 self.circuit_by_id[ufh_idx] = {SZ_ZONE_IDX: None}
446 # FIXME: this causing tests to fail when read-only protocol
447 # elif SZ_ZONE_IDX not in self.circuit_by_id[ufh_idx]:
448 # cmd = Command.from_attrs(
449 # RQ, self.ctl.id, Code._000C, f"{ufh_idx}{DEV_ROLE_MAP.UFH}"
450 # )
451 # self._send_cmd(cmd)
453 elif msg.code == Code._0008: # relay_demand
454 if msg.payload.get(SZ_DOMAIN_ID) == FC:
455 self._relay_demand = msg
456 else: # FA
457 self._relay_demand_fa = msg
459 elif msg.code == Code._000C: # zone_devices
460 # {'zone_type': '09', 'ufh_idx': '00', 'zone_idx': '09', 'device_role': 'ufh_actuator', 'devices': ['01:095421']}
461 # {'zone_type': '09', 'ufh_idx': '07', 'zone_idx': None, 'device_role': 'ufh_actuator', 'devices': []}
463 if msg.payload[SZ_ZONE_TYPE] not in (ZON_ROLE_MAP.ACT, ZON_ROLE_MAP.UFH):
464 return # ignoring ZON_ROLE_MAP.SEN for now
466 ufh_idx = msg.payload[SZ_UFH_IDX] # circuit idx
467 self.circuit_by_id[ufh_idx] = {SZ_ZONE_IDX: msg.payload[SZ_ZONE_IDX]}
468 if msg.payload[SZ_ZONE_IDX] is not None: # [SZ_DEVICES][0] will be the CTL
469 self.set_parent(
470 self._gwy.get_device(msg.payload[SZ_DEVICES][0]).tcs,
471 # child_id=msg.payload[SZ_ZONE_IDX],
472 )
474 elif msg.code == Code._22C9: # setpoint_bounds
475 # .I --- 02:017205 --:------ 02:017205 22C9 024 00076C0A280101076C0A28010...
476 # .I --- 02:017205 --:------ 02:017205 22C9 006 04076C0A2801
477 self._setpoints = msg
479 elif msg.code == Code._3150: # heat_demands
480 if isinstance(msg.payload, list): # the circuit demands
481 self._heat_demands = msg
482 elif msg.payload.get(SZ_DOMAIN_ID) == FC:
483 self._heat_demand = msg
484 elif (
485 (zone_idx := msg.payload.get(SZ_ZONE_IDX))
486 and isinstance(msg.dst, Device)
487 and (tcs := msg.dst.tcs)
488 and (zone := tcs.zone_by_idx.get(zone_idx))
489 ):
490 zone._handle_msg(msg)
492 # elif msg.code not in (Code._10E0, Code._22D0):
493 # print("xxx")
494 # "0008|FA/FC", "22C9|array", "22D0|none", "3150|ZZ/array(/FC?)"
496 # TODO: should be a private method
497 def get_circuit(
498 self, cct_idx: str, *, msg: Message | None = None, **schema: Any
499 ) -> Any:
500 """Return a UFH circuit, create it if required.
502 First, use the schema to create/update it, then pass it any msg to handle.
504 Circuits are uniquely identified by a UFH controller ID|cct_idx pair.
505 If a circuit is created, attach it to this UFC.
506 """
508 schema = {} # shrink(SCH_CCT(schema))
510 cct: UfhCircuit = self.child_by_id.get(cct_idx)
511 if not cct:
512 cct = UfhCircuit(self, cct_idx)
513 self.child_by_id[cct_idx] = cct
514 self.childs.append(cct)
516 elif schema:
517 cct._update_schema(**schema)
519 if msg:
520 cct._handle_msg(msg)
521 return cct
523 # @property
524 # def circuits(self) -> dict: # 000C
525 # return self.circuit_by_id
527 @property
528 def heat_demand(self) -> float | None: # 3150|FC (there is also 3150|FA)
529 return self._msg_value_msg(self._heat_demand, key=self.HEAT_DEMAND)
531 @property
532 def heat_demands(self) -> dict | None: # 3150|ufh_idx array
533 # return self._heat_demands.payload if self._heat_demands else None
534 return self._msg_value_msg(self._heat_demands)
536 @property
537 def relay_demand(self) -> dict | None: # 0008|FC
538 return self._msg_value_msg(self._relay_demand, key=SZ_RELAY_DEMAND)
540 @property
541 def relay_demand_fa(self) -> dict | None: # 0008|FA
542 return self._msg_value_msg(self._relay_demand_fa, key=SZ_RELAY_DEMAND)
544 @property
545 def setpoints(self) -> dict | None: # 22C9|ufh_idx array
546 if self._setpoints is None:
547 return None
549 return {
550 c[SZ_UFH_IDX]: {
551 k: v for k, v in c.items() if k in ("temp_low", "temp_high")
552 }
553 for c in self._setpoints.payload
554 }
556 @property # id, type
557 def schema(self) -> dict[str, Any]:
558 return {
559 **super().schema,
560 SZ_CIRCUITS: self.circuit_by_id,
561 }
563 @property # setpoint, config, mode (not schedule)
564 def params(self) -> dict[str, Any]:
565 return {
566 **super().params,
567 SZ_CIRCUITS: self.setpoints,
568 }
570 @property
571 def status(self) -> dict[str, Any]:
572 return {
573 **super().status,
574 SZ_HEAT_DEMAND: self.heat_demand,
575 SZ_RELAY_DEMAND: self.relay_demand,
576 f"{SZ_RELAY_DEMAND}_fa": self.relay_demand_fa,
577 }
580class DhwSensor(DhwTemperature, BatteryState, Fakeable): # DHW (07): 10A0, 1260
581 """The DHW class, such as a CS92."""
583 DHW_PARAMS: Final = "dhw_params"
585 _SLUG: str = DevType.DHW
586 _STATE_ATTR = DhwTemperature.TEMPERATURE
588 def __init__(self, *args: Any, **kwargs: Any) -> None:
589 super().__init__(*args, **kwargs)
591 self._child_id = FA # NOTE: domain_id
593 def _handle_msg(self, msg: Message) -> None: # NOTE: active
594 super()._handle_msg(msg)
596 if self._gwy.config.disable_discovery:
597 return
599 # TODO: why are we doing this here? Should simply use dscovery poller!
600 # The following is required, as CTLs don't send spontaneously
601 if msg.code == Code._1260 and self.ctl:
602 # update the controller DHW temp
603 self._send_cmd(Command.get_dhw_temp(self.ctl.id))
605 async def initiate_binding_process(self) -> Packet:
606 return await super()._initiate_binding_process(Code._1260)
608 @property
609 def dhw_params(self) -> PayDictT._10A0 | None:
610 return self._msg_value(Code._10A0)
612 @property
613 def params(self) -> dict[str, Any]:
614 return {
615 **super().params,
616 self.DHW_PARAMS: self.dhw_params,
617 }
620class OutSensor(Weather, Fakeable): # OUT: 17
621 """The OUT class (external sensor), such as a HB85/HB95."""
623 # LUMINOSITY = "luminosity" # lux
624 # WINDSPEED = "windspeed" # km/h
626 _SLUG = DevType.OUT
627 _STATE_ATTR = SZ_TEMPERATURE
629 # async def initiate_binding_process(self) -> Packet:
630 # return await super()._initiate_binding_process(...)
633def _to_msg_id(data_id: OtDataId) -> MsgId:
634 return f"{data_id:02X}"
637# NOTE: config.use_native_ot should enforce sends, but not reads from _msgz DB
638class OtbGateway(Actuator, HeatDemand): # OTB (10): 3220 (22D9, others)
639 """The OTB class, specifically an OpenTherm Bridge (R8810A Bridge)."""
641 # see: https://www.opentherm.eu/request-details/?post_ids=2944
642 # see: https://www.automatedhome.co.uk/vbulletin/showthread.php?6400-(New)-cool-mode-in-Evohome
644 _SLUG = DevType.OTB
645 _STATE_ATTR = SZ_REL_MODULATION_LEVEL
647 OT_TO_RAMSES: dict[MsgId, Code] = { # TODO: move to opentherm.py
648 MsgId._00: Code._3EF0, # master/slave status (actuator_state)
649 MsgId._01: Code._22D9, # boiler_setpoint
650 MsgId._0E: Code._3EF0, # max_rel_modulation_level (is a PARAM?)
651 MsgId._11: Code._3EF0, # rel_modulation_level (actuator_state, also Code._3EF1)
652 MsgId._12: Code._1300, # ch_water_pressure
653 MsgId._13: Code._12F0, # dhw_flow_rate
654 MsgId._19: Code._3200, # boiler_output_temp
655 MsgId._1A: Code._1260, # dhw_temp
656 MsgId._1B: Code._1290, # outside_temp
657 MsgId._1C: Code._3210, # boiler_return_temp
658 MsgId._38: Code._10A0, # dhw_setpoint (is a PARAM)
659 MsgId._39: Code._1081, # ch_max_setpoint (is a PARAM)
660 }
661 RAMSES_TO_OT: dict[Code, MsgId] = {
662 v: k for k, v in OT_TO_RAMSES.items() if v != Code._3EF0
663 } # also 10A0?
665 def __init__(self, *args: Any, **kwargs: Any) -> None:
666 super().__init__(*args, **kwargs)
668 self._child_id = FC # NOTE: domain_id
670 # TODO(eb): cleanup
671 if self._gwy.msg_db:
672 self._add_record(id=self.id, code=Code._3220, verb="RP")
673 # adds a "sim" RP opentherm_msg to the SQLite MessageIndex with code _3220
674 # causes exc when fetching ALL, when no "real" msg was added to _msgs_. We skip those.
675 else:
676 self._msgz[Code._3220] = {RP: {}} # No ctx! (not None)
678 # lf._use_ot = self._gwy.config.use_native_ot
679 self._msgs_ot: dict[MsgId, Message] = {}
680 # lf._msgs_ot_ctl_polled = {}
682 def _setup_discovery_cmds(self) -> None:
683 def which_cmd(use_native_ot: str, msg_id: MsgId) -> Command | None:
684 """Create a OT cmd, or its RAMSES equivalent, depending."""
685 # we know RQ|3220 is an option, question is: use that, or RAMSES or nothing?
686 if use_native_ot in ("always", "prefer"):
687 return Command.get_opentherm_data(self.id, msg_id)
688 if msg_id in self.OT_TO_RAMSES: # is: in ("avoid", "never")
689 return Command.from_attrs(RQ, self.id, self.OT_TO_RAMSES[msg_id], "00")
690 if use_native_ot == "avoid":
691 return Command.get_opentherm_data(self.id, msg_id)
692 return None # use_native_ot == "never"
694 super()._setup_discovery_cmds()
696 # always send at least one of RQ|3EF0 or RQ|3220|00 (status)
697 if self._gwy.config.use_native_ot != "never":
698 self._add_discovery_cmd(Command.get_opentherm_data(self.id, MsgId._00), 60)
700 if self._gwy.config.use_native_ot != "always":
701 self._add_discovery_cmd(
702 Command.from_attrs(RQ, self.id, Code._3EF0, "00"), 60
703 )
704 self._add_discovery_cmd( # NOTE: this code is a WIP
705 Command.from_attrs(RQ, self.id, Code._2401, "00"), 60
706 )
708 for data_id in SCHEMA_DATA_IDS: # From OT v2.2: version numbers
709 if cmd := which_cmd(self._gwy.config.use_native_ot, _to_msg_id(data_id)):
710 self._add_discovery_cmd(cmd, 6 * 3600, delay=180)
712 for data_id in PARAMS_DATA_IDS: # params or L/T state
713 if cmd := which_cmd(self._gwy.config.use_native_ot, _to_msg_id(data_id)):
714 self._add_discovery_cmd(cmd, 3600, delay=90)
716 for data_id in STATUS_DATA_IDS: # except "00", see above
717 if data_id == 0x00:
718 continue
719 if cmd := which_cmd(self._gwy.config.use_native_ot, _to_msg_id(data_id)):
720 self._add_discovery_cmd(cmd, 300, delay=15)
722 if _DBG_EXTRA_OTB_DISCOVERY: # TODO: these are WIP, but do vary in payload
723 for code in (
724 Code._2401, # WIP - modulation_level + flags?
725 Code._3221, # R8810A/20A
726 Code._3223, # R8810A/20A
727 ):
728 self._add_discovery_cmd(Command.from_attrs(RQ, self.id, code, "00"), 60)
730 if _DBG_EXTRA_OTB_DISCOVERY: # TODO: these are WIP, appear FIXED in payload
731 for code in (
732 Code._0150, # payload always "000000", R8820A only?
733 Code._1098, # payload always "00C8", R8820A only?
734 Code._10B0, # payload always "0000", R8820A only?
735 Code._1FD0, # payload always "0000000000000000"
736 Code._2400, # payload always "0000000F"
737 Code._2410, # payload always "000000000000000000000000010000000100000C"
738 Code._2420, # payload always "0000001000000...
739 ): # TODO: to test against BDR91T
740 self._add_discovery_cmd(
741 Command.from_attrs(RQ, self.id, code, "00"), 300
742 )
744 def _handle_msg(self, msg: Message) -> None:
745 super()._handle_msg(msg)
747 if msg.verb not in (I_, RP):
748 return
750 if msg.code == Code._3220:
751 self._handle_3220(msg)
752 elif msg.code in self.RAMSES_TO_OT:
753 self._handle_code(msg)
755 def _handle_3220(self, msg: Message) -> None:
756 """Handle 3220-based messages."""
758 # NOTE: Reserved msgs have null data, but that msg_id may later be OK!
759 if msg.payload[SZ_MSG_TYPE] == OtMsgType.RESERVED:
760 return
762 # NOTE: Some msgs have invalid data, but that msg_id may later be OK!
763 if msg.payload.get(SZ_VALUE) is None:
764 return
766 # msg_id is int in msg payload/opentherm.py, but MsgId (str) in this module
767 msg_id = _to_msg_id(msg.payload[SZ_MSG_ID])
768 self._msgs_ot[msg_id] = msg
770 if not _DBG_ENABLE_DEPRECATION: # FIXME: data gaps
771 return
773 reset = msg.payload[SZ_MSG_TYPE] not in (
774 OtMsgType.DATA_INVALID,
775 OtMsgType.UNKNOWN_DATAID,
776 OtMsgType.RESERVED, # but some are ?always reserved
777 )
778 self._deprecate_code_ctx(msg._pkt, ctx=msg_id, reset=reset)
780 def _handle_code(self, msg: Message) -> None:
781 """Handle non-3220-based messages."""
783 if msg.code == Code._3EF0 and msg.verb == I_:
784 # NOTE: this is development/discovery code # chasing flags
785 # self._send_cmd(
786 # Command.get_opentherm_data(self.id, MsgId._00), **QOS_MID
787 # ) # FIXME: deprecate QoS in kwargs
788 return
790 if msg.code in (Code._10A0, Code._3EF1):
791 return
793 if not _DBG_ENABLE_DEPRECATION: # FIXME: data gaps
794 return
796 # TODO: can be temporarily 7FFF?
797 if msg._pkt.payload[2:] == "7FFF" or (
798 msg.code == Code._1300 and msg._pkt.payload[2:] == "09F6"
799 ): # latter is CH water pressure
800 self._deprecate_code_ctx(msg._pkt)
801 else:
802 self._deprecate_code_ctx(msg._pkt, reset=True)
804 def _ot_msg_flag(self, msg_id: MsgId, flag_idx: int) -> bool | None:
805 flags: list = self._ot_msg_value(msg_id)
806 return bool(flags[flag_idx]) if flags else None
808 @staticmethod
809 def _ot_msg_name(msg: Message) -> str: # TODO: remove
810 return (
811 msg.payload[SZ_MSG_NAME]
812 if isinstance(msg.payload[SZ_MSG_NAME], str)
813 else f"{msg.payload[SZ_MSG_ID]:02X}"
814 )
816 def _ot_msg_value(self, msg_id: MsgId) -> int | float | list | None:
817 # data_id = int(msg_id, 16)
818 if (msg := self._msgs_ot.get(msg_id)) and not msg._expired:
819 # TODO: value_hb/_lb
820 return msg.payload.get(SZ_VALUE) # type: ignore[no-any-return]
821 return None
823 def _result_by_callback(
824 self, cbk_ot: Callable | None, cbk_ramses: Callable | None
825 ) -> Any | None:
826 """Return a value using OpenTherm or RAMSES as per `config.use_native_ot`."""
828 if self._gwy.config.use_native_ot == "always":
829 return cbk_ot() if cbk_ot else None
830 if self._gwy.config.use_native_ot == "prefer":
831 if cbk_ot and (result := cbk_ot()) is not None:
832 return result
834 result_ramses = cbk_ramses() if cbk_ramses is not None else None
835 if self._gwy.config.use_native_ot == "avoid" and result_ramses is None:
836 return cbk_ot() if cbk_ot else None
837 return result_ramses # incl. use_native_ot == "never"
839 def _result_by_lookup(
840 self,
841 code: Code,
842 /,
843 *,
844 key: str,
845 ) -> Any | None:
846 """Return a value using OpenTherm or RAMSES as per `config.use_native_ot`."""
847 # assert code in self.RAMSES_TO_OT and kwargs.get("key"):
849 if self._gwy.config.use_native_ot == "always":
850 return self._ot_msg_value(self.RAMSES_TO_OT[code])
852 if self._gwy.config.use_native_ot == "prefer":
853 if (result_ot := self._ot_msg_value(self.RAMSES_TO_OT[code])) is not None:
854 return result_ot
856 result_ramses = self._msg_value(code, key=key)
857 if self._gwy.config.use_native_ot == "avoid" and result_ramses is None:
858 return self._ot_msg_value(self.RAMSES_TO_OT[code])
860 return result_ramses # incl. use_native_ot == "never"
862 def _result_by_value(
863 self, result_ot: Any | None, result_ramses: Any | None
864 ) -> Any | None:
865 """Return a value using OpenTherm or RAMSES as per `config.use_native_ot`."""
866 #
868 if self._gwy.config.use_native_ot == "always":
869 return result_ot
871 if self._gwy.config.use_native_ot == "prefer":
872 if result_ot is not None:
873 return result_ot
875 #
876 elif self._gwy.config.use_native_ot == "avoid" and result_ramses is None:
877 return result_ot
879 return result_ramses # incl. use_native_ot == "never"
881 @property # TODO
882 def bit_2_4(self) -> bool | None: # 2401 - WIP
883 return self._msg_flag(Code._2401, "_flags_2", 4)
885 @property # TODO
886 def bit_2_5(self) -> bool | None: # 2401 - WIP
887 return self._msg_flag(Code._2401, "_flags_2", 5)
889 @property # TODO
890 def bit_2_6(self) -> bool | None: # 2401 - WIP
891 return self._msg_flag(Code._2401, "_flags_2", 6)
893 @property # TODO
894 def bit_2_7(self) -> bool | None: # 2401 - WIP
895 return self._msg_flag(Code._2401, "_flags_2", 7)
897 @property # TODO
898 def bit_3_7(self) -> bool | None: # 3EF0 (byte 3, only OTB)
899 return self._msg_flag(Code._3EF0, "_flags_3", 7)
901 @property # TODO
902 def bit_6_6(self) -> bool | None: # 3EF0 ?dhw_enabled (byte 3, only R8820A?)
903 return self._msg_flag(Code._3EF0, "_flags_6", 6)
905 @property # TODO
906 def percent(self) -> float | None: # 2401 - WIP (~3150|FC)
907 return self._msg_value(Code._2401, key=SZ_HEAT_DEMAND)
909 @property # TODO
910 def value(self) -> int | None: # 2401 - WIP
911 return self._msg_value(Code._2401, key="_value_2")
913 @property
914 def boiler_output_temp(self) -> float | None: # 3220|19, or 3200
915 # _LOGGER.warning(
916 # "code=%s, 3220=%s, both=%s",
917 # self._msg_value(Code._3200, key=SZ_TEMPERATURE),
918 # self._ot_msg_value(str(self.RAMSES_TO_OT[Code._3200])),
919 # self._result_by_lookup(Code._3200, key=SZ_TEMPERATURE),
920 # )
922 return self._result_by_lookup(Code._3200, key=SZ_TEMPERATURE)
924 @property
925 def boiler_return_temp(self) -> float | None: # 3220|1C, or 3210
926 return self._result_by_lookup(Code._3210, key=SZ_TEMPERATURE)
928 @property
929 def boiler_setpoint(self) -> float | None: # 3220|01, or 22D9
930 return self._result_by_lookup(Code._22D9, key=SZ_SETPOINT)
932 @property
933 def ch_max_setpoint(self) -> float | None: # 3220|39, or 1081
934 return self._result_by_lookup(Code._1081, key=SZ_SETPOINT)
936 @property # TODO: no OT equivalent
937 def ch_setpoint(self) -> float | None: # 3EF0 (byte 7, only R8820A?)
938 return self._result_by_value(
939 None, self._msg_value(Code._3EF0, key=SZ_CH_SETPOINT)
940 )
942 @property
943 def ch_water_pressure(self) -> float | None: # 3220|12, or 1300
944 return self._result_by_lookup(Code._1300, key=SZ_PRESSURE)
946 @property
947 def dhw_flow_rate(self) -> float | None: # 3220|13, or 12F0
948 return self._result_by_lookup(Code._12F0, key=SZ_DHW_FLOW_RATE)
950 @property
951 def dhw_setpoint(self) -> float | None: # 3220|38, or 10A0
952 return self._result_by_lookup(Code._10A0, key=SZ_SETPOINT)
954 @property
955 def dhw_temp(self) -> float | None: # 3220|1A, or 1260
956 return self._result_by_lookup(Code._1260, key=SZ_TEMPERATURE)
958 @property # TODO: no reliable OT equivalent?
959 def max_rel_modulation(self) -> float | None: # 3220|0E, or 3EF0 (byte 8)
960 if self._gwy.config.use_native_ot == "prefer": # HACK: there'll always be 3EF0
961 return self._msg_value(Code._3EF0, key=SZ_MAX_REL_MODULATION)
962 return self._result_by_value(
963 self._ot_msg_value(MsgId._0E), # NOTE: not reliable?
964 self._msg_value(Code._3EF0, key=SZ_MAX_REL_MODULATION),
965 )
967 @property
968 def oem_code(self) -> float | None: # 3220|73, no known RAMSES equivalent
969 return self._ot_msg_value(MsgId._73)
971 @property
972 def outside_temp(self) -> float | None: # 3220|1B, 1290
973 return self._result_by_lookup(Code._1290, key=SZ_TEMPERATURE)
975 @property # TODO: no reliable OT equivalent?
976 def rel_modulation_level(self) -> float | None: # 3220|11, or 3EF0/3EF1
977 if self._gwy.config.use_native_ot == "prefer": # HACK: there'll always be 3EF0
978 return self._msg_value((Code._3EF0, Code._3EF1), key=self.MODULATION_LEVEL)
979 return self._result_by_value(
980 self._ot_msg_value(MsgId._11), # NOTE: not reliable?
981 self._msg_value((Code._3EF0, Code._3EF1), key=self.MODULATION_LEVEL),
982 )
984 @property # TODO: no reliable OT equivalent?
985 def ch_active(self) -> bool | None: # 3220|00, or 3EF0 (byte 3)
986 if self._gwy.config.use_native_ot == "prefer": # HACK: there'll always be 3EF0
987 return self._msg_value(Code._3EF0, key=SZ_CH_ACTIVE)
988 return self._result_by_value(
989 self._ot_msg_flag(MsgId._00, 8 + 1), # NOTE: not reliable?
990 self._msg_value(Code._3EF0, key=SZ_CH_ACTIVE),
991 )
993 @property # TODO: no reliable OT equivalent?
994 def ch_enabled(self) -> bool | None: # 3220|00, or 3EF0 (byte 6)
995 if self._gwy.config.use_native_ot == "prefer": # HACK: there'll always be 3EF0
996 return self._msg_value(Code._3EF0, key=SZ_CH_ENABLED)
997 return self._result_by_value(
998 self._ot_msg_flag(MsgId._00, 0), # NOTE: not reliable?
999 self._msg_value(Code._3EF0, key=SZ_CH_ENABLED),
1000 )
1002 @property
1003 def cooling_active(self) -> bool | None: # 3220|00, TODO: no known RAMSES
1004 return self._result_by_value(self._ot_msg_flag(MsgId._00, 8 + 4), None)
1006 @property
1007 def cooling_enabled(self) -> bool | None: # 3220|00, TODO: no known RAMSES
1008 return self._result_by_value(self._ot_msg_flag(MsgId._00, 2), None)
1010 @property # TODO: no reliable OT equivalent?
1011 def dhw_active(self) -> bool | None: # 3220|00, or 3EF0 (byte 3)
1012 if self._gwy.config.use_native_ot == "prefer": # HACK: there'll always be 3EF0
1013 return self._msg_value(Code._3EF0, key=SZ_DHW_ACTIVE)
1014 return self._result_by_value(
1015 self._ot_msg_flag(MsgId._00, 8 + 2), # NOTE: not reliable?
1016 self._msg_value(Code._3EF0, key=SZ_DHW_ACTIVE),
1017 )
1019 @property
1020 def dhw_blocking(self) -> bool | None: # 3220|00, TODO: no known RAMSES
1021 return self._result_by_value(self._ot_msg_flag(MsgId._00, 6), None)
1023 @property
1024 def dhw_enabled(self) -> bool | None: # 3220|00, TODO: no known RAMSES
1025 return self._result_by_value(self._ot_msg_flag(MsgId._00, 1), None)
1027 @property
1028 def fault_present(self) -> bool | None: # 3220|00, TODO: no known RAMSES
1029 return self._result_by_value(self._ot_msg_flag(MsgId._00, 8), None)
1031 @property # TODO: no reliable OT equivalent?
1032 def flame_active(self) -> bool | None: # 3220|00, or 3EF0 (byte 3)
1033 if self._gwy.config.use_native_ot == "prefer": # HACK: there'll always be 3EF0
1034 return self._msg_value(Code._3EF0, key="flame_on")
1035 return self._result_by_value(
1036 self._ot_msg_flag(MsgId._00, 8 + 3), # NOTE: not reliable?
1037 self._msg_value(Code._3EF0, key="flame_on"),
1038 )
1040 @property
1041 def otc_active(self) -> bool | None: # 3220|00, TODO: no known RAMSES
1042 return self._result_by_value(self._ot_msg_flag(MsgId._00, 3), None)
1044 @property
1045 def summer_mode(self) -> bool | None: # 3220|00, TODO: no known RAMSES
1046 return self._result_by_value(self._ot_msg_flag(MsgId._00, 5), None)
1048 @property
1049 def opentherm_schema(self) -> dict[str, Any]:
1050 result: dict[str, Any] = {
1051 self._ot_msg_name(v): v.payload
1052 for k, v in self._msgs_ot.items()
1053 if self._supported_cmds_ctx.get(k) and int(k, 16) in SCHEMA_DATA_IDS
1054 }
1055 return {
1056 m: {k: v for k, v in p.items() if k.startswith(SZ_VALUE)}
1057 for m, p in result.items()
1058 }
1060 @property
1061 def opentherm_counters(self) -> dict[str, Any]: # all are U16
1062 return {
1063 SZ_BURNER_HOURS: self._ot_msg_value(MsgId._78),
1064 SZ_BURNER_STARTS: self._ot_msg_value(MsgId._74),
1065 SZ_BURNER_FAILED_STARTS: self._ot_msg_value(MsgId._71),
1066 SZ_CH_PUMP_HOURS: self._ot_msg_value(MsgId._79),
1067 SZ_CH_PUMP_STARTS: self._ot_msg_value(MsgId._75),
1068 SZ_DHW_BURNER_HOURS: self._ot_msg_value(MsgId._7B),
1069 SZ_DHW_BURNER_STARTS: self._ot_msg_value(MsgId._77),
1070 SZ_DHW_PUMP_HOURS: self._ot_msg_value(MsgId._7A),
1071 SZ_DHW_PUMP_STARTS: self._ot_msg_value(MsgId._76),
1072 SZ_FLAME_SIGNAL_LOW: self._ot_msg_value(MsgId._72),
1073 } # 0x73 is not a counter: is OEM diagnostic code...
1075 @property
1076 def opentherm_params(self) -> dict[str, Any]: # F8_8, U8, {"hb": S8, "lb": S8}
1077 result = {
1078 self._ot_msg_name(v): v.payload
1079 for k, v in self._msgs_ot.items()
1080 if self._supported_cmds_ctx.get(k) and int(k, 16) in PARAMS_DATA_IDS
1081 }
1082 return {
1083 m: {k: v for k, v in p.items() if k.startswith(SZ_VALUE)}
1084 for m, p in result.items()
1085 }
1087 @property
1088 def opentherm_status(self) -> dict[str, Any]: # F8_8, U16 (only OEM_CODE) or bool
1089 return { # most these are in: STATUS_DATA_IDS
1090 SZ_BOILER_OUTPUT_TEMP: self._ot_msg_value(MsgId._19),
1091 SZ_BOILER_RETURN_TEMP: self._ot_msg_value(MsgId._1C),
1092 SZ_BOILER_SETPOINT: self._ot_msg_value(MsgId._01),
1093 # SZ_CH_MAX_SETPOINT: self._ot_msg_value(MsgId._39), # in PARAMS_DATA_IDS
1094 SZ_CH_WATER_PRESSURE: self._ot_msg_value(MsgId._12),
1095 SZ_DHW_FLOW_RATE: self._ot_msg_value(MsgId._13),
1096 # SZ_DHW_SETPOINT: self._ot_msg_value(MsgId._38), # in PARAMS_DATA_IDS
1097 SZ_DHW_TEMP: self._ot_msg_value(MsgId._1A),
1098 SZ_OEM_CODE: self._ot_msg_value(MsgId._73),
1099 SZ_OUTSIDE_TEMP: self._ot_msg_value(MsgId._1B),
1100 SZ_REL_MODULATION_LEVEL: self._ot_msg_value(MsgId._11),
1101 #
1102 # SZ...: self._ot_msg_value(MsgId._05), # in STATUS_DATA_IDS
1103 # SZ...: self._ot_msg_value(MsgId._18), # in STATUS_DATA_IDS
1104 #
1105 SZ_CH_ACTIVE: self._ot_msg_flag(MsgId._00, 8 + 1),
1106 SZ_CH_ENABLED: self._ot_msg_flag(MsgId._00, 0),
1107 SZ_COOLING_ACTIVE: self._ot_msg_flag(MsgId._00, 8 + 4),
1108 SZ_COOLING_ENABLED: self._ot_msg_flag(MsgId._00, 2),
1109 SZ_DHW_ACTIVE: self._ot_msg_flag(MsgId._00, 8 + 2),
1110 SZ_DHW_BLOCKING: self._ot_msg_flag(MsgId._00, 6),
1111 SZ_DHW_ENABLED: self._ot_msg_flag(MsgId._00, 1),
1112 SZ_FAULT_PRESENT: self._ot_msg_flag(MsgId._00, 8),
1113 SZ_FLAME_ACTIVE: self._ot_msg_flag(MsgId._00, 8 + 3),
1114 SZ_SUMMER_MODE: self._ot_msg_flag(MsgId._00, 5),
1115 SZ_OTC_ACTIVE: self._ot_msg_flag(MsgId._00, 3),
1116 }
1118 @property
1119 def ramses_schema(self) -> PayDictT.EMPTY:
1120 return {}
1122 @property
1123 def ramses_params(self) -> dict[str, float | None]:
1124 return {
1125 SZ_MAX_REL_MODULATION: self.max_rel_modulation,
1126 }
1128 @property
1129 def ramses_status(self) -> dict[str, Any]:
1130 return {
1131 SZ_BOILER_OUTPUT_TEMP: self._msg_value(Code._3200, key=SZ_TEMPERATURE),
1132 SZ_BOILER_RETURN_TEMP: self._msg_value(Code._3210, key=SZ_TEMPERATURE),
1133 SZ_BOILER_SETPOINT: self._msg_value(Code._22D9, key=SZ_SETPOINT),
1134 SZ_CH_MAX_SETPOINT: self._msg_value(Code._1081, key=SZ_SETPOINT),
1135 SZ_CH_SETPOINT: self._msg_value(Code._3EF0, key=SZ_CH_SETPOINT),
1136 SZ_CH_WATER_PRESSURE: self._msg_value(Code._1300, key=SZ_PRESSURE),
1137 SZ_DHW_FLOW_RATE: self._msg_value(Code._12F0, key=SZ_DHW_FLOW_RATE),
1138 SZ_DHW_SETPOINT: self._msg_value(Code._1300, key=SZ_SETPOINT),
1139 SZ_DHW_TEMP: self._msg_value(Code._1260, key=SZ_TEMPERATURE),
1140 SZ_OUTSIDE_TEMP: self._msg_value(Code._1290, key=SZ_TEMPERATURE),
1141 SZ_REL_MODULATION_LEVEL: self._msg_value(
1142 (Code._3EF0, Code._3EF1), key=self.MODULATION_LEVEL
1143 ),
1144 #
1145 SZ_CH_ACTIVE: self._msg_value(Code._3EF0, key=SZ_CH_ACTIVE),
1146 SZ_CH_ENABLED: self._msg_value(Code._3EF0, key=SZ_CH_ENABLED),
1147 SZ_DHW_ACTIVE: self._msg_value(Code._3EF0, key=SZ_DHW_ACTIVE),
1148 SZ_FLAME_ACTIVE: self._msg_value(Code._3EF0, key=SZ_FLAME_ACTIVE),
1149 }
1151 @property
1152 def traits(self) -> dict[str, Any]:
1153 return {
1154 **super().traits,
1155 "opentherm_traits": self.supported_cmds_ot,
1156 "ramses_ii_traits": self.supported_cmds,
1157 }
1159 @property
1160 def schema(self) -> dict[str, Any]:
1161 return {
1162 **super().schema,
1163 "opentherm_schema": self.opentherm_schema,
1164 "ramses_ii_schema": self.ramses_schema,
1165 }
1167 @property
1168 def params(self) -> dict[str, Any]:
1169 return {
1170 **super().params,
1171 "opentherm_params": self.opentherm_params,
1172 "ramses_ii_params": self.ramses_params,
1173 }
1175 @property
1176 def status(self) -> dict[str, Any]:
1177 return {
1178 **super().status, # incl. actuator_cycle, actuator_state
1179 #
1180 SZ_BOILER_OUTPUT_TEMP: self.boiler_output_temp,
1181 SZ_BOILER_RETURN_TEMP: self.boiler_return_temp,
1182 SZ_BOILER_SETPOINT: self.boiler_setpoint,
1183 SZ_CH_SETPOINT: self.ch_setpoint,
1184 SZ_CH_MAX_SETPOINT: self.ch_max_setpoint,
1185 SZ_CH_WATER_PRESSURE: self.ch_water_pressure,
1186 SZ_DHW_FLOW_RATE: self.dhw_flow_rate,
1187 SZ_DHW_SETPOINT: self.dhw_setpoint,
1188 SZ_DHW_TEMP: self.dhw_temp,
1189 SZ_OEM_CODE: self.oem_code,
1190 SZ_OUTSIDE_TEMP: self.outside_temp,
1191 SZ_REL_MODULATION_LEVEL: self.rel_modulation_level,
1192 #
1193 SZ_CH_ACTIVE: self.ch_active,
1194 SZ_CH_ENABLED: self.ch_enabled,
1195 SZ_COOLING_ACTIVE: self.cooling_active,
1196 SZ_COOLING_ENABLED: self.cooling_enabled,
1197 SZ_DHW_ACTIVE: self.dhw_active,
1198 SZ_DHW_BLOCKING: self.dhw_blocking,
1199 SZ_DHW_ENABLED: self.dhw_enabled,
1200 SZ_FAULT_PRESENT: self.fault_present,
1201 SZ_FLAME_ACTIVE: self.flame_active,
1202 SZ_SUMMER_MODE: self.summer_mode,
1203 SZ_OTC_ACTIVE: self.otc_active,
1204 #
1205 # "status_opentherm": self.opentherm_status,
1206 # "status_ramses_ii": self.ramses_status,
1207 }
1210class Thermostat(BatteryState, Setpoint, Temperature, Fakeable): # THM (..):
1211 """The THM/STA class, such as a TR87RF."""
1213 _SLUG = DevType.THM
1214 _STATE_ATTR = SZ_TEMPERATURE
1216 def _handle_msg(self, msg: Message) -> None:
1217 super()._handle_msg(msg)
1219 if msg.verb != I_ or self._iz_controller is not None:
1220 return
1222 # NOTE: this has only been tested on a 12:, does it work for a 34: too?
1223 if all(
1224 (
1225 msg._addrs[0] is self.addr,
1226 msg._addrs[1] is NON_DEV_ADDR,
1227 msg._addrs[2] is self.addr,
1228 )
1229 ):
1230 if self._iz_controller is None:
1231 # _LOGGER.info(f"{msg!r} # IS_CONTROLLER (10): is FALSE")
1232 self._iz_controller = False
1233 elif self._iz_controller: # TODO: raise CorruptStateError
1234 _LOGGER.error(f"{msg!r} # IS_CONTROLLER (11): was TRUE, now False")
1236 if msg.code in CODES_ONLY_FROM_CTL: # TODO: raise CorruptPktError
1237 _LOGGER.error(f"{msg!r} # IS_CONTROLLER (12); is CORRUPT PKT")
1239 elif all(
1240 (
1241 msg._addrs[0] is NON_DEV_ADDR,
1242 msg._addrs[1] is NON_DEV_ADDR,
1243 msg._addrs[2] is self.addr,
1244 )
1245 ):
1246 if self._iz_controller is None:
1247 # _LOGGER.info(f"{msg!r} # IS_CONTROLLER (20): is TRUE")
1248 self._iz_controller = msg
1249 self._make_tcs_controller(msg=msg)
1250 elif self._iz_controller is False: # TODO: raise CorruptStateError
1251 _LOGGER.error(f"{msg!r} # IS_CONTROLLER (21): was FALSE, now True")
1253 async def initiate_binding_process(self) -> Packet:
1254 return await super()._initiate_binding_process(
1255 (Code._2309, Code._30C9, Code._0008)
1256 )
1259class BdrSwitch(Actuator, RelayDemand): # BDR (13):
1260 """The BDR class, such as a BDR91.
1262 BDR91s can be used in six distinct modes, including:
1264 - x2 boiler controller (FC/TPI): either traditional, or newer heat pump-aware
1265 - x1 electric heat zones (0x/ELE)
1266 - x1 zone valve zones (0x/VAL)
1267 - x2 DHW thingys (F9/DHW, FA/DHW)
1268 """
1270 ACTIVE: Final = "active"
1271 TPI_PARAMS: Final = "tpi_params"
1273 _SLUG = DevType.BDR
1274 _STATE_ATTR = "active"
1276 # def __init__(self, *args: Any, **kwargs: Any) -> None:
1277 # super().__init__(*args, **kwargs)
1279 # if kwargs.get(SZ_DOMAIN_ID) == FC: # TODO: F9/FA/FC, zone_idx
1280 # self.ctl._set_app_cntrl(self)
1282 def _setup_discovery_cmds(self) -> None:
1283 """Discover BDRs.
1285 The BDRs have one of six roles:
1286 - heater relay *or* a heat pump relay (alternative to an OTB)
1287 - DHW hot water valve *or* DHW heating valve
1288 - Zones: Electric relay *or* Zone valve relay
1290 They all seem to respond thus (TODO: heat pump/zone valve relay):
1291 - all BDR91As will (erractically) RP to these RQs
1292 0016, 1FC9 & 0008, 1100, 3EF1
1293 - all BDR91As will *not* RP to these RQs
1294 0009, 10E0, 3B00, 3EF0
1295 - a BDR91A will *periodically* send an I/3B00/00C8 if it is the heater relay
1296 """
1298 super()._setup_discovery_cmds()
1300 if self.is_faked:
1301 return
1303 self._add_discovery_cmd(Command.get_tpi_params(self.id), 6 * 3600) # params
1304 self._add_discovery_cmd(
1305 Command.from_attrs(RQ, self.id, Code._3EF1, "00"),
1306 60 if self._child_id in (F9, FA, FC) else 300,
1307 ) # status
1309 @property
1310 def active(self) -> bool | None: # 3EF0, 3EF1
1311 """Return the actuator's current state."""
1312 result = self._msg_value((Code._3EF0, Code._3EF1), key=self.MODULATION_LEVEL)
1313 return None if result is None else bool(result)
1315 @property
1316 def role(self) -> str | None:
1317 """Return the role of the BDR91A (there are six possibilities)."""
1319 # TODO: use self._parent?
1320 if self._child_id in DOMAIN_TYPE_MAP:
1321 return DOMAIN_TYPE_MAP[self._child_id]
1322 elif self._parent and isinstance(self._parent, Zone):
1323 # TODO: remove need for isinstance
1324 return self._parent.heating_type
1326 # if Code._3B00 in _msgs and _msgs[Code._3B00].verb == I_:
1327 # self._is_tpi = True
1328 # if Code._1FC9 in _msgs and _msgs[Code._1FC9].verb == RP:
1329 # if Code._3B00 in _msgs[Code._1FC9].raw_payload:
1330 # self._is_tpi = True
1332 return None
1334 @property
1335 def tpi_params(self) -> PayDictT._10A0 | None:
1336 return self._msg_value(Code._1100)
1338 @property
1339 def schema(self) -> dict[str, Any]:
1340 return {
1341 **super().schema,
1342 "role": self.role,
1343 }
1345 @property
1346 def params(self) -> dict[str, Any]:
1347 return {
1348 **super().params,
1349 self.TPI_PARAMS: self.tpi_params,
1350 }
1352 @property
1353 def status(self) -> dict[str, Any]:
1354 return {
1355 **super().status,
1356 self.ACTIVE: self.active,
1357 }
1360class TrvActuator(BatteryState, HeatDemand, Setpoint, Temperature): # TRV (04):
1361 """The TRV class, such as a HR92."""
1363 WINDOW_OPEN: Final = SZ_WINDOW_OPEN
1365 _SLUG = DevType.TRV
1366 _STATE_ATTR = SZ_HEAT_DEMAND
1368 @property
1369 def heat_demand(self) -> float | None: # 3150
1370 if (heat_demand := super().heat_demand) is None:
1371 if self._msg_value(Code._3150) is None and self.setpoint is False:
1372 return 0 # instead of None (no 3150s sent when setpoint is False)
1373 return heat_demand
1375 @property
1376 def window_open(self) -> bool | None: # 12B0
1377 return self._msg_value(Code._12B0, key=self.WINDOW_OPEN)
1379 @property
1380 def status(self) -> dict[str, Any]:
1381 return {
1382 **super().status,
1383 self.WINDOW_OPEN: self.window_open,
1384 }
1387class JimDevice(Actuator): # BDR (08):
1388 _SLUG: str = DevType.JIM
1389 _STATE_ATTR = None
1392class JstDevice(RelayDemand): # BDR (31):
1393 _SLUG: str = DevType.JST
1394 _STATE_ATTR = None
1397class UfhCircuit(Child, Entity): # FIXME
1398 """The UFH circuit class (UFC:circuit is much like CTL/TCS:zone).
1400 NOTE: for circuits, there's a difference between :
1401 - `self.ctl`: the UFH controller, and
1402 - `self.tcs.ctl`: the Evohome controller
1403 """
1405 _SLUG: str = None
1406 _STATE_ATTR = None
1408 def __init__(self, ufc: UfhController, ufh_idx: str) -> None:
1409 super().__init__(ufc._gwy)
1411 # FIXME: gwy.msg_db entities must know their parent device ID and their own idx
1412 self._z_id = ufc.id
1413 self._z_idx = ufh_idx
1415 self.id: str = f"{ufc.id}_{ufh_idx}"
1417 self.ufc: UfhController = ufc
1418 self._child_id = ufh_idx
1420 # TODO: _ctl should be: .ufc? .ctl?
1421 self._ctl: Controller = None
1422 self._zone: Zone | None = None
1424 # def __str__(self) -> str:
1425 # return f"{self.id} ({self._zone and self._zone._child_id})"
1427 def _update_schema(self, **kwargs: Any) -> None:
1428 raise NotImplementedError
1430 def _handle_msg(self, msg: Message) -> None:
1431 super()._handle_msg(msg)
1433 if msg.code != Code._000C or not msg.payload[SZ_DEVICES]: # zone_devices
1434 return
1436 # FIXME: is messy
1437 if not (dev_ids := msg.payload[SZ_DEVICES]):
1438 return
1439 if len(dev_ids) != 1:
1440 raise exc.PacketPayloadInvalid("No devices")
1442 # ctl = self._gwy.device_by_id.get(dev_ids[0])
1443 ctl: Controller = self._gwy.get_device(dev_ids[0])
1444 if not ctl or (self._ctl and self._ctl is not ctl):
1445 raise exc.PacketPayloadInvalid("No CTL")
1446 self._ctl = ctl
1448 ctl._make_tcs_controller()
1449 # self.set_parent(ctl.tcs)
1451 zon = ctl.tcs.get_htg_zone(msg.payload[SZ_ZONE_IDX])
1452 if not zon:
1453 raise exc.PacketPayloadInvalid("No Zone")
1454 if self._zone and self._zone is not zon:
1455 raise exc.PacketPayloadInvalid("Wrong Zone")
1456 self._zone = zon
1458 if self.ufc not in self._zone.actuators:
1459 schema = {SZ_ACTUATORS: [self.ufc.id], SZ_CIRCUITS: [self.id]}
1460 self._zone._update_schema(**schema)
1462 @property
1463 def ufx_idx(self) -> str:
1464 return self._child_id
1466 @property
1467 def zone_idx(self) -> str | None:
1468 if self._zone:
1469 return self._zone._child_id
1470 return None
1473# e.g. {"CTL": Controller}
1474HEAT_CLASS_BY_SLUG: dict[str, type[DeviceHeat]] = class_by_attr(__name__, "_SLUG")
1476_HEAT_VC_PAIR_BY_CLASS = {
1477 DevType.DHW: ((I_, Code._1260),),
1478 DevType.OTB: ((I_, Code._3220), (RP, Code._3220)),
1479}
1482def class_dev_heat(
1483 dev_addr: Address, *, msg: Message | None = None, eavesdrop: bool = False
1484) -> type[DeviceHeat]:
1485 """Return a device class, but only if the device must be from the CH/DHW group.
1487 May return a device class, DeviceHeat (which will need promotion).
1488 """
1490 if dev_addr.type in DEV_TYPE_MAP.THM_DEVICES:
1491 return HEAT_CLASS_BY_SLUG[DevType.THM]
1493 try:
1494 slug = DEV_TYPE_MAP.slug(dev_addr.type)
1495 except KeyError:
1496 pass
1497 else:
1498 return HEAT_CLASS_BY_SLUG[slug]
1500 if not eavesdrop:
1501 raise TypeError(f"No CH/DHW class for: {dev_addr} (no eavesdropping)")
1503 if msg and msg.code in CODES_OF_HEAT_DOMAIN_ONLY:
1504 return DeviceHeat
1506 raise TypeError(f"No CH/DHW class for: {dev_addr} (unknown type: {dev_addr.type})")