Coverage for src/ramses_cli/discovery.py: 29%

185 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:44 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - discovery scripts.""" 

3 

4from __future__ import annotations 

5 

6import asyncio 

7import functools 

8import json 

9import logging 

10import re 

11from collections.abc import Callable 

12from typing import TYPE_CHECKING, Any, Final 

13 

14from ramses_rf import exceptions as exc 

15from ramses_rf.const import SZ_SCHEDULE, SZ_ZONE_IDX 

16from ramses_rf.device import Fakeable 

17from ramses_tx import CODES_SCHEMA, Command, DeviceIdT, Priority 

18from ramses_tx.opentherm import OTB_DATA_IDS 

19 

20# Beware, none of this is reliable - it is all subject to random change 

21# However, these serve as examples how to use the other modules 

22 

23 

24from ramses_rf.const import ( # noqa: F401, isort: skip, pylint: disable=unused-import 

25 I_, 

26 RP, 

27 RQ, 

28 W_, 

29 Code, 

30) 

31 

32if TYPE_CHECKING: 

33 from ramses_rf import Gateway, IndexT 

34 

35 

36EXEC_CMD: Final = "exec_cmd" 

37GET_FAULTS: Final = "get_faults" 

38GET_SCHED: Final = "get_schedule" 

39SET_SCHED: Final = "set_schedule" 

40 

41EXEC_SCR: Final = "exec_scr" 

42SCAN_DISC: Final = "scan_disc" 

43SCAN_FULL: Final = "scan_full" 

44SCAN_HARD: Final = "scan_hard" 

45SCAN_XXXX: Final = "scan_xxxx" 

46 

47# DEVICE_ID_REGEX = re.compile(DEVICE_ID_REGEX.ANY) 

48 

49 

50_LOGGER = logging.getLogger(__name__) 

51 

52 

53def script_decorator(fnc: Callable[..., Any]) -> Callable[..., Any]: 

54 @functools.wraps(fnc) 

55 def wrapper(gwy: Gateway, *args: Any, **kwargs: Any) -> None: 

56 gwy.send_cmd( 

57 Command._puzzle(message="Script begins:"), 

58 priority=Priority.HIGHEST, 

59 num_repeats=3, 

60 ) 

61 

62 fnc(gwy, *args, **kwargs) 

63 

64 gwy.send_cmd( 

65 Command._puzzle(message="Script done."), 

66 priority=Priority.LOWEST, 

67 num_repeats=3, 

68 ) 

69 

70 return None 

71 

72 return wrapper 

73 

74 

75def spawn_scripts(gwy: Gateway, **kwargs: Any) -> list[asyncio.Task[None]]: 

76 tasks = [] 

77 

78 if kwargs.get(EXEC_CMD): 

79 tasks += [asyncio.create_task(exec_cmd(gwy, **kwargs))] 

80 

81 if kwargs.get(GET_FAULTS): 

82 tasks += [asyncio.create_task(get_faults(gwy, kwargs[GET_FAULTS]))] 

83 

84 elif kwargs.get(GET_SCHED) and kwargs[GET_SCHED][0]: 

85 tasks += [asyncio.create_task(get_schedule(gwy, *kwargs[GET_SCHED]))] 

86 

87 elif kwargs.get(SET_SCHED) and kwargs[SET_SCHED][0]: 

88 tasks += [asyncio.create_task(set_schedule(gwy, *kwargs[SET_SCHED]))] 

89 

90 elif kwargs.get(EXEC_SCR): 

91 script = SCRIPTS.get(f"{kwargs[EXEC_SCR][0]}") 

92 if script is None: 

93 _LOGGER.warning(f"Script: {kwargs[EXEC_SCR][0]}() - unknown script") 

94 else: 

95 _LOGGER.info(f"Script: {kwargs[EXEC_SCR][0]}().- starts...") 

96 tasks += [asyncio.create_task(script(gwy, kwargs[EXEC_SCR][1]))] 

97 

98 gwy._tasks.extend(tasks) 

99 return tasks 

100 

101 

102async def exec_cmd(gwy: Gateway, **kwargs: Any) -> None: 

103 cmd = Command.from_cli(kwargs[EXEC_CMD]) 

104 await gwy.async_send_cmd(cmd, priority=Priority.HIGH, wait_for_reply=True) 

105 

106 

107# @script_decorator 

108# async def script_scan_001(gwy: Gateway, dev_id: DeviceIdT): 

109# _LOGGER.warning("scan_001() invoked - expect a lot of nonsense") 

110# for idx in range(0x10): 

111# gwy.send_cmd(Command.from_attrs(W_, dev_id, Code._000E, f"{idx:02X}0050")) 

112# gwy.send_cmd(Command.from_attrs(RQ, dev_id, Code._000E, f"{idx:02X}00C8")) 

113 

114 

115async def get_faults( 

116 gwy: Gateway, ctl_id: DeviceIdT, start: int = 0, limit: int = 0x3F 

117) -> None: 

118 ctl = gwy.get_device(ctl_id) 

119 

120 try: 

121 await ctl.tcs.get_faultlog(start=start, limit=limit) # 0418 

122 except exc.ExpiredCallbackError as err: 

123 _LOGGER.error("get_faults(): Function timed out: %s", err) 

124 

125 

126async def get_schedule(gwy: Gateway, ctl_id: DeviceIdT, zone_idx: str) -> None: 

127 zone = gwy.get_device(ctl_id).tcs.get_htg_zone(zone_idx) 

128 

129 try: 

130 await zone.get_schedule() 

131 except exc.ExpiredCallbackError as err: 

132 _LOGGER.error("get_schedule(): Function timed out: %s", err) 

133 

134 

135async def set_schedule(gwy: Gateway, ctl_id: DeviceIdT, schedule: str) -> None: 

136 schedule_ = json.loads(schedule) 

137 zone_idx = schedule_[SZ_ZONE_IDX] 

138 

139 zone = gwy.get_device(ctl_id).tcs.get_htg_zone(zone_idx) 

140 

141 try: 

142 await zone.set_schedule(schedule_[SZ_SCHEDULE]) # 0404 

143 except exc.ExpiredCallbackError as err: 

144 _LOGGER.error("set_schedule(): Function timed out: %s", err) 

145 

146 

147async def script_bind_req( 

148 gwy: Gateway, dev_id: DeviceIdT, code: Code = Code._2309 

149) -> None: 

150 dev = gwy.get_device(dev_id) 

151 assert isinstance(dev, Fakeable) # mypy 

152 dev._make_fake() 

153 await dev._initiate_binding_process([code]) 

154 

155 

156async def script_bind_wait( 

157 gwy: Gateway, dev_id: DeviceIdT, code: Code = Code._2309, idx: IndexT = "00" 

158) -> None: 

159 dev = gwy.get_device(dev_id) 

160 assert isinstance(dev, Fakeable) # mypy 

161 dev._make_fake() 

162 await dev._wait_for_binding_request([code], idx=idx) 

163 

164 

165def script_poll_device(gwy: Gateway, dev_id: DeviceIdT) -> list[asyncio.Task[None]]: 

166 async def periodic_send( 

167 gwy: Gateway, 

168 cmd: Command, 

169 count: int = 1, 

170 interval: float | None = None, 

171 ) -> None: 

172 async def periodic_(interval_: float) -> None: 

173 await asyncio.sleep(interval_) 

174 gwy.send_cmd(cmd, priority=Priority.LOW) 

175 

176 if interval is None: 

177 interval = 0 if count == 1 else 60 

178 

179 if count <= 0: 

180 while True: 

181 await periodic_(interval) 

182 else: 

183 for _ in range(count): 

184 await periodic_(interval) 

185 

186 _LOGGER.warning("poll_device() invoked...") 

187 

188 tasks = [] 

189 

190 for code in (Code._0016, Code._1FC9): 

191 cmd = Command.from_attrs(RQ, dev_id, code, "00") 

192 tasks.append(asyncio.create_task(periodic_send(gwy, cmd, count=0))) 

193 

194 gwy._tasks.extend(tasks) 

195 return tasks 

196 

197 

198@script_decorator 

199async def script_scan_disc(gwy: Gateway, dev_id: DeviceIdT) -> None: 

200 _LOGGER.warning("scan_disc() invoked...") 

201 

202 await gwy.get_device(dev_id).discover() # discover_flag=Discover.DEFAULT) 

203 

204 

205@script_decorator 

206async def script_scan_full(gwy: Gateway, dev_id: DeviceIdT) -> None: 

207 _LOGGER.warning("scan_full() invoked - expect a lot of Warnings") 

208 

209 gwy.send_cmd(Command.from_attrs(RQ, dev_id, Code._0016, "0000"), num_repeats=3) 

210 

211 for code in sorted(CODES_SCHEMA): 

212 if code == Code._0005: 

213 for zone_type in range(20): # known up to 18 

214 gwy.send_cmd(Command.from_attrs(RQ, dev_id, code, f"00{zone_type:02X}")) 

215 

216 elif code == Code._000C: 

217 for zone_idx in range(16): # also: FA-FF? 

218 gwy.send_cmd(Command.from_attrs(RQ, dev_id, code, f"{zone_idx:02X}00")) 

219 

220 elif code == Code._0016: 

221 continue 

222 

223 elif code in (Code._01D0, Code._01E9): 

224 for zone_idx in ("00", "01", "FC"): # type: ignore[assignment] 

225 gwy.send_cmd(Command.from_attrs(W_, dev_id, code, f"{zone_idx}00")) 

226 gwy.send_cmd(Command.from_attrs(W_, dev_id, code, f"{zone_idx}03")) 

227 

228 elif code == Code._0404: # FIXME 

229 gwy.send_cmd(Command.get_schedule_fragment(dev_id, "HW", 1, 0)) 

230 gwy.send_cmd(Command.get_schedule_fragment(dev_id, "00", 1, 0)) 

231 

232 elif code == Code._0418: 

233 for log_idx in range(2): 

234 gwy.send_cmd(Command.get_system_log_entry(dev_id, log_idx)) 

235 

236 elif code == Code._1100: 

237 gwy.send_cmd(Command.get_tpi_params(dev_id)) 

238 

239 elif code == Code._2E04: 

240 gwy.send_cmd(Command.get_system_mode(dev_id)) 

241 

242 elif code == Code._3220: 

243 for data_id in (0, 3): # these are mandatory READ_DATA data_ids 

244 gwy.send_cmd(Command.get_opentherm_data(dev_id, data_id)) 

245 

246 elif code == Code._PUZZ: 

247 continue 

248 

249 elif ( 

250 code in CODES_SCHEMA 

251 and RQ in CODES_SCHEMA[code] 

252 and re.match(CODES_SCHEMA[code][RQ], "00") 

253 ): 

254 gwy.send_cmd(Command.from_attrs(RQ, dev_id, code, "00")) 

255 

256 else: 

257 gwy.send_cmd(Command.from_attrs(RQ, dev_id, code, "0000")) 

258 

259 # these are possible/difficult codes 

260 for code in (Code._0150, Code._2389): 

261 gwy.send_cmd(Command.from_attrs(RQ, dev_id, code, "0000")) 

262 

263 

264@script_decorator 

265async def script_scan_hard( 

266 gwy: Gateway, dev_id: DeviceIdT, *, start_code: None | int = None 

267) -> None: 

268 _LOGGER.warning("scan_hard() invoked - expect some Warnings") 

269 

270 start_code = start_code or 0 

271 

272 for code in range(start_code, 0x5000): 

273 await gwy.async_send_cmd( 

274 Command.from_attrs(RQ, dev_id, f"{code:04X}", "0000"), # type:ignore[arg-type] 

275 priority=Priority.LOW, 

276 ) 

277 

278 

279@script_decorator 

280async def script_scan_fan(gwy: Gateway, dev_id: DeviceIdT) -> None: 

281 _LOGGER.warning("scan_fan() invoked - expect a lot of nonsense") 

282 

283 from ramses_tx.ramses import _DEV_KLASSES_HVAC 

284 

285 OUT_CODES = ( 

286 Code._0016, 

287 Code._1470, 

288 ) 

289 

290 OLD_CODES = dict.fromkeys( 

291 c for k in _DEV_KLASSES_HVAC.values() for c in k if c not in OUT_CODES 

292 ) 

293 for code in OLD_CODES: 

294 gwy.send_cmd(Command.from_attrs(RQ, dev_id, code, "00")) 

295 

296 NEW_CODES = ( 

297 Code._0150, 

298 Code._042F, 

299 Code._1030, 

300 Code._10D0, 

301 Code._10E1, 

302 Code._2210, 

303 Code._22B0, 

304 Code._22E0, 

305 Code._22E5, 

306 Code._22E9, 

307 Code._22F1, 

308 Code._22F2, 

309 Code._22F3, 

310 Code._22F4, 

311 Code._22F7, 

312 Code._22F8, 

313 Code._2400, 

314 Code._2410, 

315 Code._2420, 

316 Code._313E, 

317 Code._3221, 

318 Code._3222, 

319 ) 

320 

321 for code in NEW_CODES: 

322 if code not in OLD_CODES and code not in OUT_CODES: 

323 gwy.send_cmd(Command.from_attrs(RQ, dev_id, code, "00")) 

324 

325 

326@script_decorator 

327async def script_scan_otb(gwy: Gateway, dev_id: DeviceIdT) -> None: 

328 _LOGGER.warning("script_scan_otb_full invoked - expect a lot of nonsense") 

329 

330 for msg_id in OTB_DATA_IDS: 

331 gwy.send_cmd(Command.get_opentherm_data(dev_id, msg_id)) 

332 

333 

334@script_decorator 

335async def script_scan_otb_hard(gwy: Gateway, dev_id: DeviceIdT) -> None: 

336 _LOGGER.warning("script_scan_otb_hard invoked - expect a lot of nonsense") 

337 

338 for msg_id in range(0x80): 

339 gwy.send_cmd(Command.get_opentherm_data(dev_id, msg_id), priority=Priority.LOW) 

340 

341 

342@script_decorator 

343async def script_scan_otb_map( 

344 gwy: Gateway, dev_id: DeviceIdT 

345) -> None: # Tested only upon a R8820A 

346 _LOGGER.warning("script_scan_otb_map invoked - expect a lot of nonsense") 

347 

348 RAMSES_TO_OPENTHERM = { 

349 Code._22D9: "01", # boiler setpoint / ControlSetpoint 

350 Code._3EF1: "11", # rel. modulation level / RelativeModulationLevel 

351 Code._1300: "12", # cv water pressure / CHWaterPressure 

352 Code._12F0: "13", # dhw_flow_rate / DHWFlowRate 

353 Code._3200: "19", # boiler output temp / BoilerWaterTemperature 

354 Code._1260: "1A", # dhw temp / DHWTemperature 

355 Code._1290: "1B", # outdoor temp / OutsideTemperature 

356 Code._3210: "1C", # boiler return temp / ReturnWaterTemperature 

357 Code._10A0: "38", # dhw params[SZ_SETPOINT] / DHWSetpoint 

358 Code._1081: "39", # max ch setpoint / MaxCHWaterSetpoint 

359 } 

360 

361 for code, msg_id in RAMSES_TO_OPENTHERM.items(): 

362 gwy.send_cmd(Command.from_attrs(RQ, dev_id, code, "00"), priority=Priority.LOW) 

363 gwy.send_cmd(Command.get_opentherm_data(dev_id, msg_id), priority=Priority.LOW) 

364 

365 

366@script_decorator 

367async def script_scan_otb_ramses( 

368 gwy: Gateway, dev_id: DeviceIdT 

369) -> None: # Tested only upon a R8820A 

370 _LOGGER.warning("script_scan_otb_ramses invoked - expect a lot of nonsense") 

371 

372 _CODES = ( 

373 Code._042F, 

374 Code._10E0, # device_info 

375 Code._10E1, # device_id 

376 Code._1FD0, 

377 Code._2400, 

378 Code._2401, 

379 Code._2410, 

380 Code._2420, 

381 Code._1300, # cv water pressure / CHWaterPressure 

382 Code._1081, # max ch setpoint / MaxCHWaterSetpoint 

383 Code._10A0, # dhw params[SZ_SETPOINT] / DHWSetpoint 

384 Code._22D9, # boiler setpoint / ControlSetpoint 

385 Code._1260, # dhw temp / DHWTemperature 

386 Code._1290, # outdoor temp / OutsideTemperature 

387 Code._3200, # boiler output temp / BoilerWaterTemperature 

388 Code._3210, # boiler return temp / ReturnWaterTemperature 

389 Code._0150, 

390 Code._12F0, # dhw flow rate / DHWFlowRate 

391 Code._1098, 

392 Code._10B0, 

393 Code._3221, 

394 Code._3223, 

395 Code._3EF0, # rel. modulation level / RelativeModulationLevel (also, below) 

396 Code._3EF1, # rel. modulation level / RelativeModulationLevel 

397 ) # excl. 3150, 3220 

398 

399 for c in _CODES: 

400 gwy.send_cmd(Command.from_attrs(RQ, dev_id, c, "00"), priority=Priority.LOW) 

401 

402 

403SCRIPTS = { 

404 k[7:]: v for k, v in locals().items() if callable(v) and k.startswith("script_") 

405}