Coverage for src/ramses_rf/system/heat.py: 32%

513 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - The evohome-compatible system.""" 

3 

4from __future__ import annotations 

5 

6import asyncio 

7import logging 

8from datetime import datetime as dt, timedelta as td 

9from threading import Lock 

10from types import SimpleNamespace 

11from typing import TYPE_CHECKING, Any, NoReturn, TypeVar 

12 

13from ramses_rf.const import ( 

14 SYS_MODE_MAP, 

15 SZ_ACTUATORS, 

16 SZ_CHANGE_COUNTER, 

17 SZ_DATETIME, 

18 SZ_DEVICES, 

19 SZ_DHW_IDX, 

20 SZ_DOMAIN_ID, 

21 SZ_HEAT_DEMAND, 

22 SZ_LANGUAGE, 

23 SZ_SENSOR, 

24 SZ_SYSTEM_MODE, 

25 SZ_TEMPERATURE, 

26 SZ_ZONE_IDX, 

27 SZ_ZONE_MASK, 

28 SZ_ZONE_TYPE, 

29 SZ_ZONES, 

30) 

31from ramses_rf.device import ( 

32 BdrSwitch, 

33 Controller, 

34 Device, 

35 OtbGateway, 

36 Temperature, 

37 UfhController, 

38) 

39from ramses_rf.entity_base import Entity, Parent, class_by_attr 

40from ramses_rf.helpers import shrink 

41from ramses_rf.schemas import ( 

42 DEFAULT_MAX_ZONES, 

43 SCH_TCS, 

44 SCH_TCS_DHW, 

45 SCH_TCS_ZONES_ZON, 

46 SZ_APPLIANCE_CONTROL, 

47 SZ_CLASS, 

48 SZ_DHW_SYSTEM, 

49 SZ_MAX_ZONES, 

50 SZ_ORPHANS, 

51 SZ_SYSTEM, 

52 SZ_UFH_SYSTEM, 

53) 

54from ramses_tx import ( 

55 DEV_ROLE_MAP, 

56 DEV_TYPE_MAP, 

57 ZON_ROLE_MAP, 

58 Command, 

59 DeviceIdT, 

60 Message, 

61 Priority, 

62) 

63from ramses_tx.typed_dicts import PayDictT 

64 

65from .faultlog import FaultLog 

66from .zones import zone_factory 

67 

68if TYPE_CHECKING: 

69 from ramses_tx import Address, Packet 

70 

71 from .faultlog import FaultIdxT, FaultLogEntry 

72 from .zones import DhwZone, Zone 

73 

74 

75# TODO: refactor packet routing (filter *before* routing) 

76 

77 

78from ramses_rf.const import ( # noqa: F401, isort: skip, pylint: disable=unused-import 

79 F9, 

80 FA, 

81 FC, 

82 FF, 

83) 

84 

85from ramses_rf.const import ( # noqa: F401, isort: skip, pylint: disable=unused-import 

86 I_, 

87 RP, 

88 RQ, 

89 W_, 

90 Code, 

91) 

92 

93 

94_LOGGER = logging.getLogger(__name__) 

95 

96 

97_SystemT = TypeVar("_SystemT", bound="Evohome") 

98 

99_StoredHwT = TypeVar("_StoredHwT", bound="StoredHw") 

100_LogbookT = TypeVar("_LogbookT", bound="Logbook") 

101_MultiZoneT = TypeVar("_MultiZoneT", bound="MultiZone") 

102 

103 

104SYS_KLASS = SimpleNamespace( 

105 SYS="system", # Generic (promotable?) system 

106 TCS="evohome", 

107 PRG="programmer", 

108) 

109 

110 

111class SystemBase(Parent, Entity): # 3B00 (multi-relay) 

112 """The TCS base class.""" 

113 

114 _SLUG: str = None # type: ignore[assignment] 

115 

116 # TODO: check (code so complex, not sure if this is true) 

117 childs: list[Device] # type: ignore[assignment] 

118 

119 def __init__(self, ctl: Controller) -> None: 

120 _LOGGER.debug("Creating a TCS for CTL: %s (%s)", ctl.id, self.__class__) 

121 

122 if ctl.id in ctl._gwy.system_by_id: 

123 raise LookupError(f"Duplicate TCS for CTL: {ctl.id}") 

124 if not isinstance(ctl, Controller): # TODO 

125 raise ValueError(f"Invalid CTL: {ctl} (is not a controller)") 

126 

127 super().__init__(ctl._gwy) 

128 

129 # FIXME: ZZZ entities must know their parent device ID and their own idx 

130 self._z_id = ctl.id # the responsible device is the controller 

131 self._z_idx = None # ? True (sentinel value to pick up arrays?) 

132 

133 self.id: DeviceIdT = ctl.id 

134 

135 self.ctl: Controller = ctl 

136 self.tcs: Evohome = self # type: ignore[assignment] 

137 self._child_id = FF # NOTE: domain_id 

138 

139 self._app_cntrl: BdrSwitch | OtbGateway | None = None 

140 self._heat_demand = None 

141 

142 def __repr__(self) -> str: 

143 return f"{self.ctl.id} ({self._SLUG})" 

144 

145 def _setup_discovery_cmds(self) -> None: 

146 # super()._setup_discovery_cmds() 

147 

148 for payload in ( 

149 f"00{DEV_ROLE_MAP.APP}", # appliance_control 

150 f"00{DEV_ROLE_MAP.HTG}", # hotwater_valve 

151 f"01{DEV_ROLE_MAP.HTG}", # heating_valve 

152 ): 

153 cmd = Command.from_attrs(RQ, self.ctl.id, Code._000C, payload) 

154 self._add_discovery_cmd(cmd, 60 * 60 * 24, delay=0) 

155 

156 cmd = Command.get_tpi_params(self.id) 

157 self._add_discovery_cmd(cmd, 60 * 60 * 6, delay=5) 

158 

159 def _handle_msg(self, msg: Message) -> None: 

160 def eavesdrop_appliance_control( 

161 this: Message, *, prev: Message | None = None 

162 ) -> None: 

163 """Discover the heat relay (10: or 13:) for this system. 

164 

165 There's' 3 ways to find a controller's heat relay (in order of reliability): 

166 1. The 3220 RQ/RP *to/from a 10:* (1x/5min) 

167 2a. The 3EF0 RQ/RP *to/from a 10:* (1x/1min) 

168 2b. The 3EF0 RQ (no RP) *to a 13:* (3x/60min) 

169 3. The 3B00 I/I exchange between a CTL & a 13: (TPI cycle rate, usu. 6x/hr) 

170 

171 Data from the CTL is considered 'authoritative'. The 1FC9 RQ/RP exchange 

172 to/from a CTL is too rare to be useful. 

173 """ 

174 

175 # 18:14:14.025 066 RQ --- 01:078710 10:067219 --:------ 3220 005 0000050000 

176 # 18:14:14.446 065 RP --- 10:067219 01:078710 --:------ 3220 005 00C00500FF 

177 # 14:41:46.599 064 RQ --- 01:078710 10:067219 --:------ 3EF0 001 00 

178 # 14:41:46.631 063 RP --- 10:067219 01:078710 --:------ 3EF0 006 0000100000FF 

179 

180 # 06:49:03.465 045 RQ --- 01:145038 13:237335 --:------ 3EF0 001 00 

181 # 06:49:05.467 045 RQ --- 01:145038 13:237335 --:------ 3EF0 001 00 

182 # 06:49:07.468 045 RQ --- 01:145038 13:237335 --:------ 3EF0 001 00 

183 # 09:03:59.693 051 I --- 13:237335 --:------ 13:237335 3B00 002 00C8 

184 # 09:04:02.667 045 I --- 01:145038 --:------ 01:145038 3B00 002 FCC8 

185 

186 if this.code not in (Code._22D9, Code._3220, Code._3B00, Code._3EF0): 

187 return 

188 

189 # note the order: most to least reliable 

190 app_cntrl = None 

191 

192 if ( 

193 this.code in (Code._22D9, Code._3220) and this.verb == RQ 

194 ): # TODO: RPs too? 

195 # dst could be an Address... 

196 if this.src == self.ctl and isinstance(this.dst, OtbGateway): # type: ignore[unreachable] 

197 app_cntrl = this.dst # type: ignore[unreachable] 

198 

199 elif this.code == Code._3EF0 and this.verb == RQ: 

200 # dst could be an Address... 

201 if this.src == self.ctl and isinstance( 

202 this.dst, # type: ignore[unreachable] 

203 BdrSwitch | OtbGateway, 

204 ): 

205 app_cntrl = this.dst # type: ignore[unreachable] 

206 

207 elif this.code == Code._3B00 and this.verb == I_ and prev is not None: 

208 if this.src == self.ctl and isinstance(prev.src, BdrSwitch): # type: ignore[unreachable] 

209 if prev.code == this.code and prev.verb == this.verb: # type: ignore[unreachable] 

210 app_cntrl = prev.src 

211 

212 if app_cntrl is not None: 

213 app_cntrl.set_parent(self, child_id=FC) # type: ignore[unreachable] 

214 

215 # # assert msg.src is self.ctl, f"msg inappropriately routed to {self}" 

216 

217 super()._handle_msg(msg) 

218 

219 if msg.code == Code._000C: 

220 if msg.payload[SZ_ZONE_TYPE] == DEV_ROLE_MAP.APP and msg.payload.get( 

221 SZ_DEVICES 

222 ): 

223 self._gwy.get_device( 

224 msg.payload[SZ_DEVICES][0], parent=self, child_id=FC 

225 ) # sets self._app_cntrl 

226 return 

227 

228 if msg.code == Code._3150: 

229 if msg.payload.get(SZ_DOMAIN_ID) == FC and msg.verb in (I_, RP): 

230 self._heat_demand = msg.payload 

231 

232 if self._gwy.config.enable_eavesdrop and not self.appliance_control: 

233 eavesdrop_appliance_control(msg) 

234 

235 @property 

236 def appliance_control(self) -> BdrSwitch | OtbGateway | None: 

237 """The TCS relay, aka 'appliance control' (BDR or OTB).""" 

238 if self._app_cntrl: 

239 return self._app_cntrl 

240 app_cntrl = [d for d in self.childs if d._child_id == FC] 

241 return app_cntrl[0] if len(app_cntrl) == 1 else None # type: ignore[return-value] 

242 

243 @property 

244 def tpi_params(self) -> PayDictT._1100 | None: # 1100 

245 return self._msg_value(Code._1100) # type: ignore[return-value] 

246 

247 @property 

248 def heat_demand(self) -> float | None: # 3150/FC 

249 return self._msg_value(Code._3150, domain_id=FC, key=SZ_HEAT_DEMAND) # type: ignore[return-value] 

250 

251 @property 

252 def is_calling_for_heat(self) -> NoReturn: 

253 raise NotImplementedError( 

254 f"{self}: is_calling_for_heat attr is deprecated, use bool(heat_demand)" 

255 ) 

256 

257 @property 

258 def schema(self) -> dict[str, Any]: 

259 """Return the system's schema.""" 

260 

261 schema: dict[str, Any] = {SZ_SYSTEM: {}} 

262 

263 schema[SZ_SYSTEM][SZ_APPLIANCE_CONTROL] = ( 

264 self.appliance_control.id if self.appliance_control else None 

265 ) 

266 

267 schema[SZ_ORPHANS] = sorted( 

268 [ 

269 d.id 

270 for d in self.childs # HACK: UFC 

271 if not d._child_id and d._is_present # TODO: and d is not self.ctl 

272 ] # and not isinstance(d, UfhController) 

273 ) # devices without a parent zone, NB: CTL can be a sensor for a zone 

274 

275 return schema 

276 

277 @property 

278 def _schema_min(self) -> dict[str, Any]: 

279 """Return the system's minimal-alised schema.""" 

280 

281 schema: dict[str, Any] = self.schema 

282 result: dict[str, Any] = {} 

283 

284 try: 

285 if schema[SZ_SYSTEM][SZ_APPLIANCE_CONTROL][:2] == DEV_TYPE_MAP.OTB: # DEX 

286 result[SZ_SYSTEM] = { 

287 SZ_APPLIANCE_CONTROL: schema[SZ_SYSTEM][SZ_APPLIANCE_CONTROL] 

288 } 

289 except (IndexError, TypeError): 

290 result[SZ_SYSTEM] = {SZ_APPLIANCE_CONTROL: None} 

291 

292 zones = {} 

293 for idx, zone in schema[SZ_ZONES].items(): 

294 _zone = {} 

295 if zone[SZ_SENSOR] and zone[SZ_SENSOR][:2] == DEV_TYPE_MAP.CTL: # DEX 

296 _zone = {SZ_SENSOR: zone[SZ_SENSOR]} 

297 if devices := [ 

298 d for d in zone[SZ_ACTUATORS] if d[:2] == DEV_TYPE_MAP.TR0 

299 ]: # DEX 

300 _zone.update({SZ_ACTUATORS: devices}) 

301 if _zone: 

302 zones[idx] = _zone 

303 if zones: 

304 result[SZ_ZONES] = zones 

305 

306 result |= { 

307 k: v 

308 for k, v in schema.items() 

309 if k in ("orphans",) and v # add UFH? 

310 } 

311 

312 return result # TODO: check against vol schema 

313 

314 @property 

315 def params(self) -> dict[str, Any]: 

316 """Return the system's configuration.""" 

317 

318 params: dict[str, Any] = {SZ_SYSTEM: {}} 

319 params[SZ_SYSTEM]["tpi_params"] = self._msg_value(Code._1100) 

320 return params 

321 

322 @property 

323 def status(self) -> dict[str, Any]: 

324 """Return the system's current state.""" 

325 

326 status: dict[str, Any] = {SZ_SYSTEM: {}} 

327 status[SZ_SYSTEM]["heat_demand"] = self.heat_demand 

328 

329 status[SZ_DEVICES] = { 

330 d.id: d.status for d in sorted(self.childs, key=lambda x: x.id) 

331 } 

332 

333 return status 

334 

335 

336class MultiZone(SystemBase): # 0005 (+/- 000C?) 

337 def __init__(self, *args: Any, **kwargs: Any) -> None: 

338 super().__init__(*args, **kwargs) 

339 

340 self.zones: list[Zone] = [] 

341 self.zone_by_idx: dict[str, Zone] = {} # should not include HW 

342 self._max_zones: int = getattr( 

343 self._gwy.config, SZ_MAX_ZONES, DEFAULT_MAX_ZONES 

344 ) 

345 

346 self._prev_30c9: Message | None = None # used to eavesdrop zone sensors 

347 

348 def _setup_discovery_cmds(self) -> None: 

349 super()._setup_discovery_cmds() 

350 

351 for zone_type in list(ZON_ROLE_MAP.HEAT_ZONES) + [ZON_ROLE_MAP.SEN]: 

352 cmd = Command.from_attrs(RQ, self.id, Code._0005, f"00{zone_type}") 

353 self._add_discovery_cmd(cmd, 60 * 60 * 24, delay=0) 

354 

355 def _handle_msg(self, msg: Message) -> None: 

356 """Process any relevant message. 

357 

358 If `zone_idx` in payload, route any messages to the corresponding zone. 

359 """ 

360 

361 def eavesdrop_zones(this: Message, *, prev: Message | None = None) -> None: 

362 [ 

363 self.get_htg_zone(v) 

364 for d in msg.payload 

365 for k, v in d.items() 

366 if k == SZ_ZONE_IDX 

367 ] 

368 

369 def eavesdrop_zone_sensors( 

370 this: Message, *, prev: Message | None = None 

371 ) -> None: 

372 """Determine each zone's sensor by matching zone/sensor temperatures.""" 

373 

374 def _testable_zones(changed_zones: dict[str, float]) -> dict[float, str]: 

375 return { 

376 t1: i1 

377 for i1, t1 in changed_zones.items() 

378 if self.zone_by_idx[i1].sensor is None 

379 and t1 not in [t2 for i2, t2 in changed_zones.items() if i2 != i1] 

380 } 

381 

382 self._prev_30c9, prev = this, self._prev_30c9 

383 if prev is None: 

384 return # type: ignore[unreachable] 

385 

386 # TODO: use msgz/I, not RP 

387 secs: int = self._msg_value(Code._1F09, key="remaining_seconds") # type: ignore[assignment] 

388 if secs is None or this.dtm > prev.dtm + td(seconds=secs + 5): 

389 return # can only compare against 30C9 pkt from the last cycle 

390 

391 # _LOGGER.warning("System state (before): %s", self.schema) 

392 

393 changed_zones: dict[str, float] = { 

394 z[SZ_ZONE_IDX]: z[SZ_TEMPERATURE] 

395 for z in this.payload 

396 if z not in prev.payload and z[SZ_TEMPERATURE] is not None 

397 } # zones with changed temps 

398 if not changed_zones: 

399 return # ctl's 30C9 says no zones have changed temps during this cycle 

400 

401 testable_zones = _testable_zones(changed_zones) 

402 if not testable_zones: 

403 return # no testable zones 

404 

405 testable_sensors = { 

406 d.temperature: d 

407 for d in self._gwy.devices # NOTE: *not* self.childs 

408 if isinstance(d, Temperature) # d.addr.type in DEVICE_HAS_ZONE_SENSOR 

409 and d.ctl in (self.ctl, None) 

410 and d.temperature is not None 

411 and d._msgs[Code._30C9].dtm > prev.dtm # changed during last cycle 

412 } 

413 if not testable_sensors: 

414 return # no testable sensors 

415 

416 matched_pairs = { 

417 sensor: zone_idx 

418 for temp_z, zone_idx in testable_zones.items() 

419 for temp_s, sensor in testable_sensors.items() 

420 if temp_z == temp_s 

421 } 

422 

423 for sensor, zone_idx in matched_pairs.items(): 

424 zone = self.zone_by_idx[zone_idx] 

425 self._gwy.get_device(sensor.id, parent=zone, is_sensor=True) 

426 

427 # _LOGGER.warning("System state (after): %s", self.schema) 

428 

429 # now see if we can allocate the controller as a sensor... 

430 if any(z for z in self.zones if z.sensor is self.ctl): 

431 return # the controller is already a sensor 

432 

433 remaining_zones = _testable_zones(changed_zones) 

434 if len(remaining_zones) != 1: 

435 return # no testable zones 

436 

437 temp, zone_idx = tuple(remaining_zones.items())[0] 

438 

439 # can safely(?) assume this zone is using the CTL as a sensor... 

440 if not [s for s in testable_sensors if s == temp]: 

441 zone = self.zone_by_idx[zone_idx] 

442 self._gwy.get_device(self.ctl.id, parent=zone, is_sensor=True) 

443 

444 # _LOGGER.warning("System state (finally): %s", self.schema) 

445 

446 def handle_msg_by_zone_idx(zone_idx: str, msg: Message) -> None: 

447 if zone := self.zone_by_idx.get(zone_idx): 

448 zone._handle_msg(msg) 

449 # elif self._gwy.config.enable_eavesdrop: 

450 # self.get_htg_zone(zone_idx)._handle_msg(msg) 

451 

452 super()._handle_msg(msg) 

453 

454 if msg.code not in (Code._0005, Code._000A, Code._2309, Code._30C9) and ( 

455 SZ_ZONE_IDX not in msg.payload # 0004,0008,0009,000C,0404,12B0,2349,3150 

456 ): 

457 return 

458 

459 # TODO: a I/0005 may have changed: del or add zones 

460 if msg.code == Code._0005: 

461 if (zone_type := msg.payload[SZ_ZONE_TYPE]) in ZON_ROLE_MAP.HEAT_ZONES: 

462 [ 

463 self.get_htg_zone( 

464 f"{idx:02X}", **{SZ_CLASS: ZON_ROLE_MAP[zone_type]} 

465 ) 

466 for idx, flag in enumerate(msg.payload[SZ_ZONE_MASK]) 

467 if flag == 1 

468 ] 

469 elif zone_type in DEV_ROLE_MAP.HEAT_DEVICES: 

470 [ 

471 self.get_htg_zone(f"{idx:02X}", msg=msg) 

472 for idx, flag in enumerate(msg.payload[SZ_ZONE_MASK]) 

473 if flag == 1 

474 ] 

475 return 

476 

477 # TODO: a I/000C may have changed: del or add devices 

478 if msg.code == Code._000C: 

479 if msg.payload[SZ_ZONE_TYPE] not in DEV_ROLE_MAP.HEAT_DEVICES: 

480 return 

481 if msg.payload[SZ_DEVICES]: 

482 self.get_htg_zone(msg.payload[SZ_ZONE_IDX], msg=msg) 

483 elif zon := self.zone_by_idx.get(msg.payload[SZ_ZONE_IDX]): 

484 zon._handle_msg(msg) # tell existing zone: no device 

485 return 

486 

487 # the CTL knows, but does not announce temps for multiroom_mode zones 

488 if msg.code == Code._30C9 and msg._has_array: 

489 for z in self.zones: 

490 if z.idx not in (x[SZ_ZONE_IDX] for x in msg.payload): 

491 z._get_temp() 

492 

493 # If some zones still don't have a sensor, maybe eavesdrop? 

494 if self._gwy.config.enable_eavesdrop and ( 

495 msg.code in (Code._000A, Code._2309, Code._30C9) and msg._has_array 

496 ): # could do Code._000A, but only 1/hr 

497 eavesdrop_zones(msg) 

498 

499 # Route all messages to their zones, incl. 000C, 0404, others 

500 if isinstance(msg.payload, dict): 

501 if zone_idx := msg.payload.get(SZ_ZONE_IDX): 

502 handle_msg_by_zone_idx(zone_idx, msg) 

503 # TODO: elif msg.payload.get(SZ_DOMAIN_ID) == FA: # DHW 

504 

505 elif isinstance(msg.payload, list) and len(msg.payload): 

506 # TODO: elif msg.payload.get(SZ_DOMAIN_ID) == FA: # DHW 

507 if isinstance(msg.payload[0], dict): # e.g. 1FC9 is a list of lists: 

508 for z_dict in msg.payload: 

509 handle_msg_by_zone_idx(z_dict.get(SZ_ZONE_IDX), msg) 

510 

511 # If some zones still don't have a sensor, maybe eavesdrop? 

512 if ( # TODO: edge case: 1 zone with CTL as SEN 

513 self._gwy.config.enable_eavesdrop 

514 and msg.code == Code._30C9 

515 and (msg._has_array or len(self.zones) == 1) 

516 and any(z for z in self.zones if not z.sensor) 

517 ): 

518 eavesdrop_zone_sensors(msg) 

519 

520 # TODO: should be a private method 

521 def get_htg_zone( 

522 self, zone_idx: str, *, msg: Message | None = None, **schema: Any 

523 ) -> Zone: 

524 """Return a heating zone, create it if required. 

525 

526 First, use the schema to create/update it, then pass it any msg to handle. 

527 

528 Heating zones are uniquely identified by a tcs_id|zone_idx pair. 

529 If a zone is created, attach it to this TCS. 

530 """ 

531 

532 schema = shrink(SCH_TCS_ZONES_ZON(schema)) 

533 

534 zon: Zone = self.zone_by_idx.get(zone_idx) # type: ignore[assignment] 

535 if zon is None: # not found in tcs, create it 

536 zon = zone_factory(self, zone_idx, msg=msg, **schema) # type: ignore[unreachable] 

537 self.zone_by_idx[zon.idx] = zon 

538 self.zones.append(zon) 

539 

540 elif schema: 

541 zon._update_schema(**schema) 

542 

543 if msg: 

544 zon._handle_msg(msg) 

545 return zon 

546 

547 @property 

548 def schema(self) -> dict[str, Any]: 

549 return { 

550 **super().schema, 

551 SZ_ZONES: {z.idx: z.schema for z in sorted(self.zones)}, 

552 } 

553 

554 @property 

555 def params(self) -> dict[str, Any]: 

556 return { 

557 **super().params, 

558 SZ_ZONES: {z.idx: z.params for z in sorted(self.zones)}, 

559 } 

560 

561 @property 

562 def status(self) -> dict[str, Any]: 

563 return { 

564 **super().status, 

565 SZ_ZONES: {z.idx: z.status for z in sorted(self.zones)}, 

566 } 

567 

568 

569class ScheduleSync(SystemBase): # 0006 (+/- 0404?) 

570 def __init__(self, *args: Any, **kwargs: Any) -> None: 

571 super().__init__(*args, **kwargs) 

572 

573 self._msg_0006: Message = None # type: ignore[assignment] 

574 

575 # used to stop concurrent get_schedules 

576 self.zone_lock = Lock() # FIXME: threading lock, or asyncio lock? 

577 self.zone_lock_idx: str | None = None 

578 

579 def _setup_discovery_cmds(self) -> None: 

580 super()._setup_discovery_cmds() 

581 

582 cmd = Command.get_schedule_version(self.id) 

583 self._add_discovery_cmd(cmd, 60 * 5, delay=5) 

584 

585 def _handle_msg(self, msg: Message) -> None: # NOTE: active 

586 """Periodically retrieve the latest global change counter.""" 

587 

588 super()._handle_msg(msg) 

589 

590 if msg.code == Code._0006: 

591 self._msg_0006 = msg 

592 

593 async def _schedule_version(self, *, force_io: bool = False) -> tuple[int, bool]: 

594 """Return the global schedule version number, and an indication if I/O was done. 

595 

596 If `force_io`, then RQ the latest change counter from the TCS rather than 

597 rely upon a recent (cached) value. 

598 

599 Cached values are only used if less than 3 minutes old. 

600 """ 

601 

602 # RQ --- 30:185469 01:037519 --:------ 0006 001 00 

603 # RP --- 01:037519 30:185469 --:------ 0006 004 000500E6 

604 

605 if ( 

606 not force_io 

607 and self._msg_0006 

608 and self._msg_0006.dtm > dt.now() - td(minutes=3) 

609 ): 

610 return ( 

611 self._msg_0006.payload[SZ_CHANGE_COUNTER], 

612 False, 

613 ) # global_ver, did_io 

614 

615 cmd = Command.get_schedule_version(self.ctl.id) 

616 pkt = await self._gwy.async_send_cmd( 

617 cmd, wait_for_reply=True, priority=Priority.HIGH 

618 ) 

619 if pkt: 

620 self._msg_0006 = Message(pkt) 

621 

622 return self._msg_0006.payload[SZ_CHANGE_COUNTER], True # global_ver, did_io 

623 

624 def _refresh_schedules(self) -> None: 

625 zone: Zone 

626 

627 for zone in getattr(self, SZ_ZONES, []): 

628 self._gwy._loop.create_task(zone.get_schedule(force_io=True)) 

629 if isinstance(self, StoredHw) and self.dhw: 

630 self._gwy._loop.create_task(self.dhw.get_schedule(force_io=True)) 

631 

632 async def _obtain_lock(self, zone_idx: str) -> None: 

633 timeout_dtm = dt.now() + td(minutes=3) 

634 while dt.now() < timeout_dtm: 

635 self.zone_lock.acquire() 

636 if self.zone_lock_idx is None: 

637 self.zone_lock_idx = zone_idx 

638 self.zone_lock.release() 

639 

640 if self.zone_lock_idx == zone_idx: 

641 break 

642 await asyncio.sleep(0.005) # gives the other zone enough time 

643 

644 else: 

645 raise TimeoutError( 

646 f"Unable to obtain lock for {zone_idx} (used by {self.zone_lock_idx})" 

647 ) 

648 

649 def _release_lock(self) -> None: 

650 self.zone_lock.acquire() 

651 self.zone_lock_idx = None 

652 self.zone_lock.release() 

653 

654 @property 

655 def schedule_version(self) -> int | None: 

656 return self._msg_value(Code._0006, key=SZ_CHANGE_COUNTER) # type: ignore[return-value] 

657 

658 @property 

659 def status(self) -> dict[str, Any]: 

660 return { 

661 **super().status, 

662 "schedule_version": self.schedule_version, 

663 } 

664 

665 

666class Language(SystemBase): # 0100 

667 def _setup_discovery_cmds(self) -> None: 

668 super()._setup_discovery_cmds() 

669 

670 cmd = Command.get_system_language(self.id) 

671 self._add_discovery_cmd(cmd, 60 * 60 * 24, delay=60 * 15) 

672 

673 @property 

674 def language(self) -> str | None: 

675 return self._msg_value(Code._0100, key=SZ_LANGUAGE) # type: ignore[return-value] 

676 

677 @property 

678 def params(self) -> dict[str, Any]: 

679 params = super().params 

680 params[SZ_SYSTEM][SZ_LANGUAGE] = self.language 

681 return params 

682 

683 

684class Logbook(SystemBase): # 0418 

685 def __init__(self, *args: Any, **kwargs: Any) -> None: 

686 super().__init__(*args, **kwargs) 

687 

688 self._prev_event: Message = None # type: ignore[assignment] 

689 self._this_event: Message = None # type: ignore[assignment] 

690 

691 self._prev_fault: Message = None # type: ignore[assignment] 

692 self._this_fault: Message = None # type: ignore[assignment] 

693 

694 self._faultlog: FaultLog = FaultLog(self) 

695 

696 def _setup_discovery_cmds(self) -> None: 

697 super()._setup_discovery_cmds() 

698 

699 cmd = Command.get_system_log_entry(self.id, 0) 

700 self._add_discovery_cmd(cmd, 60 * 5, delay=5) 

701 # self._gwy.add_task( 

702 # self._gwy._loop.create_task(self.get_faultlog()) 

703 # ) 

704 

705 def _handle_msg(self, msg: Message) -> None: # NOTE: active 

706 super()._handle_msg(msg) 

707 

708 if msg.code == Code._0418: # and msg.verb in (I_, RP): 

709 self._faultlog.handle_msg(msg) 

710 

711 async def get_faultlog( 

712 self, 

713 /, 

714 *, 

715 start: int = 0, 

716 limit: int | None = None, 

717 force_refresh: bool = False, 

718 ) -> dict[FaultIdxT, FaultLogEntry] | None: 

719 return await self._faultlog.get_faultlog( 

720 start=start, limit=limit, force_refresh=force_refresh 

721 ) 

722 

723 @property 

724 def active_faults(self) -> tuple[str, ...] | None: 

725 """Return the most recently logged faults that are not restored.""" 

726 if self._faultlog.active_faults is None: 

727 return None 

728 return tuple(str(f) for f in self._faultlog.active_faults) 

729 

730 @property 

731 def latest_event(self) -> str | None: 

732 """Return the most recently logged event (fault or restore), if any.""" 

733 if not self._faultlog.latest_event: 

734 return None 

735 return str(self._faultlog.latest_event) 

736 

737 @property 

738 def latest_fault(self) -> str | None: 

739 """Return the most recently logged fault, if any.""" 

740 if not self._faultlog.latest_fault: 

741 return None 

742 return str(self._faultlog.latest_fault) 

743 

744 @property 

745 def status(self) -> dict[str, Any]: 

746 return { 

747 **super().status, 

748 "active_faults": self.active_faults, 

749 "latest_event": self.latest_event, 

750 "latest_fault": self.latest_fault, 

751 } 

752 

753 

754class StoredHw(SystemBase): # 10A0, 1260, 1F41 

755 MIN_SETPOINT = 30.0 # NOTE: these may be removed 

756 MAX_SETPOINT = 85.0 

757 DEFAULT_SETPOINT = 50.0 

758 

759 def __init__(self, *args: Any, **kwargs: Any) -> None: 

760 super().__init__(*args, **kwargs) 

761 self._dhw: DhwZone = None # type: ignore[assignment] 

762 

763 def _setup_discovery_cmds(self) -> None: 

764 super()._setup_discovery_cmds() 

765 

766 for payload in ( 

767 f"00{DEV_ROLE_MAP.DHW}", # dhw_sensor 

768 # f"00{DEV_ROLE_MAP.HTG}", # hotwater_valve 

769 # f"01{DEV_ROLE_MAP.HTG}", # heating_valve 

770 ): 

771 cmd = Command.from_attrs(RQ, self.id, Code._000C, payload) 

772 self._add_discovery_cmd(cmd, 60 * 60 * 24, delay=0) 

773 

774 def _handle_msg(self, msg: Message) -> None: 

775 super()._handle_msg(msg) 

776 

777 if ( 

778 not isinstance(msg.payload, dict) 

779 or msg.payload.get(SZ_DHW_IDX) is None 

780 and msg.payload.get(SZ_DOMAIN_ID) not in (F9, FA) 

781 and msg.payload.get(SZ_ZONE_IDX) != "HW" 

782 ): # Code._0008, Code._000C, Code._0404, Code._10A0, Code._1260, Code._1F41 

783 return 

784 

785 # TODO: a I/0005 may have changed zones & may need a restart (del) or not (add) 

786 if ( 

787 msg.code == Code._000C 

788 and msg.payload[SZ_ZONE_TYPE] in DEV_ROLE_MAP.DHW_DEVICES 

789 ): 

790 if msg.payload[SZ_DEVICES]: 

791 self.get_dhw_zone(msg=msg) # create DHW zone if required 

792 elif self._dhw: 

793 self._dhw._handle_msg(msg) # tell existing DHW zone: no device 

794 return 

795 

796 # RQ --- 18:002563 01:078710 --:------ 10A0 001 00 # every 4h 

797 # RP --- 01:078710 18:002563 --:------ 10A0 006 00157C0003E8 

798 

799 # Route all messages to their zones, incl. 000C, 0404, others 

800 self.get_dhw_zone(msg=msg) 

801 

802 # TODO: should be a private method 

803 def get_dhw_zone(self, *, msg: Message | None = None, **schema: Any) -> DhwZone: 

804 """Return a DHW zone, create it if required. 

805 

806 First, use the schema to create/update it, then pass it any msg to handle. 

807 

808 DHW zones are uniquely identified by a controller ID. 

809 If a DHW zone is created, attach it to this TCS. 

810 """ 

811 

812 schema = shrink(SCH_TCS_DHW(schema)) 

813 

814 if not self._dhw: 

815 self._dhw = zone_factory(self, "HW", msg=msg, **schema) # type: ignore[assignment] 

816 

817 elif schema: 

818 self._dhw._update_schema(**schema) 

819 

820 if msg: 

821 self._dhw._handle_msg(msg) 

822 return self._dhw 

823 

824 @property 

825 def dhw(self) -> DhwZone | None: 

826 return self._dhw 

827 

828 @property 

829 def dhw_sensor(self) -> Device | None: 

830 return self._dhw.sensor if self._dhw else None 

831 

832 @property 

833 def hotwater_valve(self) -> Device | None: 

834 return self._dhw.hotwater_valve if self._dhw else None 

835 

836 @property 

837 def heating_valve(self) -> Device | None: 

838 return self._dhw.heating_valve if self._dhw else None 

839 

840 @property 

841 def schema(self) -> dict[str, Any]: 

842 return { 

843 **super().schema, 

844 SZ_DHW_SYSTEM: self._dhw.schema if self._dhw else {}, 

845 } 

846 

847 @property 

848 def params(self) -> dict[str, Any]: 

849 return { 

850 **super().params, 

851 SZ_DHW_SYSTEM: self._dhw.params if self._dhw else {}, 

852 } 

853 

854 @property 

855 def status(self) -> dict[str, Any]: 

856 return { 

857 **super().status, 

858 SZ_DHW_SYSTEM: self._dhw.status if self._dhw else {}, 

859 } 

860 

861 

862class SysMode(SystemBase): # 2E04 

863 def _setup_discovery_cmds(self) -> None: 

864 super()._setup_discovery_cmds() 

865 

866 cmd = Command.get_system_mode(self.id) 

867 self._add_discovery_cmd(cmd, 60 * 5, delay=5) 

868 

869 @property 

870 def system_mode(self) -> dict[str, Any] | None: # 2E04 

871 return self._msg_value(Code._2E04) # type: ignore[return-value] 

872 

873 def set_mode( 

874 self, system_mode: int | str | None, *, until: dt | str | None = None 

875 ) -> asyncio.Task[Packet]: 

876 """ 

877 Set a system mode for a specified duration, or indefinitely. 

878 

879 :param system_mode: 2-digit item from SYS_MODE_MAP, positional 

880 :param until: optional: end of set period 

881 :return: 

882 """ 

883 cmd = Command.set_system_mode(self.id, system_mode, until=until) 

884 return self._gwy.send_cmd(cmd, priority=Priority.HIGH, wait_for_reply=True) 

885 

886 def set_auto(self) -> asyncio.Task[Packet]: 

887 """Revert system to Auto, set non-PermanentOverride zones to FollowSchedule.""" 

888 return self.set_mode(SYS_MODE_MAP.AUTO) 

889 

890 def reset_mode(self) -> asyncio.Task[Packet]: 

891 """Revert system to Auto, force *all* zones to FollowSchedule.""" 

892 return self.set_mode(SYS_MODE_MAP.AUTO_WITH_RESET) 

893 

894 @property 

895 def params(self) -> dict[str, Any]: 

896 params = super().params 

897 params[SZ_SYSTEM][SZ_SYSTEM_MODE] = self.system_mode 

898 return params 

899 

900 

901class Datetime(SystemBase): # 313F 

902 def _setup_discovery_cmds(self) -> None: 

903 super()._setup_discovery_cmds() 

904 

905 cmd = Command.get_system_time(self.id) 

906 self._add_discovery_cmd(cmd, 60 * 60, delay=0) 

907 

908 def _handle_msg(self, msg: Message) -> None: 

909 super()._handle_msg(msg) 

910 

911 # FIXME: refactoring protocol stack 

912 if msg.code == Code._313F and msg.verb in (I_, RP) and self._gwy._transport: 

913 diff = abs(dt.fromisoformat(msg.payload[SZ_DATETIME]) - self._gwy._dt_now()) 

914 if diff > td(minutes=5): 

915 _LOGGER.warning(f"{msg!r} < excessive datetime difference: {diff}") 

916 

917 async def get_datetime(self) -> dt | None: 

918 cmd = Command.get_system_time(self.id) 

919 pkt = await self._gwy.async_send_cmd(cmd, wait_for_reply=True) 

920 msg = Message._from_pkt(pkt) 

921 return dt.fromisoformat(msg.payload[SZ_DATETIME]) 

922 

923 async def set_datetime(self, dtm: dt) -> Packet: 

924 """Set the date and time of the system.""" 

925 

926 cmd = Command.set_system_time(self.id, dtm) 

927 return await self._gwy.async_send_cmd(cmd, priority=Priority.HIGH) 

928 

929 

930class UfHeating(SystemBase): 

931 def _ufh_ctls(self) -> list[UfhController]: 

932 return sorted([d for d in self.childs if isinstance(d, UfhController)]) 

933 

934 @property 

935 def schema(self) -> dict[str, Any]: 

936 return { 

937 **super().schema, 

938 SZ_UFH_SYSTEM: {d.id: d.schema for d in self._ufh_ctls()}, 

939 } 

940 

941 @property 

942 def params(self) -> dict[str, Any]: 

943 return { 

944 **super().params, 

945 SZ_UFH_SYSTEM: {d.id: d.params for d in self._ufh_ctls()}, 

946 } 

947 

948 @property 

949 def status(self) -> dict[str, Any]: 

950 return { 

951 **super().status, 

952 SZ_UFH_SYSTEM: {d.id: d.status for d in self._ufh_ctls()}, 

953 } 

954 

955 

956class System(StoredHw, Datetime, Logbook, SystemBase): 

957 """The Temperature Control System class.""" 

958 

959 _SLUG: str = SYS_KLASS.SYS 

960 

961 def __init__(self, ctl: Controller, **kwargs: Any) -> None: 

962 super().__init__(ctl, **kwargs) 

963 

964 self._heat_demands: dict[str, Any] = {} 

965 self._relay_demands: dict[str, Any] = {} 

966 self._relay_failsafes: dict[str, Any] = {} 

967 

968 def _update_schema(self, **schema: Any) -> None: 

969 """Update a CH/DHW system with new schema attrs. 

970 

971 Raise an exception if the new schema is not a superset of the existing schema. 

972 """ 

973 

974 _schema: dict[str, Any] 

975 schema = shrink(SCH_TCS(schema)) 

976 

977 if schema.get(SZ_SYSTEM) and ( 

978 dev_id := schema[SZ_SYSTEM].get(SZ_APPLIANCE_CONTROL) 

979 ): 

980 self._app_cntrl = self._gwy.get_device(dev_id, parent=self, child_id=FC) # type: ignore[assignment] 

981 

982 if _schema := (schema.get(SZ_DHW_SYSTEM)): # type: ignore[assignment] 

983 self.get_dhw_zone(**_schema) # self._dhw = ... 

984 

985 if not isinstance(self, MultiZone): 

986 return 

987 

988 if _schema := (schema.get(SZ_ZONES)): # type: ignore[assignment] 

989 [self.get_htg_zone(idx, **s) for idx, s in _schema.items()] 

990 

991 @classmethod 

992 def create_from_schema(cls, ctl: Controller, **schema: Any) -> System: 

993 """Create a CH/DHW system for a CTL and set its schema attrs. 

994 

995 The appropriate System class should have been determined by a factory. 

996 Schema attrs include: class (klass) & others. 

997 """ 

998 

999 tcs = cls(ctl) 

1000 tcs._update_schema(**schema) 

1001 return tcs 

1002 

1003 def _handle_msg(self, msg: Message) -> None: 

1004 super()._handle_msg(msg) 

1005 

1006 if not isinstance(msg.payload, dict): 

1007 return 

1008 

1009 if (idx := msg.payload.get(SZ_DOMAIN_ID)) and msg.verb in (I_, RP): 

1010 idx = msg.payload[SZ_DOMAIN_ID] 

1011 if msg.code == Code._0008: 

1012 self._relay_demands[idx] = msg 

1013 elif msg.code == Code._0009: 

1014 self._relay_failsafes[idx] = msg 

1015 elif msg.code == Code._3150: 

1016 self._heat_demands[idx] = msg 

1017 elif msg.code not in ( 

1018 Code._0001, 

1019 Code._000C, 

1020 Code._0404, 

1021 Code._0418, 

1022 Code._1100, 

1023 Code._3B00, 

1024 ): 

1025 assert False, f"Unexpected code with a domain_id: {msg.code}" 

1026 

1027 @property 

1028 def heat_demands(self) -> dict[str, Any] | None: # 3150 

1029 # FC: 00-C8 (no F9, FA), TODO: deprecate as FC only? 

1030 if not self._heat_demands: 

1031 return None 

1032 return {k: v.payload["heat_demand"] for k, v in self._heat_demands.items()} 

1033 

1034 @property 

1035 def relay_demands(self) -> dict[str, Any] | None: # 0008 

1036 # FC: 00-C8, F9: 00-C8, FA: 00 or C8 only (01: all 3, 02: FC/FA only) 

1037 if not self._relay_demands: 

1038 return None 

1039 return {k: v.payload["relay_demand"] for k, v in self._relay_demands.items()} 

1040 

1041 @property 

1042 def relay_failsafes(self) -> dict[str, Any] | None: # 0009 

1043 if not self._relay_failsafes: 

1044 return None 

1045 return {} # FIXME: failsafe_enabled 

1046 

1047 @property 

1048 def status(self) -> dict[str, Any]: 

1049 """Return the system's current state.""" 

1050 

1051 status = super().status 

1052 # assert SZ_SYSTEM in status # TODO: removeme 

1053 

1054 status[SZ_SYSTEM]["heat_demands"] = self.heat_demands 

1055 status[SZ_SYSTEM]["relay_demands"] = self.relay_demands 

1056 status[SZ_SYSTEM]["relay_failsafes"] = self.relay_failsafes 

1057 

1058 return status 

1059 

1060 

1061class Evohome(ScheduleSync, Language, SysMode, MultiZone, UfHeating, System): 

1062 _SLUG: str = SYS_KLASS.TCS # evohome 

1063 

1064 # older evohome don't have zone_type=ELE 

1065 

1066 

1067class Chronotherm(Evohome): 

1068 _SLUG: str = SYS_KLASS.SYS 

1069 

1070 

1071class Hometronics(System): 

1072 _SLUG: str = SYS_KLASS.SYS 

1073 

1074 # These are only ever been seen from a Hometronics controller 

1075 # .I --- 01:023389 --:------ 01:023389 2D49 003 00C800 

1076 # .I --- 01:023389 --:------ 01:023389 2D49 003 01C800 

1077 # .I --- 01:023389 --:------ 01:023389 2D49 003 880000 

1078 # .I --- 01:023389 --:------ 01:023389 2D49 003 FD0000 

1079 

1080 # Hometronic does not react to W/2349 but rather requires W/2309 

1081 

1082 # 

1083 # def _setup_discovery_cmds(self) -> None: 

1084 # # super()._setup_discovery_cmds() 

1085 

1086 # # will RP to: 0005/configured_zones_alt, but not: configured_zones 

1087 # # will RP to: 0004 

1088 

1089 RQ_SUPPORTED = (Code._0004, Code._000C, Code._2E04, Code._313F) # TODO: WIP 

1090 RQ_UNSUPPORTED = ("xxxx",) # 10E0? 

1091 

1092 

1093class Programmer(Evohome): 

1094 _SLUG: str = SYS_KLASS.PRG 

1095 

1096 

1097class Sundial(Evohome): 

1098 _SLUG: str = SYS_KLASS.SYS 

1099 

1100 

1101# e.g. {"evohome": Evohome} 

1102SYS_CLASS_BY_SLUG: dict[str, type[System]] = class_by_attr(__name__, "_SLUG") 

1103 

1104 

1105def system_factory( 

1106 ctl: Controller, *, msg: Message | None = None, **schema: Any 

1107) -> System: 

1108 """Return the system class for a given controller/schema (defaults to evohome).""" 

1109 

1110 def best_tcs_class( 

1111 ctl_addr: Address, 

1112 *, 

1113 msg: Message | None = None, 

1114 eavesdrop: bool = False, 

1115 **schema: Any, 

1116 ) -> type[System]: 

1117 """Return the system class for a given CTL/schema (defaults to evohome).""" 

1118 

1119 klass: str = schema.get(SZ_CLASS) # type: ignore[assignment] 

1120 

1121 # a specified system class always takes precedence (even if it is wrong)... 

1122 if klass and (cls := SYS_CLASS_BY_SLUG.get(klass)): 

1123 _LOGGER.debug( 

1124 f"Using an explicitly-defined system class for: {ctl_addr} ({cls._SLUG})" 

1125 ) 

1126 return cls 

1127 

1128 # otherwise, use the default system class... 

1129 _LOGGER.debug(f"Using a generic system class for: {ctl_addr} ({Device._SLUG})") 

1130 return Evohome 

1131 

1132 return best_tcs_class( 

1133 ctl.addr, 

1134 msg=msg, 

1135 eavesdrop=ctl._gwy.config.enable_eavesdrop, 

1136 **schema, 

1137 ).create_from_schema(ctl, **schema)