Coverage for src/ramses_cli/discovery.py: 29%
185 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:44 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:44 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - discovery scripts."""
4from __future__ import annotations
6import asyncio
7import functools
8import json
9import logging
10import re
11from collections.abc import Callable
12from typing import TYPE_CHECKING, Any, Final
14from ramses_rf import exceptions as exc
15from ramses_rf.const import SZ_SCHEDULE, SZ_ZONE_IDX
16from ramses_rf.device import Fakeable
17from ramses_tx import CODES_SCHEMA, Command, DeviceIdT, Priority
18from ramses_tx.opentherm import OTB_DATA_IDS
20# Beware, none of this is reliable - it is all subject to random change
21# However, these serve as examples how to use the other modules
24from ramses_rf.const import ( # noqa: F401, isort: skip, pylint: disable=unused-import
25 I_,
26 RP,
27 RQ,
28 W_,
29 Code,
30)
32if TYPE_CHECKING:
33 from ramses_rf import Gateway, IndexT
36EXEC_CMD: Final = "exec_cmd"
37GET_FAULTS: Final = "get_faults"
38GET_SCHED: Final = "get_schedule"
39SET_SCHED: Final = "set_schedule"
41EXEC_SCR: Final = "exec_scr"
42SCAN_DISC: Final = "scan_disc"
43SCAN_FULL: Final = "scan_full"
44SCAN_HARD: Final = "scan_hard"
45SCAN_XXXX: Final = "scan_xxxx"
47# DEVICE_ID_REGEX = re.compile(DEVICE_ID_REGEX.ANY)
50_LOGGER = logging.getLogger(__name__)
53def script_decorator(fnc: Callable[..., Any]) -> Callable[..., Any]:
54 @functools.wraps(fnc)
55 def wrapper(gwy: Gateway, *args: Any, **kwargs: Any) -> None:
56 gwy.send_cmd(
57 Command._puzzle(message="Script begins:"),
58 priority=Priority.HIGHEST,
59 num_repeats=3,
60 )
62 fnc(gwy, *args, **kwargs)
64 gwy.send_cmd(
65 Command._puzzle(message="Script done."),
66 priority=Priority.LOWEST,
67 num_repeats=3,
68 )
70 return None
72 return wrapper
75def spawn_scripts(gwy: Gateway, **kwargs: Any) -> list[asyncio.Task[None]]:
76 tasks = []
78 if kwargs.get(EXEC_CMD):
79 tasks += [asyncio.create_task(exec_cmd(gwy, **kwargs))]
81 if kwargs.get(GET_FAULTS):
82 tasks += [asyncio.create_task(get_faults(gwy, kwargs[GET_FAULTS]))]
84 elif kwargs.get(GET_SCHED) and kwargs[GET_SCHED][0]:
85 tasks += [asyncio.create_task(get_schedule(gwy, *kwargs[GET_SCHED]))]
87 elif kwargs.get(SET_SCHED) and kwargs[SET_SCHED][0]:
88 tasks += [asyncio.create_task(set_schedule(gwy, *kwargs[SET_SCHED]))]
90 elif kwargs.get(EXEC_SCR):
91 script = SCRIPTS.get(f"{kwargs[EXEC_SCR][0]}")
92 if script is None:
93 _LOGGER.warning(f"Script: {kwargs[EXEC_SCR][0]}() - unknown script")
94 else:
95 _LOGGER.info(f"Script: {kwargs[EXEC_SCR][0]}().- starts...")
96 tasks += [asyncio.create_task(script(gwy, kwargs[EXEC_SCR][1]))]
98 gwy._tasks.extend(tasks)
99 return tasks
102async def exec_cmd(gwy: Gateway, **kwargs: Any) -> None:
103 cmd = Command.from_cli(kwargs[EXEC_CMD])
104 await gwy.async_send_cmd(cmd, priority=Priority.HIGH, wait_for_reply=True)
107# @script_decorator
108# async def script_scan_001(gwy: Gateway, dev_id: DeviceIdT):
109# _LOGGER.warning("scan_001() invoked - expect a lot of nonsense")
110# for idx in range(0x10):
111# gwy.send_cmd(Command.from_attrs(W_, dev_id, Code._000E, f"{idx:02X}0050"))
112# gwy.send_cmd(Command.from_attrs(RQ, dev_id, Code._000E, f"{idx:02X}00C8"))
115async def get_faults(
116 gwy: Gateway, ctl_id: DeviceIdT, start: int = 0, limit: int = 0x3F
117) -> None:
118 ctl = gwy.get_device(ctl_id)
120 try:
121 await ctl.tcs.get_faultlog(start=start, limit=limit) # 0418
122 except exc.ExpiredCallbackError as err:
123 _LOGGER.error("get_faults(): Function timed out: %s", err)
126async def get_schedule(gwy: Gateway, ctl_id: DeviceIdT, zone_idx: str) -> None:
127 zone = gwy.get_device(ctl_id).tcs.get_htg_zone(zone_idx)
129 try:
130 await zone.get_schedule()
131 except exc.ExpiredCallbackError as err:
132 _LOGGER.error("get_schedule(): Function timed out: %s", err)
135async def set_schedule(gwy: Gateway, ctl_id: DeviceIdT, schedule: str) -> None:
136 schedule_ = json.loads(schedule)
137 zone_idx = schedule_[SZ_ZONE_IDX]
139 zone = gwy.get_device(ctl_id).tcs.get_htg_zone(zone_idx)
141 try:
142 await zone.set_schedule(schedule_[SZ_SCHEDULE]) # 0404
143 except exc.ExpiredCallbackError as err:
144 _LOGGER.error("set_schedule(): Function timed out: %s", err)
147async def script_bind_req(
148 gwy: Gateway, dev_id: DeviceIdT, code: Code = Code._2309
149) -> None:
150 dev = gwy.get_device(dev_id)
151 assert isinstance(dev, Fakeable) # mypy
152 dev._make_fake()
153 await dev._initiate_binding_process([code])
156async def script_bind_wait(
157 gwy: Gateway, dev_id: DeviceIdT, code: Code = Code._2309, idx: IndexT = "00"
158) -> None:
159 dev = gwy.get_device(dev_id)
160 assert isinstance(dev, Fakeable) # mypy
161 dev._make_fake()
162 await dev._wait_for_binding_request([code], idx=idx)
165def script_poll_device(gwy: Gateway, dev_id: DeviceIdT) -> list[asyncio.Task[None]]:
166 async def periodic_send(
167 gwy: Gateway,
168 cmd: Command,
169 count: int = 1,
170 interval: float | None = None,
171 ) -> None:
172 async def periodic_(interval_: float) -> None:
173 await asyncio.sleep(interval_)
174 gwy.send_cmd(cmd, priority=Priority.LOW)
176 if interval is None:
177 interval = 0 if count == 1 else 60
179 if count <= 0:
180 while True:
181 await periodic_(interval)
182 else:
183 for _ in range(count):
184 await periodic_(interval)
186 _LOGGER.warning("poll_device() invoked...")
188 tasks = []
190 for code in (Code._0016, Code._1FC9):
191 cmd = Command.from_attrs(RQ, dev_id, code, "00")
192 tasks.append(asyncio.create_task(periodic_send(gwy, cmd, count=0)))
194 gwy._tasks.extend(tasks)
195 return tasks
198@script_decorator
199async def script_scan_disc(gwy: Gateway, dev_id: DeviceIdT) -> None:
200 _LOGGER.warning("scan_disc() invoked...")
202 await gwy.get_device(dev_id).discover() # discover_flag=Discover.DEFAULT)
205@script_decorator
206async def script_scan_full(gwy: Gateway, dev_id: DeviceIdT) -> None:
207 _LOGGER.warning("scan_full() invoked - expect a lot of Warnings")
209 gwy.send_cmd(Command.from_attrs(RQ, dev_id, Code._0016, "0000"), num_repeats=3)
211 for code in sorted(CODES_SCHEMA):
212 if code == Code._0005:
213 for zone_type in range(20): # known up to 18
214 gwy.send_cmd(Command.from_attrs(RQ, dev_id, code, f"00{zone_type:02X}"))
216 elif code == Code._000C:
217 for zone_idx in range(16): # also: FA-FF?
218 gwy.send_cmd(Command.from_attrs(RQ, dev_id, code, f"{zone_idx:02X}00"))
220 elif code == Code._0016:
221 continue
223 elif code in (Code._01D0, Code._01E9):
224 for zone_idx in ("00", "01", "FC"): # type: ignore[assignment]
225 gwy.send_cmd(Command.from_attrs(W_, dev_id, code, f"{zone_idx}00"))
226 gwy.send_cmd(Command.from_attrs(W_, dev_id, code, f"{zone_idx}03"))
228 elif code == Code._0404: # FIXME
229 gwy.send_cmd(Command.get_schedule_fragment(dev_id, "HW", 1, 0))
230 gwy.send_cmd(Command.get_schedule_fragment(dev_id, "00", 1, 0))
232 elif code == Code._0418:
233 for log_idx in range(2):
234 gwy.send_cmd(Command.get_system_log_entry(dev_id, log_idx))
236 elif code == Code._1100:
237 gwy.send_cmd(Command.get_tpi_params(dev_id))
239 elif code == Code._2E04:
240 gwy.send_cmd(Command.get_system_mode(dev_id))
242 elif code == Code._3220:
243 for data_id in (0, 3): # these are mandatory READ_DATA data_ids
244 gwy.send_cmd(Command.get_opentherm_data(dev_id, data_id))
246 elif code == Code._PUZZ:
247 continue
249 elif (
250 code in CODES_SCHEMA
251 and RQ in CODES_SCHEMA[code]
252 and re.match(CODES_SCHEMA[code][RQ], "00")
253 ):
254 gwy.send_cmd(Command.from_attrs(RQ, dev_id, code, "00"))
256 else:
257 gwy.send_cmd(Command.from_attrs(RQ, dev_id, code, "0000"))
259 # these are possible/difficult codes
260 for code in (Code._0150, Code._2389):
261 gwy.send_cmd(Command.from_attrs(RQ, dev_id, code, "0000"))
264@script_decorator
265async def script_scan_hard(
266 gwy: Gateway, dev_id: DeviceIdT, *, start_code: None | int = None
267) -> None:
268 _LOGGER.warning("scan_hard() invoked - expect some Warnings")
270 start_code = start_code or 0
272 for code in range(start_code, 0x5000):
273 await gwy.async_send_cmd(
274 Command.from_attrs(RQ, dev_id, f"{code:04X}", "0000"), # type:ignore[arg-type]
275 priority=Priority.LOW,
276 )
279@script_decorator
280async def script_scan_fan(gwy: Gateway, dev_id: DeviceIdT) -> None:
281 _LOGGER.warning("scan_fan() invoked - expect a lot of nonsense")
283 from ramses_tx.ramses import _DEV_KLASSES_HVAC
285 OUT_CODES = (
286 Code._0016,
287 Code._1470,
288 )
290 OLD_CODES = dict.fromkeys(
291 c for k in _DEV_KLASSES_HVAC.values() for c in k if c not in OUT_CODES
292 )
293 for code in OLD_CODES:
294 gwy.send_cmd(Command.from_attrs(RQ, dev_id, code, "00"))
296 NEW_CODES = (
297 Code._0150,
298 Code._042F,
299 Code._1030,
300 Code._10D0,
301 Code._10E1,
302 Code._2210,
303 Code._22B0,
304 Code._22E0,
305 Code._22E5,
306 Code._22E9,
307 Code._22F1,
308 Code._22F2,
309 Code._22F3,
310 Code._22F4,
311 Code._22F7,
312 Code._22F8,
313 Code._2400,
314 Code._2410,
315 Code._2420,
316 Code._313E,
317 Code._3221,
318 Code._3222,
319 )
321 for code in NEW_CODES:
322 if code not in OLD_CODES and code not in OUT_CODES:
323 gwy.send_cmd(Command.from_attrs(RQ, dev_id, code, "00"))
326@script_decorator
327async def script_scan_otb(gwy: Gateway, dev_id: DeviceIdT) -> None:
328 _LOGGER.warning("script_scan_otb_full invoked - expect a lot of nonsense")
330 for msg_id in OTB_DATA_IDS:
331 gwy.send_cmd(Command.get_opentherm_data(dev_id, msg_id))
334@script_decorator
335async def script_scan_otb_hard(gwy: Gateway, dev_id: DeviceIdT) -> None:
336 _LOGGER.warning("script_scan_otb_hard invoked - expect a lot of nonsense")
338 for msg_id in range(0x80):
339 gwy.send_cmd(Command.get_opentherm_data(dev_id, msg_id), priority=Priority.LOW)
342@script_decorator
343async def script_scan_otb_map(
344 gwy: Gateway, dev_id: DeviceIdT
345) -> None: # Tested only upon a R8820A
346 _LOGGER.warning("script_scan_otb_map invoked - expect a lot of nonsense")
348 RAMSES_TO_OPENTHERM = {
349 Code._22D9: "01", # boiler setpoint / ControlSetpoint
350 Code._3EF1: "11", # rel. modulation level / RelativeModulationLevel
351 Code._1300: "12", # cv water pressure / CHWaterPressure
352 Code._12F0: "13", # dhw_flow_rate / DHWFlowRate
353 Code._3200: "19", # boiler output temp / BoilerWaterTemperature
354 Code._1260: "1A", # dhw temp / DHWTemperature
355 Code._1290: "1B", # outdoor temp / OutsideTemperature
356 Code._3210: "1C", # boiler return temp / ReturnWaterTemperature
357 Code._10A0: "38", # dhw params[SZ_SETPOINT] / DHWSetpoint
358 Code._1081: "39", # max ch setpoint / MaxCHWaterSetpoint
359 }
361 for code, msg_id in RAMSES_TO_OPENTHERM.items():
362 gwy.send_cmd(Command.from_attrs(RQ, dev_id, code, "00"), priority=Priority.LOW)
363 gwy.send_cmd(Command.get_opentherm_data(dev_id, msg_id), priority=Priority.LOW)
366@script_decorator
367async def script_scan_otb_ramses(
368 gwy: Gateway, dev_id: DeviceIdT
369) -> None: # Tested only upon a R8820A
370 _LOGGER.warning("script_scan_otb_ramses invoked - expect a lot of nonsense")
372 _CODES = (
373 Code._042F,
374 Code._10E0, # device_info
375 Code._10E1, # device_id
376 Code._1FD0,
377 Code._2400,
378 Code._2401,
379 Code._2410,
380 Code._2420,
381 Code._1300, # cv water pressure / CHWaterPressure
382 Code._1081, # max ch setpoint / MaxCHWaterSetpoint
383 Code._10A0, # dhw params[SZ_SETPOINT] / DHWSetpoint
384 Code._22D9, # boiler setpoint / ControlSetpoint
385 Code._1260, # dhw temp / DHWTemperature
386 Code._1290, # outdoor temp / OutsideTemperature
387 Code._3200, # boiler output temp / BoilerWaterTemperature
388 Code._3210, # boiler return temp / ReturnWaterTemperature
389 Code._0150,
390 Code._12F0, # dhw flow rate / DHWFlowRate
391 Code._1098,
392 Code._10B0,
393 Code._3221,
394 Code._3223,
395 Code._3EF0, # rel. modulation level / RelativeModulationLevel (also, below)
396 Code._3EF1, # rel. modulation level / RelativeModulationLevel
397 ) # excl. 3150, 3220
399 for c in _CODES:
400 gwy.send_cmd(Command.from_attrs(RQ, dev_id, c, "00"), priority=Priority.LOW)
403SCRIPTS = {
404 k[7:]: v for k, v in locals().items() if callable(v) and k.startswith("script_")
405}