Coverage for src/ramses_tx/schemas.py: 77%
132 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - a RAMSES-II protocol decoder & analyser.
4:term:`Schema` processor for protocol (lower) layer.
5"""
7from __future__ import annotations
9import logging
10from collections.abc import Callable
11from typing import Any, Final, Never, NewType, TypeAlias, TypedDict, TypeVar
13import voluptuous as vol
15from .const import (
16 DEFAULT_ECHO_TIMEOUT,
17 DEFAULT_RPLY_TIMEOUT,
18 DEV_TYPE_MAP,
19 DEVICE_ID_REGEX,
20 MAX_DUTY_CYCLE_RATE,
21 MIN_INTER_WRITE_GAP,
22)
24_LOGGER = logging.getLogger(__name__)
27#
28# 0/5: Packet source configuration
29SZ_COMMS_PARAMS: Final = "comms_params"
30SZ_DUTY_CYCLE_LIMIT: Final = "duty_cycle_limit"
31SZ_GAP_BETWEEN_WRITES: Final = "gap_between_writes"
32SZ_ECHO_TIMEOUT: Final = "echo_timeout"
33SZ_RPLY_TIMEOUT: Final = "reply_timeout"
35SCH_COMMS_PARAMS = vol.Schema(
36 {
37 vol.Required(SZ_DUTY_CYCLE_LIMIT, default=MAX_DUTY_CYCLE_RATE): vol.All(
38 float, vol.Range(min=0.005, max=0.2)
39 ),
40 vol.Required(SZ_GAP_BETWEEN_WRITES, default=MIN_INTER_WRITE_GAP): vol.All(
41 float, vol.Range(min=0.05, max=1.0)
42 ),
43 vol.Required(SZ_ECHO_TIMEOUT, default=DEFAULT_ECHO_TIMEOUT): vol.All(
44 float, vol.Range(min=0.01, max=1.0)
45 ),
46 vol.Required(SZ_RPLY_TIMEOUT, default=DEFAULT_RPLY_TIMEOUT): vol.All(
47 float, vol.Range(min=0.01, max=1.0)
48 ),
49 },
50 extra=vol.PREVENT_EXTRA,
51)
53#
54# 1/5: Packet source configuration
55SZ_INPUT_FILE: Final = "input_file"
56SZ_PACKET_SOURCE: Final = "packet_source"
59#
60# 2/5: Packet log configuration
61SZ_FILE_NAME: Final = "file_name"
62SZ_PACKET_LOG: Final = "packet_log"
63SZ_ROTATE_BACKUPS: Final = "rotate_backups"
64SZ_ROTATE_BYTES: Final = "rotate_bytes"
67class PktLogConfigT(TypedDict):
68 file_name: str
69 rotate_backups: int
70 rotate_bytes: int | None
73def sch_packet_log_dict_factory(
74 default_backups: int = 0,
75) -> dict[vol.Required, vol.Any]:
76 """
77 :return: a packet log dict with a configurable default rotation policy.
79 Usage:
81 .. code-block::
83 SCH_PACKET_LOG_7 = vol.Schema(
84 packet_log_dict_factory(default_backups=7), extra=vol.PREVENT_EXTRA
85 )
87 """
89 SCH_PACKET_LOG_CONFIG = vol.Schema(
90 {
91 vol.Optional(SZ_ROTATE_BACKUPS, default=default_backups): vol.Any(
92 None, int
93 ),
94 vol.Optional(SZ_ROTATE_BYTES): vol.Any(None, int),
95 },
96 extra=vol.PREVENT_EXTRA,
97 )
99 SCH_PACKET_LOG_NAME = str
101 def NormalisePacketLog(rotate_backups: int = 0) -> Callable[..., Any]:
102 def normalise_packet_log(node_value: str | PktLogConfigT) -> PktLogConfigT:
103 if isinstance(node_value, str):
104 return {
105 SZ_FILE_NAME: node_value,
106 SZ_ROTATE_BACKUPS: rotate_backups,
107 SZ_ROTATE_BYTES: None,
108 }
109 return node_value
111 return normalise_packet_log
113 return { # SCH_PACKET_LOG_DICT
114 vol.Required(SZ_PACKET_LOG, default=None): vol.Any(
115 None,
116 vol.All(
117 SCH_PACKET_LOG_NAME,
118 NormalisePacketLog(rotate_backups=default_backups),
119 ),
120 SCH_PACKET_LOG_CONFIG.extend(
121 {vol.Required(SZ_FILE_NAME): SCH_PACKET_LOG_NAME}
122 ),
123 )
124 }
127SCH_PACKET_LOG = vol.Schema(
128 sch_packet_log_dict_factory(default_backups=7), extra=vol.PREVENT_EXTRA
129)
131#
132# 3/5: Serial port configuration
133SZ_PORT_CONFIG: Final = "port_config"
134SZ_PORT_NAME: Final = "port_name"
135SZ_SERIAL_PORT: Final = "serial_port"
137SZ_BAUDRATE: Final = "baudrate"
138SZ_DSRDTR: Final = "dsrdtr"
139SZ_RTSCTS: Final = "rtscts"
140SZ_TIMEOUT: Final = "timeout"
141SZ_XONXOFF: Final = "xonxoff"
144SCH_SERIAL_PORT_CONFIG = vol.Schema(
145 {
146 vol.Optional(SZ_BAUDRATE, default=115200): vol.All(
147 vol.Coerce(int), vol.Any(57600, 115200)
148 ), # NB: HGI80 does not work, except at 115200 - so must be default
149 vol.Optional(SZ_DSRDTR, default=False): bool,
150 vol.Optional(SZ_RTSCTS, default=False): bool,
151 vol.Optional(SZ_TIMEOUT, default=0): vol.Any(None, int), # default None?
152 vol.Optional(SZ_XONXOFF, default=True): bool, # set True to remove \x11
153 },
154 extra=vol.PREVENT_EXTRA,
155)
158class PortConfigT(TypedDict):
159 baudrate: int # 57600, 115200
160 dsrdtr: bool
161 rtscts: bool
162 timeout: int
163 xonxoff: bool
166def sch_serial_port_dict_factory() -> dict[vol.Required, vol.Any]:
167 """Return a serial port dict.
169 Usage:
171 .. code-block::
173 SCH_SERIAL_PORT = vol.Schema(
174 sch_serial_port_dict_factory(), extra=vol.PREVENT_EXTRA
175 )
176 """
178 SCH_SERIAL_PORT_NAME = str
180 def NormaliseSerialPort() -> Callable[[str | PortConfigT], PortConfigT]:
181 def normalise_serial_port(node_value: str | PortConfigT) -> PortConfigT:
182 if isinstance(node_value, str):
183 return {SZ_PORT_NAME: node_value} | SCH_SERIAL_PORT_CONFIG({}) # type: ignore[no-any-return]
184 return node_value
186 return normalise_serial_port
188 return { # SCH_SERIAL_PORT_DICT
189 vol.Required(SZ_SERIAL_PORT): vol.Any(
190 vol.All(
191 SCH_SERIAL_PORT_NAME,
192 NormaliseSerialPort(),
193 ),
194 SCH_SERIAL_PORT_CONFIG.extend(
195 {vol.Required(SZ_PORT_NAME): SCH_SERIAL_PORT_NAME}
196 ),
197 )
198 }
201def extract_serial_port(ser_port_dict: dict[str, Any]) -> tuple[str, PortConfigT]:
202 """Extract a serial port, port_config_dict tuple from a sch_serial_port_dict."""
203 port_name: str = ser_port_dict.get(SZ_PORT_NAME) # type: ignore[assignment]
204 port_config = {k: v for k, v in ser_port_dict.items() if k != SZ_PORT_NAME}
205 return port_name, port_config # type: ignore[return-value]
208#
209# 4/5: Traits (of devices) configuration (basic)
211_T = TypeVar("_T")
214def ConvertNullToDict() -> Callable[[_T | None], _T | dict[Never, Never]]:
215 def convert_null_to_dict(node_value: _T | None) -> _T | dict[Never, Never]:
216 if node_value is None:
217 return {}
218 return node_value
220 return convert_null_to_dict
223SZ_ALIAS: Final = "alias"
224SZ_BOUND_TO: Final = "bound"
225SZ_CLASS: Final = "class"
226SZ_FAKED: Final = "faked"
227SZ_SCHEME: Final = "scheme"
229SZ_BLOCK_LIST: Final = "block_list"
230SZ_KNOWN_LIST: Final = "known_list"
232SCH_DEVICE_ID_ANY = vol.Match(DEVICE_ID_REGEX.ANY)
233SCH_DEVICE_ID_SEN = vol.Match(DEVICE_ID_REGEX.SEN)
234SCH_DEVICE_ID_CTL = vol.Match(DEVICE_ID_REGEX.CTL)
235SCH_DEVICE_ID_DHW = vol.Match(DEVICE_ID_REGEX.DHW)
236SCH_DEVICE_ID_HGI = vol.Match(DEVICE_ID_REGEX.HGI)
237SCH_DEVICE_ID_APP = vol.Match(DEVICE_ID_REGEX.APP)
238SCH_DEVICE_ID_BDR = vol.Match(DEVICE_ID_REGEX.BDR)
239SCH_DEVICE_ID_UFC = vol.Match(DEVICE_ID_REGEX.UFC)
241_SCH_TRAITS_DOMAINS = ("heat", "hvac")
242_SCH_TRAITS_HVAC_SCHEMES = ("itho", "nuaire", "orcon", "vasco", "climarad")
245DeviceTraitsT = TypedDict(
246 "DeviceTraitsT",
247 {
248 "alias": str | None,
249 "faked": bool | None,
250 "class": str | None,
251 },
252)
255def sch_global_traits_dict_factory(
256 heat_traits: dict[vol.Optional, vol.Any] | None = None,
257 hvac_traits: dict[vol.Optional, vol.Any] | None = None,
258) -> tuple[dict[vol.Optional, vol.Any], vol.Any]:
259 """Return a global traits dict with a configurable extra traits.
261 Usage:
263 .. code-block::
265 SCH_GLOBAL_TRAITS = vol.Schema(
266 sch_global_traits_dict(heat=traits), extra=vol.PREVENT_EXTRA
267 )
268 """
270 heat_traits = heat_traits or {}
271 hvac_traits = hvac_traits or {}
273 SCH_TRAITS_BASE = vol.Schema(
274 {
275 vol.Optional(SZ_ALIAS, default=None): vol.Any(None, str),
276 vol.Optional(SZ_FAKED, default=None): vol.Any(None, bool),
277 vol.Optional(vol.Remove("_note")): str, # only for convenience, not used
278 },
279 extra=vol.PREVENT_EXTRA,
280 )
282 # NOTE: voluptuous doesn't like StrEnums, hence str(s)
283 # TIP: the _domain key can be used to force which traits schema to use
284 heat_slugs = list(
285 str(s) for s in DEV_TYPE_MAP.slugs() if s not in DEV_TYPE_MAP.HVAC_SLUGS
286 )
287 SCH_TRAITS_HEAT = SCH_TRAITS_BASE.extend(
288 {
289 vol.Optional("_domain", default="heat"): "heat",
290 vol.Optional(SZ_CLASS): vol.Any(
291 None, *heat_slugs, *(str(DEV_TYPE_MAP[s]) for s in heat_slugs)
292 ),
293 }
294 )
295 SCH_TRAITS_HEAT = SCH_TRAITS_HEAT.extend(
296 heat_traits,
297 extra=vol.PREVENT_EXTRA, # Always prevent extra keys
298 )
300 # NOTE: voluptuous doesn't like StrEnums, hence str(s)
301 hvac_slugs = list(str(s) for s in DEV_TYPE_MAP.HVAC_SLUGS)
302 SCH_TRAITS_HVAC = SCH_TRAITS_BASE.extend(
303 {
304 vol.Optional("_domain", default="hvac"): "hvac",
305 vol.Optional(SZ_CLASS, default="HVC"): vol.Any(
306 None, *hvac_slugs, *(str(DEV_TYPE_MAP[s]) for s in hvac_slugs)
307 ), # TODO: consider removing None
308 # Add 'bound' trait for FAN devices
309 vol.Optional(SZ_BOUND_TO): vol.Any(None, vol.Match(DEVICE_ID_REGEX.ANY)),
310 }
311 )
312 SCH_TRAITS_HVAC = SCH_TRAITS_HVAC.extend(
313 {vol.Optional(SZ_SCHEME): vol.Any(*_SCH_TRAITS_HVAC_SCHEMES)}
314 )
315 SCH_TRAITS_HVAC = SCH_TRAITS_HVAC.extend(
316 hvac_traits,
317 extra=vol.PREVENT_EXTRA, # Always prevent extra keys
318 )
320 SCH_TRAITS = vol.Any(
321 vol.All(None, ConvertNullToDict()),
322 vol.Any(SCH_TRAITS_HEAT, SCH_TRAITS_HVAC),
323 extra=vol.PREVENT_EXTRA,
324 )
325 SCH_DEVICE = vol.Schema(
326 {vol.Optional(SCH_DEVICE_ID_ANY): SCH_TRAITS},
327 extra=vol.PREVENT_EXTRA,
328 )
330 global_traits_dict = { # Filter lists with Device traits...
331 vol.Optional(SZ_KNOWN_LIST, default={}): vol.Any(
332 vol.All(None, ConvertNullToDict()),
333 vol.All(SCH_DEVICE, vol.Length(min=0)),
334 ),
335 vol.Optional(SZ_BLOCK_LIST, default={}): vol.Any(
336 vol.All(None, ConvertNullToDict()),
337 vol.All(SCH_DEVICE, vol.Length(min=0)),
338 ),
339 }
341 return global_traits_dict, SCH_TRAITS
344SCH_GLOBAL_TRAITS_DICT, SCH_TRAITS = sch_global_traits_dict_factory()
346#
347# Device lists (Engine configuration)
349DeviceIdT = NewType("DeviceIdT", str) # TypeVar('DeviceIdT', bound=str) #
350DevIndexT = NewType("DevIndexT", str)
351DeviceListT: TypeAlias = dict[DeviceIdT, DeviceTraitsT]
354def select_device_filter_mode(
355 enforce_known_list: bool,
356 known_list: DeviceListT,
357 block_list: DeviceListT,
358) -> bool:
359 """
360 Determine which device filter to use, if any.
362 Either:
363 - block if device_id in block_list (could be empty), otherwise
364 - allow if device_id in known_list, or
365 """
367 # warn if not has_exactly_one_valid_hgi(known_list)
369 if enforce_known_list and not known_list:
370 _LOGGER.warning(
371 f"Best practice is to enforce a {SZ_KNOWN_LIST} (an allow list), "
372 f"but it is empty, so it can't be used "
373 )
374 enforce_known_list = False
376 if enforce_known_list:
377 _LOGGER.info(
378 f"A valid {SZ_KNOWN_LIST} was provided, "
379 f"and will be enforced as a allow list: length = {len(known_list)}"
380 )
381 _LOGGER.debug(f"known_list = {known_list}")
383 elif block_list:
384 _LOGGER.info(
385 f"A valid {SZ_BLOCK_LIST} was provided, "
386 f"and will be used as a deny list: length = {len(block_list)}"
387 )
388 _LOGGER.debug(f"block_list = {block_list}")
390 elif known_list:
391 _LOGGER.warning(
392 f"Best practice is to enforce the {SZ_KNOWN_LIST} as an allow list, "
393 f"configure: {SZ_ENFORCE_KNOWN_LIST} = True"
394 )
395 _LOGGER.debug(f"known_list = {known_list}")
397 else:
398 _LOGGER.warning(
399 f"Best practice is to provide a {SZ_KNOWN_LIST} and enforce it, "
400 f"configure: {SZ_ENFORCE_KNOWN_LIST} = True"
401 )
403 return enforce_known_list
406#
407# 5/5: Gateway (engine) configuration
409SZ_DISABLE_SENDING: Final = "disable_sending"
410SZ_DISABLE_QOS: Final = "disable_qos"
411SZ_ENFORCE_KNOWN_LIST: Final[str] = f"enforce_{SZ_KNOWN_LIST}"
412SZ_EVOFW_FLAG: Final = "evofw_flag"
413SZ_SQLITE_INDEX: Final = (
414 "sqlite_index" # temporary 0.52.x SQLite dev config option in ramses_cc
415)
416SZ_LOG_ALL_MQTT: Final = "log_all_mqtt"
417SZ_USE_REGEX: Final = "use_regex"
419SCH_ENGINE_DICT = {
420 vol.Optional(SZ_DISABLE_SENDING, default=False): bool,
421 vol.Optional(SZ_DISABLE_QOS, default=None): vol.Any(
422 None, # None is selective QoS (e.g. QoS only for bindings, schedule, etc.)
423 bool,
424 ), # in the long term, this default to be True (not None)
425 vol.Optional(SZ_ENFORCE_KNOWN_LIST, default=False): bool,
426 vol.Optional(SZ_EVOFW_FLAG): vol.Any(None, str),
427 # vol.Optional(SZ_PORT_CONFIG): SCH_SERIAL_PORT_CONFIG,
428 vol.Optional(
429 SZ_SQLITE_INDEX, default=False
430 ): bool, # temporary 0.52.x dev config option
431 vol.Optional(
432 SZ_LOG_ALL_MQTT, default=False
433 ): bool, # log all incoming MQTT traffic config option
434 vol.Optional(SZ_USE_REGEX): dict, # vol.All(ConvertNullToDict(), dict),
435 vol.Optional(SZ_COMMS_PARAMS): SCH_COMMS_PARAMS,
436}
437SCH_ENGINE_CONFIG = vol.Schema(SCH_ENGINE_DICT, extra=vol.REMOVE_EXTRA)
439SZ_INBOUND: Final = "inbound" # for use_regex (intentionally obscured)
440SZ_OUTBOUND: Final = "outbound"