Coverage for src/ramses_rf/entity_base.py: 19%

593 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - Base class for all RAMSES-II objects: devices and constructs.""" 

3 

4from __future__ import annotations 

5 

6import asyncio 

7import contextlib 

8import logging 

9import random 

10from collections.abc import Iterable 

11from datetime import datetime as dt, timedelta as td 

12from inspect import getmembers, isclass 

13from sys import modules 

14from types import ModuleType 

15from typing import TYPE_CHECKING, Any, Final 

16 

17from ramses_rf.helpers import schedule_task 

18from ramses_tx import Priority, QosParams 

19from ramses_tx.address import ALL_DEVICE_ID 

20from ramses_tx.const import MsgId 

21from ramses_tx.opentherm import OPENTHERM_MESSAGES 

22from ramses_tx.ramses import CODES_SCHEMA 

23 

24from . import exceptions as exc 

25from .const import ( 

26 DEV_TYPE_MAP, 

27 SZ_ACTUATORS, 

28 SZ_DOMAIN_ID, 

29 SZ_NAME, 

30 SZ_SENSOR, 

31 SZ_ZONE_IDX, 

32) 

33from .schemas import SZ_CIRCUITS 

34 

35from .const import ( # noqa: F401, isort: skip, pylint: disable=unused-import 

36 I_, 

37 RP, 

38 RQ, 

39 W_, 

40 Code, 

41 VerbT, 

42) 

43 

44from .const import ( # noqa: F401, isort: skip, pylint: disable=unused-import 

45 F9, 

46 FA, 

47 FC, 

48 FF, 

49) 

50 

51if TYPE_CHECKING: 

52 from ramses_tx import Command, Message, Packet, VerbT 

53 from ramses_tx.frame import HeaderT 

54 from ramses_tx.opentherm import OtDataId 

55 from ramses_tx.schemas import DeviceIdT, DevIndexT 

56 

57 from .device import ( 

58 BdrSwitch, 

59 Controller, 

60 DhwSensor, 

61 OtbGateway, 

62 TrvActuator, 

63 UfhCircuit, 

64 ) 

65 from .gateway import Gateway 

66 from .system import Evohome 

67 

68 

69_QOS_TX_LIMIT = 12 # TODO: needs work 

70_ID_SLICE = 9 

71_SZ_LAST_PKT: Final = "last_msg" 

72_SZ_NEXT_DUE: Final = "next_due" 

73_SZ_TIMEOUT: Final = "timeout" 

74_SZ_FAILURES: Final = "failures" 

75_SZ_INTERVAL: Final = "interval" 

76_SZ_COMMAND: Final = "command" 

77 

78# 

79# NOTE: All debug flags should be False for deployment to end-users 

80_DBG_ENABLE_DISCOVERY_BACKOFF: Final[bool] = False 

81 

82_LOGGER = logging.getLogger(__name__) 

83 

84 

85def class_by_attr(name: str, attr: str) -> dict[str, Any]: # TODO: change to __module__ 

86 """Return a mapping of a (unique) attr of classes in a module to that class.""" 

87 

88 def predicate(m: ModuleType) -> bool: 

89 return isclass(m) and m.__module__ == name and getattr(m, attr, None) 

90 

91 return {getattr(c[1], attr): c[1] for c in getmembers(modules[name], predicate)} 

92 

93 

94class _Entity: 

95 """The ultimate base class for Devices/Zones/Systems. 

96 

97 This class is mainly concerned with: 

98 - if the entity can Rx packets (e.g. can the HGI send it an RQ?) 

99 """ 

100 

101 _SLUG: str = None # type: ignore[assignment] 

102 

103 def __init__(self, gwy: Gateway) -> None: 

104 self._gwy = gwy 

105 self.id: DeviceIdT = None # type: ignore[assignment] 

106 

107 self._qos_tx_count = 0 # the number of pkts Tx'd with no matching Rx 

108 

109 def __repr__(self) -> str: 

110 return f"{self.id} ({self._SLUG})" 

111 

112 # TODO: should be a private method 

113 def deprecate_device(self, pkt: Packet, reset: bool = False) -> None: 

114 """If an entity is deprecated enough times, stop sending to it.""" 

115 

116 if reset: 

117 self._qos_tx_count = 0 

118 return 

119 

120 self._qos_tx_count += 1 

121 if self._qos_tx_count == _QOS_TX_LIMIT: 

122 _LOGGER.warning( 

123 f"{pkt} < Sending now deprecated for {self} " 

124 "(consider adjusting device_id filters)" 

125 ) # TODO: take whitelist into account 

126 

127 def _handle_msg(self, msg: Message) -> None: 

128 """Store a msg in the DBs.""" 

129 

130 raise NotImplementedError # to be handled by implementing classes 

131 

132 # FIXME: this is a mess - to deprecate for async version? 

133 def _send_cmd(self, cmd: Command, **kwargs: Any) -> asyncio.Task | None: 

134 """Send a Command & return the corresponding Task.""" 

135 

136 # Don't poll this device if it is not responding 

137 if self._qos_tx_count > _QOS_TX_LIMIT: 

138 _LOGGER.info(f"{cmd} < Sending was deprecated for {self}") 

139 return None # TODO: raise Exception (should be handled before now) 

140 

141 if [ # TODO: remove this 

142 k for k in kwargs if k not in ("priority", "num_repeats") 

143 ]: # FIXME: deprecate QoS in kwargs, should be qos=QosParams(...) 

144 raise RuntimeError("Deprecated kwargs: %s", kwargs) 

145 

146 # cmd._source_entity = self # TODO: is needed? 

147 return self._gwy.send_cmd(cmd, wait_for_reply=False, **kwargs) 

148 

149 # FIXME: this is a mess 

150 async def _async_send_cmd( 

151 self, 

152 cmd: Command, 

153 priority: Priority | None = None, 

154 qos: QosParams | None = None, # FIXME: deprecate QoS in kwargs? 

155 ) -> Packet | None: 

156 """Send a Command & return the response Packet, or the echo Packet otherwise.""" 

157 

158 # Don't poll this device if it is not responding 

159 if self._qos_tx_count > _QOS_TX_LIMIT: 

160 _LOGGER.warning(f"{cmd} < Sending was deprecated for {self}") 

161 return None # FIXME: raise Exception (should be handled before now) 

162 

163 # cmd._source_entity = self # TODO: is needed? 

164 return await self._gwy.async_send_cmd( 

165 cmd, 

166 max_retries=qos.max_retries if qos else None, 

167 priority=priority, 

168 timeout=qos.timeout if qos else None, 

169 wait_for_reply=qos.wait_for_reply if qos else None, 

170 ) 

171 

172 

173class _MessageDB(_Entity): 

174 """Maintain/utilize an entity's state database. 

175 

176 EntityBase msg_db query methods 

177 

178 (ix = database.py.MessageIndex method) 

179 

180 .. table:: Database Query Methods 

181 :widths: auto 

182 

183 ==== ====================== ==================== ============ ========== ========== 

184 e. method name args returns uses used by 

185 ==== ====================== ==================== ============ ========== ========== 

186 e1 _get_msg_by_hdr hdr Message i3 discover 

187 e2 _msg_value code(s), Msg, args dict[k,v] e3,e4 

188 e3 _msg_value_code code, verb, key dict[k,v] e4,e5,e6 e6 

189 e4 _msg_value_msg Msg, (code) dict[k,v] e2,e3 

190 e5 _msg_qry_by_code_key code, key, (verb=) e6, 

191 e6 _msg_value_qry_by_code key code, key str/float e3,e5 

192 e7 _msg_qry sql e8 

193 e8 _msg_count sql e7 

194 e9 supported_cmds list(Codes) i7 

195 e10 _msgs() i5 

196 ==== ====================== ==================== ============ ========== ========== 

197 

198 """ 

199 

200 _gwy: Gateway 

201 ctl: Controller 

202 tcs: Evohome 

203 

204 # These attr used must be in this class 

205 _z_id: DeviceIdT 

206 _z_idx: DevIndexT | None # e.g. 03, HW. Is None for CTL, TCS. 

207 # idx is one of: 

208 # - a simple index (e.g. zone_idx, domain_id, aka child_id) 

209 # - a compound ctx (e.g. 0005/000C/0418) 

210 # - True (an array of elements, each with its own idx), 

211 # - False (no idx, is usu. 00), 

212 # - None (not determinable, rare) 

213 

214 def __init__(self, gwy: Gateway) -> None: 

215 super().__init__(gwy) 

216 

217 self._msgs_: dict[ 

218 Code, Message 

219 ] = {} # TODO(eb): deprecated, used in test, remove Q1 2026 

220 if not self._gwy.msg_db: # TODO(eb): deprecated since 0.52.1, remove Q1 2026 

221 self._msgz_: dict[ 

222 Code, dict[VerbT, dict[bool | str | None, Message]] 

223 ] = {} # code/verb/ctx, 

224 

225 # As of 0.52.1 we use SQLite MessageIndex, see ramses_rf/database.py 

226 # _msgz_ (nested) was only used in this module. Note: 

227 # _msgz (now rebuilt from _msgs) is also used in: 

228 # - client.py: for code in device._msgz.values() 

229 # - base.py: Code._1060 in self._msgz 

230 # [x] device.heat (no longer used) 

231 

232 def _handle_msg(self, msg: Message) -> None: 

233 """Store a msg in the DBs. 

234 Uses SQLite MessageIndex since 0.52.1 

235 """ 

236 

237 if not ( 

238 msg.src.id == self.id[:_ID_SLICE] # do store if dev is msg.src 

239 or ( 

240 msg.dst.id == self.id[:_ID_SLICE] and msg.verb != RQ 

241 ) # skip RQs to self 

242 or ( 

243 msg.dst.id == ALL_DEVICE_ID and msg.code == Code._1FC9 

244 ) # skip rf_bind rq 

245 ): 

246 return # don't store the rest 

247 

248 if self._gwy.msg_db: # central SQLite MessageIndex 

249 _LOGGER.debug( 

250 "For %s (_z_id %s) add to msg_db: %s, src %s, dst %s", 

251 self.id, 

252 self._z_id, 

253 msg, 

254 msg.src, 

255 msg.dst, 

256 ) 

257 debug_code: Code = Code._3150 # for debugging only log these, pick your own 

258 if msg.code == debug_code and msg.src.id.startswith("01:"): 

259 _LOGGER.debug( 

260 "Added msg from %s with code %s to _gwy.msg_db. hdr=%s", 

261 msg.src, 

262 msg.code, 

263 msg._pkt._hdr, 

264 ) 

265 # print(self._gwy.get(src=str(msg.src[:9]), code=debug_code)) # < success! 

266 # Result in test log: lookup fails 

267 # msg.src = 01:073976 (CTL) 

268 # Added msg from 01:073976 (CTL) with code 0005 to _gwy.msg_db 

269 # query is for: 01:073976 < no suffix, extended lookup to [:12] chars 

270 self._gwy.msg_db.add(msg) 

271 

272 # ignore any replaced message that might be returned 

273 else: # TODO(eb): remove Q1 2026 

274 if msg.code not in self._msgz_: # deprecated since 0.52.1 

275 # Store msg verb + ctx by code in nested self._msgz_ Dict 

276 self._msgz_[msg.code] = {msg.verb: {msg._pkt._ctx: msg}} 

277 elif msg.verb not in self._msgz_[msg.code]: 

278 # Same, 1 level deeper 

279 self._msgz_[msg.code][msg.verb] = {msg._pkt._ctx: msg} 

280 else: 

281 # Same, replacing previous message 

282 self._msgz_[msg.code][msg.verb][msg._pkt._ctx] = msg 

283 

284 # Also store msg by code in flat self._msgs_ dict (stores the latest I/RP msgs by code) 

285 # TODO(eb): deprecated since 0.52.1, remove next block _msgs_ Q1 2026 

286 if msg.verb in (I_, RP): # drop RQ's 

287 # if msg.code == Code._3150 and msg.src.id.startswith( 

288 # "02:" 

289 # ): # print for UFC only, 1 failing test 

290 # print( 

291 # f"Added msg with code {msg.code} to {self.id}._msgs_. hdr={msg._pkt._hdr}" 

292 # ) 

293 self._msgs_[msg.code] = msg 

294 

295 @property 

296 def _msg_list(self) -> list[Message]: 

297 """Return a flattened list of all messages logged on this device.""" 

298 # (only) used in gateway.py#get_state() and in tests/tests/test_eavesdrop_schema.py 

299 # TODO remove _msg_list Q1 2026 

300 if self._gwy.msg_db: 

301 msg_list_qry: list[Message] = [] 

302 code_list = self._msg_dev_qry() 

303 if code_list: 

304 for c in code_list: 

305 if c in self._msgs: 

306 # safeguard against lookup failures ("sim" packets?) 

307 msg_list_qry.append(self._msgs[c]) 

308 else: 

309 # evohome has these errors 

310 # _msg_list could not fetch self._msgs[7FFF] for 18:072981 (_z_id 18:072981) 

311 _LOGGER.debug( 

312 "_msg_list could not fetch self._msgs[%s] for %s (_z_id %s)", 

313 c, 

314 self.id, 

315 self._z_id, 

316 ) 

317 return msg_list_qry 

318 # else create from legacy nested dict 

319 return [ 

320 msg 

321 for code in self._msgz.values() 

322 for ctx in code.values() 

323 for msg in ctx.values() 

324 ] 

325 

326 def _add_record( 

327 self, id: DeviceIdT, code: Code | None = None, verb: str = " I" 

328 ) -> None: 

329 """Add a (dummy) record to the central SQLite MessageIndex.""" 

330 # used by heat.py init 

331 if self._gwy.msg_db: 

332 self._gwy.msg_db.add_record(id, code=str(code), verb=verb) 

333 # else: 

334 # _LOGGER.warning("Missing MessageIndex") 

335 # raise NotImplementedError 

336 

337 def _delete_msg(self, msg: Message) -> None: # FIXME: this is a mess 

338 """Remove the msg from all state databases. Used for expired msgs.""" 

339 

340 from .device import Device 

341 

342 obj: _MessageDB 

343 

344 # delete from the central SQLite MessageIndex 

345 if self._gwy.msg_db: 

346 self._gwy.msg_db.rem(msg) 

347 

348 entities: list[_MessageDB] = [] 

349 if isinstance(msg.src, Device): 

350 entities = [msg.src] 

351 if getattr(msg.src, "tcs", None): 

352 entities.append(msg.src.tcs) 

353 if msg.src.tcs.dhw: 

354 entities.append(msg.src.tcs.dhw) 

355 entities.extend(msg.src.tcs.zones) 

356 

357 # remove the msg from all the state DBs 

358 # TODO(eb): remove Q1 2026 

359 for obj in entities: 

360 if msg in obj._msgs_.values(): 

361 del obj._msgs_[msg.code] 

362 if not self._gwy.msg_db: # _msgz_ is deprecated, only used during migration 

363 with contextlib.suppress(KeyError): 

364 del obj._msgz_[msg.code][msg.verb][msg._pkt._ctx] 

365 

366 ### entity_base query methods 

367 

368 def _get_msg_by_hdr(self, hdr: HeaderT) -> Message | None: 

369 """Return a msg, if any, that matches a given header.""" 

370 

371 if self._gwy.msg_db: 

372 # use central SQLite MessageIndex 

373 msgs = self._gwy.msg_db.get(hdr=hdr) 

374 # only 1 result expected since hdr is a unique key in _gwy.msg_db 

375 if msgs: 

376 if msgs[0]._pkt._hdr != hdr: 

377 raise LookupError 

378 return msgs[0] 

379 else: 

380 msg: Message 

381 code: Code 

382 verb: VerbT 

383 

384 # _ is device_id 

385 code, verb, _, *args = hdr.split("|") # type: ignore[assignment] 

386 

387 try: 

388 if args and (ctx := args[0]): # ctx may == True 

389 msg = self._msgz[code][verb][ctx] 

390 elif False in self._msgz[code][verb]: 

391 msg = self._msgz[code][verb][False] 

392 elif None in self._msgz[code][verb]: 

393 msg = self._msgz[code][verb][None] 

394 else: 

395 return None 

396 except KeyError: 

397 return None 

398 

399 if msg._pkt._hdr != hdr: 

400 raise LookupError 

401 return msg 

402 return None 

403 

404 def _msg_flag(self, code: Code, key: str, idx: int) -> bool | None: 

405 if flags := self._msg_value(code, key=key): 

406 return bool(flags[idx]) 

407 return None 

408 

409 def _msg_value( 

410 self, code: Code | Iterable[Code], *args: Any, **kwargs: Any 

411 ) -> dict | list | None: 

412 """ 

413 Get the value for a Code from the database or from a Message object provided. 

414 

415 :param code: filter messages by Code or a tuple of codes (optional) 

416 :param args: Message (optional) 

417 :param kwargs: zone to filter on (optional) 

418 :return: a dict containing key: value pairs, or a list of those 

419 """ 

420 if isinstance(code, str | tuple): # a code or a tuple of codes 

421 return self._msg_value_code(code, *args, **kwargs) 

422 

423 assert isinstance(code, Message), ( 

424 f"Invalid format: _msg_value({code})" 

425 ) # catch invalidly formatted code, only handle Message from here 

426 return self._msg_value_msg(code, *args, **kwargs) 

427 

428 def _msg_value_code( 

429 self, 

430 code: Code, 

431 verb: VerbT | None = None, 

432 key: str | None = None, 

433 **kwargs: Any, 

434 ) -> dict | list | None: 

435 """ 

436 Query the _msgz message dict or the SQLite MessageIndex for the most recent 

437 key: value pairs(s) for a given code. 

438 

439 :param code: filter messages by Code or a tuple of Codes, optional 

440 :param verb: filter on I, RQ, RP, optional, only with a single Code 

441 :param key: value keyword to retrieve, not together with verb RQ 

442 :param kwargs: extra filter, e.g. zone_idx='01' 

443 :return: a dict containing key: value pairs, or a list of those 

444 """ 

445 assert not isinstance(code, tuple) or verb is None, ( 

446 f"Unsupported: using a tuple ({code}) with a verb ({verb})" 

447 ) 

448 

449 if verb: 

450 if verb == VerbT("RQ"): 

451 # must be a single code 

452 assert not isinstance(code, tuple) or verb is None, ( 

453 f"Unsupported: using a keyword ({key}) with verb RQ. Ignoring key" 

454 ) 

455 key = None 

456 try: 

457 if self._gwy.msg_db: # central SQLite MessageIndex, use verb= kwarg 

458 code = Code( 

459 self._msg_qry_by_code_key(code, key, **kwargs, verb=verb) 

460 ) 

461 msg = self._msgs.get(code) 

462 else: # deprecated lookup in nested _msgz 

463 msgs = self._msgz[code][verb] 

464 msg = max(msgs.values()) if msgs else None 

465 except KeyError: 

466 msg = None 

467 

468 elif isinstance(code, tuple): 

469 msgs = [m for m in self._msgs.values() if m.code in code] 

470 msg = max(msgs) if msgs else None 

471 # return highest = latest? value found in code:value pairs 

472 else: # single Code 

473 # for Zones, this doesn't work, returns first result = often wrong 

474 # TODO fix in _msg_qry_by_code_key() 

475 msg = self._msgs.get(code) 

476 

477 return self._msg_value_msg(msg, key=key, **kwargs) 

478 

479 def _msg_value_msg( 

480 self, 

481 msg: Message | None, 

482 key: str = "*", 

483 zone_idx: str | None = None, 

484 domain_id: str | None = None, 

485 ) -> dict | list | None: 

486 """ 

487 Get from a Message all or a specific key with its value(s), 

488 optionally filtering for a zone or a domain 

489 

490 :param msg: a Message to inspect 

491 :param key: the key to filter on 

492 :param zone_idx: the zone to filter on 

493 :param domain_id: the domain to filter on 

494 :return: a dict containing key: value pairs, or a list of those 

495 """ 

496 if msg is None: 

497 return None 

498 elif msg._expired: 

499 self._gwy._loop.call_soon(self._delete_msg, msg) # HA bugs without defer 

500 

501 if msg.code == Code._1FC9: # NOTE: list of lists/tuples 

502 return [x[1] for x in msg.payload] 

503 

504 idx: str | None = None 

505 val: str | None = None # holds the expected matching id value 

506 

507 if domain_id: 

508 idx, val = SZ_DOMAIN_ID, domain_id 

509 elif zone_idx: 

510 idx, val = SZ_ZONE_IDX, zone_idx 

511 

512 if isinstance(msg.payload, dict): 

513 msg_dict = msg.payload # could be a mismatch on idx, accept 

514 elif idx: # a list of dicts, e.g. SZ_DOMAIN_ID=FC 

515 msg_dict = { 

516 k: v for d in msg.payload for k, v in d.items() if d[idx] == val 

517 } 

518 else: # a list without idx 

519 # TODO: this isn't ideal: e.g. a controller is being treated like a 'stat 

520 # .I 101 --:------ --:------ 12:126457 2309 006 0107D0-0207D0 # is a CTL 

521 msg_dict = msg.payload[0] # we pick the first 

522 

523 assert ( 

524 (not domain_id and not zone_idx) 

525 or (msg_dict.get(idx) == val) 

526 or (idx == SZ_DOMAIN_ID) 

527 ), ( 

528 f"full dict:{msg_dict}, payload:{msg.payload} < Coding error: key='{idx}', val='{val}'" 

529 ) # should not be there 

530 

531 if ( 

532 key == "*" or not key 

533 ): # from a SQLite wildcard query, return first=only? k,v 

534 return { 

535 k: v 

536 for k, v in msg_dict.items() 

537 if k not in ("dhw_idx", SZ_DOMAIN_ID, SZ_ZONE_IDX) and k[:1] != "_" 

538 } 

539 return msg_dict.get(key) 

540 

541 # SQLite methods, since 0.52.0 

542 

543 def _msg_dev_qry(self) -> list[Code] | None: 

544 """ 

545 Retrieve from the MessageIndex a list of Code keys involving this device. 

546 

547 :return: list of Codes or an empty list when the query returned empty 

548 """ 

549 

550 if self._gwy.msg_db: 

551 # SQLite query on MessageIndex 

552 res: list[Code] = [] 

553 

554 if len(self.id) == 9: 

555 # fetch a ctl's message codes (add all its children?) 

556 sql = """ 

557 SELECT code from messages WHERE 

558 verb in (' I', 'RP') 

559 AND (src = ? OR dst = ?) 

560 AND ctx LIKE ? 

561 """ 

562 _ctx_qry = "%" 

563 

564 elif self.id[_ID_SLICE:] == "_HW": 

565 # fetch a DHW entity's message codes 

566 sql = """ 

567 SELECT code from messages WHERE 

568 verb in (' I', 'RP') 

569 AND (src = ? OR dst = ?) 

570 AND (ctx IN ('FC', 'FA', 'F9', 'FA') OR plk LIKE ?) 

571 """ 

572 _ctx_qry = "%dhw_idx%" 

573 

574 else: 

575 # fetch a zone's message codes 

576 sql = """ 

577 SELECT code from messages WHERE 

578 verb in (' I', 'RP') 

579 AND (src = ? OR dst = ?) 

580 AND ctx LIKE ? 

581 """ 

582 _ctx_qry = f"%{self.id[_ID_SLICE + 1 :]}%" 

583 

584 for rec in self._gwy.msg_db.qry_field( 

585 sql, (self.id[:_ID_SLICE], self.id[:_ID_SLICE], _ctx_qry) 

586 ): 

587 _LOGGER.debug( 

588 "Fetched from index: %s for %s (_z_id %s)", 

589 rec[0], 

590 self.id, 

591 self._z_id, 

592 ) 

593 # Example: "Fetched from index: code 1FD4 for 01:123456 (_z_id 01)" 

594 res.append(Code(str(rec[0]))) 

595 return res 

596 else: 

597 _LOGGER.warning("Missing MessageIndex") 

598 raise NotImplementedError 

599 

600 def _msg_qry_by_code_key( 

601 self, 

602 code: Code | tuple[Code] | None = None, 

603 key: str | None = None, 

604 **kwargs: Any, 

605 ) -> Code | None: 

606 """ 

607 Retrieve from the MessageIndex the most current Code for a code(s) & 

608 keyword combination involving this device. 

609 

610 :param code: (optional) a message Code to use, e.g. Code._31DA or a tuple of Codes 

611 :param key: (optional) message keyword to fetch, e.g. SZ_HUMIDITY 

612 :param kwargs: optional verb='vb' single verb 

613 :return: Code of most recent query result message or None when query returned empty 

614 """ 

615 if self._gwy.msg_db: 

616 code_qry: str = "= " 

617 if code is None: 

618 code_qry = "LIKE '%'" # wildcard 

619 elif isinstance(code, tuple): 

620 for cd in code: 

621 code_qry += f"'{str(cd)}' OR code = '" 

622 code_qry = code_qry[:-13] # trim last OR 

623 else: 

624 code_qry += str(code) 

625 if kwargs["verb"] and kwargs["verb"] in (" I", "RP"): 

626 vb = f"('{str(kwargs['verb'])}',)" 

627 else: 

628 vb = "(' I', 'RP',)" 

629 ctx_qry = "%" 

630 if kwargs["zone_idx"]: 

631 ctx_qry = f"%{kwargs['zone_idx']}%" 

632 elif kwargs["dhw_idx"]: # DHW 

633 ctx_qry = f"%{kwargs['dhw_idx']}%" 

634 key_qry = "%" if key is None else f"%{key}%" 

635 

636 # SQLite query on MessageIndex 

637 sql = """ 

638 SELECT dtm, code from messages WHERE 

639 verb in ? 

640 AND (src = ? OR dst = ?) 

641 AND (code ?) 

642 AND (ctx LIKE ?) 

643 AND (plk LIKE ?) 

644 """ 

645 latest: dt = dt(0, 0, 0) 

646 res = None 

647 

648 for rec in self._gwy.msg_db.qry_field( 

649 sql, 

650 ( 

651 vb, 

652 self.id[:_ID_SLICE], 

653 self.id[:_ID_SLICE], 

654 code_qry, 

655 ctx_qry, 

656 key_qry, 

657 ), 

658 ): 

659 _LOGGER.debug( 

660 "_msg_qry_by_code_key fetched rec: %s, code: %s", rec, code_qry 

661 ) 

662 assert isinstance(rec[0], dt) # mypy hint 

663 if rec[0] > latest: # dtm, only use most recent 

664 res = Code(rec[1]) 

665 latest = rec[0] 

666 return res 

667 else: 

668 _LOGGER.warning("Missing MessageIndex") 

669 raise NotImplementedError 

670 

671 def _msg_value_qry_by_code_key( 

672 self, 

673 code: Code | None = None, 

674 key: str | None = None, 

675 **kwargs: Any, 

676 ) -> str | float | None: 

677 """ 

678 Retrieve from the _msgs dict the most current value of a specific code & keyword combination 

679 or the first key's value when no key is specified. 

680 

681 :param code: (optional) a single message Code to use, e.g. 31DA 

682 :param key: (optional) message keyword to fetch the value for, e.g. SZ_HUMIDITY or * (wildcard) 

683 :param kwargs: not used as of 0.52.1 

684 :return: a single string or float value or None when qry returned empty 

685 """ 

686 val_msg: dict | list | None = None 

687 val: object = None 

688 cd: Code | None = self._msg_qry_by_code_key(code, key) 

689 if cd is None or cd not in self._msgs: 

690 _LOGGER.warning("Code %s not in device %s's messages", cd, self.id) 

691 else: 

692 val_msg = self._msg_value_msg( 

693 self._msgs[cd], 

694 key=key, # key can be wildcard * 

695 ) 

696 if val_msg: 

697 val = val_msg[0] 

698 _LOGGER.debug("Extracted val %s for code %s, key %s", val, code, key) 

699 

700 if isinstance(val, float): 

701 return float(val) 

702 else: 

703 return str(val) 

704 

705 def _msg_qry(self, sql: str) -> list[dict]: 

706 """ 

707 SQLite custom query for an entity's stored payloads using the full MessageIndex. 

708 See ramses_rf/database.py 

709 

710 :param sql: custom SQLite query on MessageIndex. Can include multiple CODEs in SELECT. 

711 :return: list of payload dicts from the selected messages, or an empty list 

712 """ 

713 

714 res: list[dict] = [] 

715 if sql and self._gwy.msg_db: 

716 # example query: 

717 # """SELECT code from messages WHERE verb in (' I', 'RP') AND (src = ? OR dst = ?) 

718 # AND (code = '31DA' OR ...) AND (plk LIKE '%{SZ_FAN_INFO}%' OR ...)""" = 2 params 

719 for rec in self._gwy.msg_db.qry_field( 

720 sql, (self.id[:_ID_SLICE], self.id[:_ID_SLICE]) 

721 ): 

722 _pl = self._msgs[Code(rec[0])].payload 

723 # add payload dict to res(ults) 

724 res.append(_pl) # only if newer, handled by MessageIndex 

725 return res 

726 

727 def _msg_count(self, sql: str) -> int: 

728 """ 

729 Get the number of messages in a query result. 

730 

731 :param sql: custom SQLite query on MessageIndex. 

732 :return: amount of messages in entity's database, 0 for no results 

733 """ 

734 return len(self._msg_qry(sql)) 

735 

736 @property 

737 def traits(self) -> dict[str, Any]: 

738 """Get the codes seen by the entity.""" 

739 

740 codes = { 

741 code: (CODES_SCHEMA[code][SZ_NAME] if code in CODES_SCHEMA else None) 

742 for code in sorted(self._msgs) 

743 if self._msgs[code].src == (self if hasattr(self, "addr") else self.ctl) 

744 } 

745 

746 return {"_sent": list(codes.keys())} 

747 

748 @property 

749 def _msgs(self) -> dict[Code, Message]: 

750 """ 

751 Get a flat dict af all I/RP messages logged with this device as src or dst. 

752 

753 :return: flat dict of messages by Code 

754 """ 

755 if not self._gwy.msg_db: 

756 return self._msgs_ 

757 # _LOGGER.warning("Missing MessageIndex") 

758 # raise NotImplementedError 

759 

760 # if self.id[:3] == "18:": # HGI, confirm this is correct, tests suggest so 

761 # return {} 

762 

763 # a routine to debug dict creation, see test_systems.py: 

764 # print(f"Create _msgs for {self.id}:") 

765 # results = self._gwy.msg_db._cu.execute("SELECT dtm, src, code from messages WHERE verb in (' I', 'RP') and code is '3150'") 

766 # for r in results: 

767 # print(r) 

768 

769 if len(self.id) == 9: 

770 # fetch a ctl's message dtms (add all its children?) 

771 sql = """ 

772 SELECT dtm from messages WHERE 

773 verb in (' I', 'RP') 

774 AND (src = ? OR dst = ?) 

775 AND ctx LIKE ? 

776 """ 

777 _ctx_qry = "%" 

778 

779 elif self.id[_ID_SLICE:] == "_HW": 

780 # fetch a DHW entity's message dtms 

781 sql = """ 

782 SELECT dtm from messages WHERE 

783 verb in (' I', 'RP') 

784 AND (src = ? OR dst = ?) 

785 AND (ctx IN ('FC', 'FA', 'F9', 'FA') OR plk LIKE ?) 

786 """ 

787 _ctx_qry = "%dhw_idx%" 

788 # TODO add Children messages? self.ctl.dhw 

789 else: 

790 # fetch a zone's message dtms 

791 sql = """ 

792 SELECT dtm from messages WHERE 

793 verb in (' I', 'RP') 

794 AND (src = ? OR dst = ?) 

795 AND ctx LIKE ? 

796 """ 

797 _ctx_qry = f"%{self.id[_ID_SLICE + 1 :]}%" 

798 

799 _msg_dict = { # since 0.52.3 use ctx (context) instead of just the address 

800 m.code: m 

801 for m in self._gwy.msg_db.qry( 

802 sql, (self.id[:_ID_SLICE], self.id[:_ID_SLICE], _ctx_qry) 

803 ) # e.g. 01:123456_HW, 01:123456_02 (Zone) 

804 } 

805 # if CTL, remove 3150, 3220 heat_demand, both are only stored on children 

806 # HACK 

807 # if self.id[:3] == "01:" and self._SLUG == "CTL": 

808 # with next ON: 2 errors , both 1x UFC, 1x CTR 

809 # with next OFF: 4 errors, all CTR 

810 # if Code._3150 in _msg_dict: # Note: CTL can send a 3150 (see heat_ufc_00) 

811 # _msg_dict.pop(Code._3150) # keep, prefer to have 2 extra instead of missing 1 

812 # if Code._3220 in _msg_dict: 

813 # _msg_dict.pop(Code._3220) 

814 # _LOGGER.debug(f"Removed 3150/3220 from %s._msgs dict", self.id) 

815 return _msg_dict 

816 

817 @property 

818 def _msgz(self) -> dict[Code, dict[VerbT, dict[bool | str | None, Message]]]: 

819 """ 

820 Get a nested dict of all I/RP messages logged with this device as either src or dst. 

821 Based on SQL query on MessageIndex with device as src or dst. 

822 

823 :return: dict of messages involving this device, nested by Code, Verb, Context 

824 """ 

825 if not self._gwy.msg_db: 

826 return self._msgz_ # TODO(eb): remove and uncomment next Q1 2026 

827 # _LOGGER.warning("Missing MessageIndex") 

828 # raise NotImplementedError 

829 

830 # build _msgz from MessageIndex/_msgs: 

831 msgs_1: dict[Code, dict[VerbT, dict[bool | str | None, Message]]] = {} 

832 msg: Message 

833 

834 for msg in self._msgs.values(): # contains only verbs I, RP 

835 if msg.code not in msgs_1: 

836 msgs_1[msg.code] = {msg.verb: {msg._pkt._ctx: msg}} 

837 elif msg.verb not in msgs_1[msg.code]: 

838 msgs_1[msg.code][msg.verb] = {msg._pkt._ctx: msg} 

839 else: 

840 msgs_1[msg.code][msg.verb][msg._pkt._ctx] = msg 

841 

842 return msgs_1 

843 

844 

845class _Discovery(_MessageDB): 

846 MAX_CYCLE_SECS = 30 

847 MIN_CYCLE_SECS = 3 

848 

849 def __init__(self, gwy: Gateway) -> None: 

850 super().__init__(gwy) 

851 

852 self._discovery_cmds: dict[HeaderT, dict] = None # type: ignore[assignment] 

853 self._discovery_poller: asyncio.Task | None = None 

854 

855 self._supported_cmds: dict[str, bool | None] = {} 

856 self._supported_cmds_ctx: dict[str, bool | None] = {} 

857 

858 if not gwy.config.disable_discovery: 

859 # self._start_discovery_poller() # Can't use derived classes don't exist yet 

860 gwy._loop.call_soon(self._start_discovery_poller) 

861 

862 @property # TODO: needs tidy up 

863 def discovery_cmds(self) -> dict[HeaderT, dict]: 

864 """Return the pollable commands.""" 

865 if self._discovery_cmds is None: 

866 self._discovery_cmds = {} 

867 self._setup_discovery_cmds() 

868 return self._discovery_cmds 

869 

870 @property 

871 def supported_cmds(self) -> dict[Code, Any]: 

872 """Return the current list of pollable command codes.""" 

873 if self._gwy.msg_db: 

874 return { 

875 code: CODES_SCHEMA[code][SZ_NAME] 

876 for code in sorted( 

877 self._gwy.msg_db.get_rp_codes( 

878 (self.id[:_ID_SLICE], self.id[:_ID_SLICE]) 

879 ) 

880 ) 

881 if self._is_not_deprecated_cmd(code) 

882 } 

883 return { # TODO(eb): deprecated since 0.52.1, remove Q1 2026 

884 code: (CODES_SCHEMA[code][SZ_NAME] if code in CODES_SCHEMA else None) 

885 for code in sorted(self._msgz) 

886 if self._msgz[code].get(RP) and self._is_not_deprecated_cmd(code) 

887 } 

888 

889 @property 

890 def supported_cmds_ot(self) -> dict[MsgId, Any]: 

891 """Return the current list of pollable OT msg_ids.""" 

892 

893 def _to_data_id(msg_id: MsgId | str) -> OtDataId: 

894 return int(msg_id, 16) # type: ignore[return-value] 

895 

896 # def _to_msg_id(data_id: OtDataId | int) -> MsgId: # not used 

897 # return f"{data_id:02X}" # type: ignore[return-value] 

898 

899 res: list[str] = [] 

900 if self._gwy.msg_db: 

901 # SQLite query for ctx field on MessageIndex 

902 sql = """ 

903 SELECT ctx from messages WHERE 

904 verb = 'RP' 

905 AND code = '3220' 

906 AND (src = ? OR dst = ?) 

907 """ 

908 for rec in self._gwy.msg_db.qry_field( 

909 sql, (self.id[:_ID_SLICE], self.id[:_ID_SLICE]) 

910 ): 

911 _LOGGER.debug("Fetched OT ctx from index: %s", rec[0]) 

912 res.append(rec[0]) 

913 else: # TODO(eb): remove next Q1 2026 

914 res_dict: dict[bool | str | None, Message] | list[Any] = self._msgz[ 

915 Code._3220 

916 ].get(RP, []) 

917 assert isinstance(res_dict, dict) 

918 res = list(res_dict.keys()) 

919 # raise NotImplementedError 

920 

921 return { 

922 f"0x{msg_id}": OPENTHERM_MESSAGES[_to_data_id(msg_id)].get("en") # type: ignore[misc] 

923 for msg_id in sorted(res) 

924 if ( 

925 self._is_not_deprecated_cmd(Code._3220, ctx=msg_id) 

926 and _to_data_id(msg_id) in OPENTHERM_MESSAGES 

927 ) 

928 } 

929 

930 def _is_not_deprecated_cmd(self, code: Code, ctx: str | None = None) -> bool: 

931 """Return True if the code|ctx pair is not deprecated.""" 

932 

933 if ctx is None: 

934 supported_cmds = self._supported_cmds 

935 idx = str(code) 

936 else: 

937 supported_cmds = self._supported_cmds_ctx 

938 idx = f"{code}|{ctx}" 

939 

940 return supported_cmds.get(idx, None) is not False 

941 

942 def _setup_discovery_cmds(self) -> None: 

943 raise NotImplementedError 

944 

945 def _add_discovery_cmd( 

946 self, 

947 cmd: Command, 

948 interval: float, 

949 *, 

950 delay: float = 0, 

951 timeout: float | None = None, 

952 ) -> None: 

953 """Schedule a command to run periodically. 

954 

955 Both `timeout` and `delay` are in seconds. 

956 """ 

957 

958 if cmd.rx_header is None: # TODO: raise TypeError 

959 _LOGGER.warning(f"cmd({cmd}): invalid (null) header not added to discovery") 

960 return 

961 

962 if cmd.rx_header in self.discovery_cmds: 

963 _LOGGER.info(f"cmd({cmd}): duplicate header not added to discovery") 

964 return 

965 

966 if delay: 

967 delay += random.uniform(0.05, 0.45) 

968 

969 self.discovery_cmds[cmd.rx_header] = { 

970 _SZ_COMMAND: cmd, 

971 _SZ_INTERVAL: td(seconds=max(interval, self.MAX_CYCLE_SECS)), 

972 _SZ_LAST_PKT: None, 

973 _SZ_NEXT_DUE: dt.now() + td(seconds=delay), 

974 _SZ_TIMEOUT: timeout, 

975 _SZ_FAILURES: 0, 

976 } 

977 

978 def _start_discovery_poller(self) -> None: 

979 """Start the discovery poller (if it is not already running).""" 

980 

981 if self._discovery_poller and not self._discovery_poller.done(): 

982 return 

983 

984 self._discovery_poller = schedule_task(self._poll_discovery_cmds) 

985 self._discovery_poller.set_name(f"{self.id}_discovery_poller") 

986 self._gwy.add_task(self._discovery_poller) 

987 

988 async def _stop_discovery_poller(self) -> None: 

989 """Stop the discovery poller (only if it is running).""" 

990 if not self._discovery_poller or self._discovery_poller.done(): 

991 return 

992 

993 self._discovery_poller.cancel() 

994 with contextlib.suppress(asyncio.CancelledError): 

995 await self._discovery_poller 

996 

997 async def _poll_discovery_cmds(self) -> None: 

998 """Send any outstanding commands that are past due. 

999 

1000 If a relevant message was received recently enough, reschedule the corresponding 

1001 command for later. 

1002 """ 

1003 

1004 while True: 

1005 await self.discover() 

1006 

1007 if self.discovery_cmds: 

1008 next_due = min(t[_SZ_NEXT_DUE] for t in self.discovery_cmds.values()) 

1009 delay = max((next_due - dt.now()).total_seconds(), self.MIN_CYCLE_SECS) 

1010 else: 

1011 delay = self.MAX_CYCLE_SECS 

1012 

1013 await asyncio.sleep(min(delay, self.MAX_CYCLE_SECS)) 

1014 

1015 async def discover(self) -> None: 

1016 def find_latest_msg(hdr: HeaderT, task: dict) -> Message | None: 

1017 """ 

1018 :return: the latest message for a header from any source (not just RPs). 

1019 """ 

1020 msgs: list[Message] = [ 

1021 m 

1022 for m in [self._get_msg_by_hdr(hdr[:5] + v + hdr[7:]) for v in (I_, RP)] 

1023 if m is not None 

1024 ] 

1025 

1026 try: 

1027 if task[_SZ_COMMAND].code in (Code._000A, Code._30C9): 

1028 if self._gwy.msg_db: # use bespoke MessageIndex qry 

1029 sql = """ 

1030 SELECT dtm from messages WHERE 

1031 code = ? 

1032 AND verb in (' I', 'RP') 

1033 AND ctx = 'True' 

1034 AND (src = ? OR dst = ?) 

1035 """ 

1036 res = self._gwy.msg_db.qry( 

1037 sql, 

1038 ( 

1039 task[_SZ_COMMAND].code, 

1040 self.tcs.id[:_ID_SLICE], 

1041 self.tcs.id[:_ID_SLICE], 

1042 ), 

1043 ) 

1044 if len(res) > 0: 

1045 msgs += res[0] # expect 1 Message in returned tuple 

1046 else: 

1047 _LOGGER.debug( 

1048 f"No msg found for hdr {hdr}, task code {task[_SZ_COMMAND].code}" 

1049 ) 

1050 else: # TODO(eb) remove next Q1 2026 

1051 # CRITICAL FIX: self.tcs might be None during early discovery 

1052 if self.tcs: 

1053 msgs += [self.tcs._msgz[task[_SZ_COMMAND].code][I_][True]] 

1054 # raise NotImplementedError 

1055 except KeyError: 

1056 pass 

1057 

1058 return max(msgs) if msgs else None 

1059 

1060 def backoff(hdr: HeaderT, failures: int) -> td: 

1061 """Backoff the interval if there are/were any failures.""" 

1062 

1063 if not _DBG_ENABLE_DISCOVERY_BACKOFF: # FIXME: data gaps 

1064 return self.discovery_cmds[hdr][_SZ_INTERVAL] # type: ignore[no-any-return] 

1065 

1066 if failures > 5: 

1067 secs = 60 * 60 * 6 

1068 _LOGGER.error( 

1069 f"No response for {hdr} ({failures}/5): throttling to 1/6h" 

1070 ) 

1071 elif failures > 2: 

1072 _LOGGER.warning( 

1073 f"No response for {hdr} ({failures}/5): retrying in {self.MAX_CYCLE_SECS}s" 

1074 ) 

1075 secs = self.MAX_CYCLE_SECS 

1076 else: 

1077 _LOGGER.info( 

1078 f"No response for {hdr} ({failures}/5): retrying in {self.MIN_CYCLE_SECS}s" 

1079 ) 

1080 secs = self.MIN_CYCLE_SECS 

1081 

1082 return td(seconds=secs) 

1083 

1084 async def send_disc_cmd( 

1085 hdr: HeaderT, task: dict, timeout: float = 15 

1086 ) -> Packet | None: # TODO: use constant instead of 15 

1087 """Send a scheduled command and wait for/return the response.""" 

1088 

1089 try: 

1090 pkt: Packet | None = await asyncio.wait_for( 

1091 self._gwy.async_send_cmd(task[_SZ_COMMAND]), 

1092 timeout=timeout, # self.MAX_CYCLE_SECS? 

1093 ) 

1094 

1095 # TODO: except: handle no QoS 

1096 

1097 except exc.ProtocolError as err: # InvalidStateError, SendTimeoutError 

1098 _LOGGER.warning(f"{self}: Failed to send discovery cmd: {hdr}: {err}") 

1099 

1100 except TimeoutError as err: # safety valve timeout 

1101 _LOGGER.warning( 

1102 f"{self}: Failed to send discovery cmd: {hdr} within {timeout} secs: {err}" 

1103 ) 

1104 

1105 else: 

1106 return pkt 

1107 

1108 return None 

1109 

1110 for hdr, task in self.discovery_cmds.items(): 

1111 dt_now = dt.now() 

1112 

1113 if (msg := find_latest_msg(hdr, task)) and ( 

1114 task[_SZ_NEXT_DUE] < msg.dtm + task[_SZ_INTERVAL] 

1115 ): # if a newer message is available, take it 

1116 task[_SZ_FAILURES] = 0 # only if task[_SZ_LAST_PKT].verb == RP? 

1117 task[_SZ_LAST_PKT] = msg._pkt 

1118 task[_SZ_NEXT_DUE] = msg.dtm + task[_SZ_INTERVAL] 

1119 

1120 if task[_SZ_NEXT_DUE] > dt_now: 

1121 continue # if (most recent) last_msg is not yet due... 

1122 

1123 # since we may do I/O, check if the code|msg_id is deprecated 

1124 task[_SZ_NEXT_DUE] = dt_now + task[_SZ_INTERVAL] # might undeprecate later 

1125 

1126 if not self._is_not_deprecated_cmd(task[_SZ_COMMAND].code): 

1127 continue 

1128 if not self._is_not_deprecated_cmd( 

1129 task[_SZ_COMMAND].code, ctx=task[_SZ_COMMAND].payload[4:6] 

1130 ): # only for Code._3220 

1131 continue 

1132 

1133 # we'll have to do I/O... 

1134 task[_SZ_NEXT_DUE] = dt_now + backoff(hdr, task[_SZ_FAILURES]) # JIC 

1135 

1136 if pkt := await send_disc_cmd(hdr, task): # TODO: OK 4 some exceptions 

1137 task[_SZ_FAILURES] = 0 # only if task[_SZ_LAST_PKT].verb == RP? 

1138 task[_SZ_LAST_PKT] = pkt 

1139 task[_SZ_NEXT_DUE] = pkt.dtm + task[_SZ_INTERVAL] 

1140 else: 

1141 task[_SZ_FAILURES] += 1 

1142 task[_SZ_LAST_PKT] = None 

1143 task[_SZ_NEXT_DUE] = dt_now + backoff(hdr, task[_SZ_FAILURES]) 

1144 

1145 def _deprecate_code_ctx( 

1146 self, pkt: Packet, ctx: str = None, reset: bool = False 

1147 ) -> None: 

1148 """If a code|ctx is deprecated twice, stop polling for it.""" 

1149 

1150 def deprecate(supported_dict: dict[str, bool | None], idx: str) -> None: 

1151 if idx not in supported_dict: 

1152 supported_dict[idx] = None 

1153 elif supported_dict[idx] is None: 

1154 _LOGGER.info( 

1155 f"{pkt} < Polling now deprecated for code|ctx={idx}: " 

1156 "it appears to be unsupported" 

1157 ) 

1158 supported_dict[idx] = False 

1159 

1160 def reinstate(supported_dict: dict[str, bool | None], idx: str) -> None: 

1161 if self._is_not_deprecated_cmd(idx, None) is False: 

1162 _LOGGER.info( 

1163 f"{pkt} < Polling now reinstated for code|ctx={idx}: " 

1164 "it now appears supported" 

1165 ) 

1166 if idx in supported_dict: 

1167 supported_dict.pop(idx) 

1168 

1169 if ctx is None: 

1170 supported_cmds = self._supported_cmds 

1171 idx: str = pkt.code 

1172 else: 

1173 supported_cmds = self._supported_cmds_ctx 

1174 idx = f"{pkt.code}|{ctx}" 

1175 

1176 (reinstate if reset else deprecate)(supported_cmds, idx) 

1177 

1178 

1179class Entity(_Discovery): 

1180 """The base class for Devices/Zones/Systems.""" 

1181 

1182 

1183class Parent(Entity): # A System, Zone, DhwZone or a UfhController 

1184 """A Parent can be a System (TCS), a heating Zone, a DHW Zone, or a UfhController. 

1185 

1186 For a System, children include the appliance controller, the children of all Zones 

1187 (incl. the DHW Zone), and also any UFH controllers. 

1188 

1189 For a heating Zone, children are limited to a sensor, and a number of actuators. 

1190 

1191 For the DHW Zone, the children are limited to a sensor, a DHW valve, and/or a 

1192 heating valve. 

1193 

1194 There is a `set_parent` method, but no `set_child` method. 

1195 """ 

1196 

1197 actuator_by_id: dict[DeviceIdT, BdrSwitch | UfhCircuit | TrvActuator] 

1198 actuators: list[BdrSwitch | UfhCircuit | TrvActuator] 

1199 

1200 circuit_by_id: dict[str, Any] 

1201 

1202 _app_cntrl: BdrSwitch | OtbGateway | None 

1203 _dhw_sensor: DhwSensor | None 

1204 _dhw_valve: BdrSwitch | None 

1205 _htg_valve: BdrSwitch | None 

1206 

1207 def __init__(self, *args: Any, child_id: str = None, **kwargs: Any) -> None: 

1208 super().__init__(*args, **kwargs) 

1209 

1210 self._child_id: str = child_id # type: ignore[assignment] 

1211 

1212 # self._sensor: Child = None 

1213 self.child_by_id: dict[str, Child] = {} 

1214 self.childs: list[Child] = [] 

1215 

1216 @property 

1217 def zone_idx(self) -> str: 

1218 """Return the domain id. 

1219 

1220 For zones and circuits, the domain id is an idx, e.g.: '00', '01', '02'... 

1221 For systems, it is 'FF', otherwise it is one of 'F9', 'FA' or 'FC'. 

1222 """ 

1223 return self._child_id 

1224 

1225 @zone_idx.setter # TODO: should be a private setter 

1226 def zone_idx(self, value: str) -> None: 

1227 """Set the domain id, after validating it.""" 

1228 self._child_id = value 

1229 

1230 def _add_child( 

1231 self, child: Any, *, child_id: str = None, is_sensor: bool = None 

1232 ) -> None: 

1233 """Add a child device to this Parent, after validating the association. 

1234 

1235 Also sets various other parent-specific object references (e.g. parent._sensor). 

1236 

1237 This method should be invoked by the child's corresponding `set_parent` method. 

1238 """ 

1239 

1240 # NOTE: here to prevent circular references 

1241 from .device import ( 

1242 BdrSwitch, 

1243 DhwSensor, 

1244 OtbGateway, 

1245 OutSensor, 

1246 TrvActuator, 

1247 UfhCircuit, 

1248 UfhController, 

1249 ) 

1250 from .system import DhwZone, System, Zone 

1251 

1252 if hasattr(self, "childs") and child not in self.childs: # Any parent 

1253 assert isinstance( 

1254 self, System | Zone | DhwZone | UfhController 

1255 ) # TODO: remove me 

1256 

1257 if is_sensor and child_id == FA: # DHW zone (sensor) 

1258 assert isinstance(self, DhwZone) # TODO: remove me 

1259 assert isinstance(child, DhwSensor) 

1260 if self._dhw_sensor and self._dhw_sensor is not child: 

1261 raise exc.SystemSchemaInconsistent( 

1262 f"{self} changed dhw_sensor (from {self._dhw_sensor} to {child})" 

1263 ) 

1264 self._dhw_sensor = child 

1265 

1266 elif is_sensor and hasattr(self, SZ_SENSOR): # HTG zone 

1267 assert isinstance(self, Zone) # TODO: remove me 

1268 if self.sensor and self.sensor is not child: 

1269 raise exc.SystemSchemaInconsistent( 

1270 f"{self} changed zone sensor (from {self.sensor} to {child})" 

1271 ) 

1272 self._sensor = child 

1273 

1274 elif is_sensor: 

1275 raise TypeError( 

1276 f"not a valid combination for {self}: {child}|{child_id}|{is_sensor}" 

1277 ) 

1278 

1279 elif hasattr(self, SZ_CIRCUITS): # UFH circuit 

1280 assert isinstance(self, UfhController) # TODO: remove me 

1281 if child not in self.circuit_by_id: 

1282 self.circuit_by_id[child.id] = child 

1283 

1284 elif hasattr(self, SZ_ACTUATORS): # HTG zone 

1285 assert isinstance(self, Zone) # TODO: remove me 

1286 assert isinstance(child, BdrSwitch | UfhCircuit | TrvActuator) 

1287 if child not in self.actuators: 

1288 self.actuators.append(child) 

1289 self.actuator_by_id[child.id] = child # type: ignore[assignment,index] 

1290 

1291 elif child_id == F9: # DHW zone (HTG valve) 

1292 assert isinstance(self, DhwZone) # TODO: remove me 

1293 assert isinstance(child, BdrSwitch) 

1294 if self._htg_valve and self._htg_valve is not child: 

1295 raise exc.SystemSchemaInconsistent( 

1296 f"{self} changed htg_valve (from {self._htg_valve} to {child})" 

1297 ) 

1298 self._htg_valve = child 

1299 

1300 elif child_id == FA: # DHW zone (DHW valve) 

1301 assert isinstance(self, DhwZone) # TODO: remove me 

1302 assert isinstance(child, BdrSwitch) 

1303 if self._dhw_valve and self._dhw_valve is not child: 

1304 raise exc.SystemSchemaInconsistent( 

1305 f"{self} changed dhw_valve (from {self._dhw_valve} to {child})" 

1306 ) 

1307 self._dhw_valve = child 

1308 

1309 elif child_id == FC: # Appliance Controller 

1310 assert isinstance(self, System) # TODO: remove me 

1311 assert isinstance(child, BdrSwitch | OtbGateway) 

1312 if self._app_cntrl and self._app_cntrl is not child: 

1313 raise exc.SystemSchemaInconsistent( 

1314 f"{self} changed app_cntrl (from {self._app_cntrl} to {child})" 

1315 ) 

1316 self._app_cntrl = child 

1317 

1318 elif child_id == FF: # System 

1319 assert isinstance(self, System) # TODO: remove me? 

1320 assert isinstance(child, UfhController | OutSensor) 

1321 pass 

1322 

1323 else: 

1324 raise TypeError( 

1325 f"not a valid combination for {self}: {child}|{child_id}|{is_sensor}" 

1326 ) 

1327 

1328 self.childs.append(child) 

1329 self.child_by_id[child.id] = child 

1330 

1331 

1332class Child(Entity): # A Zone, Device or a UfhCircuit 

1333 """A Device can be the Child of a Parent (a System, a heating Zone, or a DHW Zone). 

1334 

1335 A Device may/may not have a Parent, but all devices will have the gateway as a 

1336 parent, so that they can always be found via `gwy.child_by_id[device_id]`. 

1337 

1338 In addition, the gateway has `system_by_id`, the Systems have `zone_by_id`, and the 

1339 heating Zones have `actuator_by_id` dicts. 

1340 

1341 There is a `set_parent` method, but no `set_child` method. 

1342 """ 

1343 

1344 def __init__( 

1345 self, 

1346 *args: Any, 

1347 parent: Parent = None, 

1348 is_sensor: bool | None = None, 

1349 **kwargs: Any, 

1350 ) -> None: 

1351 super().__init__(*args, **kwargs) 

1352 

1353 self._parent = parent 

1354 self._is_sensor = is_sensor 

1355 

1356 self._child_id: str | None = None # TODO: should be: str? 

1357 

1358 def _handle_msg(self, msg: Message) -> None: 

1359 from .device import Controller, Device, UfhController 

1360 

1361 def eavesdrop_parent_zone() -> None: 

1362 if isinstance(msg.src, UfhController): 

1363 return 

1364 

1365 if SZ_ZONE_IDX not in msg.payload: 

1366 return 

1367 

1368 if isinstance(self, Device): # FIXME: a mess... see issue ramses_cc #249 

1369 # the following is a mess - may just be better off deprecating it 

1370 if self.type in DEV_TYPE_MAP.HEAT_ZONE_ACTUATORS: 

1371 self.set_parent(msg.dst, child_id=msg.payload[SZ_ZONE_IDX]) 

1372 

1373 elif self.type in DEV_TYPE_MAP.THM_DEVICES: 

1374 self.set_parent( 

1375 msg.dst, child_id=msg.payload[SZ_ZONE_IDX], is_sensor=True 

1376 ) 

1377 

1378 super()._handle_msg(msg) 

1379 

1380 if not self._gwy.config.enable_eavesdrop or ( 

1381 msg.src is msg.dst or not isinstance(msg.dst, Controller) # UfhController)) 

1382 ): 

1383 return 

1384 

1385 if not self._parent or not self._child_id: 

1386 eavesdrop_parent_zone() 

1387 

1388 def _get_parent( 

1389 self, parent: Parent, *, child_id: str = None, is_sensor: bool | None = None 

1390 ) -> tuple[Parent, str | None]: 

1391 """Get the device's parent, after validating it.""" 

1392 

1393 # NOTE: here to prevent circular references 

1394 from .device import ( 

1395 BdrSwitch, 

1396 Controller, 

1397 DhwSensor, 

1398 OtbGateway, 

1399 OutSensor, 

1400 Thermostat, 

1401 TrvActuator, 

1402 UfhCircuit, 

1403 UfhController, 

1404 ) 

1405 from .system import DhwZone, Evohome, System, Zone 

1406 

1407 if isinstance(self, UfhController): 

1408 child_id = FF 

1409 

1410 if isinstance(parent, Controller): # A controller can't be a Parent 

1411 parent = parent.tcs 

1412 

1413 if isinstance(parent, Evohome) and child_id: 

1414 if child_id in (F9, FA): 

1415 parent = parent.get_dhw_zone() 

1416 # elif child_id == FC: 

1417 # pass 

1418 elif int(child_id, 16) < parent._max_zones: 

1419 parent = parent.get_htg_zone(child_id) 

1420 

1421 elif isinstance(parent, Zone) and not child_id: 

1422 child_id = child_id or parent.idx 

1423 

1424 # elif isinstance(parent, DhwZone) and child_id: 

1425 # child_id = child_id or parent.idx # ?"HW" 

1426 

1427 elif isinstance(parent, UfhController) and not child_id: 

1428 raise TypeError( 

1429 f"{self}: can't set child_id to: {child_id} " 

1430 f"(for Circuits, it must be a circuit_idx)" 

1431 ) 

1432 

1433 # if child_id is None: 

1434 # child_id = parent._child_id # or, for zones: parent.idx 

1435 

1436 if self._parent and self._parent != parent: 

1437 raise exc.SystemSchemaInconsistent( 

1438 f"{self} can't change parent " 

1439 f"({self._parent}_{self._child_id} to {parent}_{child_id})" 

1440 ) 

1441 

1442 # if self._child_id is not None and self._child_id != child_id: 

1443 # raise CorruptStateError( 

1444 # f"{self} can't set domain to: {child_id}, " 

1445 # f"({self._parent}_{self._child_id} to {parent}_{child_id})" 

1446 # ) 

1447 

1448 # if self._parent: 

1449 # if self._parent.ctl is not parent: 

1450 # raise CorruptStateError(f"parent mismatch: {self._parent.ctl} is not {parent}") 

1451 # if self._child_id and self._child_id != child_id: 

1452 # raise CorruptStateError(f"child_id mismatch: {self._child_id} != {child_id}") 

1453 

1454 PARENT_RULES: dict[Any, dict] = { 

1455 DhwZone: {SZ_ACTUATORS: (BdrSwitch,), SZ_SENSOR: (DhwSensor,)}, 

1456 System: { 

1457 SZ_ACTUATORS: (BdrSwitch, OtbGateway, UfhController), 

1458 SZ_SENSOR: (OutSensor,), 

1459 }, 

1460 UfhController: {SZ_ACTUATORS: (UfhCircuit,), SZ_SENSOR: ()}, 

1461 Zone: { 

1462 SZ_ACTUATORS: (BdrSwitch, TrvActuator, UfhCircuit), 

1463 SZ_SENSOR: (Controller, Thermostat, TrvActuator), 

1464 }, 

1465 } 

1466 

1467 for k, v in PARENT_RULES.items(): 

1468 if isinstance(parent, k): 

1469 rules = v 

1470 break 

1471 else: 

1472 raise TypeError( 

1473 f"for Parent {parent}: not a valid parent " 

1474 f"(it must be {tuple(PARENT_RULES.keys())})" 

1475 ) 

1476 

1477 if is_sensor and not isinstance(self, rules[SZ_SENSOR]): 

1478 raise TypeError( 

1479 f"for Parent {parent}: Sensor {self} must be {rules[SZ_SENSOR]}" 

1480 ) 

1481 if not is_sensor and not isinstance(self, rules[SZ_ACTUATORS]): 

1482 raise TypeError( 

1483 f"for Parent {parent}: Actuator {self} must be {rules[SZ_ACTUATORS]}" 

1484 ) 

1485 

1486 if isinstance(parent, Zone): 

1487 if child_id != parent.idx: 

1488 raise TypeError( 

1489 f"{self}: can't set child_id to: {child_id} " 

1490 f"(it must match its parent's zone idx, {parent.idx})" 

1491 ) 

1492 

1493 elif isinstance(parent, DhwZone): # usu. FA (HW), could be F9 

1494 if child_id not in (F9, FA): # may not be known if eavesdrop'd 

1495 raise TypeError( 

1496 f"{self}: can't set child_id to: {child_id} " 

1497 f"(for DHW, it must be F9 or FA)" 

1498 ) 

1499 

1500 elif isinstance(parent, System): # usu. FC 

1501 if child_id not in (FC, FF): # was: not in (F9, FA, FC, "HW"): 

1502 raise TypeError( 

1503 f"{self}: can't set child_id to: {child_id} " 

1504 f"(for TCS, it must be FC)" 

1505 ) 

1506 

1507 elif not isinstance(parent, UfhController): # is like CTL/TCS combined 

1508 raise TypeError( 

1509 f"{self}: can't set Parent to: {parent} " 

1510 f"(it must be System, DHW, Zone, or UfhController)" 

1511 ) 

1512 

1513 return parent, child_id 

1514 

1515 # TODO: should be a private method 

1516 def set_parent( 

1517 self, parent: Parent | None, *, child_id: str = None, is_sensor: bool = None 

1518 ) -> Parent: 

1519 """Set the device's parent, after validating it. 

1520 

1521 This method will then invoke the parent's corresponding `set_child` method. 

1522 

1523 Devices don't have parents, rather: parents have children; a mis-configured 

1524 system could easily leave a device as a child of multiple parents (or bound 

1525 to multiple controllers). 

1526 

1527 It is assumed that a device is only bound to one controller, either a (Evohome) 

1528 controller, or an UFH controller. 

1529 """ 

1530 

1531 from .device import ( # NOTE: here to prevent circular references 

1532 Controller, 

1533 UfhController, 

1534 ) 

1535 

1536 parent, child_id = self._get_parent( 

1537 parent, child_id=child_id, is_sensor=is_sensor 

1538 ) 

1539 ctl = parent if isinstance(parent, UfhController) else parent.ctl 

1540 

1541 if self.ctl and self.ctl is not ctl: 

1542 # NOTE: assume a device is bound to only one CTL (usu. best practice) 

1543 raise exc.SystemSchemaInconsistent( 

1544 f"{self} can't change controller: {self.ctl} to {ctl} " 

1545 "(or perhaps the device has multiple controllers?" 

1546 ) 

1547 

1548 parent._add_child(self, child_id=child_id, is_sensor=is_sensor) 

1549 # parent.childs.append(self) 

1550 # parent.child_by_id[self.id] = self 

1551 

1552 self._child_id = child_id 

1553 self._parent = parent 

1554 

1555 assert isinstance(ctl, Controller) # mypy hint 

1556 

1557 self.ctl: Controller = ctl 

1558 self.tcs: Evohome = ctl.tcs 

1559 

1560 return parent