Coverage for src/ramses_tx/command.py: 24%
636 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""
3RAMSES RF - a RAMSES-II protocol decoder & analyser.
5This module provides the `Command` class for constructing and managing RAMSES-II protocol
6commands (packets) that are to be sent to HVAC devices. It includes methods for creating
7commands to control various aspects of the heating system including zones, DHW, and fan controls.
9"""
11from __future__ import annotations
13import logging
14import math
15from collections.abc import Iterable
16from datetime import datetime as dt, timedelta as td
17from typing import TYPE_CHECKING, Any, TypeVar
19from . import exceptions as exc
20from .address import (
21 ALL_DEV_ADDR,
22 HGI_DEV_ADDR,
23 NON_DEV_ADDR,
24 Address,
25 dev_id_to_hex_id,
26 pkt_addrs,
27)
28from .const import (
29 DEV_TYPE_MAP,
30 DEVICE_ID_REGEX,
31 FAULT_DEVICE_CLASS,
32 FAULT_STATE,
33 FAULT_TYPE,
34 SYS_MODE_MAP,
35 SZ_DHW_IDX,
36 SZ_MAX_RETRIES,
37 SZ_PRIORITY,
38 SZ_TIMEOUT,
39 ZON_MODE_MAP,
40 FaultDeviceClass,
41 FaultState,
42 FaultType,
43 Priority,
44)
45from .frame import Frame, pkt_header
46from .helpers import (
47 air_quality_code,
48 capability_bits,
49 fan_info_flags,
50 fan_info_to_byte,
51 hex_from_bool,
52 hex_from_double,
53 hex_from_dtm,
54 hex_from_dts,
55 hex_from_percent,
56 hex_from_str,
57 hex_from_temp,
58 timestamp,
59)
60from .opentherm import parity
61from .parsers import LOOKUP_PUZZ
62from .ramses import (
63 _2411_PARAMS_SCHEMA,
64 SZ_DATA_TYPE,
65 SZ_MAX_VALUE,
66 SZ_MIN_VALUE,
67 SZ_PRECISION,
68)
69from .version import VERSION
71from .const import ( # noqa: F401, isort: skip, pylint: disable=unused-import
72 I_,
73 RP,
74 RQ,
75 W_,
76 Code,
77)
78from .const import ( # noqa: F401, isort: skip, pylint: disable=unused-import
79 F9,
80 FA,
81 FC,
82 FF,
83)
86if TYPE_CHECKING:
87 from .const import VerbT
88 from .frame import HeaderT, PayloadT
89 from .schemas import DeviceIdT
92COMMAND_FORMAT = "{:<2} {} {} {} {} {} {:03d} {}"
95DEV_MODE = False
97_LOGGER = logging.getLogger(__name__)
98if DEV_MODE:
99 _LOGGER.setLevel(logging.DEBUG)
102_ZoneIdxT = TypeVar("_ZoneIdxT", int, str)
105class Qos:
106 """The QoS class - this is a mess - it is the first step in cleaning up QoS."""
108 # TODO: this needs work
110 POLL_INTERVAL = 0.002
112 TX_PRIORITY_DEFAULT = Priority.DEFAULT
114 # tx (from sent to gwy, to get back from gwy) seems to takes approx. 0.025s
115 TX_RETRIES_DEFAULT = 2
116 TX_RETRIES_MAX = 5
117 TX_TIMEOUT_DEFAULT = td(seconds=0.2) # 0.20 OK, but too high?
119 RX_TIMEOUT_DEFAULT = td(seconds=0.50) # 0.20 seems OK, 0.10 too low sometimes
121 TX_BACKOFFS_MAX = 2 # i.e. tx_timeout 2 ** MAX_BACKOFF
123 QOS_KEYS = (SZ_PRIORITY, SZ_MAX_RETRIES, SZ_TIMEOUT)
124 # priority, max_retries, rx_timeout, backoff
125 DEFAULT_QOS = (Priority.DEFAULT, TX_RETRIES_DEFAULT, TX_TIMEOUT_DEFAULT, True)
126 DEFAULT_QOS_TABLE = {
127 f"{RQ}|{Code._0016}": (Priority.HIGH, 5, None, True),
128 f"{RQ}|{Code._0006}": (Priority.HIGH, 5, None, True),
129 f"{I_}|{Code._0404}": (Priority.HIGH, 3, td(seconds=0.30), True),
130 f"{RQ}|{Code._0404}": (Priority.HIGH, 3, td(seconds=1.00), True),
131 f"{W_}|{Code._0404}": (Priority.HIGH, 3, td(seconds=1.00), True),
132 f"{RQ}|{Code._0418}": (Priority.LOW, 3, None, None),
133 f"{RQ}|{Code._1F09}": (Priority.HIGH, 5, None, True),
134 f"{I_}|{Code._1FC9}": (Priority.HIGH, 2, td(seconds=1), False),
135 f"{RQ}|{Code._3220}": (Priority.DEFAULT, 1, td(seconds=1.2), False),
136 f"{W_}|{Code._3220}": (Priority.HIGH, 3, td(seconds=1.2), False),
137 } # The long timeout for the OTB is for total RTT to slave (boiler)
139 def __init__(
140 self,
141 *,
142 priority: Priority | None = None, # TODO: deprecate
143 max_retries: int | None = None, # TODO: deprecate
144 timeout: td | None = None, # TODO: deprecate
145 backoff: bool | None = None, # TODO: deprecate
146 ) -> None:
147 self.priority = self.DEFAULT_QOS[0] if priority is None else priority
148 self.retry_limit = self.DEFAULT_QOS[1] if max_retries is None else max_retries
149 self.tx_timeout = self.TX_TIMEOUT_DEFAULT
150 self.rx_timeout = self.DEFAULT_QOS[2] if timeout is None else timeout
151 self.disable_backoff = not (self.DEFAULT_QOS[3] if backoff is None else backoff)
153 self.retry_limit = min(self.retry_limit, Qos.TX_RETRIES_MAX)
155 @classmethod # constructor from verb|code pair
156 def verb_code(cls, verb: VerbT, code: str | Code, **kwargs: Any) -> Qos:
157 """Constructor to create a QoS based upon the defaults for a verb|code pair."""
159 default_qos = cls.DEFAULT_QOS_TABLE.get(f"{verb}|{code}", cls.DEFAULT_QOS)
160 return cls(
161 **{k: kwargs.get(k, default_qos[i]) for i, k in enumerate(cls.QOS_KEYS)}
162 )
165def _check_idx(zone_idx: int | str) -> str:
166 """Validate and normalize a zone index or DHW index.
168 This helper function validates that a zone index is within the valid range
169 and converts it to a consistent string format.
171 :param zone_idx: The zone index to validate. Can be:
172 - int: 0-15 for zones, 0xFA for DHW
173 - str: String representation of the index (hex or 'HW' for DHW)
174 :type zone_idx: int | str
175 :return: The normalized zone index as a 2-character hex string
176 :rtype: str
177 :raises CommandInvalid: If the zone index is invalid
179 .. note::
180 - For DHW (Domestic Hot Water), use 0xFA or 'HW'
181 - For zones, use 0-15 (or '00'-'0F' as hex strings)
182 """
183 # if zone_idx is None:
184 # return "00"
185 if not isinstance(zone_idx, int | str):
186 raise exc.CommandInvalid(f"Invalid value for zone_idx: {zone_idx}")
187 if isinstance(zone_idx, str):
188 zone_idx = FA if zone_idx == "HW" else zone_idx
189 result: int = zone_idx if isinstance(zone_idx, int) else int(zone_idx, 16)
190 if 0 > result > 15 and result != 0xFA:
191 raise exc.CommandInvalid(f"Invalid value for zone_idx: {result}")
192 return f"{result:02X}"
195def _normalise_mode(
196 mode: int | str | None,
197 target: bool | float | None,
198 until: dt | str | None,
199 duration: int | None,
200) -> str:
201 """Validate and normalize a heating mode for zone or DHW control.
203 This helper function ensures the operating mode is valid and consistent
204 with the provided target and timing parameters.
206 :param mode: The operating mode. Can be:
207 - None: Auto-determined from other parameters
208 - int/str: Mode code (see ZON_MODE_MAP for valid values)
209 :type mode: int | str | None
210 :param target: The target value for the mode:
211 - For zone modes: The temperature setpoint
212 - For DHW modes: Active state (True/False)
213 :type target: bool | float | None
214 :param until: The end time for temporary modes
215 :type until: datetime | str | None
216 :param duration: The duration in minutes for countdown modes
217 :type duration: int | None
218 :return: Normalized 2-character hex mode string
219 :rtype: str
220 :raises CommandInvalid: If the parameters are inconsistent or invalid
222 .. note::
223 - If mode is None, it will be determined based on other parameters:
224 - If until is set: TEMPORARY mode
225 - If duration is set: COUNTDOWN mode
226 - Otherwise: PERMANENT mode
227 - The target parameter must be provided for all modes except FOLLOW
228 """
230 if mode is None and target is None:
231 raise exc.CommandInvalid(
232 "Invalid args: One of mode or setpoint/active can't be None"
233 )
234 if until and duration:
235 raise exc.CommandInvalid(
236 "Invalid args: At least one of until or duration must be None"
237 )
239 if mode is None:
240 if until:
241 mode = ZON_MODE_MAP.TEMPORARY
242 elif duration:
243 mode = ZON_MODE_MAP.COUNTDOWN
244 else:
245 mode = ZON_MODE_MAP.PERMANENT # TODO: advanced_override?
246 elif isinstance(mode, int):
247 mode = f"{mode:02X}"
248 if mode not in ZON_MODE_MAP:
249 mode = ZON_MODE_MAP._hex(mode) # type: ignore[arg-type] # may raise KeyError
251 assert isinstance(mode, str) # mypy check
253 if mode != ZON_MODE_MAP.FOLLOW and target is None:
254 raise exc.CommandInvalid(
255 f"Invalid args: For {ZON_MODE_MAP[mode]}, setpoint/active can't be None"
256 )
258 return mode
261def _normalise_until(
262 mode: int | str | None,
263 _: Any,
264 until: dt | str | None,
265 duration: int | None,
266) -> tuple[Any, Any]:
267 """Validate and normalize timing parameters for zone/DHW mode changes.
269 This helper function ensures that the timing parameters (until/duration)
270 are consistent with the specified mode.
272 :param mode: The operating mode (from ZON_MODE_MAP)
273 :type mode: int | str | None
274 :param _: Unused parameter (kept for compatibility with call signatures)
275 :type _: Any
276 :param until: The end time for temporary modes
277 :type until: datetime | str | None
278 :param duration: The duration in minutes for countdown modes
279 :type duration: int | None
280 :return: A tuple of (until, duration) with validated values
281 :rtype: tuple[Any, Any]
282 :raises CommandInvalid: If the timing parameters are inconsistent with the mode
284 .. note::
285 - For TEMPORARY mode: 'until' must be provided, 'duration' must be None
286 - For COUNTDOWN mode: 'duration' must be provided, 'until' must be None
287 - For other modes: Both 'until' and 'duration' must be None
288 - If mode is TEMPORARY and until is None, it will be changed to ADVANCED mode
289 """
290 if mode == ZON_MODE_MAP.TEMPORARY:
291 if duration is not None:
292 raise exc.CommandInvalid(
293 f"Invalid args: For mode={mode}, duration must be None"
294 )
295 if until is None:
296 mode = ZON_MODE_MAP.ADVANCED # or: until = dt.now() + td(hour=1)
298 elif mode in ZON_MODE_MAP.COUNTDOWN:
299 if duration is None:
300 raise exc.CommandInvalid(
301 f"Invalid args: For mode={mode}, duration can't be None"
302 )
303 if until is not None:
304 raise exc.CommandInvalid(
305 f"Invalid args: For mode={mode}, until must be None"
306 )
308 elif until is not None or duration is not None:
309 raise exc.CommandInvalid(
310 f"Invalid args: For mode={mode}, until and duration must both be None"
311 )
313 return until, duration # TODO return updated mode for ZON_MODE_MAP.TEMPORARY ?
316class Command(Frame):
317 """The Command class (packets to be transmitted).
319 They have QoS and/or callbacks (but no RSSI).
320 """
322 def __init__(self, frame: str) -> None:
323 """Create a command from a string (and its meta-attrs)."""
325 try:
326 super().__init__(frame)
327 except exc.PacketInvalid as err:
328 raise exc.CommandInvalid(err.message) from err
330 try:
331 self._validate(strict_checking=False)
332 except exc.PacketInvalid as err:
333 raise exc.CommandInvalid(err.message) from err
335 try:
336 self._validate(strict_checking=True)
337 except exc.PacketInvalid as err:
338 _LOGGER.warning(f"{self} < Command is potentially invalid: {err}")
340 self._rx_header: str | None = None
341 # self._source_entity: Entity | None = None # TODO: is needed?
343 @classmethod # convenience constructor
344 def from_attrs(
345 cls,
346 verb: VerbT,
347 dest_id: DeviceIdT | str,
348 code: Code,
349 payload: PayloadT,
350 *,
351 from_id: DeviceIdT | str | None = None,
352 seqn: int | str | None = None,
353 ) -> Command:
354 """Create a command from its attrs using a destination device_id."""
356 from_id = from_id or HGI_DEV_ADDR.id
358 addrs: tuple[DeviceIdT | str, DeviceIdT | str, DeviceIdT | str]
360 # if dest_id == NUL_DEV_ADDR.id:
361 # addrs = (from_id, dest_id, NON_DEV_ADDR.id)
362 if dest_id == from_id:
363 addrs = (from_id, NON_DEV_ADDR.id, dest_id)
364 else:
365 addrs = (from_id, dest_id, NON_DEV_ADDR.id)
367 return cls._from_attrs(
368 verb,
369 code,
370 payload,
371 addr0=addrs[0],
372 addr1=addrs[1],
373 addr2=addrs[2],
374 seqn=seqn,
375 )
377 @classmethod # generic constructor
378 def _from_attrs(
379 cls,
380 verb: str | VerbT,
381 code: str | Code,
382 payload: PayloadT,
383 *,
384 addr0: DeviceIdT | str | None = None,
385 addr1: DeviceIdT | str | None = None,
386 addr2: DeviceIdT | str | None = None,
387 seqn: int | str | None = None,
388 ) -> Command:
389 """Create a command from its attrs using an address set."""
391 verb = I_ if verb == "I" else W_ if verb == "W" else verb
393 addr0 = addr0 or NON_DEV_ADDR.id
394 addr1 = addr1 or NON_DEV_ADDR.id
395 addr2 = addr2 or NON_DEV_ADDR.id
397 _, _, *addrs = pkt_addrs(" ".join((addr0, addr1, addr2)))
398 # print(pkt_addrs(" ".join((addr0, addr1, addr2))))
400 if seqn is None or seqn in ("", "-", "--", "---"):
401 seqn = "---"
402 elif isinstance(seqn, int):
403 seqn = f"{int(seqn):03d}"
405 frame = " ".join(
406 (
407 verb,
408 seqn,
409 *(a.id for a in addrs),
410 code,
411 f"{int(len(payload) / 2):03d}",
412 payload,
413 )
414 )
416 return cls(frame)
418 @classmethod # used by CLI for -x switch (NB: no len field)
419 def from_cli(cls, cmd_str: str) -> Command:
420 """Create a command from a CLI string (the -x switch).
422 Examples include (whitespace for readability):
423 'RQ 01:123456 1F09 00'
424 'RQ 01:123456 13:123456 3EF0 00'
425 'RQ 07:045960 01:054173 10A0 00137400031C'
426 ' W 123 30:045960 -:- 32:054173 22F1 001374'
427 """
429 parts = cmd_str.upper().split()
430 if len(parts) < 4:
431 raise exc.CommandInvalid(
432 f"Command string is not parseable: '{cmd_str}'"
433 ", format is: verb [seqn] addr0 [addr1 [addr2]] code payload"
434 )
436 verb = parts.pop(0)
437 seqn = "---" if DEVICE_ID_REGEX.ANY.match(parts[0]) else parts.pop(0)
438 payload = parts.pop()[:48]
439 code = parts.pop()
441 addrs: tuple[DeviceIdT | str, DeviceIdT | str, DeviceIdT | str]
443 if not 0 < len(parts) < 4:
444 raise exc.CommandInvalid(f"Command is invalid: '{cmd_str}'")
445 elif len(parts) == 1 and verb == I_:
446 # drs = (cmd[0], NON_DEV_ADDR.id, cmd[0])
447 addrs = (NON_DEV_ADDR.id, NON_DEV_ADDR.id, parts[0])
448 elif len(parts) == 1:
449 addrs = (HGI_DEV_ADDR.id, parts[0], NON_DEV_ADDR.id)
450 elif len(parts) == 2 and parts[0] == parts[1]:
451 addrs = (parts[0], NON_DEV_ADDR.id, parts[1])
452 elif len(parts) == 2:
453 addrs = (parts[0], parts[1], NON_DEV_ADDR.id)
454 else:
455 addrs = (parts[0], parts[1], parts[2])
457 return cls._from_attrs(
458 verb,
459 code,
460 payload,
461 **{f"addr{k}": v for k, v in enumerate(addrs)},
462 seqn=seqn,
463 )
465 def __repr__(self) -> str:
466 """Return an unambiguous string representation of this object."""
467 # e.g.: RQ --- 18:000730 01:145038 --:------ 000A 002 0800 # 000A|RQ|01:145038|08
468 comment = f" # {self._hdr}{f' ({self._ctx})' if self._ctx else ''}"
469 return f"... {self}{comment}"
471 def __str__(self) -> str:
472 """Return a brief readable string representation of this object."""
473 # e.g.: 000A|RQ|01:145038|08
474 return super().__repr__() # TODO: self._hdr
476 @property
477 def tx_header(self) -> HeaderT:
478 """Return the QoS header of this (request) packet."""
480 return self._hdr
482 @property
483 def rx_header(self) -> HeaderT | None:
484 """Return the QoS header of a corresponding response packet (if any)."""
486 if self.tx_header and self._rx_header is None:
487 self._rx_header = pkt_header(self, rx_header=True)
488 return self._rx_header
490 @classmethod # constructor for I|0002 # TODO: trap corrupt temps?
491 def put_weather_temp(cls, dev_id: DeviceIdT | str, temperature: float) -> Command:
492 """Constructor to announce the current temperature of a weather sensor (0002).
494 This is for use by a faked HB85 or similar.
495 """
497 if dev_id[:2] != DEV_TYPE_MAP.OUT:
498 raise exc.CommandInvalid(
499 f"Faked device {dev_id} has an unsupported device type: "
500 f"device_id should be like {DEV_TYPE_MAP.OUT}:xxxxxx"
501 )
503 payload = f"00{hex_from_temp(temperature)}01"
504 return cls._from_attrs(I_, Code._0002, payload, addr0=dev_id, addr2=dev_id)
506 @classmethod # constructor for RQ|0004
507 def get_zone_name(cls, ctl_id: DeviceIdT | str, zone_idx: _ZoneIdxT) -> Command:
508 """Get the name of a zone. (c.f. parser_0004)
510 This method constructs a command to request the name of a specific zone
511 from the controller.
513 :param ctl_id: The device ID of the controller
514 :type ctl_id: DeviceIdT | str
515 :param zone_idx: The index of the zone (00-31)
516 :type zone_idx: _ZoneIdxT
517 :return: A Command object for the RQ|0004 message
518 :rtype: Command
520 .. note::
521 The zone name is typically a user-assigned identifier for the zone,
522 such as "Living Room" or "Bedroom 1".
523 """
524 return cls.from_attrs(RQ, ctl_id, Code._0004, f"{_check_idx(zone_idx)}00")
526 @classmethod # constructor for W|0004
527 def set_zone_name(
528 cls, ctl_id: DeviceIdT | str, zone_idx: _ZoneIdxT, name: str
529 ) -> Command:
530 """Set the name of a zone. (c.f. parser_0004)
532 This method constructs a command to set the name of a specific zone
533 on the controller. The name will be truncated to 20 characters (40 hex digits).
535 :param ctl_id: The device ID of the controller
536 :type ctl_id: DeviceIdT | str
537 :param zone_idx: The index of the zone (00-31)
538 :type zone_idx: _ZoneIdxT
539 :param name: The new name for the zone (max 20 characters)
540 :type name: str
541 :return: A Command object for the W|0004 message
542 :rtype: Command
544 .. note::
545 The name will be converted to uppercase and non-ASCII characters
546 will be replaced with '?'. The name is limited to 20 characters.
547 """
548 payload = f"{_check_idx(zone_idx)}00{hex_from_str(name)[:40]:0<40}"
549 return cls.from_attrs(W_, ctl_id, Code._0004, payload)
551 @classmethod # constructor for RQ|0006
552 def get_schedule_version(cls, ctl_id: DeviceIdT | str) -> Command:
553 """Get the current version (change counter) of the schedules.
555 This method retrieves a version number that is incremented whenever any zone's
556 schedule (including the DHW zone) is modified. This allows clients to efficiently
557 check if schedules have changed before downloading them.
559 :param ctl_id: The device ID of the controller
560 :type ctl_id: DeviceIdT | str
561 :return: A Command object for the RQ|0006 message
562 :rtype: Command
564 .. note::
565 The version number is a simple counter that increments with each schedule
566 change. It has no inherent meaning beyond indicating that a change has
567 occurred. The actual value should be compared with a previously stored
568 version to detect changes.
569 """
570 return cls.from_attrs(RQ, ctl_id, Code._0006, "00")
572 @classmethod # constructor for RQ|0008
573 def get_relay_demand(
574 cls, dev_id: DeviceIdT | str, zone_idx: _ZoneIdxT | None = None
575 ) -> Command:
576 """Get the current demand value for a relay or zone. (c.f. parser_0008)
578 This method constructs a command to request the current demand value for a
579 specific relay or zone. The demand value typically represents the requested
580 output level (0-100%) for the relay or zone.
582 :param dev_id: The device ID of the relay or controller
583 :type dev_id: DeviceIdT | str
584 :param zone_idx: The index of the zone (00-31), or None for the relay itself
585 :type zone_idx: _ZoneIdxT | None
586 :return: A Command object for the RQ|0008 message
587 :rtype: Command
589 .. note::
590 - If zone_idx is None, the command requests the relay's overall demand.
591 - If zone_idx is specified, the command requests the demand for that specific zone.
592 - The response will contain the current demand value as a percentage (0-100%).
593 """
594 payload = "00" if zone_idx is None else _check_idx(zone_idx)
595 return cls.from_attrs(RQ, dev_id, Code._0008, payload)
597 @classmethod # constructor for RQ|000A
598 def get_zone_config(cls, ctl_id: DeviceIdT | str, zone_idx: _ZoneIdxT) -> Command:
599 """Get the configuration of a specific zone. (c.f. parser_000a)
601 This method constructs a command to request the configuration parameters
602 for a specific zone from the controller. The configuration includes
603 settings related to the zone's operation, such as temperature setpoints,
604 mode, and other zone-specific parameters.
606 :param ctl_id: The device ID of the controller
607 :type ctl_id: DeviceIdT | str
608 :param zone_idx: The index of the zone (00-31)
609 :type zone_idx: _ZoneIdxT
610 :return: A Command object for the RQ|000A message
611 :rtype: Command
613 .. note::
614 The response to this command will include various configuration parameters
615 for the specified zone, such as:
616 - Zone type (radiator, underfloor heating, etc.)
617 - Temperature setpoints
618 - Mode (heating/cooling)
619 - Other zone-specific settings
620 """
621 zon_idx = _check_idx(zone_idx)
622 return cls.from_attrs(RQ, ctl_id, Code._000A, zon_idx)
624 @classmethod # constructor for W|000A
625 def set_zone_config(
626 cls,
627 ctl_id: DeviceIdT | str,
628 zone_idx: _ZoneIdxT,
629 *,
630 min_temp: float = 5,
631 max_temp: float = 35,
632 local_override: bool = False,
633 openwindow_function: bool = False,
634 multiroom_mode: bool = False,
635 ) -> Command:
636 """Set the configuration parameters for a specific zone. (c.f. parser_000a)
638 This method constructs a command to configure various parameters for a zone,
639 including temperature limits and operational modes.
641 :param ctl_id: The device ID of the controller
642 :type ctl_id: DeviceIdT | str
643 :param zone_idx: The index of the zone (00-31)
644 :type zone_idx: _ZoneIdxT
645 :param min_temp: Minimum allowed temperature for the zone (5-21°C)
646 :type min_temp: float
647 :param max_temp: Maximum allowed temperature for the zone (21-35°C)
648 :type max_temp: float
649 :param local_override: If True, allows local temperature override at the device
650 :type local_override: bool
651 :param openwindow_function: If True, enables open window detection function
652 :type openwindow_function: bool
653 :param multiroom_mode: If True, enables multi-room mode for this zone
654 :type multiroom_mode: bool
655 :return: A Command object for the W|000A message
656 :rtype: Command
657 :raises CommandInvalid: If any parameter is out of range or of incorrect type
659 .. note::
660 - The minimum temperature must be between 5°C and 21°C
661 - The maximum temperature must be between 21°C and 35°C
662 - The minimum temperature cannot be higher than the maximum temperature
663 - These settings affect how the zone behaves in different operating modes
664 """
665 zon_idx = _check_idx(zone_idx)
667 if not (5 <= min_temp <= 21):
668 raise exc.CommandInvalid(f"Out of range, min_temp: {min_temp}")
669 if not (21 <= max_temp <= 35):
670 raise exc.CommandInvalid(f"Out of range, max_temp: {max_temp}")
671 if not isinstance(local_override, bool):
672 raise exc.CommandInvalid(f"Invalid arg, local_override: {local_override}")
673 if not isinstance(openwindow_function, bool):
674 raise exc.CommandInvalid(
675 f"Invalid arg, openwindow_function: {openwindow_function}"
676 )
677 if not isinstance(multiroom_mode, bool):
678 raise exc.CommandInvalid(f"Invalid arg, multiroom_mode: {multiroom_mode}")
680 bitmap = 0 if local_override else 1
681 bitmap |= 0 if openwindow_function else 2
682 bitmap |= 0 if multiroom_mode else 16
684 payload = "".join(
685 (zon_idx, f"{bitmap:02X}", hex_from_temp(min_temp), hex_from_temp(max_temp))
686 )
688 return cls.from_attrs(W_, ctl_id, Code._000A, payload)
690 @classmethod # constructor for RQ|0100
691 def get_system_language(cls, ctl_id: DeviceIdT | str, **kwargs: Any) -> Command:
692 """Get the configured language of the system. (c.f. parser_0100)
694 This method constructs a command to request the current language setting
695 from the system controller.
697 :param ctl_id: The device ID of the controller
698 :type ctl_id: DeviceIdT | str
699 :param kwargs: Additional keyword arguments (not used, for compatibility only)
700 :return: A Command object for the RQ|0100 message
701 :rtype: Command
703 .. note::
704 The response will contain a language code that corresponds to the
705 system's configured language setting.
706 """
707 assert not kwargs, kwargs
708 return cls.from_attrs(RQ, ctl_id, Code._0100, "00", **kwargs)
710 @classmethod # constructor for RQ|0404
711 def get_schedule_fragment(
712 cls,
713 ctl_id: DeviceIdT | str,
714 zone_idx: _ZoneIdxT,
715 frag_number: int,
716 total_frags: int | None,
717 **kwargs: Any,
718 ) -> Command:
719 """Get a specific fragment of a schedule. (c.f. parser_0404)
721 This method constructs a command to request a specific fragment of a schedule
722 from the controller. Schedules are typically broken into multiple fragments
723 for efficient transmission.
725 :param ctl_id: The device ID of the controller
726 :type ctl_id: DeviceIdT | str
727 :param zone_idx: The index of the zone (00-31), or 0xFA/'FA'/'HW' for DHW schedule
728 :type zone_idx: _ZoneIdxT
729 :param frag_number: The fragment number to retrieve (0-based)
730 :type frag_number: int
731 :param total_frags: Total number of fragments (optional)
732 :type total_frags: int | None
733 :param kwargs: Additional keyword arguments
734 :return: A Command object for the RQ|0404 message
735 :rtype: Command
737 .. note::
738 - For zone schedules, use a zone index between 00-31
739 - For DHW (Domestic Hot Water) schedule, use 0xFA, 'FA', or 'HW' as zone_idx
740 - The schedule is typically retrieved in multiple fragments to handle
741 the potentially large amount of data
742 """
744 assert not kwargs, kwargs
745 zon_idx = _check_idx(zone_idx)
747 if total_frags is None:
748 total_frags = 0
750 kwargs.pop("frag_length", None) # for pytests?
751 frag_length = "00"
753 # TODO: check the following rules
754 if frag_number == 0:
755 raise exc.CommandInvalid(f"frag_number={frag_number}, but it is 1-indexed")
756 elif frag_number == 1 and total_frags != 0:
757 raise exc.CommandInvalid(
758 f"total_frags={total_frags}, but must be 0 when frag_number=1"
759 )
760 elif frag_number > total_frags and total_frags != 0:
761 raise exc.CommandInvalid(
762 f"frag_number={frag_number}, but must be <= total_frags={total_frags}"
763 )
765 header = "00230008" if zon_idx == FA else f"{zon_idx}200008"
767 payload = f"{header}{frag_length}{frag_number:02X}{total_frags:02X}"
768 return cls.from_attrs(RQ, ctl_id, Code._0404, payload, **kwargs)
770 @classmethod # constructor for W|0404
771 def set_schedule_fragment(
772 cls,
773 ctl_id: DeviceIdT | str,
774 zone_idx: _ZoneIdxT,
775 frag_num: int,
776 frag_cnt: int,
777 fragment: str,
778 ) -> Command:
779 """Set a specific fragment of a schedule. (c.f. parser_0404)
781 This method constructs a command to set a specific fragment of a schedule
782 on the controller. Schedules are typically set in multiple fragments
783 due to their potentially large size.
785 :param ctl_id: The device ID of the controller
786 :type ctl_id: DeviceIdT | str
787 :param zone_idx: The index of the zone (00-31), or 0xFA/'FA'/'HW' for DHW schedule
788 :type zone_idx: _ZoneIdxT
789 :param frag_num: The fragment number being set (1-based index)
790 :type frag_num: int
791 :param frag_cnt: Total number of fragments in the schedule
792 :type frag_cnt: int
793 :param fragment: The schedule fragment data as a hex string
794 :type fragment: str
795 :return: A Command object for the W|0404 message
796 :rtype: Command
797 :raises CommandInvalid: If fragment number is invalid or out of range
799 .. note::
800 - For zone schedules, use a zone index between 00-31
801 - For DHW (Domestic Hot Water) schedule, use 0xFA, 'FA', or 'HW' as zone_idx
802 - The first fragment (frag_num=1) typically contains schedule metadata
803 - Fragment numbers are 1-based (1 to frag_cnt)
804 """
806 zon_idx = _check_idx(zone_idx)
808 # TODO: check the following rules
809 if frag_num == 0:
810 raise exc.CommandInvalid(f"frag_num={frag_num}, but it is 1-indexed")
811 elif frag_num > frag_cnt:
812 raise exc.CommandInvalid(
813 f"frag_num={frag_num}, but must be <= frag_cnt={frag_cnt}"
814 )
816 header = "00230008" if zon_idx == FA else f"{zon_idx}200008"
817 frag_length = int(len(fragment) / 2)
819 payload = f"{header}{frag_length:02X}{frag_num:02X}{frag_cnt:02X}{fragment}"
820 return cls.from_attrs(W_, ctl_id, Code._0404, payload)
822 @classmethod # constructor for RQ|0418
823 def get_system_log_entry(
824 cls, ctl_id: DeviceIdT | str, log_idx: int | str
825 ) -> Command:
826 """Retrieve a specific log entry from the system log. (c.f. parser_0418)
828 This method constructs a command to request a specific log entry from the
829 system's event log. The log contains historical events and fault records.
831 :param ctl_id: The device ID of the controller
832 :type ctl_id: DeviceIdT | str
833 :param log_idx: The index of the log entry to retrieve (0-based)
834 :type log_idx: int | str (hex string)
835 :return: A Command object for the RQ|0418 message
836 :rtype: Command
838 .. note::
839 - The log index is 0-based, where 0 is the most recent entry
840 - The log typically contains system events, faults, and warnings
841 - The response will include details about the log entry
842 """
843 log_idx = log_idx if isinstance(log_idx, int) else int(log_idx, 16)
844 return cls.from_attrs(RQ, ctl_id, Code._0418, f"{log_idx:06X}")
846 @classmethod # constructor for I|0418 (used for testing only)
847 def _put_system_log_entry(
848 cls,
849 ctl_id: DeviceIdT | str,
850 fault_state: FaultState | str,
851 fault_type: FaultType | str,
852 device_class: FaultDeviceClass | str,
853 device_id: DeviceIdT | str | None = None,
854 domain_idx: int | str = "00",
855 _log_idx: int | str | None = None,
856 timestamp: dt | str | None = None,
857 **kwargs: Any,
858 ) -> Command:
859 """Create a log entry in the system log. (c.f. parser_0418)
861 This internal method constructs a command to create a log entry in the system's
862 event log. It's primarily used for testing purposes to simulate log entries.
864 :param ctl_id: The device ID of the controller
865 :type ctl_id: DeviceIdT | str
866 :param fault_state: The state of the fault (e.g., 'on', 'off', 'unknown')
867 :type fault_state: FaultState | str
868 :param fault_type: The type of fault being logged
869 :type fault_type: FaultType | str
870 :param device_class: The class of device associated with the fault
871 :type device_class: FaultDeviceClass | str
872 :param device_id: The ID of the device associated with the fault (optional)
873 :type device_id: DeviceIdT | str | None
874 :param domain_idx: The domain index (default: '00')
875 :type domain_idx: int | str
876 :param _log_idx: The log index (for internal use, optional)
877 :type _log_idx: int | str | None
878 :param timestamp: The timestamp of the log entry (default: current time)
879 :type timestamp: dt | str | None
880 :param kwargs: Additional keyword arguments
881 :return: A Command object for the I|0418 message
882 :rtype: Command
883 :raises AssertionError: If device_class is invalid
885 .. note::
886 - This is an internal method primarily used for testing
887 - The log entry will appear in the system's event log
888 - The fault_state and fault_type should match the expected enums
889 - If timestamp is not provided, the current time will be used
890 """
891 if isinstance(device_class, FaultDeviceClass):
892 device_class = {v: k for k, v in FAULT_DEVICE_CLASS.items()}[device_class]
893 assert device_class in FAULT_DEVICE_CLASS
895 if isinstance(fault_state, FaultState):
896 fault_state = {v: k for k, v in FAULT_STATE.items()}[fault_state]
897 assert fault_state in FAULT_STATE
899 if isinstance(fault_type, FaultType):
900 fault_type = {v: k for k, v in FAULT_TYPE.items()}[fault_type]
901 assert fault_type in FAULT_TYPE
903 assert isinstance(domain_idx, str) and len(domain_idx) == 2
905 if _log_idx is None:
906 _log_idx = 0
907 if not isinstance(_log_idx, str):
908 _log_idx = f"{_log_idx:02X}"
909 assert 0 <= int(_log_idx, 16) <= 0x3F # TODO: is it 0x3E or 0x3F?
911 if timestamp is None:
912 timestamp = dt.now() #
913 timestamp = hex_from_dts(timestamp)
915 dev_id = dev_id_to_hex_id(device_id) if device_id else "000000" # type: ignore[arg-type]
917 payload = "".join(
918 (
919 "00",
920 fault_state,
921 _log_idx,
922 "B0",
923 fault_type,
924 domain_idx,
925 device_class,
926 "0000",
927 timestamp,
928 "FFFF7000",
929 dev_id,
930 )
931 )
933 return cls.from_attrs(I_, ctl_id, Code._0418, payload)
935 @classmethod # constructor for RQ|1030
936 def get_mix_valve_params(
937 cls, ctl_id: DeviceIdT | str, zone_idx: _ZoneIdxT
938 ) -> Command:
939 """Retrieve the mixing valve parameters for a specific zone. (c.f. parser_1030)
941 This method constructs a command to request the current mixing valve parameters
942 for a specific zone from the controller. These parameters control how the
943 mixing valve operates for the specified zone.
945 :param ctl_id: The device ID of the controller
946 :type ctl_id: DeviceIdT | str
947 :param zone_idx: The index of the zone (00-31)
948 :type zone_idx: _ZoneIdxT
949 :return: A Command object for the RQ|1030 message
950 :rtype: Command
952 .. note::
953 - The mixing valve controls the temperature of the water in the heating circuit
954 by mixing hot water from the boiler with cooler return water
955 - The parameters include settings like the minimum and maximum flow temperatures
956 and the proportional band for the valve control
957 """
958 zon_idx = _check_idx(zone_idx)
960 return cls.from_attrs(RQ, ctl_id, Code._1030, zon_idx)
962 @classmethod # constructor for W|1030 - TODO: sort out kwargs for HVAC
963 def set_mix_valve_params(
964 cls,
965 ctl_id: DeviceIdT | str,
966 zone_idx: _ZoneIdxT,
967 *,
968 max_flow_setpoint: int = 55,
969 min_flow_setpoint: int = 15,
970 valve_run_time: int = 150,
971 pump_run_time: int = 15,
972 **kwargs: Any,
973 ) -> Command:
974 """Set the mixing valve parameters for a specific zone. (c.f. parser_1030)
976 This method constructs a command to configure the mixing valve parameters
977 for a specific zone. These parameters control how the mixing valve operates
978 to regulate the temperature of the water in the heating circuit.
980 :param ctl_id: The device ID of the controller
981 :type ctl_id: DeviceIdT | str
982 :param zone_idx: The index of the zone (00-31)
983 :type zone_idx: _ZoneIdxT
984 :param max_flow_setpoint: Maximum flow temperature setpoint in °C (0-99)
985 :type max_flow_setpoint: int
986 :param min_flow_setpoint: Minimum flow temperature setpoint in °C (0-50)
987 :type min_flow_setpoint: int
988 :param valve_run_time: Valve run time in seconds (0-240)
989 :type valve_run_time: int
990 :param pump_run_time: Pump overrun time in seconds after valve closes (0-99)
991 :type pump_run_time: int
992 :param kwargs: Additional keyword arguments (e.g., boolean_cc)
993 :return: A Command object for the W|1030 message
994 :rtype: Command
995 :raises CommandInvalid: If any parameter is out of valid range
997 .. note::
998 - The mixing valve controls the temperature by mixing hot water from the boiler
999 with cooler return water
1000 - The pump overrun time allows the pump to continue running after the valve
1001 closes to dissipate residual heat
1002 - The valve run time determines how long the valve takes to move between
1003 fully open and fully closed positions
1004 """
1005 boolean_cc = kwargs.pop("boolean_cc", 1)
1006 assert not kwargs, kwargs
1008 zon_idx = _check_idx(zone_idx)
1010 if not (0 <= max_flow_setpoint <= 99):
1011 raise exc.CommandInvalid(
1012 f"Out of range, max_flow_setpoint: {max_flow_setpoint}"
1013 )
1014 if not (0 <= min_flow_setpoint <= 50):
1015 raise exc.CommandInvalid(
1016 f"Out of range, min_flow_setpoint: {min_flow_setpoint}"
1017 )
1018 if not (0 <= valve_run_time <= 240):
1019 raise exc.CommandInvalid(f"Out of range, valve_run_time: {valve_run_time}")
1020 if not (0 <= pump_run_time <= 99):
1021 raise exc.CommandInvalid(f"Out of range, pump_run_time: {pump_run_time}")
1023 payload = "".join(
1024 (
1025 zon_idx,
1026 f"C801{max_flow_setpoint:02X}",
1027 f"C901{min_flow_setpoint:02X}",
1028 f"CA01{valve_run_time:02X}",
1029 f"CB01{pump_run_time:02X}",
1030 f"CC01{boolean_cc:02X}",
1031 )
1032 )
1034 return cls.from_attrs(W_, ctl_id, Code._1030, payload, **kwargs)
1036 @classmethod # constructor for RQ|10A0
1037 def get_dhw_params(cls, ctl_id: DeviceIdT | str, **kwargs: Any) -> Command:
1038 """Get the parameters of the Domestic Hot Water (DHW) system. (c.f. parser_10a0)
1040 This method constructs a command to retrieve the current parameters
1041 of the DHW system, including setpoint, overrun, and differential settings.
1043 :param ctl_id: The device ID of the controller
1044 :type ctl_id: DeviceIdT | str
1045 :param kwargs: Additional keyword arguments
1046 - dhw_idx: Index of the DHW circuit (0 or 1), defaults to 0
1047 - Other arguments will raise an exception
1048 :return: A Command object for the RQ|10A0 message
1049 :rtype: Command
1050 :raises AssertionError: If unexpected keyword arguments are provided
1052 .. note::
1053 - Most systems only have one DHW circuit (index 0)
1054 - The response includes current setpoint, overrun, and differential values
1055 - The actual values are parsed by parser_10a0
1056 """
1057 dhw_idx = _check_idx(kwargs.pop(SZ_DHW_IDX, 0)) # 00 or 01 (rare)
1058 assert not kwargs, f"Unexpected arguments: {kwargs}"
1060 return cls.from_attrs(RQ, ctl_id, Code._10A0, dhw_idx)
1062 @classmethod # constructor for W|10A0
1063 def set_dhw_params(
1064 cls,
1065 ctl_id: DeviceIdT | str,
1066 *,
1067 setpoint: float | None = 50.0,
1068 overrun: int | None = 5,
1069 differential: float | None = 1,
1070 **kwargs: Any, # only expect "dhw_idx"
1071 ) -> Command:
1072 """Set the parameters of the Domestic Hot Water (DHW) system. (c.f. parser_10a0)
1074 This method constructs a command to configure the parameters of the DHW system,
1075 including temperature setpoint, pump overrun time, and temperature differential.
1077 :param ctl_id: The device ID of the controller
1078 :type ctl_id: DeviceIdT | str
1079 :param setpoint: Target temperature for DHW in °C (30.0-85.0), defaults to 50.0
1080 :type setpoint: float | None
1081 :param overrun: Pump overrun time in minutes (0-10), defaults to 5
1082 :type overrun: int | None
1083 :param differential: Temperature differential in °C (1.0-10.0), defaults to 1.0
1084 :type differential: float | None
1085 :param kwargs: Additional keyword arguments
1086 - dhw_idx: Index of the DHW circuit (0 or 1), defaults to 0
1087 :return: A Command object for the W|10A0 message
1088 :rtype: Command
1089 :raises CommandInvalid: If any parameter is out of valid range
1090 :raises AssertionError: If unexpected keyword arguments are provided
1092 .. note::
1093 - The setpoint is the target temperature for the hot water
1094 - Overrun keeps the pump running after heating stops to dissipate residual heat
1095 - Differential prevents rapid cycling by requiring this much temperature drop
1096 before reheating
1097 - Most systems only have one DHW circuit (index 0)
1098 """
1099 # Defaults for newer evohome colour:
1100 # Defaults for older evohome colour: ?? (30-85) C, ? (0-10) min, ? (1-10) C
1101 # Defaults for evohome monochrome:
1103 # 14:34:26.734 022 W --- 18:013393 01:145038 --:------ 10A0 006 000F6E050064
1104 # 14:34:26.751 073 I --- 01:145038 --:------ 01:145038 10A0 006 000F6E0003E8
1105 # 14:34:26.764 074 I --- 01:145038 18:013393 --:------ 10A0 006 000F6E0003E8
1107 dhw_idx = _check_idx(kwargs.pop(SZ_DHW_IDX, 0)) # 00 or 01 (rare)
1108 assert not kwargs, f"Unexpected arguments: {kwargs}"
1110 setpoint = 50.0 if setpoint is None else setpoint
1111 overrun = 5 if overrun is None else overrun
1112 differential = 1.0 if differential is None else differential
1114 if not (30.0 <= setpoint <= 85.0):
1115 raise exc.CommandInvalid(f"Out of range, setpoint: {setpoint}")
1116 if not (0 <= overrun <= 10):
1117 raise exc.CommandInvalid(f"Out of range, overrun: {overrun}")
1118 if not (1 <= differential <= 10):
1119 raise exc.CommandInvalid(f"Out of range, differential: {differential}")
1121 payload = f"{dhw_idx}{hex_from_temp(setpoint)}{overrun:02X}{hex_from_temp(differential)}"
1123 return cls.from_attrs(W_, ctl_id, Code._10A0, payload)
1125 @classmethod # constructor for RQ|1100
1126 def get_tpi_params(
1127 cls, dev_id: DeviceIdT | str, *, domain_id: int | str | None = None
1128 ) -> Command:
1129 """Get the Time Proportional and Integral (TPI) parameters of a system. (c.f. parser_1100)
1131 This method constructs a command to retrieve the TPI parameters for a specific domain.
1132 TPI is a control algorithm used to maintain temperature by cycling the boiler on/off.
1134 :param dev_id: The device ID of the controller or BDR91 relay
1135 :type dev_id: DeviceIdT | str
1136 :param domain_id: The domain ID to get parameters for, or None for default
1137 (00 for BDR devices, FC for controllers)
1138 :type domain_id: int | str | None
1139 :return: A Command object for the RQ|1100 message
1140 :rtype: Command
1142 .. note::
1143 - TPI parameters control how the system maintains temperature by cycling the boiler
1144 - Different domains can have different TPI settings
1145 - The response will include cycle rate, minimum on/off times, and other parameters
1146 """
1147 if domain_id is None:
1148 domain_id = "00" if dev_id[:2] == DEV_TYPE_MAP.BDR else FC
1150 return cls.from_attrs(RQ, dev_id, Code._1100, _check_idx(domain_id))
1152 @classmethod # constructor for W|1100
1153 def set_tpi_params(
1154 cls,
1155 ctl_id: DeviceIdT | str,
1156 domain_id: int | str | None,
1157 *,
1158 cycle_rate: int = 3, # TODO: check
1159 min_on_time: int = 5, # TODO: check
1160 min_off_time: int = 5, # TODO: check
1161 proportional_band_width: float | None = None, # TODO: check
1162 ) -> Command:
1163 """Set the Time Proportional and Integral (TPI) parameters of a system. (c.f. parser_1100)
1165 This method constructs a command to configure the TPI parameters for a specific domain.
1166 TPI is a control algorithm that maintains temperature by cycling the boiler on/off.
1168 :param ctl_id: The device ID of the controller
1169 :type ctl_id: DeviceIdT | str
1170 :param domain_id: The domain ID to configure, or None for default domain (00)
1171 :type domain_id: int | str | None
1172 :param cycle_rate: Number of on/off cycles per hour (TODO: validate range, typically 3,6,9,12)
1173 :type cycle_rate: int
1174 :param min_on_time: Minimum time in minutes the boiler stays on (TODO: validate range, typically 1-5)
1175 :type min_on_time: int
1176 :param min_off_time: Minimum time in minutes the boiler stays off (TODO: validate range, typically 1-5)
1177 :type min_off_time: int
1178 :param proportional_band_width: Width of the proportional band in °C (TODO: validate range, typically 1.5-3.0)
1179 :type proportional_band_width: float | None
1180 :return: A Command object for the W|1100 message
1181 :rtype: Command
1182 :raises AssertionError: If any parameter is out of valid range
1184 .. note::
1185 - TPI parameters control how the system maintains temperature by cycling the boiler
1186 - Different domains can have different TPI settings
1187 - The proportional band determines how much the temperature can vary before the
1188 boiler cycles on/off
1189 - The cycle rate affects how frequently the boiler cycles when maintaining temperature
1190 - Parameters are converted to appropriate hex values in the payload (e.g., minutes * 4)
1191 """
1192 if domain_id is None:
1193 domain_id = "00"
1195 # TODO: Uncomment and fix these validations once ranges are confirmed
1196 # assert cycle_rate is None or cycle_rate in (3, 6, 9, 12), cycle_rate
1197 # assert min_on_time is None or 1 <= min_on_time <= 5, min_on_time
1198 # assert min_off_time is None or 1 <= min_off_time <= 5, min_off_time
1199 # assert (
1200 # proportional_band_width is None or 1.5 <= proportional_band_width <= 3.0
1201 # ), proportional_band_width
1203 payload = "".join(
1204 (
1205 _check_idx(domain_id),
1206 f"{cycle_rate * 4:02X}", # Convert cycles/hour to internal format
1207 f"{int(min_on_time * 4):02X}", # Convert minutes to internal format
1208 f"{int(min_off_time * 4):02X}00", # Convert minutes to internal format (or: ...FF)
1209 f"{hex_from_temp(proportional_band_width)}01", # Convert temperature to hex
1210 )
1211 )
1213 return cls.from_attrs(W_, ctl_id, Code._1100, payload)
1215 @classmethod # constructor for RQ|1260
1216 def get_dhw_temp(cls, ctl_id: DeviceIdT | str, **kwargs: Any) -> Command:
1217 """Get the current temperature from a Domestic Hot Water (DHW) sensor. (c.f. parser_10a0)
1219 This method constructs a command to request the current temperature reading from
1220 a DHW temperature sensor. The sensor is typically located in the hot water tank.
1222 :param ctl_id: The device ID of the controller
1223 :type ctl_id: DeviceIdT | str
1224 :param kwargs: Additional keyword arguments
1225 - dhw_idx: Index of the DHW sensor (0 or 1), defaults to 0
1226 - Other arguments will raise an exception
1227 :return: A Command object for the RQ|1260 message
1228 :rtype: Command
1229 :raises AssertionError: If unexpected keyword arguments are provided
1231 .. note::
1232 - Most systems only have one DHW sensor (index 0)
1233 - The response will include the current temperature in degrees Celsius
1234 - The actual temperature is parsed by parser_10a0
1235 """
1236 dhw_idx = _check_idx(kwargs.pop(SZ_DHW_IDX, 0)) # 00 or 01 (rare)
1237 assert not kwargs, f"Unexpected arguments: {kwargs}"
1239 return cls.from_attrs(RQ, ctl_id, Code._1260, dhw_idx)
1241 @classmethod # constructor for I|1260 # TODO: trap corrupt temps?
1242 def put_dhw_temp(
1243 cls, dev_id: DeviceIdT | str, temperature: float | None, **kwargs: Any
1244 ) -> Command:
1245 """Announce the current temperature of a Domestic Hot Water (DHW) sensor. (1260)
1247 This method constructs a command to announce/simulate a temperature reading from
1248 a DHW temperature sensor. This is primarily intended for use with simulated or
1249 emulated devices like a faked CS92A sensor.
1251 :param dev_id: The device ID of the DHW sensor (must start with DHW type code)
1252 :type dev_id: DeviceIdT | str
1253 :param temperature: The temperature to report in °C, or None for no reading
1254 :type temperature: float | None
1255 :param kwargs: Additional keyword arguments
1256 - dhw_idx: Index of the DHW sensor (0 or 1), defaults to 0
1257 - Other arguments will raise an exception
1258 :return: A Command object for the I|1260 message
1259 :rtype: Command
1260 :raises CommandInvalid: If the device type is not a DHW sensor
1261 :raises AssertionError: If unexpected keyword arguments are provided
1263 .. note::
1264 - This is typically used for testing or simulation purposes
1265 - The temperature is converted to the appropriate hex format
1266 - The device ID must be a valid DHW sensor type (starts with DHW code)
1267 - Most systems only have one DHW sensor (index 0)
1268 - The message is sent as an I-type (unsolicited) message
1269 """
1270 dhw_idx = _check_idx(kwargs.pop(SZ_DHW_IDX, 0)) # 00 or 01 (rare)
1271 assert not kwargs, f"Unexpected arguments: {kwargs}"
1273 if dev_id[:2] != DEV_TYPE_MAP.DHW:
1274 raise exc.CommandInvalid(
1275 f"Faked device {dev_id} has an unsupported device type: "
1276 f"device_id should be like {DEV_TYPE_MAP.DHW}:xxxxxx"
1277 )
1279 payload = f"{dhw_idx}{hex_from_temp(temperature)}"
1280 return cls._from_attrs(I_, Code._1260, payload, addr0=dev_id, addr2=dev_id)
1282 @classmethod # constructor for I|1290 # TODO: trap corrupt temps?
1283 def put_outdoor_temp(
1284 cls, dev_id: DeviceIdT | str, temperature: float | None
1285 ) -> Command:
1286 """Announce the current outdoor temperature from a sensor. (1290)
1288 This method constructs a command to announce/simulate an outdoor temperature reading.
1289 This is for use by a faked HVAC sensor, or similar.
1291 :param dev_id: The device ID of the outdoor temperature sensor
1292 :type dev_id: DeviceIdT | str
1293 :param temperature: The temperature to report in °C, or None for no reading
1294 :type temperature: float | None
1295 :return: A Command object for the I|1290 message
1296 :rtype: Command
1298 .. note::
1299 - This is typically used for testing or simulation purposes
1300 - The temperature is converted to the appropriate hex format
1301 - The message is sent as an I-type (unsolicited) message
1302 - The sensor index is hardcoded to 00 (most systems have only one outdoor sensor)
1303 - The device ID should match the expected format for an outdoor temperature sensor
1304 """
1305 payload = f"00{hex_from_temp(temperature)}"
1306 return cls._from_attrs(I_, Code._1290, payload, addr0=dev_id, addr2=dev_id)
1308 @classmethod # constructor for I|1298
1309 def put_co2_level(cls, dev_id: DeviceIdT | str, co2_level: float | None) -> Command:
1310 """Announce the current CO₂ level from a sensor. (1298)
1311 .I --- 37:039266 --:------ 37:039266 1298 003 000316
1313 This method constructs a command to announce/simulate a CO₂ level reading from
1314 an indoor air quality sensor. The message is typically sent by devices that
1315 monitor indoor air quality.
1317 :param dev_id: The device ID of the CO₂ sensor
1318 :type dev_id: DeviceIdT | str
1319 :param co2_level: The CO₂ level to report in ppm (parts per million), or None for no reading
1320 :type co2_level: float | None
1321 :return: A Command object for the I|1298 message
1322 :rtype: Command
1324 .. note::
1325 - This is typically used for testing or simulation purposes
1326 - The CO₂ level is converted to the appropriate hex format using double precision
1327 - The message is sent as an I-type (unsolicited) message
1328 - The sensor index is hardcoded to 00 (most systems have only one CO₂ sensor)
1329 - The device ID should match the expected format for a CO₂ sensor
1330 - Example message format: ``.I --- 37:039266 --:------ 37:039266 1298 003 000316``
1331 """
1332 payload = f"00{hex_from_double(co2_level)}"
1333 return cls._from_attrs(I_, Code._1298, payload, addr0=dev_id, addr2=dev_id)
1335 @classmethod # constructor for I|12A0
1336 def put_indoor_humidity(
1337 cls, dev_id: DeviceIdT | str, indoor_humidity: float | None
1338 ) -> Command:
1339 """Announce the current indoor humidity from a sensor or fan. (12A0)
1340 .I --- 37:039266 --:------ 37:039266 1298 003 000316
1342 This method constructs a command to announce/simulate an indoor humidity reading.
1343 The message is typically sent by devices that monitor indoor air quality,
1344 such as humidity sensors or ventilation systems with humidity sensing capabilities.
1346 :param dev_id: The device ID of the humidity sensor or fan
1347 :type dev_id: DeviceIdT | str
1348 :param indoor_humidity: The relative humidity to report (0-100%), or None for no reading
1349 :type indoor_humidity: float | None
1350 :return: A Command object for the I|12A0 message
1351 :rtype: Command
1353 .. note::
1354 - This is typically used for testing or simulation purposes
1355 - The humidity is converted to the appropriate hex format using standard precision
1356 - The message is sent as an I-type (unsolicited) message
1357 - The sensor index is hardcoded to 00 (most systems have only one humidity sensor)
1358 - The device ID should match the expected format for a humidity sensor or fan
1359 - The humidity value is expected to be in the range 0-100%
1360 - Example message format: ``.I --- 37:039266 --:------ 37:039266 12A0 003 0032`` (for 50%)
1361 """
1362 payload = "00" + hex_from_percent(indoor_humidity, high_res=False)
1363 return cls._from_attrs(I_, Code._12A0, payload, addr0=dev_id, addr2=dev_id)
1365 @classmethod # constructor for RQ|12B0
1366 def get_zone_window_state(
1367 cls, ctl_id: DeviceIdT | str, zone_idx: _ZoneIdxT
1368 ) -> Command:
1369 """Request the open window state of a zone (c.f. parser 12B0).
1371 This method constructs a command to query whether a particular zone has an open window.
1372 The response will indicate if the window in the specified zone is open or closed.
1374 :param ctl_id: The device ID of the controller managing the zone
1375 :type ctl_id: DeviceIdT | str
1376 :param zone_idx: The zone index (0-based) to query
1377 :type zone_idx: _ZoneIdxT
1378 :return: A Command object for the RQ|12B0 message
1379 :rtype: Command
1381 .. note::
1382 - The zone index is 0-based (0 = Zone 1, 1 = Zone 2, etc.)
1383 - The controller will respond with a message indicating the window state
1384 - This is typically used by thermostats to enable/disable heating when windows are open
1385 - The actual window state detection is usually done by a separate sensor
1386 """
1387 return cls.from_attrs(RQ, ctl_id, Code._12B0, _check_idx(zone_idx))
1389 @classmethod # constructor for RQ|1F41
1390 def get_dhw_mode(cls, ctl_id: DeviceIdT | str, **kwargs: Any) -> Command:
1391 """Request the current mode of the Domestic Hot Water (DHW) system. (c.f. parser 1F41)
1393 This method constructs a command to query the operating mode of the DHW system.
1394 The response will indicate whether the DHW is in automatic, manual, or other modes.
1396 :param ctl_id: The device ID of the DHW controller
1397 :type ctl_id: DeviceIdT | str
1398 :param kwargs: Additional parameters (currently only 'dhw_idx' is supported)
1399 :key dhw_idx: The DHW circuit index (0 or 1, defaults to 0 for single-DHW systems)
1400 :type dhw_idx: int, optional
1401 :return: A Command object for the RQ|1F41 message
1402 :rtype: Command
1403 :raises AssertionError: If unexpected keyword arguments are provided
1405 .. note::
1406 - Most systems have a single DHW circuit (index 0)
1407 - The response will indicate the current DHW mode (e.g., auto, manual, off)
1408 - This is typically used by heating controllers to monitor DHW state
1409 - The actual mode values are defined in the response parser (parser_1f41)
1410 """
1411 dhw_idx = _check_idx(kwargs.pop(SZ_DHW_IDX, 0)) # 00 or 01 (rare)
1412 assert not kwargs, f"Unexpected arguments: {kwargs}"
1414 return cls.from_attrs(RQ, ctl_id, Code._1F41, dhw_idx)
1416 @classmethod # constructor for W|1F41
1417 def set_dhw_mode(
1418 cls,
1419 ctl_id: DeviceIdT | str,
1420 *,
1421 mode: int | str | None = None,
1422 active: bool | None = None,
1423 until: dt | str | None = None,
1424 duration: int | None = None, # never supplied by DhwZone.set_mode()
1425 **kwargs: Any,
1426 ) -> Command:
1427 """Set or reset the mode of the Domestic Hot Water (DHW) system. (c.f. parser 1F41)
1429 This method constructs a command to change the operating mode of the DHW system.
1430 It can set the DHW to automatic, manual on/off, or scheduled modes with specific durations.
1432 :param ctl_id: The device ID of the DHW controller
1433 :type ctl_id: DeviceIdT | str
1434 :param mode: The desired DHW mode (None, "auto", "heat", "off", or numeric values)
1435 :type mode: int | str | None
1436 :param active: If specified, sets the DHW on/off state (alternative to mode)
1437 :type active: bool | None
1438 :param until: End time for temporary mode (datetime or "YYYY-MM-DD HH:MM" string)
1439 :type until: datetime | str | None
1440 :param duration: Duration in seconds for temporary mode (alternative to 'until')
1441 :type duration: int | None
1442 :param kwargs: Additional parameters (currently only 'dhw_idx' is supported)
1443 :key dhw_idx: The DHW circuit index (0 or 1, defaults to 0 for single-DHW systems)
1444 :type dhw_idx: int, optional
1445 :return: A Command object for the W|1F41 message
1446 :rtype: Command
1447 :raises AssertionError: If unexpected keyword arguments are provided
1448 :raises CommandInvalid: If invalid parameters are provided
1450 .. note::
1451 - Mode takes precedence over 'active' if both are specified
1452 - When using 'active' with 'until' or 'duration', the mode will be temporary
1453 - Supported mode values are defined in ZON_MODE_MAP
1454 - Most systems have a single DHW circuit (index 0)
1455 - The actual mode values are defined in the response parser (parser_1f41)
1456 """
1457 dhw_idx = _check_idx(kwargs.pop(SZ_DHW_IDX, 0)) # 00 or 01 (rare)
1458 assert not kwargs, f"Unexpected arguments: {kwargs}"
1460 mode = _normalise_mode(mode, active, until, duration)
1462 if mode == ZON_MODE_MAP.FOLLOW:
1463 active = None
1464 if active is not None and not isinstance(active, bool | int):
1465 raise exc.CommandInvalid(
1466 f"Invalid args: active={active}, but must be a bool"
1467 )
1469 until, duration = _normalise_until(mode, active, until, duration)
1471 payload = "".join(
1472 (
1473 dhw_idx,
1474 "FF" if active is None else "01" if bool(active) else "00",
1475 mode,
1476 "FFFFFF" if duration is None else f"{duration:06X}",
1477 "" if until is None else hex_from_dtm(until),
1478 )
1479 )
1481 return cls.from_attrs(W_, ctl_id, Code._1F41, payload)
1483 @classmethod # constructor for 1FC9 (rf_bind) 3-way handshake
1484 def put_bind(
1485 cls,
1486 verb: VerbT,
1487 src_id: DeviceIdT | str,
1488 codes: Code | Iterable[Code] | None,
1489 dst_id: DeviceIdT | str | None = None,
1490 **kwargs: Any,
1491 ) -> Command:
1492 """Create an RF bind command (1FC9) for device binding operations.
1494 This method constructs commands used in the 3-way handshake process for binding
1495 devices in the Ramses RF protocol. It's primarily used by faked/test devices.
1497 :param verb: The verb for the command (I, RQ, RP, W, etc.)
1498 :type verb: VerbT
1499 :param src_id: Source device ID initiating the bind
1500 :type src_id: DeviceIdT | str
1501 :param codes: Single code or list of codes to bind
1502 :type codes: Code | Iterable[Code] | None
1503 :param dst_id: Optional destination device ID (defaults to broadcast)
1504 :type dst_id: DeviceIdT | str | None
1505 :param kwargs: Additional parameters
1506 :key oem_code: OEM code for bind offers (only used with I-type messages)
1507 :type oem_code: str, optional
1508 :return: A Command object for the bind operation
1509 :rtype: Command
1510 :raises CommandInvalid: If invalid codes are provided for binding
1512 .. note::
1513 - Common use cases include:
1514 - FAN binding to CO2 (1298), HUM (12A0), PER (2E10), or SWI (22F1, 22F3)
1515 - CTL binding to DHW (1260), RND/THM (30C9)
1516 - More complex bindings (e.g., TRV to CTL) may require custom constructors
1517 - The binding process typically involves a 3-way handshake
1518 - For I-type messages with no specific destination, this creates a bind offer
1519 """
1520 kodes: list[Code]
1522 if not codes: # None, "", or []
1523 kodes = [] # used by confirm
1524 elif len(codes[0]) == len(Code._1FC9): # type: ignore[index] # if iterable: list, tuple, or dict.keys()
1525 kodes = list(codes) # type: ignore[arg-type]
1526 elif len(codes[0]) == len(Code._1FC9[0]): # type: ignore[index]
1527 kodes = [codes] # type: ignore[list-item]
1528 else:
1529 raise exc.CommandInvalid(f"Invalid codes for a bind command: {codes}")
1531 if verb == I_ and dst_id in (None, src_id, ALL_DEV_ADDR.id):
1532 oem_code = kwargs.pop("oem_code", None)
1533 assert not kwargs, f"Unexpected arguments: {kwargs}"
1534 return cls._put_bind_offer(src_id, dst_id, kodes, oem_code=oem_code)
1536 elif verb == W_ and dst_id not in (None, src_id):
1537 idx = kwargs.pop("idx", None)
1538 assert not kwargs, kwargs
1539 return cls._put_bind_accept(src_id, dst_id, kodes, idx=idx) # type: ignore[arg-type]
1541 elif verb == I_:
1542 idx = kwargs.pop("idx", None)
1543 assert not kwargs, kwargs
1544 return cls._put_bind_confirm(src_id, dst_id, kodes, idx=idx) # type: ignore[arg-type]
1546 raise exc.CommandInvalid(
1547 f"Invalid verb|dst_id for a bind command: {verb}|{dst_id}"
1548 )
1550 @classmethod # constructor for 1FC9 (rf_bind) offer
1551 def _put_bind_offer(
1552 cls,
1553 src_id: DeviceIdT | str,
1554 dst_id: DeviceIdT | str | None,
1555 codes: list[Code],
1556 *,
1557 oem_code: str | None = None,
1558 ) -> Command:
1559 """Create a bind offer message (I-type) for device binding.
1561 # TODO: should preserve order of codes, else tests may fail
1563 This internal method constructs the initial bind offer message in the 3-way
1564 binding handshake. It's typically called by `put_bind()` and not used directly.
1566 :param src_id: Source device ID making the offer
1567 :type src_id: DeviceIdT | str
1568 :param dst_id: Optional destination device ID (broadcast if None)
1569 :type dst_id: DeviceIdT | str | None
1570 :param codes: List of codes to include in the bind offer
1571 :type codes: list[Code]
1572 :param oem_code: Optional OEM-specific code for the binding
1573 :type oem_code: str | None
1574 :return: A Command object for the bind offer message
1575 :rtype: Command
1576 :raises CommandInvalid: If no valid codes are provided for the offer
1578 .. note::
1579 - This creates an I-type (unsolicited) bind offer message
1580 - The message includes the source device's ID and the requested bind codes
1581 - OEM-specific bindings can include an additional OEM code
1582 - The actual binding codes are filtered to exclude 1FC9 and 10E0
1583 - The order of codes is preserved in the output message
1584 """
1585 # Filter out 1FC9 and 10E0 from the codes list
1586 kodes = [c for c in codes if c not in (Code._1FC9, Code._10E0)]
1587 if not kodes: # might be []
1588 raise exc.CommandInvalid(f"Invalid codes for a bind offer: {codes}")
1590 hex_id = Address.convert_to_hex(src_id) # type: ignore[arg-type]
1591 payload = "".join(f"00{c}{hex_id}" for c in kodes)
1593 if oem_code: # 01, 67, 6C
1594 payload += f"{oem_code}{Code._10E0}{hex_id}"
1595 payload += f"00{Code._1FC9}{hex_id}"
1597 return cls.from_attrs( # NOTE: .from_attrs, not ._from_attrs
1598 I_, dst_id or src_id, Code._1FC9, payload, from_id=src_id
1599 ) # as dst_id could be NUL_DEV_ID
1601 @classmethod # constructor for 1FC9 (rf_bind) accept - mainly used for test suite
1602 def _put_bind_accept(
1603 cls,
1604 src_id: DeviceIdT | str,
1605 dst_id: DeviceIdT | str,
1606 codes: list[Code],
1607 *,
1608 idx: str | None = "00",
1609 ) -> Command:
1610 """Create a bind accept message (W-type) for device binding.
1612 This internal method constructs the bind accept message in the 3-way binding
1613 handshake. It's typically called by `put_bind()` and is mainly used for testing.
1615 :param src_id: Source device ID accepting the bind
1616 :type src_id: DeviceIdT | str
1617 :param dst_id: Destination device ID that sent the bind offer
1618 :type dst_id: DeviceIdT | str
1619 :param codes: List of codes to include in the bind accept
1620 :type codes: list[Code]
1621 :param idx: Optional index for the binding (defaults to "00")
1622 :type idx: str | None
1623 :return: A Command object for the bind accept message
1624 :rtype: Command
1625 :raises CommandInvalid: If no valid codes are provided for the accept
1627 .. note::
1628 - This creates a W-type (write) bind accept message
1629 - The message includes the source device's ID and the accepted bind codes
1630 - The index parameter allows for multiple bindings between the same devices
1631 - Primarily used in test suites to simulate device binding
1632 - The actual binding codes should match those in the original offer
1633 """
1634 if not codes: # might be empty list
1635 raise exc.CommandInvalid(f"Invalid codes for a bind accept: {codes}")
1637 hex_id = Address.convert_to_hex(src_id) # type: ignore[arg-type]
1638 payload = "".join(f"{idx or '00'}{c}{hex_id}" for c in codes)
1640 return cls.from_attrs(W_, dst_id, Code._1FC9, payload, from_id=src_id)
1642 @classmethod # constructor for 1FC9 (rf_bind) confirm
1643 def _put_bind_confirm(
1644 cls,
1645 src_id: DeviceIdT | str,
1646 dst_id: DeviceIdT | str,
1647 codes: list[Code],
1648 *,
1649 idx: str | None = "00",
1650 ) -> Command:
1651 """Create a bind confirmation message (I-type) to complete device binding.
1653 This internal method constructs the final confirmation message in the 3-way
1654 binding handshake. It's typically called by `put_bind()` to confirm that
1655 the binding process has been completed successfully.
1657 :param src_id: Source device ID confirming the bind
1658 :type src_id: DeviceIdT | str
1659 :param dst_id: Destination device ID that needs confirmation
1660 :type dst_id: DeviceIdT | str
1661 :param codes: List of codes that were bound (only first code is used)
1662 :type codes: list[Code]
1663 :param idx: Optional index for the binding (defaults to "00")
1664 :type idx: str | None
1665 :return: A Command object for the bind confirmation message
1666 :rtype: Command
1668 .. note::
1669 - This creates an I-type (unsolicited) bind confirmation message
1670 - The message includes the source device's ID and the first bound code
1671 - If no codes are provided, only the index is used as payload
1672 - The index is important (e.g., Nuaire 4-way switch uses "21")
1673 - This is the final step in the 3-way binding handshake
1674 - The binding is considered complete after this message is received
1675 """
1676 if not codes: # if not payload
1677 payload = idx or "00" # e.g. Nuaire 4-way switch uses 21!
1678 else:
1679 hex_id = Address.convert_to_hex(src_id) # type: ignore[arg-type]
1680 payload = f"{idx or '00'}{codes[0]}{hex_id}"
1682 return cls.from_attrs(I_, dst_id, Code._1FC9, payload, from_id=src_id)
1684 @classmethod # constructor for I|22F1
1685 def set_fan_mode(
1686 cls,
1687 fan_id: DeviceIdT | str,
1688 fan_mode: int | str | None,
1689 *,
1690 seqn: int | str | None = None,
1691 src_id: DeviceIdT | str | None = None,
1692 idx: str = "00", # could be e.g. "63"
1693 ) -> Command:
1694 """Set the operating mode of a ventilation fan.
1696 This method constructs a command to control the speed and operating mode of a
1697 ventilation fan. The command can be sent with either a sequence number or a
1698 source device ID, depending on the system configuration.
1700 There are two types of this packet observed:
1701 - With sequence number: ``I 018 --:------ --:------ 39:159057 22F1 003 000x04``
1702 - With source ID: ``I --- 21:039407 28:126495 --:------ 22F1 003 000x07``
1704 :param fan_id: The device ID of the target fan (e.g., '39:159057')
1705 :type fan_id: DeviceIdT | str
1706 :param fan_mode: The desired fan mode, which can be specified as:
1707 - Integer: 0-9 for different speed levels
1708 - String: Descriptive mode like 'auto', 'low', 'medium', 'high'
1709 - None: Default mode (typically auto)
1710 :type fan_mode: int | str | None
1711 :param seqn: Optional sequence number (0-255), mutually exclusive with src_id
1712 :type seqn: int | str | None
1713 :param src_id: Optional source device ID, mutually exclusive with seqn
1714 :type src_id: DeviceIdT | str | None
1715 :param idx: Index identifier, typically '00' but can be other values like '63'
1716 :type idx: str
1717 :return: A configured Command object ready to be sent to the device.
1718 :rtype: Command
1719 :raises CommandInvalid: If both seqn and src_id are provided, or if fan_mode is invalid.
1721 .. note::
1722 This command is typically sent as part of a triplet with 0.1s intervals
1723 when using sequence numbers. The sequence number should increase
1724 monotonically modulo 256 after each triplet.
1726 **Scheme 1 (with sequence number):**
1727 - Sent as a triplet, 0.1s apart
1728 - Uses a sequence number (000-255)
1729 - Example: ``I 218 --:------ --:------ 39:159057 22F1 003 000204`` (low speed)
1731 **Scheme 2 (with source ID):**
1732 - Sent as a triplet, 0.085s apart
1733 - Uses source device ID instead of sequence number
1734 - Example: ``I --- 21:039407 28:126495 --:------ 22F1 003 000507``
1735 """
1736 # NOTE: WIP: rate can be int or str
1738 # Scheme 1: I 218 --:------ --:------ 39:159057
1739 # - are cast as a triplet, 0.1s apart?, with a seqn (000-255) and no src_id
1740 # - triplet has same seqn, increased monotonically mod 256 after every triplet
1741 # - only payloads seen: '(00|63)0[234]04', may accept '000.'
1742 # .I 218 --:------ --:------ 39:159057 22F1 003 000204 # low
1744 # Scheme 1a: I --- --:------ --:------ 21:038634 (less common)
1745 # - some systems that accept scheme 2 will accept this scheme
1747 # Scheme 2: I --- 21:038634 18:126620 --:------ (less common)
1748 # - are cast as a triplet, 0.085s apart, without a seqn (i.e. is ---)
1749 # - only payloads seen: '000[0-9A]0[5-7A]', may accept '000.'
1750 # .I --- 21:038634 18:126620 --:------ 22F1 003 000507
1752 from .ramses import _22F1_MODE_ORCON
1754 _22F1_MODE_ORCON_MAP = {v: k for k, v in _22F1_MODE_ORCON.items()}
1756 if fan_mode is None:
1757 mode = "00"
1758 elif isinstance(fan_mode, int):
1759 mode = f"{fan_mode:02X}"
1760 else:
1761 mode = fan_mode
1763 if mode in _22F1_MODE_ORCON:
1764 payload = f"{idx}{mode}"
1765 elif mode in _22F1_MODE_ORCON_MAP:
1766 payload = f"{idx}{_22F1_MODE_ORCON_MAP[mode]}"
1767 else:
1768 raise exc.CommandInvalid(f"fan_mode is not valid: {fan_mode}")
1770 if src_id and seqn:
1771 raise exc.CommandInvalid(
1772 "seqn and src_id are mutually exclusive (you can have neither)"
1773 )
1775 if seqn:
1776 return cls._from_attrs(I_, Code._22F1, payload, addr2=fan_id, seqn=seqn)
1777 return cls._from_attrs(I_, Code._22F1, payload, addr0=src_id, addr1=fan_id)
1779 @classmethod # constructor for I|22F7
1780 def set_bypass_position(
1781 cls,
1782 fan_id: DeviceIdT | str,
1783 *,
1784 bypass_position: float | None = None,
1785 src_id: DeviceIdT | str | None = None,
1786 **kwargs: Any,
1787 ) -> Command:
1788 """Set the position or mode of a bypass valve in a ventilation system.
1790 This method constructs a command to control the bypass valve position or mode
1791 for a ventilation system. The bypass valve regulates the flow of air between
1792 the supply and exhaust air streams, typically for heat recovery.
1794 The method supports two ways to control the bypass:
1795 - Direct position control using `bypass_position` (0.0 to 1.0)
1796 - Predefined modes using `bypass_mode` ('auto', 'on', 'off')
1798 :param fan_id: The device ID of the target fan/ventilation unit (e.g., '01:123456')
1799 :type fan_id: DeviceIdT | str
1800 :param bypass_position: The desired position as a float between 0.0 (fully closed)
1801 and 1.0 (fully open). If None, the system will use auto mode.
1802 :type bypass_position: float | None
1803 :param src_id: The source device ID sending the command. If None, defaults to fan_id.
1804 :type src_id: DeviceIdT | str | None
1805 :keyword bypass_mode: Alternative to bypass_position, accepts:
1806 - 'auto': Let the system control the bypass automatically
1807 - 'on': Force bypass fully open
1808 - 'off': Force bypass fully closed
1809 :type bypass_mode: str | None
1810 :return: A configured Command object ready to be sent to the device.
1811 :rtype: Command
1812 :raises CommandInvalid: If both bypass_position and bypass_mode are provided,
1813 or if an invalid bypass_mode is specified.
1815 .. note::
1816 The bypass valve position affects heat recovery efficiency and indoor air quality.
1817 Use with caution as incorrect settings may impact system performance.
1818 """
1820 # RQ --- 37:155617 32:155617 --:------ 22F7 002 0064 # officially: 00C8EF
1821 # RP --- 32:155617 37:155617 --:------ 22F7 003 00C8C8
1823 bypass_mode = kwargs.pop("bypass_mode", None)
1824 assert not kwargs, kwargs
1826 src_id = src_id or fan_id # TODO: src_id should be an arg?
1828 if bypass_mode and bypass_position is not None:
1829 raise exc.CommandInvalid(
1830 "bypass_mode and bypass_position are mutually exclusive, "
1831 "both cannot be provided, and neither is OK"
1832 )
1833 elif bypass_position is not None:
1834 pos = f"{int(bypass_position * 200):02X}"
1835 elif bypass_mode:
1836 pos = {"auto": "FF", "off": "00", "on": "C8"}[bypass_mode]
1837 else:
1838 pos = "FF" # auto
1840 return cls._from_attrs(
1841 W_, Code._22F7, f"00{pos}", addr0=src_id, addr1=fan_id
1842 ) # trailing EF not required
1844 @classmethod # constructor for RQ|2309
1845 def get_zone_setpoint(cls, ctl_id: DeviceIdT | str, zone_idx: _ZoneIdxT) -> Command:
1846 """Get the current temperature setpoint for a specific zone.
1848 This method constructs a command to request the current temperature setpoint
1849 for a specified zone from the controller. The response will contain the current
1850 target temperature for the zone.
1852 :param ctl_id: The device ID of the controller (e.g., '01:123456')
1853 :type ctl_id: DeviceIdT | str
1854 :param zone_idx: The index of the zone (0-31 or '00'-'1F')
1855 :type zone_idx: _ZoneIdxT
1856 :return: A configured Command object that can be sent to the device.
1857 :rtype: Command
1858 :raises ValueError: If the zone index is out of valid range (0-31)
1860 .. note::
1861 The zone index is 0-based, where:
1862 - 0 = Zone 1 (typically main living area)
1863 - 1 = Zone 2 (e.g., bedrooms)
1864 - And so on up to zone 32
1866 The actual number of available zones depends on the controller configuration.
1867 Requesting a non-existent zone will typically result in no response.
1868 """
1869 return cls.from_attrs(RQ, ctl_id, Code._2309, _check_idx(zone_idx))
1871 @classmethod # constructor for W|2309
1872 def set_zone_setpoint(
1873 cls, ctl_id: DeviceIdT | str, zone_idx: _ZoneIdxT, setpoint: float
1874 ) -> Command:
1875 """Set the temperature setpoint for a specific zone.
1877 This method constructs a command to set the target temperature for a specified
1878 zone. The setpoint is specified in degrees Celsius with a resolution of 0.1°C.
1880 :param ctl_id: The device ID of the controller (e.g., '01:123456')
1881 :type ctl_id: DeviceIdT | str
1882 :param zone_idx: The index of the zone (0-31 or '00'-'1F')
1883 :type zone_idx: _ZoneIdxT
1884 :param setpoint: The desired temperature in °C (typically 5.0-35.0)
1885 :type setpoint: float
1886 :return: A configured Command object ready to be sent to the device.
1887 :rtype: Command
1888 :raises ValueError: If the setpoint is outside the valid range or if the
1889 zone index is invalid.
1891 .. note::
1892 The controller will typically round the setpoint to the nearest 0.5°C.
1893 The actual temperature range may be further limited by:
1894 - System-wide minimum/maximum limits
1895 - Zone-specific overrides
1896 - Current operating mode (heating/cooling)
1898 When setting a new setpoint, the system may take some time to acknowledge
1899 the change. Use `get_zone_setpoint` to verify the new setting.
1901 Some systems may have additional restrictions on when setpoints can be
1902 modified, such as during specific operating modes or schedules.
1903 """
1904 # Example: .W --- 34:092243 01:145038 --:------ 2309 003 0107D0
1905 payload = f"{_check_idx(zone_idx)}{hex_from_temp(setpoint)}"
1906 return cls.from_attrs(W_, ctl_id, Code._2309, payload)
1908 @classmethod # constructor for RQ|2349
1909 def get_zone_mode(cls, ctl_id: DeviceIdT | str, zone_idx: _ZoneIdxT) -> Command:
1910 """Get the current operating mode of a zone.
1912 This method constructs a command to request the current operating mode
1913 and setpoint information for a specific zone from the controller.
1915 :param ctl_id: The device ID of the controller (e.g., '01:123456')
1916 :type ctl_id: DeviceIdT | str
1917 :param zone_idx: The index of the zone (0-31 or '00'-'1F')
1918 :type zone_idx: _ZoneIdxT
1919 :return: A configured Command object that can be sent to the device.
1920 :rtype: Command
1922 :Example:
1923 >>> # Get mode for zone 0
1924 >>> cmd = Command.get_zone_mode('01:123456', '00')
1925 """
1927 return cls.from_attrs(RQ, ctl_id, Code._2349, _check_idx(zone_idx))
1929 @classmethod # constructor for W|2349
1930 def set_zone_mode(
1931 cls,
1932 ctl_id: DeviceIdT | str,
1933 zone_idx: _ZoneIdxT,
1934 *,
1935 mode: int | str | None = None,
1936 setpoint: float | None = None,
1937 until: dt | str | None = None,
1938 duration: int | None = None, # never supplied by Zone.set_mode()
1939 ) -> Command:
1940 """Set or reset the operating mode of a zone.
1942 This method constructs a command to configure the operating mode and setpoint
1943 for a specific zone. The command can set the zone to various modes including
1944 follow schedule, temporary override, or permanent override.
1946 :param ctl_id: The device ID of the controller (e.g., '01:123456')
1947 :type ctl_id: DeviceIdT | str
1948 :param zone_idx: The index of the zone (0-31 or '00'-'1F')
1949 :type zone_idx: _ZoneIdxT
1950 :keyword mode: The desired operating mode. Can be an integer, string, or None.
1951 Common values include 'follow_schedule', 'temporary', 'permanent_override'.
1952 :type mode: int | str | None
1953 :keyword setpoint: The target temperature in °C (resolution 0.1°C). Required for
1954 some modes. If None, the system will use the maximum possible value.
1955 :type setpoint: float | None
1956 :keyword until: The end time for a temporary override. Required for 'temporary' mode.
1957 Can be a datetime object or ISO 8601 formatted string.
1958 :type until: datetime | str | None
1959 :keyword duration: Duration in minutes for the override. Mutually exclusive with 'until'.
1960 :type duration: int | None
1961 :return: A configured Command object ready to be sent to the device.
1962 :rtype: Command
1963 :raises CommandInvalid: If invalid arguments are provided.
1965 .. note::
1966 Incompatible combinations:
1967 - mode == 'follow_schedule' & setpoint is not None (setpoint will be ignored)
1968 - mode == 'temporary' & until is None (until is required)
1969 - until and duration are mutually exclusive (use only one)
1970 """
1972 # .W --- 18:013393 01:145038 --:------ 2349 013 0004E201FFFFFF330B1A0607E4
1973 # .W --- 22:017139 01:140959 --:------ 2349 007 0801F400FFFFFF
1975 mode = _normalise_mode(mode, setpoint, until, duration)
1977 if setpoint is not None and not isinstance(setpoint, float | int):
1978 raise exc.CommandInvalid(
1979 f"Invalid args: setpoint={setpoint}, but must be a float"
1980 )
1982 until, duration = _normalise_until(mode, setpoint, until, duration)
1984 payload = "".join(
1985 (
1986 _check_idx(zone_idx),
1987 hex_from_temp(setpoint), # None means max, if a temp is required
1988 mode,
1989 "FFFFFF" if duration is None else f"{duration:06X}",
1990 "" if until is None else hex_from_dtm(until),
1991 )
1992 )
1994 return cls.from_attrs(W_, ctl_id, Code._2349, payload)
1996 @classmethod # constructor for W|2411
1997 def set_fan_param(
1998 cls,
1999 fan_id: DeviceIdT | str,
2000 param_id: str,
2001 value: str | int | float | bool,
2002 *,
2003 src_id: DeviceIdT | str | None = None,
2004 ) -> Command:
2005 """Set a configuration parameter for a fan/ventilation device.
2007 This method constructs a command to configure various parameters of a
2008 fan or ventilation device using the RAMSES-II protocol.
2010 :param fan_id: The device ID of the fan/ventilation unit
2011 :type fan_id: DeviceIdT | str
2012 :param param_id: The parameter ID to set (e.g., 'bypass_position' or hex code '00')
2013 :type param_id: str
2014 :param value: The value to set for the parameter. Type depends on the parameter.
2015 :type value: str | int | float | bool
2016 :param src_id: Optional source device ID. If not provided, fan_id will be used.
2017 :type src_id: DeviceIdT | str | None
2018 :return: A configured Command object ready to be sent to the device.
2019 :rtype: Command
2020 :raises CommandInvalid: If the parameter ID is unknown or value is invalid.
2022 .. note::
2023 The parameter ID must be a valid 2-character hexadecimal string (00-FF) that
2024 exists in the _2411_PARAMS_SCHEMA. The payload format follows the pattern:
2025 ^(00|01|15|16|17|21)00[0-9A-F]{6}[0-9A-F]{8}(([0-9A-F]{8}){3}[0-9A-F]{4})?$
2026 --- Ramses-II 2411 payload: 23 bytes, 46 hex digits ---
2028 Raises:
2029 CommandInvalid: For invalid parameters or values
2030 """
2031 # Validate and normalize parameter ID
2032 try:
2033 param_id = param_id.strip().upper()
2034 if len(param_id) != 2:
2035 raise ValueError(
2036 "Parameter ID must be exactly 2 hexadecimal characters"
2037 )
2038 int(param_id, 16) # Validate hex
2039 except ValueError as err:
2040 raise exc.CommandInvalid(
2041 f"Invalid parameter ID: '{param_id}'. Must be a 2-digit hexadecimal value (00-FF)"
2042 ) from err
2044 # Get parameter schema
2045 if (param_schema := _2411_PARAMS_SCHEMA.get(param_id)) is None:
2046 raise exc.CommandInvalid(
2047 f"Unknown parameter ID: '{param_id}'. This parameter is not defined in the device schema"
2048 )
2050 # Get value constraints with defaults
2051 min_val = param_schema[SZ_MIN_VALUE]
2052 max_val = param_schema[SZ_MAX_VALUE]
2053 precision = param_schema.get(SZ_PRECISION, 1.0)
2054 data_type = param_schema.get(SZ_DATA_TYPE, "00")
2056 try:
2057 # Check for special float values first
2058 if isinstance(value, float) and not math.isfinite(value):
2059 raise exc.CommandInvalid(
2060 f"Parameter {param_id}: Invalid value '{value}'. Must be a finite number"
2061 )
2063 # Scaling
2064 if str(data_type) == "01": # %
2065 # Special handling for parameter 52 (Sensor sensitivity)
2066 value_scaled = int(round(float(value) / precision))
2067 min_val_scaled = int(round(float(min_val) / precision))
2068 max_val_scaled = int(round(float(max_val) / precision))
2069 precision_scaled = int(round(float(precision) * 10))
2070 trailer = "0032" # Trailer for percentage parameters
2072 # For percentage values, validate input is in range
2073 if not min_val_scaled <= value_scaled <= max_val_scaled:
2074 raise exc.CommandInvalid(
2075 f"Parameter {param_id}: Value {value_scaled / 10}% is out of allowed range ({min_val_scaled / 10}% to {max_val_scaled / 10}%)"
2076 )
2077 elif str(data_type) == "0F": # %
2078 # For other percentage parameters, use the standard scaling
2079 value_scaled = int(round((float(value) / 100.0) / float(precision)))
2080 min_val_scaled = int(round(float(min_val) / float(precision)))
2081 max_val_scaled = int(round(float(max_val) / float(precision)))
2082 precision_scaled = int(round(float(precision) * 200))
2083 trailer = "0032" # Trailer for percentage parameters
2085 # For percentage values, validate input is in range
2086 if not min_val_scaled <= value_scaled <= max_val_scaled:
2087 raise exc.CommandInvalid(
2088 f"Parameter {param_id}: Value {value_scaled / 2}% is out of allowed range ({min_val_scaled / 2}% to {max_val_scaled / 2}%)"
2089 )
2090 elif str(data_type) == "92": # °C
2091 # Scale temperature values by 100 (21.5°C -> 2150 = 0x0866)
2092 # Round to 0.1°C precision first, then scale
2093 value_rounded = (
2094 round(float(value) * 10) / 10
2095 ) # Round to 1 decimal place
2096 value_scaled = int(
2097 value_rounded * 100
2098 ) # Convert to integer (e.g., 21.5 -> 2150)
2099 min_val_scaled = int(float(min_val) * 100)
2100 max_val_scaled = int(float(max_val) * 100)
2101 precision_scaled = int(float(precision) * 100)
2102 trailer = (
2103 "0001" # always 4 hex not sure about the value, but seems to work.
2104 )
2105 # For temperature values, validate input is within allowed range
2106 if not min_val_scaled <= value_scaled <= max_val_scaled:
2107 raise exc.CommandInvalid(
2108 f"Parameter {param_id}: Temperature {value_scaled / 100:.1f}°C is out of allowed range ({min_val_scaled / 100:.1f}°C to {max_val_scaled / 100:.1f}°C)"
2109 )
2110 elif (str(data_type) == "00") or (
2111 str(data_type) == "10"
2112 ): # numeric (minutes, medium(0)/high(1) or days)
2113 value_scaled = int(float(value))
2114 min_val_scaled = int(float(min_val))
2115 max_val_scaled = int(float(max_val))
2116 precision = 1
2117 precision_scaled = int(precision)
2118 trailer = (
2119 "0001" # always 4 hex not sure about the value, but seems to work.
2120 )
2121 # For numeric values, validate input is between min and max
2122 if not min_val_scaled <= value_scaled <= max_val_scaled:
2123 unit = "minutes" if data_type == "00" else ""
2124 raise exc.CommandInvalid(
2125 f"Parameter {param_id}: Value {value_scaled}{' ' + unit if unit else ''} is out of allowed range ({min_val_scaled} to {max_val_scaled}{' ' + unit if unit else ''})"
2126 )
2127 else:
2128 # Validate value against min/max
2129 raise exc.CommandInvalid(
2130 f"Parameter {param_id}: Invalid data type '{data_type}'. Must be one of '00', '01', '0F', '10', or '92'"
2131 f"Invalid Data_type {data_type} for parameter {param_id}"
2132 )
2134 # Assemble payload fields
2135 leading = "00" # always 2 hex
2136 param_id_hex = f"{int(param_id, 16):04X}" # 4 hex, upper, zero-padded
2138 # data_type (6 hex): always from schema, zero-padded to 6 hex
2139 data_type_hex = f"00{data_type}"
2140 value_hex = f"{value_scaled:08X}"
2141 min_hex = f"{min_val_scaled:08X}"
2142 max_hex = f"{max_val_scaled:08X}"
2143 precision_hex = f"{precision_scaled:08X}"
2145 _LOGGER.debug(
2146 f"set_fan_param: value={value}, min={min_val}, max={max_val}, precision={precision}"
2147 f"\n Scaled: value={value_scaled} (0x{value_hex}), min={min_val_scaled} (0x{min_hex}), "
2148 f"max={max_val_scaled} (0x{max_hex}), precision={precision_scaled} (0x{precision_hex})"
2149 )
2151 # Final field order: 2+4+4+8+8+8+8+4 = 46 hex -> 23 bytes
2152 payload = (
2153 f"{leading}"
2154 f"{param_id_hex}"
2155 f"{data_type_hex}"
2156 f"{value_hex}"
2157 f"{min_hex}"
2158 f"{max_hex}"
2159 f"{precision_hex}"
2160 f"{trailer}"
2161 )
2162 payload = "".join(payload)
2163 _LOGGER.debug(
2164 f"set_fan_param: Final frame: {W_} --- {src_id} {fan_id} --:------ 2411 {len(payload):03d} {payload}"
2165 )
2167 # Create the command with exactly 2 addresses: from_id and fan_id
2168 return cls._from_attrs(
2169 W_,
2170 Code._2411,
2171 payload,
2172 addr0=src_id,
2173 addr1=fan_id,
2174 addr2=NON_DEV_ADDR.id,
2175 )
2177 except (ValueError, TypeError) as err:
2178 raise exc.CommandInvalid(f"Invalid value: {value}") from err
2180 @classmethod # constructor for RQ|2411
2181 def get_fan_param(
2182 cls,
2183 fan_id: DeviceIdT | str,
2184 param_id: str,
2185 *,
2186 src_id: DeviceIdT | str,
2187 ) -> Command:
2188 """Create a command to get a fan parameter value.
2190 This method constructs a command to read a specific parameter from a fan device
2191 using the RAMSES-II 2411 command. The parameter ID must be a valid 2-character
2192 hexadecimal string (00-FF).
2194 :param fan_id: The device ID of the target fan (e.g., '01:123456')
2195 :type fan_id: DeviceIdT | str
2196 :param param_id: The parameter ID to read (2-character hex string, e.g., '4E')
2197 :type param_id: str
2198 :param src_id: The source device ID that will send the command
2199 :type src_id: DeviceIdT | str
2200 :return: A Command object for the RQ|2411 message
2201 :rtype: Command
2202 :raises CommandInvalid: If the parameter ID is invalid (None, wrong type, wrong format)
2204 .. note::
2205 For a complete working example, see the `test_get_fan_param.py` test file
2206 which demonstrates:
2207 - Setting up the gateway
2208 - Sending the command
2209 - Handling the response
2210 - Proper error handling
2212 .. warning::
2213 The parameter ID must be a valid 2-character hexadecimal string (00-FF).
2214 The following will raise CommandInvalid:
2215 - None value
2216 - Non-string types
2217 - Leading/trailing whitespace
2218 - Incorrect length (not 2 characters)
2219 - Non-hexadecimal characters
2220 """
2221 if param_id is None:
2222 raise exc.CommandInvalid("Parameter ID cannot be None")
2224 if not isinstance(param_id, str):
2225 raise exc.CommandInvalid(
2226 f"Parameter ID must be a string, got {type(param_id).__name__}"
2227 )
2229 param_id_stripped = param_id.strip()
2230 if param_id != param_id_stripped:
2231 raise exc.CommandInvalid(
2232 f"Parameter ID cannot have leading or trailing whitespace: '{param_id}'"
2233 )
2235 # validate the string format
2236 try:
2237 if len(param_id) != 2:
2238 raise ValueError("Invalid length")
2239 int(param_id, 16) # Will raise ValueError if not valid hex
2240 except ValueError as err:
2241 raise exc.CommandInvalid(
2242 f"Invalid parameter ID: '{param_id}'. Must be a 2-character hex string (00-FF)."
2243 ) from err
2245 payload = f"0000{param_id.upper()}" # Convert to uppercase for consistency
2246 _LOGGER.debug(
2247 "Created get_fan_param command for %s from %s to %s",
2248 param_id,
2249 src_id,
2250 fan_id,
2251 )
2253 return cls._from_attrs(RQ, Code._2411, payload, addr0=src_id, addr1=fan_id)
2255 @classmethod # constructor for RQ|2E04
2256 def get_system_mode(cls, ctl_id: DeviceIdT | str) -> Command:
2257 """Get the mode of a system (c.f. parser_2e04).
2259 :param ctl_id: The device ID of the controller
2260 :type ctl_id: DeviceIdT | str
2261 :return: A Command object for the RQ|2E04 message
2262 :rtype: Command
2263 """
2265 return cls.from_attrs(RQ, ctl_id, Code._2E04, FF)
2267 @classmethod # constructor for W|2E04
2268 def set_system_mode(
2269 cls,
2270 ctl_id: DeviceIdT | str,
2271 system_mode: int | str | None,
2272 *,
2273 until: dt | str | None = None,
2274 ) -> Command:
2275 """Set or reset the operating mode of the HVAC system. (c.f. parser_2e04)
2277 This method constructs a command to change the system-wide operating mode,
2278 such as switching between heating modes or setting a temporary override.
2280 :param ctl_id: The device ID of the controller (e.g., '01:123456')
2281 :type ctl_id: DeviceIdT | str
2282 :param system_mode: The desired system mode. Can be specified as:
2283 - Integer: Numeric mode code (0-5)
2284 - String: Mode name (e.g., 'auto', 'heat_eco')
2285 - Hex string: Two-character hex code (e.g., '00' for auto)
2286 If None, defaults to 'auto' mode.
2287 :type system_mode: int | str | None
2288 :param until: Optional timestamp when the mode should revert.
2289 Required for temporary modes like 'eco' or 'advanced'.
2290 Not allowed for 'auto' or 'heat_off' modes.
2291 :type until: datetime | str | None
2292 :return: A configured Command object ready to be sent to the device.
2293 :rtype: Command
2294 :raises CommandInvalid: If the combination of mode and until is invalid.
2295 :raises KeyError: If an invalid mode is specified.
2297 .. note::
2298 Available modes are defined in SYS_MODE_MAP and typically include:
2299 - 'auto': System follows the schedule (code '00')
2300 - 'heat_off': Heating disabled (code '04')
2301 - 'eco': Reduced temperature mode (code '01')
2302 - 'advanced': Custom temperature mode (code '02')
2303 - 'holiday': Away mode (code '03')
2304 - 'custom': Custom mode (code '05')
2306 When using temporary modes (eco/advanced), the 'until' parameter
2307 must be provided. The system will automatically revert to the
2308 schedule when the time elapses.
2309 """
2311 if system_mode is None:
2312 system_mode = SYS_MODE_MAP.AUTO
2313 if isinstance(system_mode, int):
2314 system_mode = f"{system_mode:02X}"
2315 if system_mode not in SYS_MODE_MAP:
2316 system_mode = SYS_MODE_MAP._hex(system_mode) # may raise KeyError
2318 if until is not None and system_mode in (
2319 SYS_MODE_MAP.AUTO,
2320 SYS_MODE_MAP.AUTO_WITH_RESET,
2321 SYS_MODE_MAP.HEAT_OFF,
2322 ):
2323 raise exc.CommandInvalid(
2324 f"Invalid args: For system_mode={SYS_MODE_MAP[system_mode]},"
2325 " until must be None"
2326 )
2328 assert isinstance(system_mode, str) # mypy hint
2330 payload = "".join(
2331 (
2332 system_mode,
2333 hex_from_dtm(until),
2334 "00" if until is None else "01",
2335 )
2336 )
2338 return cls.from_attrs(W_, ctl_id, Code._2E04, payload)
2340 @classmethod # constructor for I|2E10
2341 def put_presence_detected(
2342 cls, dev_id: DeviceIdT | str, presence_detected: bool | None
2343 ) -> Command:
2344 """Announce the current presence detection state from a sensor. (c.f. parser_2e10)
2345 # .I --- ...
2347 This method constructs an I-type (unsolicited) command to report the
2348 presence detection state from a presence sensor to the system.
2350 :param dev_id: The device ID of the presence sensor (e.g., '01:123456')
2351 :type dev_id: DeviceIdT | str
2352 :param presence_detected: The current presence state:
2353 - True: Presence detected
2354 - False: No presence detected
2355 - None: Sensor state unknown/error
2356 :type presence_detected: bool | None
2357 :return: A configured Command object ready to be sent to the system.
2358 :rtype: Command
2360 .. note::
2361 This is typically used by presence sensors to report their state
2362 to the HVAC system. The system may use this information for
2363 occupancy-based control strategies.
2365 The command uses the 2E10 code, which is specifically designed
2366 for presence/occupancy reporting in the RAMSES-II protocol.
2367 """
2368 payload = f"00{hex_from_bool(presence_detected)}"
2369 return cls._from_attrs(I_, Code._2E10, payload, addr0=dev_id, addr2=dev_id)
2371 @classmethod # constructor for RQ|30C9
2372 def get_zone_temp(cls, ctl_id: DeviceIdT | str, zone_idx: _ZoneIdxT) -> Command:
2373 """Request the current temperature reading for a specific zone. (c.f. parser_30c9)
2375 This method constructs a command to request the current temperature
2376 from a zone's temperature sensor. The response will include the current
2377 temperature in degrees Celsius with 0.1°C resolution.
2379 :param ctl_id: The device ID of the controller managing the zone (e.g., '01:123456')
2380 :type ctl_id: DeviceIdT | str
2381 :param zone_idx: The index of the zone to query. Can be specified as:
2382 - Integer (0-31)
2383 - Hex string ('00'-'1F')
2384 - String representation of integer ('0'-'31')
2385 :type zone_idx: _ZoneIdxT
2386 :return: A configured Command object that can be sent to the device.
2387 :rtype: Command
2388 :raises ValueError: If the zone index is out of valid range (0-31)
2390 .. note::
2391 The zone index is 0-based. For example:
2392 - 0 = Zone 1 (typically main living area)
2393 - 1 = Zone 2 (e.g., bedrooms)
2394 - And so on up to zone 32
2396 The actual number of available zones depends on the controller configuration.
2397 Requesting a non-existent zone will typically result in no response.
2398 """
2399 return cls.from_attrs(RQ, ctl_id, Code._30C9, _check_idx(zone_idx))
2401 @classmethod # constructor for I|30C9 # TODO: trap corrupt temps?
2402 def put_sensor_temp(
2403 cls, dev_id: DeviceIdT | str, temperature: float | None
2404 ) -> Command:
2405 """Announce the current temperature reading from a thermostat. (c.f. parser_30c9)
2406 This is for use by a faked DTS92(E) or similar.
2408 This method constructs an I-type (unsolicited) command to report the current
2409 temperature from a thermostat or temperature sensor to the system. This is
2410 typically used to simulate a physical thermostat's temperature reporting.
2412 :param dev_id: The device ID of the thermostat or sensor (e.g., '01:123456')
2413 :type dev_id: DeviceIdT | str
2414 :param temperature: The current temperature in degrees Celsius.
2415 Use None to indicate a sensor error or invalid reading.
2416 The valid range is typically 0-40°C, but this may vary by device.
2417 :type temperature: float | None
2418 :return: A configured Command object ready to be sent to the system.
2419 :rtype: Command
2421 .. note::
2422 This is primarily used for testing or simulating thermostats like the DTS92(E).
2423 The temperature is transmitted with 0.1°C resolution.
2425 The command uses the 30C9 code, which is used by thermostats to report
2426 their current temperature reading to the controller.
2428 When temperature is None, it typically indicates a sensor fault or
2429 invalid reading, which the system may interpret as a maintenance alert.
2430 """
2431 # .I --- 34:021943 --:------ 34:021943 30C9 003 000C0D
2433 if dev_id[:2] not in (
2434 DEV_TYPE_MAP.TR0, # 00
2435 DEV_TYPE_MAP.HCW, # 03
2436 DEV_TYPE_MAP.TRV, # 04
2437 DEV_TYPE_MAP.DTS, # 12
2438 DEV_TYPE_MAP.DT2, # 22
2439 DEV_TYPE_MAP.RND, # 34
2440 ):
2441 raise exc.CommandInvalid(
2442 f"Faked device {dev_id} has an unsupported device type: "
2443 f"device_id should be like {DEV_TYPE_MAP.HCW}:xxxxxx"
2444 )
2446 payload = f"00{hex_from_temp(temperature)}"
2447 return cls._from_attrs(I_, Code._30C9, payload, addr0=dev_id, addr2=dev_id)
2449 @classmethod # constructor for RQ|313F
2450 def get_system_time(cls, ctl_id: DeviceIdT | str) -> Command:
2451 """Constructor to get the datetime of a system (c.f. parser_313f)."""
2453 return cls.from_attrs(RQ, ctl_id, Code._313F, "00")
2455 @classmethod # constructor for W|313F
2456 def set_system_time(
2457 cls,
2458 ctl_id: DeviceIdT | str,
2459 datetime: dt | str,
2460 is_dst: bool = False,
2461 ) -> Command:
2462 """Set the datetime of a system (c.f. parser_313f).
2464 :param ctl_id: The device ID of the controller
2465 :type ctl_id: DeviceIdT | str
2466 :param datetime: The target date and time
2467 :type datetime: dt | str
2468 :param is_dst: Whether Daylight Saving Time is active, defaults to False
2469 :type is_dst: bool
2470 :return: A Command object for the W|313F message
2471 :rtype: Command
2472 """
2473 # .W --- 30:185469 01:037519 --:------ 313F 009 0060003A0C1B0107E5
2475 dt_str = hex_from_dtm(datetime, is_dst=is_dst, incl_seconds=True)
2476 return cls.from_attrs(W_, ctl_id, Code._313F, f"0060{dt_str}")
2478 @classmethod # constructor for I|31DA
2479 def get_hvac_fan_31da(
2480 cls,
2481 dev_id: DeviceIdT | str,
2482 hvac_id: str,
2483 bypass_position: float | None,
2484 air_quality: int | None,
2485 co2_level: int | None,
2486 indoor_humidity: float | None,
2487 outdoor_humidity: float | None,
2488 exhaust_temp: float | None,
2489 supply_temp: float | None,
2490 indoor_temp: float | None,
2491 outdoor_temp: float | None,
2492 speed_capabilities: list[str],
2493 fan_info: str,
2494 _unknown_fan_info_flags: list[int], # skip? as starts with _
2495 exhaust_fan_speed: float | None,
2496 supply_fan_speed: float | None,
2497 remaining_mins: int | None,
2498 post_heat: int | None,
2499 pre_heat: int | None,
2500 supply_flow: float | None,
2501 exhaust_flow: float | None,
2502 **kwargs: Any, # option: air_quality_basis: str | None,
2503 ) -> Command:
2504 """Construct an I|31DA command for HVAC fan status updates.
2506 This method creates an unsolicited status update command for HVAC fan systems,
2507 reporting various sensor readings and system states.
2509 :param dev_id: The device ID of the HVAC controller
2510 :type dev_id: DeviceIdT | str
2511 :param hvac_id: The ID of the HVAC unit
2512 :type hvac_id: str
2513 :param bypass_position: Current bypass damper position (0.0-1.0)
2514 :type bypass_position: float | None
2515 :param air_quality: Current air quality reading
2516 :type air_quality: int | None
2517 :param co2_level: Current CO₂ level in ppm
2518 :type co2_level: int | None
2519 :param indoor_humidity: Current indoor relative humidity (0.0-1.0)
2520 :type indoor_humidity: float | None
2521 :param outdoor_humidity: Current outdoor relative humidity (0.0-1.0)
2522 :type outdoor_humidity: float | None
2523 :param exhaust_temp: Current exhaust air temperature in °C
2524 :type exhaust_temp: float | None
2525 :param supply_temp: Current supply air temperature in °C
2526 :type supply_temp: float | None
2527 :param indoor_temp: Current indoor temperature in °C
2528 :type indoor_temp: float | None
2529 :param outdoor_temp: Current outdoor temperature in °C
2530 :type outdoor_temp: float | None
2531 :param speed_capabilities: List of supported fan speed settings
2532 :type speed_capabilities: list[str]
2533 :param fan_info: Current fan mode/status information
2534 :type fan_info: str
2535 :param _unknown_fan_info_flags: Internal flags (reserved for future use)
2536 :type _unknown_fan_info_flags: list[int]
2537 :param exhaust_fan_speed: Current exhaust fan speed (0.0-1.0)
2538 :type exhaust_fan_speed: float | None
2539 :param supply_fan_speed: Current supply fan speed (0.0-1.0)
2540 :type supply_fan_speed: float | None
2541 :param remaining_mins: Remaining time in current mode (minutes)
2542 :type remaining_mins: int | None
2543 :param post_heat: Post-heat status/level
2544 :type post_heat: int | None
2545 :param pre_heat: Pre-heat status/level
2546 :type pre_heat: int | None
2547 :param supply_flow: Current supply air flow rate (if available)
2548 :type supply_flow: float | None
2549 :param exhaust_flow: Current exhaust air flow rate (if available)
2550 :type exhaust_flow: float | None
2551 :param kwargs: Additional parameters (reserved for future use)
2552 :return: A configured Command object for the HVAC fan status update
2553 :rtype: Command
2555 .. note::
2556 This command is typically sent periodically by the HVAC controller to report
2557 current system status. All parameters are optional, but providing complete
2558 information will result in more accurate system monitoring and control.
2559 """
2560 # 00 EF00 7FFF 34 33 0898 0898 088A 0882 F800 00 15 14 14 0000 EF EF 05F5 0613:
2561 # {"hvac_id": '00', 'bypass_position': 0.000, 'air_quality': None,
2562 # 'co2_level': None, 'indoor_humidity': 0.52, 'outdoor_humidity': 0.51,
2563 # 'exhaust_temp': 22.0, 'supply_temp': 22.0, 'indoor_temp': 21.86,
2564 # 'outdoor_temp': 21.78, 'speed_capabilities': ['off', 'low_med_high',
2565 # 'timer', 'boost', 'auto'], 'fan_info': 'away',
2566 # '_unknown_fan_info_flags': [0, 0, 0], 'exhaust_fan_speed': 0.1,
2567 # 'supply_fan_speed': 0.1, 'remaining_mins': 0, 'post_heat': None,
2568 # 'pre_heat': None, 'supply_flow': 15.25, 'exhaust_flow': 15.55},
2570 air_quality_basis: str = kwargs.pop("air_quality_basis", "00")
2571 extra: str = kwargs.pop("_extra", "")
2572 assert not kwargs, kwargs
2574 payload = hvac_id
2575 payload += (
2576 f"{(int(air_quality * 200)):02X}" if air_quality is not None else "EF"
2577 )
2578 payload += (
2579 f"{air_quality_code(air_quality_basis)}"
2580 if air_quality_basis is not None
2581 else "00"
2582 )
2583 payload += f"{co2_level:04X}" if co2_level is not None else "7FFF"
2584 payload += (
2585 hex_from_percent(indoor_humidity, high_res=False)
2586 if indoor_humidity is not None
2587 else "EF"
2588 )
2589 payload += (
2590 hex_from_percent(outdoor_humidity, high_res=False)
2591 if outdoor_humidity is not None
2592 else "EF"
2593 )
2594 payload += hex_from_temp(exhaust_temp) if exhaust_temp is not None else "7FFF"
2595 payload += hex_from_temp(supply_temp) if supply_temp is not None else "7FFF"
2596 payload += hex_from_temp(indoor_temp) if indoor_temp is not None else "7FFF"
2597 payload += hex_from_temp(outdoor_temp) if outdoor_temp is not None else "7FFF"
2598 payload += (
2599 f"{capability_bits(speed_capabilities):04X}"
2600 if speed_capabilities is not None
2601 else "7FFF"
2602 )
2603 payload += (
2604 hex_from_percent(bypass_position, high_res=True)
2605 if bypass_position is not None
2606 else "EF"
2607 )
2608 payload += (
2609 f"{(fan_info_to_byte(fan_info) | fan_info_flags(_unknown_fan_info_flags)):02X}"
2610 if fan_info is not None
2611 else "EF"
2612 )
2613 payload += (
2614 hex_from_percent(exhaust_fan_speed, high_res=True)
2615 if exhaust_fan_speed is not None
2616 else "FF"
2617 )
2618 payload += (
2619 hex_from_percent(supply_fan_speed, high_res=True)
2620 if supply_fan_speed is not None
2621 else "FF"
2622 )
2623 payload += f"{remaining_mins:04X}" if remaining_mins is not None else "7FFF"
2624 payload += f"{int(post_heat * 200):02X}" if post_heat is not None else "EF"
2625 payload += f"{int(pre_heat * 200):02X}" if pre_heat is not None else "EF"
2626 payload += (
2627 f"{(int(supply_flow * 100)):04X}" if supply_flow is not None else "7FFF"
2628 )
2629 payload += (
2630 f"{(int(exhaust_flow * 100)):04X}" if exhaust_flow is not None else "7FFF"
2631 )
2632 payload += extra
2634 return cls._from_attrs(I_, Code._31DA, payload, addr0=dev_id, addr2=dev_id)
2636 @classmethod # constructor for RQ|3220
2637 def get_opentherm_data(cls, otb_id: DeviceIdT | str, msg_id: int | str) -> Command:
2638 """Request OpenTherm protocol data from a device. (c.f. parser_3220)
2640 This method constructs a command to request data from an OpenTherm compatible
2641 device using the OpenTherm protocol. It sends a Read-Data request for a
2642 specific data ID to the target device.
2644 :param otb_id: The device ID of the OpenTherm bridge/controller
2645 :type otb_id: DeviceIdT | str
2646 :param msg_id: The OpenTherm message ID to request. Can be specified as:
2647 - Integer (e.g., 0 for Status)
2648 - Hex string (e.g., '00' for Status)
2649 See OpenTherm specification for valid message IDs.
2650 :type msg_id: int | str
2651 :return: A configured Command object ready to be sent to the device.
2652 :rtype: Command
2654 .. note::
2655 The OpenTherm protocol is used for communication between heating systems
2656 and thermostats. Common message IDs include:
2657 - 0x00: Status (0x00)
2658 - 0x01: Control setpoint (0x01)
2659 - 0x11: Relative modulation level (0x11)
2660 - 0x12: CH water pressure (0x12)
2661 - 0x19: Boiler water temperature (0x19)
2662 - 0x1A: DHW temperature (0x1A)
2663 - 0x71: DHW setpoint (0x71)
2665 The response will contain the requested data in the OpenTherm format,
2666 which includes status flags and the data value.
2668 The command automatically handles the parity bit required by the
2669 OpenTherm protocol.
2670 """
2671 msg_id = msg_id if isinstance(msg_id, int) else int(msg_id, 16)
2672 payload = f"0080{msg_id:02X}0000" if parity(msg_id) else f"0000{msg_id:02X}0000"
2673 return cls.from_attrs(RQ, otb_id, Code._3220, payload)
2675 @classmethod # constructor for I|3EF0 # TODO: trap corrupt states?
2676 def put_actuator_state(
2677 cls, dev_id: DeviceIdT | str, modulation_level: float
2678 ) -> Command:
2679 """Announce the current modulation level of a heating actuator. (c.f. parser_3ef0)
2680 This is for use by a faked BDR91A or similar.
2682 This method constructs an I-type (unsolicited) command to report the current
2683 modulation level of a heating actuator, such as a BDR91A relay. The modulation
2684 level represents the current output state of the actuator as a percentage.
2686 :param dev_id: The device ID of the actuator (e.g., '13:123456').
2687 Must be a device type compatible with BDR91A.
2688 :type dev_id: DeviceIdT | str
2689 :param modulation_level: The current modulation level as a float between 0.0 and 1.0.
2690 - 0.0: Actuator is fully off
2691 - 1.0: Actuator is fully on
2692 - Values in between represent partial modulation (if supported)
2693 - None: Indicates an error or unknown state
2694 :type modulation_level: float | None
2695 :return: A configured Command object ready to be sent to the system.
2696 :rtype: Command
2697 :raises CommandInvalid: If the device ID is not a valid BDR-type device.
2699 .. note::
2700 This is primarily used for testing or simulating BDR91A relay modules.
2701 The modulation level is converted to a percentage (0-100%) with 0.5% resolution.
2703 The command uses the 3EF0 code, which is specifically designed for
2704 reporting actuator states in the RAMSES-II protocol.
2705 """
2706 # .I --- 13:049798 --:------ 13:049798 3EF0 003 00C8FF
2707 # .I --- 13:106039 --:------ 13:106039 3EF0 003 0000FF
2709 if dev_id[:2] != DEV_TYPE_MAP.BDR:
2710 raise exc.CommandInvalid(
2711 f"Faked device {dev_id} has an unsupported device type: "
2712 f"device_id should be like {DEV_TYPE_MAP.BDR}:xxxxxx"
2713 )
2715 payload = (
2716 "007FFF"
2717 if modulation_level is None
2718 else f"00{int(modulation_level * 200):02X}FF"
2719 )
2720 return cls._from_attrs(I_, Code._3EF0, payload, addr0=dev_id, addr2=dev_id)
2722 @classmethod # constructor for RP|3EF1 (I|3EF1?) # TODO: trap corrupt values?
2723 def put_actuator_cycle(
2724 cls,
2725 src_id: DeviceIdT | str,
2726 dst_id: DeviceIdT | str,
2727 modulation_level: float,
2728 actuator_countdown: int,
2729 *,
2730 cycle_countdown: int | None = None,
2731 ) -> Command:
2732 """Announce the internal cycling state of a heating actuator. (c.f. parser_3ef1)
2733 This is for use by a faked BDR91A or similar.
2735 This method constructs an RP-type (request/response) command to report the
2736 internal cycling state of a heating actuator, such as a BDR91A relay. It provides
2737 detailed timing information about the actuator's modulation cycle.
2739 :param src_id: The device ID of the actuator sending the report (e.g., '13:123456').
2740 Must be a device type compatible with BDR91A.
2741 :type src_id: DeviceIdT | str
2742 :param dst_id: The device ID of the intended recipient of this report.
2743 :type dst_id: DeviceIdT | str
2744 :param modulation_level: The current modulation level as a float between 0.0 and 1.0.
2745 - 0.0: Actuator is fully off
2746 - 1.0: Actuator is fully on
2747 - Values in between represent partial modulation (if supported)
2748 :type modulation_level: float
2749 :param actuator_countdown: Time in seconds until the next actuator cycle state change.
2750 This is used for PWM (Pulse Width Modulation) control.
2751 :type actuator_countdown: int
2752 :param cycle_countdown: Optional time in seconds until the next complete cycle.
2753 If None, indicates the cycle is not currently active.
2754 :type cycle_countdown: int | None
2755 :return: A configured Command object ready to be sent to the system.
2756 :rtype: Command
2757 :raises CommandInvalid: If the source device ID is not a valid BDR-type device.
2759 .. note::
2760 This is primarily used for testing or simulating BDR91A relay modules.
2761 The method automatically handles the conversion of timing values to the
2762 appropriate hexadecimal format required by the RAMSES-II protocol.
2764 The command uses the 3EF1 code, which is specifically designed for
2765 reporting detailed actuator cycling information.
2766 """
2767 # RP --- 13:049798 18:006402 --:------ 3EF1 007 00-0126-0126-00-FF
2769 if src_id[:2] != DEV_TYPE_MAP.BDR:
2770 raise exc.CommandInvalid(
2771 f"Faked device {src_id} has an unsupported device type: "
2772 f"device_id should be like {DEV_TYPE_MAP.BDR}:xxxxxx"
2773 )
2775 payload = "00"
2776 payload += f"{cycle_countdown:04X}" if cycle_countdown is not None else "7FFF"
2777 payload += f"{actuator_countdown:04X}"
2778 payload += hex_from_percent(modulation_level)
2779 payload += "FF"
2780 return cls._from_attrs(RP, Code._3EF1, payload, addr0=src_id, addr1=dst_id)
2782 @classmethod # constructor for internal use only
2783 def _puzzle(cls, msg_type: str | None = None, message: str = "") -> Command:
2784 """Construct a puzzle command used for device discovery and version reporting.
2786 This internal method creates a special 'puzzle' command used during device
2787 discovery and version reporting. The command format varies based on the
2788 message type and content.
2790 :param msg_type: The type of puzzle message to create. If None, it will be
2791 automatically determined based on the presence of a message:
2792 - '10': Version request (empty message)
2793 - '12': Version response (with message)
2794 Other valid types include '11' and '13' for specific message formats,
2795 and '20' and above for timestamp-based messages.
2796 :type msg_type: str | None
2797 :param message: The message content to include in the puzzle.
2798 Format depends on msg_type:
2799 - For type '10': Should be empty (version request)
2800 - For type '11': Should be a 10-character string (MAC address)
2801 - For type '12': Version string (e.g., 'v0.20.0')
2802 - For other types: Arbitrary message content
2803 :type message: str
2804 :return: A configured Command object with the puzzle message.
2805 :rtype: Command
2806 :raises AssertionError: If msg_type is not in LOOKUP_PUZZ.
2808 .. note::
2809 This is an internal method used by the RAMSES-II protocol for device
2810 discovery and version reporting. The message format varies:
2812 - Type '10': Version request (empty message)
2813 - Type '11': MAC address report (special format)
2814 - Type '12': Version response (includes version string)
2815 - Type '13': Basic message (no timestamp)
2816 - Type '20+': Timestamped message (high precision)
2818 The method automatically handles timestamp generation and message
2819 formatting based on the message type.
2820 """
2821 if msg_type is None:
2822 msg_type = "12" if message else "10"
2824 assert msg_type in LOOKUP_PUZZ, f"Invalid/deprecated Puzzle type: {msg_type}"
2826 payload = f"00{msg_type}"
2828 if int(msg_type, 16) >= int("20", 16):
2829 payload += f"{int(timestamp() * 1e7):012X}"
2830 elif msg_type != "13":
2831 payload += f"{int(timestamp() * 1000):012X}"
2833 if msg_type == "10":
2834 payload += hex_from_str(f"v{VERSION}")
2835 elif msg_type == "11":
2836 payload += hex_from_str(message[:4] + message[5:7] + message[8:])
2837 else:
2838 payload += hex_from_str(message)
2840 return cls.from_attrs(I_, ALL_DEV_ADDR.id, Code._PUZZ, payload[:48])
2843# A convenience dict
2844CODE_API_MAP = {
2845 f"{RP}|{Code._3EF1}": Command.put_actuator_cycle, # . has a test (RP, not I)
2846 f"{I_}|{Code._3EF0}": Command.put_actuator_state,
2847 f"{I_}|{Code._1FC9}": Command.put_bind,
2848 f"{W_}|{Code._1FC9}": Command.put_bind, # NOTE: same class method as I|1FC9
2849 f"{W_}|{Code._22F7}": Command.set_bypass_position,
2850 f"{I_}|{Code._1298}": Command.put_co2_level, # . has a test
2851 f"{RQ}|{Code._1F41}": Command.get_dhw_mode,
2852 f"{W_}|{Code._1F41}": Command.set_dhw_mode, # . has a test
2853 f"{RQ}|{Code._10A0}": Command.get_dhw_params,
2854 f"{W_}|{Code._10A0}": Command.set_dhw_params, # . has a test
2855 f"{RQ}|{Code._1260}": Command.get_dhw_temp,
2856 f"{I_}|{Code._1260}": Command.put_dhw_temp, # . has a test (empty)
2857 f"{I_}|{Code._22F1}": Command.set_fan_mode,
2858 f"{W_}|{Code._2411}": Command.set_fan_param,
2859 f"{I_}|{Code._12A0}": Command.put_indoor_humidity, # . has a test
2860 f"{RQ}|{Code._1030}": Command.get_mix_valve_params,
2861 f"{W_}|{Code._1030}": Command.set_mix_valve_params, # . has a test
2862 f"{RQ}|{Code._3220}": Command.get_opentherm_data,
2863 f"{I_}|{Code._1290}": Command.put_outdoor_temp,
2864 f"{I_}|{Code._2E10}": Command.put_presence_detected,
2865 f"{RQ}|{Code._0008}": Command.get_relay_demand,
2866 f"{RQ}|{Code._0404}": Command.get_schedule_fragment, # . has a test
2867 f"{W_}|{Code._0404}": Command.set_schedule_fragment,
2868 f"{RQ}|{Code._0006}": Command.get_schedule_version,
2869 f"{I_}|{Code._30C9}": Command.put_sensor_temp, # . has a test
2870 f"{RQ}|{Code._0100}": Command.get_system_language,
2871 f"{RQ}|{Code._0418}": Command.get_system_log_entry,
2872 f"{RQ}|{Code._2E04}": Command.get_system_mode, # . has a test
2873 f"{W_}|{Code._2E04}": Command.set_system_mode,
2874 f"{RQ}|{Code._313F}": Command.get_system_time,
2875 f"{W_}|{Code._313F}": Command.set_system_time, # . has a test
2876 f"{RQ}|{Code._1100}": Command.get_tpi_params,
2877 f"{W_}|{Code._1100}": Command.set_tpi_params, # . has a test
2878 f"{I_}|{Code._0002}": Command.put_weather_temp,
2879 f"{RQ}|{Code._000A}": Command.get_zone_config,
2880 f"{W_}|{Code._000A}": Command.set_zone_config, # . has a test
2881 f"{RQ}|{Code._2349}": Command.get_zone_mode,
2882 f"{W_}|{Code._2349}": Command.set_zone_mode, # . has a test
2883 f"{RQ}|{Code._0004}": Command.get_zone_name,
2884 f"{W_}|{Code._0004}": Command.set_zone_name, # . has a test
2885 f"{RQ}|{Code._2309}": Command.get_zone_setpoint,
2886 f"{W_}|{Code._2309}": Command.set_zone_setpoint, # . has a test
2887 f"{RQ}|{Code._30C9}": Command.get_zone_temp,
2888 f"{RQ}|{Code._12B0}": Command.get_zone_window_state,
2889 f"{I_}|{Code._31DA}": Command.get_hvac_fan_31da, # . has a test
2890} # TODO: RQ|0404 (Zone & DHW)