Coverage for src/ramses_rf/device/base.py: 41%

224 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - a RAMSES-II protocol decoder & analyser. 

3 

4Base for all devices. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10from collections.abc import Iterable 

11from typing import TYPE_CHECKING 

12 

13from ramses_rf.binding_fsm import BindContext, Vendor 

14from ramses_rf.const import DEV_TYPE_MAP, SZ_OEM_CODE, DevType 

15from ramses_rf.entity_base import Child, Entity, class_by_attr 

16from ramses_rf.helpers import shrink 

17from ramses_rf.schemas import ( 

18 SCH_TRAITS, 

19 SZ_ALIAS, 

20 SZ_CLASS, 

21 SZ_FAKED, 

22 SZ_KNOWN_LIST, 

23 SZ_SCHEME, 

24) 

25from ramses_tx import Command, Packet, Priority, QosParams 

26from ramses_tx.ramses import CODES_BY_DEV_SLUG, CODES_ONLY_FROM_CTL 

27 

28from ramses_rf.const import ( # noqa: F401, isort: skip, pylint: disable=unused-import 

29 I_, 

30 RP, 

31 RQ, 

32 W_, 

33 Code, 

34) 

35 

36if TYPE_CHECKING: 

37 from typing import Any 

38 

39 from ramses_rf import Gateway 

40 from ramses_rf.system import Zone 

41 from ramses_tx import Address, DeviceIdT, IndexT, Message 

42 

43 

44BIND_WAITING_TIMEOUT = 300 # how long to wait, listening for an offer 

45BIND_REQUEST_TIMEOUT = 5 # how long to wait for an accept after sending an offer 

46BIND_CONFIRM_TIMEOUT = 5 # how long to wait for a confirm after sending an accept 

47 

48 

49_LOGGER = logging.getLogger(__name__) 

50 

51 

52class DeviceBase(Entity): 

53 """The Device base class - can also be used for unknown device types.""" 

54 

55 _SLUG: str = DevType.DEV 

56 _STATE_ATTR: str = None 

57 

58 _bind_context: BindContext | None = None 

59 

60 def __init__(self, gwy: Gateway, dev_addr: Address, **kwargs: Any) -> None: 

61 super().__init__(gwy) 

62 

63 # FIXME: gwy.msg_db entities must know their parent device ID and their own idx 

64 self._z_id = dev_addr.id # the responsible device is itself 

65 self._z_idx = None # depends upon its location in the schema 

66 

67 self.id: DeviceIdT = dev_addr.id 

68 

69 # self.tcs = None # NOTE: Heat (CH/DHW) devices only 

70 # self.ctl = None 

71 # self._child_id = None # also in Child class 

72 

73 self.addr = dev_addr 

74 self.type = dev_addr.type # DEX # TODO: remove this attr? use SLUG? 

75 

76 self._scheme: Vendor = None 

77 

78 def __str__(self) -> str: 

79 if self._STATE_ATTR and hasattr(self, self._STATE_ATTR): 

80 state: float | None = getattr(self, self._STATE_ATTR) 

81 return f"{self.id} ({self._SLUG}): {state}" 

82 return f"{self.id} ({self._SLUG})" 

83 

84 def __lt__(self, other: object) -> bool: 

85 if not hasattr(other, "id"): 

86 return NotImplemented 

87 return self.id < other.id # type: ignore[no-any-return] 

88 

89 def _update_traits(self, **traits: Any) -> None: 

90 """Update a device with new schema attributes. 

91 

92 :param traits: The traits to apply (e.g., alias, class, faked) 

93 :raises TypeError: If the device is not fakeable but 'faked' is set. 

94 """ 

95 

96 traits = shrink(SCH_TRAITS(traits)) 

97 

98 if traits.get(SZ_FAKED): # class & alias are done elsewhere 

99 if not isinstance(self, Fakeable): 

100 raise TypeError(f"Device is not fakeable: {self} (traits={traits})") 

101 self._make_fake() 

102 

103 self._scheme = traits.get(SZ_SCHEME) 

104 

105 @classmethod 

106 def create_from_schema( 

107 cls, gwy: Gateway, dev_addr: Address, **schema: Any 

108 ) -> DeviceBase: 

109 """Create a device (for a GWY) and set its schema attrs (aka traits). 

110 

111 All devices have traits, but also controllers (CTL, UFC) have a system schema. 

112 

113 The appropriate Device class should have been determined by a factory. 

114 Schema attrs include: class (SLUG), alias & faked. 

115 """ 

116 

117 dev = cls(gwy, dev_addr) 

118 dev._update_traits(**schema) # TODO: split traits/schema 

119 return dev 

120 

121 def _setup_discovery_cmds(self) -> None: 

122 # super()._setup_discovery_cmds() 

123 # sometimes, battery-powered devices will respond to an RQ (e.g. bind mode) 

124 

125 # if discover_flag & Discover.TRAITS: 

126 # self._add_discovery_cmd(cmd(RQ, Code._1FC9, "00", self.id), 60 * 60 * 24) 

127 # self._add_discovery_cmd(cmd(RQ, Code._0016, "00", self.id), 60 * 60) 

128 

129 pass 

130 

131 def _send_cmd(self, cmd: Command, **kwargs: Any) -> None: 

132 if self.has_battery and not self.is_faked and cmd.dst.id == self.id: 

133 _LOGGER.info(f"{cmd} < Sending inadvisable for {self} (it has a battery)") 

134 

135 super()._send_cmd(cmd, **kwargs) 

136 

137 def _handle_msg(self, msg: Message) -> None: 

138 # # assert msg.src is self or ( 

139 # # msg.code == Code._1FC9 and msg.payload[SZ_PHASE] == SZ_OFFER 

140 # # ), f"msg from {msg.src} inappropriately routed to {self}" 

141 

142 super()._handle_msg(msg) 

143 

144 if self._SLUG in DEV_TYPE_MAP.PROMOTABLE_SLUGS: # HACK: can get precise class? 

145 from . import best_dev_role 

146 

147 cls = best_dev_role( 

148 self.addr, msg=msg, eavesdrop=self._gwy.config.enable_eavesdrop 

149 ) 

150 

151 if cls._SLUG in (DevType.DEV, self._SLUG): 

152 return # either a demotion (DEV), or not promotion (HEA/HVC) 

153 

154 if self._SLUG == DevType.HEA and cls._SLUG in DEV_TYPE_MAP.HVAC_SLUGS: 

155 return # TODO: should raise error if CODES_OF_HVAC_DOMAIN_ONLY? 

156 

157 if self._SLUG == DevType.HVC and cls._SLUG not in DEV_TYPE_MAP.HVAC_SLUGS: 

158 return # TODO: should raise error if CODES_OF_HEAT_DOMAIN_ONLY? 

159 

160 _LOGGER.warning( 

161 f"Promoting the device class of {self} to {cls._SLUG}" 

162 f" - use a {SZ_KNOWN_LIST} to explicitly set this device's" 

163 f" {SZ_CLASS} to '{DEV_TYPE_MAP[cls._SLUG]}'" 

164 ) 

165 self.__class__ = cls 

166 

167 @property 

168 def has_battery(self) -> None | bool: # 1060 

169 """Return True if the device is battery powered (excludes battery-backup).""" 

170 if self._gwy.msg_db: 

171 code_list = self._msg_dev_qry() 

172 return isinstance(self, BatteryState) or ( 

173 code_list is not None and Code._1060 in code_list 

174 ) # TODO(eb): clean up next line Q1 2026 

175 return isinstance(self, BatteryState) or Code._1060 in self._msgz 

176 

177 @property 

178 def is_faked(self) -> bool: 

179 """Return True if the device is faked.""" 

180 

181 return bool(self._bind_context) # isinstance(self, Fakeable) and... 

182 

183 @property 

184 def _is_binding(self) -> bool: 

185 """Return True if the (faked) device is actively binding.""" 

186 

187 return self._bind_context and self._bind_context.is_binding 

188 

189 @property 

190 def _is_present(self) -> bool: 

191 """Try to exclude ghost devices (as caused by corrupt packet addresses).""" 

192 return any( 

193 m.src == self for m in self._msgs.values() if not m._expired 

194 ) # TODO: needs addressing 

195 

196 @property 

197 def schema(self) -> dict[str, Any]: 

198 """Return the fixed attributes of the device.""" 

199 return {} # SZ_CLASS: DEV_TYPE_MAP[self._SLUG]} 

200 

201 @property 

202 def params(self) -> dict[str, Any]: 

203 """Return the configurable attributes of the device.""" 

204 return {} 

205 

206 @property 

207 def status(self) -> dict[str, Any]: 

208 """Return the state attributes of the device.""" 

209 return {} 

210 

211 @property 

212 def traits(self) -> dict[str, Any]: 

213 """Get the traits of the device.""" 

214 

215 result = super().traits 

216 

217 known_dev = self._gwy._include.get(self.id) 

218 

219 result.update( 

220 { 

221 SZ_CLASS: DEV_TYPE_MAP[self._SLUG], 

222 SZ_ALIAS: known_dev.get(SZ_ALIAS) if known_dev else None, 

223 SZ_FAKED: self.is_faked, 

224 } 

225 ) 

226 

227 return result | {"_bind": self._msg_value(Code._1FC9)} 

228 

229 

230class BatteryState(DeviceBase): # 1060 

231 BATTERY_LOW = "battery_low" # boolean 

232 BATTERY_STATE = "battery_state" # percentage (0.0-1.0) 

233 

234 @property 

235 def battery_low(self) -> None | bool: # 1060 

236 if self.is_faked: 

237 return False 

238 return self._msg_value(Code._1060, key=self.BATTERY_LOW) 

239 

240 @property 

241 def battery_state(self) -> dict[str, Any] | None: # 1060 

242 if self.is_faked: 

243 return None 

244 return self._msg_value(Code._1060) 

245 

246 @property 

247 def status(self) -> dict[str, Any]: 

248 return { 

249 **super().status, 

250 self.BATTERY_STATE: self.battery_state, 

251 } 

252 

253 

254class DeviceInfo(DeviceBase): # 10E0 

255 def _setup_discovery_cmds(self) -> None: 

256 super()._setup_discovery_cmds() 

257 

258 # if discover_flag & Discover.SCHEMA: 

259 if self._SLUG not in CODES_BY_DEV_SLUG or RP in CODES_BY_DEV_SLUG[ 

260 self._SLUG 

261 ].get(Code._10E0, {}): 

262 cmd = Command.from_attrs(RQ, self.id, Code._10E0, "00") 

263 self._add_discovery_cmd(cmd, 60 * 60 * 24) 

264 

265 @property 

266 def device_info(self) -> dict | None: # 10E0 

267 return self._msg_value(Code._10E0) 

268 

269 @property 

270 def traits(self) -> dict[str, Any]: 

271 """Return the traits of the device.""" 

272 

273 result = super().traits 

274 

275 if Code._10E0 in self._msgs or Code._10E0 in CODES_BY_DEV_SLUG.get( 

276 self._SLUG, [] 

277 ): 

278 result.update({"_info": self.device_info}) 

279 

280 return result 

281 

282 

283# NOTE: devices (Thermostat) not attrs (Temperature) are faked 

284class Fakeable(DeviceBase): 

285 """There are two types of Faking: impersonation (of real devices) and full-faking. 

286 

287 Impersonation of physical devices simply means sending packets on their behalf. This 

288 is straight-forward for sensors & remotes (they do not usually receive pkts). 

289 

290 Faked (virtual) devices must have any packet addressed to them sent to their 

291 handle_msg() method by the dispatcher. Impersonated devices will simply pick up 

292 such packets via RF. 

293 """ 

294 

295 def __init__(self, gwy: Gateway, *args: Any, **kwargs: Any) -> None: 

296 super().__init__(gwy, *args, **kwargs) 

297 

298 self._bind_context: BindContext | None = None 

299 

300 # TOD: this is messy - device schema vs device traits 

301 if self.id in gwy._include and gwy._include[self.id].get(SZ_FAKED): 

302 self._make_fake() 

303 

304 if kwargs.get(SZ_FAKED): 

305 self._make_fake() 

306 

307 def _make_fake(self) -> None: 

308 if self._bind_context: 

309 return 

310 

311 self._bind_context = BindContext(self) 

312 self._gwy._include[self.id][SZ_FAKED] = True # TODO: remove this 

313 _LOGGER.info(f"Faking now enabled for: {self}") 

314 

315 async def _async_send_cmd( 

316 self, 

317 cmd: Command, 

318 priority: Priority | None = None, 

319 qos: QosParams | None = None, 

320 ) -> Packet | None: 

321 """Wrapper to CC: any relevant Commands to the binding Context.""" 

322 

323 if self._bind_context and self._bind_context.is_binding: 

324 # cmd.code in (Code._1FC9, Code._10E0) 

325 self._bind_context.sent_cmd(cmd) # other codes needed for edge cases 

326 

327 return await super()._async_send_cmd(cmd, priority=priority, qos=qos) 

328 

329 def _handle_msg(self, msg: Message) -> None: 

330 """Wrapper to CC: any relevant Packets to the binding Context.""" 

331 

332 super()._handle_msg(msg) 

333 

334 if self._bind_context and self._bind_context.is_binding: 

335 # msg.code in (Code._1FC9, Code._10E0) 

336 self._bind_context.rcvd_msg(msg) # maybe other codes needed for edge cases 

337 

338 async def _wait_for_binding_request( 

339 self, 

340 accept_codes: Iterable[Code], 

341 /, 

342 *, 

343 idx: IndexT = "00", 

344 require_ratify: bool = False, 

345 ) -> tuple[Packet, Packet, Packet, Packet | None]: 

346 """Listen for a binding and return the Offer packets. 

347 

348 :param accept_codes: The codes allowed for this binding 

349 :type accept_codes: Iterable[Code] 

350 :param idx: The index to bind to, defaults to "00" 

351 :type idx: IndexT 

352 :param require_ratify: Whether a ratification step is required, defaults to False 

353 :type require_ratify: bool 

354 :return: A tuple of the four binding transaction packets 

355 :rtype: tuple[Packet, Packet, Packet, Packet | None] 

356 """ 

357 

358 if not self._bind_context: 

359 raise TypeError(f"{self}: Faking not enabled") 

360 

361 msgs = await self._bind_context.wait_for_binding_request( 

362 accept_codes, idx=idx, require_ratify=require_ratify 

363 ) 

364 return msgs 

365 

366 async def wait_for_binding_request( 

367 self, 

368 accept_codes: Iterable[Code], 

369 /, 

370 *, 

371 idx: IndexT = "00", 

372 require_ratify: bool = False, 

373 ) -> tuple[Packet, Packet, Packet, Packet | None]: 

374 raise NotImplementedError 

375 

376 async def _initiate_binding_process( 

377 self, 

378 offer_codes: Code | Iterable[Code], 

379 /, 

380 *, 

381 confirm_code: Code | None = None, 

382 ratify_cmd: Command | None = None, 

383 ) -> tuple[Packet, Packet, Packet, Packet | None]: 

384 """Start a binding and return the Accept, or raise an exception.""" 

385 # confirm_code can be FFFF. 

386 

387 if not self._bind_context: 

388 raise TypeError(f"{self}: Faking not enabled") 

389 

390 if isinstance(offer_codes, Iterable): 

391 codes: tuple[Code] = offer_codes 

392 else: 

393 codes = tuple([offer_codes]) 

394 

395 msgs = await self._bind_context.initiate_binding_process( 

396 codes, confirm_code=confirm_code, ratify_cmd=ratify_cmd 

397 ) # TODO: if successful, re-discover schema? 

398 return msgs 

399 

400 async def initiate_binding_process(self) -> Packet: 

401 raise NotImplementedError 

402 

403 @property 

404 def oem_code(self) -> str | None: 

405 """Return the OEM code (a 2-char ascii str) for this device, if there is one.""" 

406 # raise NotImplementedError # self.traits is a @property 

407 if not self.traits.get(SZ_OEM_CODE): 

408 self.traits[SZ_OEM_CODE] = self._msg_value(Code._10E0, key=SZ_OEM_CODE) 

409 return self.traits.get(SZ_OEM_CODE) 

410 

411 

412class Device(Child, DeviceBase): 

413 """The base class for all devices.""" 

414 

415 def __init__(self, gwy: Gateway, dev_addr: Address, **kwargs: Any) -> None: 

416 _LOGGER.debug("Creating a Device: %s (%s)", dev_addr.id, self.__class__) 

417 super().__init__(gwy, dev_addr) 

418 

419 gwy._add_device(self) 

420 

421 

422class HgiGateway(Device): # HGI (18:) 

423 """The HGI80 base class.""" 

424 

425 _SLUG: str = DevType.HGI 

426 

427 def __init__(self, *args: Any, **kwargs: Any) -> None: 

428 super().__init__(*args, **kwargs) 

429 

430 self.ctl = None # FIXME: a mess 

431 self._child_id = "gw" # TODO 

432 self.tcs = None 

433 

434 @property 

435 def schema(self) -> dict[str, Any]: 

436 return {} 

437 

438 

439class DeviceHeat(Device): # Heat domain: Honeywell CH/DHW or compatible 

440 """The base class for the heat domain (Honeywell CH/DHW-compatible devices). 

441 

442 Includes UFH and heatpumps (which can also cool). 

443 """ 

444 

445 _SLUG: str = DevType.HEA # shouldn't be any of these instantiated 

446 

447 def __init__(self, gwy: Gateway, dev_addr: Address, **kwargs: Any) -> None: 

448 super().__init__(gwy, dev_addr, **kwargs) 

449 

450 self.ctl = None 

451 self.tcs = None 

452 self._child_id = None # domain_id, or zone_idx 

453 

454 self._iz_controller: None | bool | Message = None 

455 

456 def _handle_msg(self, msg: Message) -> None: 

457 super()._handle_msg(msg) 

458 

459 if msg.verb != I_ or self._iz_controller is not None: 

460 return 

461 

462 if not self._iz_controller and msg.code in CODES_ONLY_FROM_CTL: 

463 if self._iz_controller is None: 

464 _LOGGER.info(f"{msg!r} # IS_CONTROLLER (00): is TRUE") 

465 self._make_tcs_controller(msg=msg) 

466 elif self._iz_controller is False: # TODO: raise CorruptStateError 

467 _LOGGER.error(f"{msg!r} # IS_CONTROLLER (01): was FALSE, now True") 

468 

469 def _make_tcs_controller( 

470 self, *, msg: Message | None = None, **schema: Any 

471 ) -> None: # CH/DHW 

472 """Attach a TCS (create/update as required) after passing it any msg.""" 

473 

474 if self.type not in DEV_TYPE_MAP.CONTROLLERS: # potentially can be controllers 

475 raise TypeError(f"Invalid device type to be a controller: {self}") 

476 

477 self._iz_controller = self._iz_controller or msg or True 

478 

479 # @property 

480 # def controller(self): # -> Optional[Controller]: 

481 # """Return the entity's controller, if known.""" 

482 

483 # return self.ctl # TODO: if the controller is not known, try to find it? 

484 

485 @property 

486 def _is_controller(self) -> None | bool: 

487 if self._iz_controller is not None: 

488 return bool(self._iz_controller) # True, False, or msg 

489 

490 if self.ctl is not None: # TODO: messy 

491 return self.ctl is self 

492 

493 return False 

494 

495 @property 

496 def zone(self) -> Zone | None: 

497 """Return the device's parent zone, if known.""" 

498 

499 return self._parent 

500 

501 

502class DeviceHvac(Device): # HVAC domain: ventilation, PIV, MV/HR 

503 """The Device base class for the HVAC domain (ventilation, PIV, MV/HR).""" 

504 

505 _SLUG: str = DevType.HVC # these may be instantiated, and promoted later on 

506 

507 def __init__(self, gwy: Gateway, dev_addr: Address, **kwargs: Any) -> None: 

508 super().__init__(gwy, dev_addr, **kwargs) 

509 

510 self._child_id = "hv" # TODO: domain_id/deprecate 

511 

512 # def _handle_msg(self, msg: Message) -> None: 

513 # super()._handle_msg(msg) 

514 

515 # # if type(self) is DeviceHvac: 

516 # # if self.type == DEV_TYPE_MAP.RFG: # self.__class__ is Device, DEX 

517 # # # TODO: the RFG codes need checking 

518 # # if msg.code in (Code._31D9, Code._31DA) and msg.verb in (I_, RP): 

519 # # self.__class__ = HvacVentilator 

520 # # elif msg.code in (Code._0006, Code._0418, Code._3220) and msg.verb == RQ: 

521 # # self.__class__ = RfgGateway 

522 # # elif msg.code in (Code._313F,) and msg.verb == W_: 

523 # # self.__class__ = RfgGateway 

524 # # if type(self) is not Device: 

525 # # _LOGGER.warning(f"Promoted a device type for: {self}") 

526 

527 # if msg.code in (Code._1298, Code._12A0, Code._22F1, Code._22F3): 

528 # self._hvac_trick() 

529 

530 

531# e.g. {"HGI": HgiGateway} 

532BASE_CLASS_BY_SLUG: dict[str, type[Device]] = class_by_attr(__name__, "_SLUG")