Coverage for src/ramses_rf/device/heat.py: 44%

657 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - devices from the CH/DHW (heat) domain.""" 

3 

4from __future__ import annotations 

5 

6import logging 

7from collections.abc import Callable 

8from typing import TYPE_CHECKING, Any, Final 

9 

10from ramses_rf import exceptions as exc 

11from ramses_rf.const import ( 

12 DEV_ROLE_MAP, 

13 DEV_TYPE_MAP, 

14 DOMAIN_TYPE_MAP, 

15 SZ_DEVICES, 

16 SZ_DOMAIN_ID, 

17 SZ_HEAT_DEMAND, 

18 SZ_PRESSURE, 

19 SZ_RELAY_DEMAND, 

20 SZ_SETPOINT, 

21 SZ_TEMPERATURE, 

22 SZ_UFH_IDX, 

23 SZ_WINDOW_OPEN, 

24 SZ_ZONE_IDX, 

25 SZ_ZONE_MASK, 

26 SZ_ZONE_TYPE, 

27 ZON_ROLE_MAP, 

28 DevType, 

29) 

30from ramses_rf.device import Device 

31from ramses_rf.entity_base import Child, Entity, Parent, class_by_attr 

32from ramses_rf.helpers import shrink 

33from ramses_rf.schemas import SCH_TCS, SZ_ACTUATORS, SZ_CIRCUITS 

34from ramses_tx import NON_DEV_ADDR, Command, Priority 

35from ramses_tx.const import SZ_NUM_REPEATS, SZ_PRIORITY, MsgId 

36from ramses_tx.opentherm import ( 

37 PARAMS_DATA_IDS, 

38 SCHEMA_DATA_IDS, 

39 STATUS_DATA_IDS, 

40 SZ_MSG_ID, 

41 SZ_MSG_NAME, 

42 SZ_MSG_TYPE, 

43 SZ_VALUE, 

44 OtMsgType, 

45) 

46from ramses_tx.ramses import CODES_OF_HEAT_DOMAIN_ONLY, CODES_ONLY_FROM_CTL 

47from ramses_tx.typed_dicts import PayDictT 

48 

49from .base import BatteryState, DeviceHeat, Fakeable 

50 

51from ramses_rf.const import ( # noqa: F401, isort: skip, pylint: disable=unused-import 

52 F9, 

53 FA, 

54 FC, 

55 FF, 

56) 

57 

58from ramses_rf.const import ( # noqa: F401, isort: skip, pylint: disable=unused-import 

59 I_, 

60 RP, 

61 RQ, 

62 W_, 

63 Code, 

64) 

65 

66from ramses_tx.const import ( 

67 SZ_BOILER_OUTPUT_TEMP, 

68 SZ_BOILER_RETURN_TEMP, 

69 SZ_BOILER_SETPOINT, 

70 SZ_BURNER_FAILED_STARTS, 

71 SZ_BURNER_HOURS, 

72 SZ_BURNER_STARTS, 

73 SZ_CH_ACTIVE, 

74 SZ_CH_ENABLED, 

75 SZ_CH_MAX_SETPOINT, 

76 SZ_CH_PUMP_HOURS, 

77 SZ_CH_PUMP_STARTS, 

78 SZ_CH_SETPOINT, 

79 SZ_CH_WATER_PRESSURE, 

80 SZ_COOLING_ACTIVE, 

81 SZ_COOLING_ENABLED, 

82 SZ_DHW_ACTIVE, 

83 SZ_DHW_BLOCKING, 

84 SZ_DHW_BURNER_HOURS, 

85 SZ_DHW_BURNER_STARTS, 

86 SZ_DHW_ENABLED, 

87 SZ_DHW_FLOW_RATE, 

88 SZ_DHW_PUMP_HOURS, 

89 SZ_DHW_PUMP_STARTS, 

90 SZ_DHW_SETPOINT, 

91 SZ_DHW_TEMP, 

92 SZ_FAULT_PRESENT, 

93 SZ_FLAME_ACTIVE, 

94 SZ_FLAME_SIGNAL_LOW, 

95 SZ_MAX_REL_MODULATION, 

96 SZ_OEM_CODE, 

97 SZ_OTC_ACTIVE, 

98 SZ_OUTSIDE_TEMP, 

99 SZ_REL_MODULATION_LEVEL, 

100 SZ_SUMMER_MODE, 

101) 

102 

103if TYPE_CHECKING: 

104 from ramses_rf.system import Evohome, Zone 

105 from ramses_tx import Address, Message, Packet 

106 from ramses_tx.opentherm import OtDataId 

107 

108 

109QOS_LOW = {SZ_PRIORITY: Priority.LOW} # FIXME: deprecate QoS in kwargs 

110QOS_MID = {SZ_PRIORITY: Priority.HIGH} # FIXME: deprecate QoS in kwargs 

111QOS_MAX = {SZ_PRIORITY: Priority.HIGH, SZ_NUM_REPEATS: 3} # FIXME: deprecate QoS... 

112 

113# 

114# NOTE: All debug flags should be False for deployment to end-users 

115_DBG_ENABLE_DEPRECATION: Final[bool] = False 

116_DBG_EXTRA_OTB_DISCOVERY: Final[bool] = False 

117 

118_LOGGER = logging.getLogger(__name__) 

119 

120 

121class Actuator(DeviceHeat): # 3EF0, 3EF1 (for 10:/13:) 

122 # .I --- 13:109598 --:------ 13:109598 3EF0 003 00C8FF # event-driven, 00/C8 

123 # RP --- 13:109598 18:002563 --:------ 0008 002 00C8 # 00/C8, as above 

124 # RP --- 13:109598 18:002563 --:------ 3EF1 007 0000BF-00BFC8FF # 00/C8, as above 

125 

126 # RP --- 10:048122 18:140805 --:------ 3EF1 007 007FFF-003C2A10 # 10:s only RP, always 7FFF 

127 # RP --- 13:109598 18:199952 --:------ 3EF1 007 0001B8-01B800FF # 13:s only RP 

128 

129 # RP --- 10:047707 18:199952 --:------ 3EF0 009 001110-0A00FF-033100 # 10:s only RP 

130 # RP --- 10:138926 34:010253 --:------ 3EF0 006 002E11-0000FF # 10:s only RP 

131 # .I --- 13:209679 --:------ 13:209679 3EF0 003 00C8FF # 13:s only I 

132 

133 ACTUATOR_CYCLE: Final = "actuator_cycle" 

134 ACTUATOR_ENABLED: Final = "actuator_enabled" # boolean 

135 ACTUATOR_STATE: Final = "actuator_state" 

136 MODULATION_LEVEL: Final = "modulation_level" # percentage (0.0-1.0) 

137 

138 def _handle_msg(self, msg: Message) -> None: # NOTE: active 

139 super()._handle_msg(msg) 

140 

141 if isinstance(self, OtbGateway): 

142 return 

143 

144 if self._gwy.config.disable_discovery: 

145 return 

146 

147 # TODO: why are we doing this here? Should simply use discovery poller! 

148 if msg.code == Code._3EF0 and msg.verb == I_ and not self.is_faked: 

149 # lf._send_cmd(Command.get_relay_demand(self.id), qos=QOS_LOW) 

150 self._send_cmd( 

151 Command.from_attrs(RQ, self.id, Code._3EF1, "00"), **QOS_LOW 

152 ) # actuator cycle 

153 

154 @property 

155 def actuator_cycle(self) -> dict | None: # 3EF1 

156 return self._msg_value(Code._3EF1) 

157 

158 @property 

159 def actuator_state(self) -> dict | None: # 3EF0 

160 return self._msg_value(Code._3EF0) 

161 

162 @property 

163 def status(self) -> dict[str, Any]: 

164 return { 

165 **super().status, 

166 self.ACTUATOR_CYCLE: self.actuator_cycle, 

167 self.ACTUATOR_STATE: self.actuator_state, 

168 } 

169 

170 

171class HeatDemand(DeviceHeat): # 3150 

172 HEAT_DEMAND: Final = SZ_HEAT_DEMAND # percentage valve open (0.0-1.0) 

173 

174 @property 

175 def heat_demand(self) -> float | None: # 3150 

176 return self._msg_value(Code._3150, key=self.HEAT_DEMAND) 

177 

178 @property 

179 def status(self) -> dict[str, Any]: 

180 return { 

181 **super().status, 

182 self.HEAT_DEMAND: self.heat_demand, 

183 } 

184 

185 

186class Setpoint(DeviceHeat): # 2309 

187 SETPOINT: Final = SZ_SETPOINT # degrees Celsius 

188 

189 @property 

190 def setpoint(self) -> float | None: # 2309 

191 return self._msg_value(Code._2309, key=self.SETPOINT) 

192 

193 @property 

194 def status(self) -> dict[str, Any]: 

195 return { 

196 **super().status, 

197 self.SETPOINT: self.setpoint, 

198 } 

199 

200 

201class Weather(DeviceHeat): # 0002 

202 TEMPERATURE: Final = SZ_TEMPERATURE # TODO: deprecate 

203 

204 @property 

205 def temperature(self) -> float | None: # 0002 

206 return self._msg_value(Code._0002, key=SZ_TEMPERATURE) 

207 

208 @temperature.setter 

209 def temperature(self, value: float | None) -> None: 

210 """Fake the outdoor temperature of the sensor.""" 

211 

212 if not self.is_faked: 

213 raise exc.DeviceNotFaked(f"{self}: Faking is not enabled") 

214 

215 cmd = Command.put_outdoor_temp(self.id, value) 

216 self._gwy.send_cmd(cmd, num_repeats=2, priority=Priority.HIGH) 

217 

218 @property 

219 def status(self) -> dict[str, Any]: 

220 return { 

221 **super().status, 

222 self.TEMPERATURE: self.temperature, 

223 } 

224 

225 

226class RelayDemand(DeviceHeat): # 0008 

227 # .I --- 01:054173 --:------ 01:054173 1FC9 018 03-0008-04D39D FC-3B00-04D39D 03-1FC9-04D39D 

228 # .W --- 13:123456 01:054173 --:------ 1FC9 006 00-3EF0-35E240 

229 # .I --- 01:054173 13:123456 --:------ 1FC9 006 00-FFFF-04D39D 

230 

231 # Some either 00/C8, others 00-C8 

232 # .I --- 01:145038 --:------ 01:145038 0008 002 0314 # ZON valve zone (ELE too?) 

233 # .I --- 01:145038 --:------ 01:145038 0008 002 F914 # HTG valve 

234 # .I --- 01:054173 --:------ 01:054173 0008 002 FA00 # DHW valve 

235 # .I --- 01:145038 --:------ 01:145038 0008 002 FC14 # appliance_relay 

236 

237 # RP --- 13:109598 18:199952 --:------ 0008 002 0000 

238 # RP --- 13:109598 18:199952 --:------ 0008 002 00C8 

239 

240 RELAY_DEMAND: Final = SZ_RELAY_DEMAND # percentage (0.0-1.0) 

241 

242 def _setup_discovery_cmds(self) -> None: 

243 super()._setup_discovery_cmds() 

244 

245 if not self.is_faked: # discover_flag & Discover.STATUS and 

246 self._add_discovery_cmd(Command.get_relay_demand(self.id), 60 * 15) 

247 

248 @property 

249 def relay_demand(self) -> float | None: # 0008 

250 return self._msg_value(Code._0008, key=self.RELAY_DEMAND) 

251 

252 @property 

253 def status(self) -> dict[str, Any]: 

254 return { 

255 **super().status, 

256 self.RELAY_DEMAND: self.relay_demand, 

257 } 

258 

259 

260class DhwTemperature(DeviceHeat): # 1260 

261 TEMPERATURE: Final = SZ_TEMPERATURE # TODO: deprecate 

262 

263 @property 

264 def temperature(self) -> float | None: # 1260 

265 return self._msg_value(Code._1260, key=SZ_TEMPERATURE) 

266 

267 @temperature.setter 

268 def temperature(self, value: float | None) -> None: 

269 """Fake the DHW temperature of the sensor.""" 

270 

271 if not self.is_faked: 

272 raise exc.DeviceNotFaked(f"{self}: Faking is not enabled") 

273 

274 cmd = Command.put_dhw_temp(self.id, value) 

275 self._gwy.send_cmd(cmd, num_repeats=2, priority=Priority.HIGH) 

276 

277 @property 

278 def status(self) -> dict[str, Any]: 

279 return { 

280 **super().status, 

281 self.TEMPERATURE: self.temperature, 

282 } 

283 

284 

285class Temperature(DeviceHeat): # 30C9 

286 # .I --- 34:145039 --:------ 34:145039 1FC9 012 00-30C9-8A368F 00-1FC9-8A368F 

287 # .W --- 01:054173 34:145039 --:------ 1FC9 006 03-2309-04D39D # real CTL 

288 # .I --- 34:145039 01:054173 --:------ 1FC9 006 00-30C9-8A368F 

289 @property 

290 def temperature(self) -> float | None: # 30C9 

291 return self._msg_value(Code._30C9, key=SZ_TEMPERATURE) 

292 

293 @temperature.setter 

294 def temperature(self, value: float | None) -> None: 

295 """Fake the indoor temperature of the sensor.""" 

296 

297 if not self.is_faked: 

298 raise exc.DeviceNotFaked(f"{self}: Faking is not enabled") 

299 

300 cmd = Command.put_sensor_temp(self.id, value) 

301 self._gwy.send_cmd(cmd, num_repeats=2, priority=Priority.HIGH) 

302 

303 @property 

304 def status(self) -> dict[str, Any]: 

305 return { 

306 **super().status, 

307 SZ_TEMPERATURE: self.temperature, 

308 } 

309 

310 

311class RfgGateway(DeviceHeat): # RFG (30:) 

312 """The RFG100 base class.""" 

313 

314 _SLUG = DevType.RFG 

315 _STATE_ATTR = None 

316 

317 

318class Controller(DeviceHeat): # CTL (01): 

319 """The Controller base class.""" 

320 

321 HEAT_DEMAND: Final = SZ_HEAT_DEMAND 

322 

323 _SLUG = DevType.CTL 

324 _STATE_ATTR = HEAT_DEMAND 

325 

326 def __init__(self, *args: Any, **kwargs: Any) -> None: 

327 super().__init__(*args, **kwargs) 

328 

329 # self.ctl = None 

330 self.tcs = None # TODO: = self? 

331 self._make_tcs_controller(**kwargs) # NOTE: must create_from_schema first 

332 

333 def _handle_msg(self, msg: Message) -> None: 

334 super()._handle_msg(msg) 

335 

336 self.tcs._handle_msg(msg) 

337 

338 def _make_tcs_controller( 

339 self, *, msg: Message | None = None, **schema: Any 

340 ) -> None: # CH/DHW 

341 """Attach a TCS (create/update as required) after passing it any msg.""" 

342 

343 def get_system(*, msg: Message | None = None, **schema: Any) -> Evohome: 

344 """Return a TCS (temperature control system), create it if required. 

345 

346 Use the schema to create/update it, then pass it any msg to handle. 

347 

348 TCSs are uniquely identified by a controller ID. 

349 If a TCS is created, attach it to this device (which should be a CTL). 

350 """ 

351 

352 from ramses_rf.system import system_factory 

353 

354 schema = shrink(SCH_TCS(schema)) 

355 

356 if not self.tcs: 

357 self.tcs = system_factory(self, msg=msg, **schema) 

358 

359 elif schema: 

360 self.tcs._update_schema(**schema) 

361 

362 if msg: 

363 self.tcs._handle_msg(msg) 

364 return self.tcs 

365 

366 super()._make_tcs_controller(msg=None, **schema) 

367 

368 self.tcs = get_system(msg=msg, **schema) 

369 

370 

371class Programmer(Controller): # PRG (23): 

372 """The Controller base class.""" 

373 

374 _SLUG = DevType.PRG 

375 

376 

377class UfhController(Parent, DeviceHeat): # UFC (02): 

378 """The UFC class, the HCE80 that controls the UFH zones.""" 

379 

380 HEAT_DEMAND: Final = SZ_HEAT_DEMAND 

381 

382 _SLUG = DevType.UFC 

383 _STATE_ATTR = HEAT_DEMAND 

384 

385 _child_id = FA 

386 _iz_controller = True 

387 

388 childs: list[UfhCircuit] # TODO: check (code so complex, not sure if this is true) 

389 

390 # 12:27:24.398 067 I --- 02:000921 --:------ 01:191718 3150 002 0360 

391 # 12:27:24.546 068 I --- 02:000921 --:------ 01:191718 3150 002 065A 

392 # 12:27:24.693 067 I --- 02:000921 --:------ 01:191718 3150 002 045C 

393 # 12:27:24.824 059 I --- 01:191718 --:------ 01:191718 3150 002 FC5C 

394 # 12:27:24.857 067 I --- 02:000921 --:------ 02:000921 3150 006 0060-015A-025C 

395 

396 def __init__(self, *args: Any, **kwargs: Any) -> None: 

397 super().__init__(*args, **kwargs) 

398 

399 self.circuit_by_id = {f"{i:02X}": {} for i in range(8)} 

400 

401 self._setpoints: Message | None = None 

402 self._heat_demand: Message | None = None 

403 self._heat_demands: Message | None = None 

404 self._relay_demand: Message | None = None 

405 self._relay_demand_fa: Message | None = None 

406 

407 def _setup_discovery_cmds(self) -> None: 

408 super()._setup_discovery_cmds() 

409 

410 # Only RPs are: 0001, 0005/000C, 10E0, 000A/2309 & 22D0 

411 

412 cmd = Command.from_attrs(RQ, self.id, Code._0005, f"00{DEV_ROLE_MAP.UFH}") 

413 self._add_discovery_cmd(cmd, 60 * 60 * 24) 

414 

415 # TODO: this needs work 

416 # if discover_flag & Discover.PARAMS: # only 2309 has any potential? 

417 for ufc_idx in self.circuit_by_id: 

418 cmd = Command.get_zone_config(self.id, ufc_idx) 

419 self._add_discovery_cmd(cmd, 60 * 60 * 6) 

420 

421 cmd = Command.get_zone_setpoint(self.id, ufc_idx) 

422 self._add_discovery_cmd(cmd, 60 * 60 * 6) 

423 

424 for ufc_idx in range(8): 

425 payload = f"{ufc_idx:02X}{DEV_ROLE_MAP.UFH}" 

426 cmd = Command.from_attrs(RQ, self.id, Code._000C, payload) 

427 self._add_discovery_cmd(cmd, 60 * 60 * 24) 

428 

429 def _handle_msg(self, msg: Message) -> None: 

430 super()._handle_msg(msg) 

431 

432 # Several assumptions are made, regarding 000C pkts: 

433 # - UFC bound only to CTL (not, e.g. SEN) 

434 # - all circuits bound to the same controller 

435 

436 if msg.code == Code._0005: # system_zones 

437 # {'zone_type': '09', 'zone_mask': [1, 1, 1, 1, 1, 0, 0, 0], 'zone_class': 'underfloor_heating'} 

438 

439 if msg.payload[SZ_ZONE_TYPE] not in (ZON_ROLE_MAP.ACT, ZON_ROLE_MAP.UFH): 

440 return # ignoring ZON_ROLE_MAP.SEN for now 

441 

442 for idx, flag in enumerate(msg.payload[SZ_ZONE_MASK]): 

443 ufh_idx = f"{idx:02X}" 

444 if not flag: 

445 self.circuit_by_id[ufh_idx] = {SZ_ZONE_IDX: None} 

446 # FIXME: this causing tests to fail when read-only protocol 

447 # elif SZ_ZONE_IDX not in self.circuit_by_id[ufh_idx]: 

448 # cmd = Command.from_attrs( 

449 # RQ, self.ctl.id, Code._000C, f"{ufh_idx}{DEV_ROLE_MAP.UFH}" 

450 # ) 

451 # self._send_cmd(cmd) 

452 

453 elif msg.code == Code._0008: # relay_demand 

454 if msg.payload.get(SZ_DOMAIN_ID) == FC: 

455 self._relay_demand = msg 

456 else: # FA 

457 self._relay_demand_fa = msg 

458 

459 elif msg.code == Code._000C: # zone_devices 

460 # {'zone_type': '09', 'ufh_idx': '00', 'zone_idx': '09', 'device_role': 'ufh_actuator', 'devices': ['01:095421']} 

461 # {'zone_type': '09', 'ufh_idx': '07', 'zone_idx': None, 'device_role': 'ufh_actuator', 'devices': []} 

462 

463 if msg.payload[SZ_ZONE_TYPE] not in (ZON_ROLE_MAP.ACT, ZON_ROLE_MAP.UFH): 

464 return # ignoring ZON_ROLE_MAP.SEN for now 

465 

466 ufh_idx = msg.payload[SZ_UFH_IDX] # circuit idx 

467 self.circuit_by_id[ufh_idx] = {SZ_ZONE_IDX: msg.payload[SZ_ZONE_IDX]} 

468 if msg.payload[SZ_ZONE_IDX] is not None: # [SZ_DEVICES][0] will be the CTL 

469 self.set_parent( 

470 self._gwy.get_device(msg.payload[SZ_DEVICES][0]).tcs, 

471 # child_id=msg.payload[SZ_ZONE_IDX], 

472 ) 

473 

474 elif msg.code == Code._22C9: # setpoint_bounds 

475 # .I --- 02:017205 --:------ 02:017205 22C9 024 00076C0A280101076C0A28010... 

476 # .I --- 02:017205 --:------ 02:017205 22C9 006 04076C0A2801 

477 self._setpoints = msg 

478 

479 elif msg.code == Code._3150: # heat_demands 

480 if isinstance(msg.payload, list): # the circuit demands 

481 self._heat_demands = msg 

482 elif msg.payload.get(SZ_DOMAIN_ID) == FC: 

483 self._heat_demand = msg 

484 elif ( 

485 (zone_idx := msg.payload.get(SZ_ZONE_IDX)) 

486 and isinstance(msg.dst, Device) 

487 and (tcs := msg.dst.tcs) 

488 and (zone := tcs.zone_by_idx.get(zone_idx)) 

489 ): 

490 zone._handle_msg(msg) 

491 

492 # elif msg.code not in (Code._10E0, Code._22D0): 

493 # print("xxx") 

494 # "0008|FA/FC", "22C9|array", "22D0|none", "3150|ZZ/array(/FC?)" 

495 

496 # TODO: should be a private method 

497 def get_circuit( 

498 self, cct_idx: str, *, msg: Message | None = None, **schema: Any 

499 ) -> Any: 

500 """Return a UFH circuit, create it if required. 

501 

502 First, use the schema to create/update it, then pass it any msg to handle. 

503 

504 Circuits are uniquely identified by a UFH controller ID|cct_idx pair. 

505 If a circuit is created, attach it to this UFC. 

506 """ 

507 

508 schema = {} # shrink(SCH_CCT(schema)) 

509 

510 cct: UfhCircuit = self.child_by_id.get(cct_idx) 

511 if not cct: 

512 cct = UfhCircuit(self, cct_idx) 

513 self.child_by_id[cct_idx] = cct 

514 self.childs.append(cct) 

515 

516 elif schema: 

517 cct._update_schema(**schema) 

518 

519 if msg: 

520 cct._handle_msg(msg) 

521 return cct 

522 

523 # @property 

524 # def circuits(self) -> dict: # 000C 

525 # return self.circuit_by_id 

526 

527 @property 

528 def heat_demand(self) -> float | None: # 3150|FC (there is also 3150|FA) 

529 return self._msg_value_msg(self._heat_demand, key=self.HEAT_DEMAND) 

530 

531 @property 

532 def heat_demands(self) -> dict | None: # 3150|ufh_idx array 

533 # return self._heat_demands.payload if self._heat_demands else None 

534 return self._msg_value_msg(self._heat_demands) 

535 

536 @property 

537 def relay_demand(self) -> dict | None: # 0008|FC 

538 return self._msg_value_msg(self._relay_demand, key=SZ_RELAY_DEMAND) 

539 

540 @property 

541 def relay_demand_fa(self) -> dict | None: # 0008|FA 

542 return self._msg_value_msg(self._relay_demand_fa, key=SZ_RELAY_DEMAND) 

543 

544 @property 

545 def setpoints(self) -> dict | None: # 22C9|ufh_idx array 

546 if self._setpoints is None: 

547 return None 

548 

549 return { 

550 c[SZ_UFH_IDX]: { 

551 k: v for k, v in c.items() if k in ("temp_low", "temp_high") 

552 } 

553 for c in self._setpoints.payload 

554 } 

555 

556 @property # id, type 

557 def schema(self) -> dict[str, Any]: 

558 return { 

559 **super().schema, 

560 SZ_CIRCUITS: self.circuit_by_id, 

561 } 

562 

563 @property # setpoint, config, mode (not schedule) 

564 def params(self) -> dict[str, Any]: 

565 return { 

566 **super().params, 

567 SZ_CIRCUITS: self.setpoints, 

568 } 

569 

570 @property 

571 def status(self) -> dict[str, Any]: 

572 return { 

573 **super().status, 

574 SZ_HEAT_DEMAND: self.heat_demand, 

575 SZ_RELAY_DEMAND: self.relay_demand, 

576 f"{SZ_RELAY_DEMAND}_fa": self.relay_demand_fa, 

577 } 

578 

579 

580class DhwSensor(DhwTemperature, BatteryState, Fakeable): # DHW (07): 10A0, 1260 

581 """The DHW class, such as a CS92.""" 

582 

583 DHW_PARAMS: Final = "dhw_params" 

584 

585 _SLUG: str = DevType.DHW 

586 _STATE_ATTR = DhwTemperature.TEMPERATURE 

587 

588 def __init__(self, *args: Any, **kwargs: Any) -> None: 

589 super().__init__(*args, **kwargs) 

590 

591 self._child_id = FA # NOTE: domain_id 

592 

593 def _handle_msg(self, msg: Message) -> None: # NOTE: active 

594 super()._handle_msg(msg) 

595 

596 if self._gwy.config.disable_discovery: 

597 return 

598 

599 # TODO: why are we doing this here? Should simply use dscovery poller! 

600 # The following is required, as CTLs don't send spontaneously 

601 if msg.code == Code._1260 and self.ctl: 

602 # update the controller DHW temp 

603 self._send_cmd(Command.get_dhw_temp(self.ctl.id)) 

604 

605 async def initiate_binding_process(self) -> Packet: 

606 return await super()._initiate_binding_process(Code._1260) 

607 

608 @property 

609 def dhw_params(self) -> PayDictT._10A0 | None: 

610 return self._msg_value(Code._10A0) 

611 

612 @property 

613 def params(self) -> dict[str, Any]: 

614 return { 

615 **super().params, 

616 self.DHW_PARAMS: self.dhw_params, 

617 } 

618 

619 

620class OutSensor(Weather, Fakeable): # OUT: 17 

621 """The OUT class (external sensor), such as a HB85/HB95.""" 

622 

623 # LUMINOSITY = "luminosity" # lux 

624 # WINDSPEED = "windspeed" # km/h 

625 

626 _SLUG = DevType.OUT 

627 _STATE_ATTR = SZ_TEMPERATURE 

628 

629 # async def initiate_binding_process(self) -> Packet: 

630 # return await super()._initiate_binding_process(...) 

631 

632 

633def _to_msg_id(data_id: OtDataId) -> MsgId: 

634 return f"{data_id:02X}" 

635 

636 

637# NOTE: config.use_native_ot should enforce sends, but not reads from _msgz DB 

638class OtbGateway(Actuator, HeatDemand): # OTB (10): 3220 (22D9, others) 

639 """The OTB class, specifically an OpenTherm Bridge (R8810A Bridge).""" 

640 

641 # see: https://www.opentherm.eu/request-details/?post_ids=2944 

642 # see: https://www.automatedhome.co.uk/vbulletin/showthread.php?6400-(New)-cool-mode-in-Evohome 

643 

644 _SLUG = DevType.OTB 

645 _STATE_ATTR = SZ_REL_MODULATION_LEVEL 

646 

647 OT_TO_RAMSES: dict[MsgId, Code] = { # TODO: move to opentherm.py 

648 MsgId._00: Code._3EF0, # master/slave status (actuator_state) 

649 MsgId._01: Code._22D9, # boiler_setpoint 

650 MsgId._0E: Code._3EF0, # max_rel_modulation_level (is a PARAM?) 

651 MsgId._11: Code._3EF0, # rel_modulation_level (actuator_state, also Code._3EF1) 

652 MsgId._12: Code._1300, # ch_water_pressure 

653 MsgId._13: Code._12F0, # dhw_flow_rate 

654 MsgId._19: Code._3200, # boiler_output_temp 

655 MsgId._1A: Code._1260, # dhw_temp 

656 MsgId._1B: Code._1290, # outside_temp 

657 MsgId._1C: Code._3210, # boiler_return_temp 

658 MsgId._38: Code._10A0, # dhw_setpoint (is a PARAM) 

659 MsgId._39: Code._1081, # ch_max_setpoint (is a PARAM) 

660 } 

661 RAMSES_TO_OT: dict[Code, MsgId] = { 

662 v: k for k, v in OT_TO_RAMSES.items() if v != Code._3EF0 

663 } # also 10A0? 

664 

665 def __init__(self, *args: Any, **kwargs: Any) -> None: 

666 super().__init__(*args, **kwargs) 

667 

668 self._child_id = FC # NOTE: domain_id 

669 

670 # TODO(eb): cleanup 

671 if self._gwy.msg_db: 

672 self._add_record(id=self.id, code=Code._3220, verb="RP") 

673 # adds a "sim" RP opentherm_msg to the SQLite MessageIndex with code _3220 

674 # causes exc when fetching ALL, when no "real" msg was added to _msgs_. We skip those. 

675 else: 

676 self._msgz[Code._3220] = {RP: {}} # No ctx! (not None) 

677 

678 # lf._use_ot = self._gwy.config.use_native_ot 

679 self._msgs_ot: dict[MsgId, Message] = {} 

680 # lf._msgs_ot_ctl_polled = {} 

681 

682 def _setup_discovery_cmds(self) -> None: 

683 def which_cmd(use_native_ot: str, msg_id: MsgId) -> Command | None: 

684 """Create a OT cmd, or its RAMSES equivalent, depending.""" 

685 # we know RQ|3220 is an option, question is: use that, or RAMSES or nothing? 

686 if use_native_ot in ("always", "prefer"): 

687 return Command.get_opentherm_data(self.id, msg_id) 

688 if msg_id in self.OT_TO_RAMSES: # is: in ("avoid", "never") 

689 return Command.from_attrs(RQ, self.id, self.OT_TO_RAMSES[msg_id], "00") 

690 if use_native_ot == "avoid": 

691 return Command.get_opentherm_data(self.id, msg_id) 

692 return None # use_native_ot == "never" 

693 

694 super()._setup_discovery_cmds() 

695 

696 # always send at least one of RQ|3EF0 or RQ|3220|00 (status) 

697 if self._gwy.config.use_native_ot != "never": 

698 self._add_discovery_cmd(Command.get_opentherm_data(self.id, MsgId._00), 60) 

699 

700 if self._gwy.config.use_native_ot != "always": 

701 self._add_discovery_cmd( 

702 Command.from_attrs(RQ, self.id, Code._3EF0, "00"), 60 

703 ) 

704 self._add_discovery_cmd( # NOTE: this code is a WIP 

705 Command.from_attrs(RQ, self.id, Code._2401, "00"), 60 

706 ) 

707 

708 for data_id in SCHEMA_DATA_IDS: # From OT v2.2: version numbers 

709 if cmd := which_cmd(self._gwy.config.use_native_ot, _to_msg_id(data_id)): 

710 self._add_discovery_cmd(cmd, 6 * 3600, delay=180) 

711 

712 for data_id in PARAMS_DATA_IDS: # params or L/T state 

713 if cmd := which_cmd(self._gwy.config.use_native_ot, _to_msg_id(data_id)): 

714 self._add_discovery_cmd(cmd, 3600, delay=90) 

715 

716 for data_id in STATUS_DATA_IDS: # except "00", see above 

717 if data_id == 0x00: 

718 continue 

719 if cmd := which_cmd(self._gwy.config.use_native_ot, _to_msg_id(data_id)): 

720 self._add_discovery_cmd(cmd, 300, delay=15) 

721 

722 if _DBG_EXTRA_OTB_DISCOVERY: # TODO: these are WIP, but do vary in payload 

723 for code in ( 

724 Code._2401, # WIP - modulation_level + flags? 

725 Code._3221, # R8810A/20A 

726 Code._3223, # R8810A/20A 

727 ): 

728 self._add_discovery_cmd(Command.from_attrs(RQ, self.id, code, "00"), 60) 

729 

730 if _DBG_EXTRA_OTB_DISCOVERY: # TODO: these are WIP, appear FIXED in payload 

731 for code in ( 

732 Code._0150, # payload always "000000", R8820A only? 

733 Code._1098, # payload always "00C8", R8820A only? 

734 Code._10B0, # payload always "0000", R8820A only? 

735 Code._1FD0, # payload always "0000000000000000" 

736 Code._2400, # payload always "0000000F" 

737 Code._2410, # payload always "000000000000000000000000010000000100000C" 

738 Code._2420, # payload always "0000001000000... 

739 ): # TODO: to test against BDR91T 

740 self._add_discovery_cmd( 

741 Command.from_attrs(RQ, self.id, code, "00"), 300 

742 ) 

743 

744 def _handle_msg(self, msg: Message) -> None: 

745 super()._handle_msg(msg) 

746 

747 if msg.verb not in (I_, RP): 

748 return 

749 

750 if msg.code == Code._3220: 

751 self._handle_3220(msg) 

752 elif msg.code in self.RAMSES_TO_OT: 

753 self._handle_code(msg) 

754 

755 def _handle_3220(self, msg: Message) -> None: 

756 """Handle 3220-based messages.""" 

757 

758 # NOTE: Reserved msgs have null data, but that msg_id may later be OK! 

759 if msg.payload[SZ_MSG_TYPE] == OtMsgType.RESERVED: 

760 return 

761 

762 # NOTE: Some msgs have invalid data, but that msg_id may later be OK! 

763 if msg.payload.get(SZ_VALUE) is None: 

764 return 

765 

766 # msg_id is int in msg payload/opentherm.py, but MsgId (str) in this module 

767 msg_id = _to_msg_id(msg.payload[SZ_MSG_ID]) 

768 self._msgs_ot[msg_id] = msg 

769 

770 if not _DBG_ENABLE_DEPRECATION: # FIXME: data gaps 

771 return 

772 

773 reset = msg.payload[SZ_MSG_TYPE] not in ( 

774 OtMsgType.DATA_INVALID, 

775 OtMsgType.UNKNOWN_DATAID, 

776 OtMsgType.RESERVED, # but some are ?always reserved 

777 ) 

778 self._deprecate_code_ctx(msg._pkt, ctx=msg_id, reset=reset) 

779 

780 def _handle_code(self, msg: Message) -> None: 

781 """Handle non-3220-based messages.""" 

782 

783 if msg.code == Code._3EF0 and msg.verb == I_: 

784 # NOTE: this is development/discovery code # chasing flags 

785 # self._send_cmd( 

786 # Command.get_opentherm_data(self.id, MsgId._00), **QOS_MID 

787 # ) # FIXME: deprecate QoS in kwargs 

788 return 

789 

790 if msg.code in (Code._10A0, Code._3EF1): 

791 return 

792 

793 if not _DBG_ENABLE_DEPRECATION: # FIXME: data gaps 

794 return 

795 

796 # TODO: can be temporarily 7FFF? 

797 if msg._pkt.payload[2:] == "7FFF" or ( 

798 msg.code == Code._1300 and msg._pkt.payload[2:] == "09F6" 

799 ): # latter is CH water pressure 

800 self._deprecate_code_ctx(msg._pkt) 

801 else: 

802 self._deprecate_code_ctx(msg._pkt, reset=True) 

803 

804 def _ot_msg_flag(self, msg_id: MsgId, flag_idx: int) -> bool | None: 

805 flags: list = self._ot_msg_value(msg_id) 

806 return bool(flags[flag_idx]) if flags else None 

807 

808 @staticmethod 

809 def _ot_msg_name(msg: Message) -> str: # TODO: remove 

810 return ( 

811 msg.payload[SZ_MSG_NAME] 

812 if isinstance(msg.payload[SZ_MSG_NAME], str) 

813 else f"{msg.payload[SZ_MSG_ID]:02X}" 

814 ) 

815 

816 def _ot_msg_value(self, msg_id: MsgId) -> int | float | list | None: 

817 # data_id = int(msg_id, 16) 

818 if (msg := self._msgs_ot.get(msg_id)) and not msg._expired: 

819 # TODO: value_hb/_lb 

820 return msg.payload.get(SZ_VALUE) # type: ignore[no-any-return] 

821 return None 

822 

823 def _result_by_callback( 

824 self, cbk_ot: Callable | None, cbk_ramses: Callable | None 

825 ) -> Any | None: 

826 """Return a value using OpenTherm or RAMSES as per `config.use_native_ot`.""" 

827 

828 if self._gwy.config.use_native_ot == "always": 

829 return cbk_ot() if cbk_ot else None 

830 if self._gwy.config.use_native_ot == "prefer": 

831 if cbk_ot and (result := cbk_ot()) is not None: 

832 return result 

833 

834 result_ramses = cbk_ramses() if cbk_ramses is not None else None 

835 if self._gwy.config.use_native_ot == "avoid" and result_ramses is None: 

836 return cbk_ot() if cbk_ot else None 

837 return result_ramses # incl. use_native_ot == "never" 

838 

839 def _result_by_lookup( 

840 self, 

841 code: Code, 

842 /, 

843 *, 

844 key: str, 

845 ) -> Any | None: 

846 """Return a value using OpenTherm or RAMSES as per `config.use_native_ot`.""" 

847 # assert code in self.RAMSES_TO_OT and kwargs.get("key"): 

848 

849 if self._gwy.config.use_native_ot == "always": 

850 return self._ot_msg_value(self.RAMSES_TO_OT[code]) 

851 

852 if self._gwy.config.use_native_ot == "prefer": 

853 if (result_ot := self._ot_msg_value(self.RAMSES_TO_OT[code])) is not None: 

854 return result_ot 

855 

856 result_ramses = self._msg_value(code, key=key) 

857 if self._gwy.config.use_native_ot == "avoid" and result_ramses is None: 

858 return self._ot_msg_value(self.RAMSES_TO_OT[code]) 

859 

860 return result_ramses # incl. use_native_ot == "never" 

861 

862 def _result_by_value( 

863 self, result_ot: Any | None, result_ramses: Any | None 

864 ) -> Any | None: 

865 """Return a value using OpenTherm or RAMSES as per `config.use_native_ot`.""" 

866 # 

867 

868 if self._gwy.config.use_native_ot == "always": 

869 return result_ot 

870 

871 if self._gwy.config.use_native_ot == "prefer": 

872 if result_ot is not None: 

873 return result_ot 

874 

875 # 

876 elif self._gwy.config.use_native_ot == "avoid" and result_ramses is None: 

877 return result_ot 

878 

879 return result_ramses # incl. use_native_ot == "never" 

880 

881 @property # TODO 

882 def bit_2_4(self) -> bool | None: # 2401 - WIP 

883 return self._msg_flag(Code._2401, "_flags_2", 4) 

884 

885 @property # TODO 

886 def bit_2_5(self) -> bool | None: # 2401 - WIP 

887 return self._msg_flag(Code._2401, "_flags_2", 5) 

888 

889 @property # TODO 

890 def bit_2_6(self) -> bool | None: # 2401 - WIP 

891 return self._msg_flag(Code._2401, "_flags_2", 6) 

892 

893 @property # TODO 

894 def bit_2_7(self) -> bool | None: # 2401 - WIP 

895 return self._msg_flag(Code._2401, "_flags_2", 7) 

896 

897 @property # TODO 

898 def bit_3_7(self) -> bool | None: # 3EF0 (byte 3, only OTB) 

899 return self._msg_flag(Code._3EF0, "_flags_3", 7) 

900 

901 @property # TODO 

902 def bit_6_6(self) -> bool | None: # 3EF0 ?dhw_enabled (byte 3, only R8820A?) 

903 return self._msg_flag(Code._3EF0, "_flags_6", 6) 

904 

905 @property # TODO 

906 def percent(self) -> float | None: # 2401 - WIP (~3150|FC) 

907 return self._msg_value(Code._2401, key=SZ_HEAT_DEMAND) 

908 

909 @property # TODO 

910 def value(self) -> int | None: # 2401 - WIP 

911 return self._msg_value(Code._2401, key="_value_2") 

912 

913 @property 

914 def boiler_output_temp(self) -> float | None: # 3220|19, or 3200 

915 # _LOGGER.warning( 

916 # "code=%s, 3220=%s, both=%s", 

917 # self._msg_value(Code._3200, key=SZ_TEMPERATURE), 

918 # self._ot_msg_value(str(self.RAMSES_TO_OT[Code._3200])), 

919 # self._result_by_lookup(Code._3200, key=SZ_TEMPERATURE), 

920 # ) 

921 

922 return self._result_by_lookup(Code._3200, key=SZ_TEMPERATURE) 

923 

924 @property 

925 def boiler_return_temp(self) -> float | None: # 3220|1C, or 3210 

926 return self._result_by_lookup(Code._3210, key=SZ_TEMPERATURE) 

927 

928 @property 

929 def boiler_setpoint(self) -> float | None: # 3220|01, or 22D9 

930 return self._result_by_lookup(Code._22D9, key=SZ_SETPOINT) 

931 

932 @property 

933 def ch_max_setpoint(self) -> float | None: # 3220|39, or 1081 

934 return self._result_by_lookup(Code._1081, key=SZ_SETPOINT) 

935 

936 @property # TODO: no OT equivalent 

937 def ch_setpoint(self) -> float | None: # 3EF0 (byte 7, only R8820A?) 

938 return self._result_by_value( 

939 None, self._msg_value(Code._3EF0, key=SZ_CH_SETPOINT) 

940 ) 

941 

942 @property 

943 def ch_water_pressure(self) -> float | None: # 3220|12, or 1300 

944 return self._result_by_lookup(Code._1300, key=SZ_PRESSURE) 

945 

946 @property 

947 def dhw_flow_rate(self) -> float | None: # 3220|13, or 12F0 

948 return self._result_by_lookup(Code._12F0, key=SZ_DHW_FLOW_RATE) 

949 

950 @property 

951 def dhw_setpoint(self) -> float | None: # 3220|38, or 10A0 

952 return self._result_by_lookup(Code._10A0, key=SZ_SETPOINT) 

953 

954 @property 

955 def dhw_temp(self) -> float | None: # 3220|1A, or 1260 

956 return self._result_by_lookup(Code._1260, key=SZ_TEMPERATURE) 

957 

958 @property # TODO: no reliable OT equivalent? 

959 def max_rel_modulation(self) -> float | None: # 3220|0E, or 3EF0 (byte 8) 

960 if self._gwy.config.use_native_ot == "prefer": # HACK: there'll always be 3EF0 

961 return self._msg_value(Code._3EF0, key=SZ_MAX_REL_MODULATION) 

962 return self._result_by_value( 

963 self._ot_msg_value(MsgId._0E), # NOTE: not reliable? 

964 self._msg_value(Code._3EF0, key=SZ_MAX_REL_MODULATION), 

965 ) 

966 

967 @property 

968 def oem_code(self) -> float | None: # 3220|73, no known RAMSES equivalent 

969 return self._ot_msg_value(MsgId._73) 

970 

971 @property 

972 def outside_temp(self) -> float | None: # 3220|1B, 1290 

973 return self._result_by_lookup(Code._1290, key=SZ_TEMPERATURE) 

974 

975 @property # TODO: no reliable OT equivalent? 

976 def rel_modulation_level(self) -> float | None: # 3220|11, or 3EF0/3EF1 

977 if self._gwy.config.use_native_ot == "prefer": # HACK: there'll always be 3EF0 

978 return self._msg_value((Code._3EF0, Code._3EF1), key=self.MODULATION_LEVEL) 

979 return self._result_by_value( 

980 self._ot_msg_value(MsgId._11), # NOTE: not reliable? 

981 self._msg_value((Code._3EF0, Code._3EF1), key=self.MODULATION_LEVEL), 

982 ) 

983 

984 @property # TODO: no reliable OT equivalent? 

985 def ch_active(self) -> bool | None: # 3220|00, or 3EF0 (byte 3) 

986 if self._gwy.config.use_native_ot == "prefer": # HACK: there'll always be 3EF0 

987 return self._msg_value(Code._3EF0, key=SZ_CH_ACTIVE) 

988 return self._result_by_value( 

989 self._ot_msg_flag(MsgId._00, 8 + 1), # NOTE: not reliable? 

990 self._msg_value(Code._3EF0, key=SZ_CH_ACTIVE), 

991 ) 

992 

993 @property # TODO: no reliable OT equivalent? 

994 def ch_enabled(self) -> bool | None: # 3220|00, or 3EF0 (byte 6) 

995 if self._gwy.config.use_native_ot == "prefer": # HACK: there'll always be 3EF0 

996 return self._msg_value(Code._3EF0, key=SZ_CH_ENABLED) 

997 return self._result_by_value( 

998 self._ot_msg_flag(MsgId._00, 0), # NOTE: not reliable? 

999 self._msg_value(Code._3EF0, key=SZ_CH_ENABLED), 

1000 ) 

1001 

1002 @property 

1003 def cooling_active(self) -> bool | None: # 3220|00, TODO: no known RAMSES 

1004 return self._result_by_value(self._ot_msg_flag(MsgId._00, 8 + 4), None) 

1005 

1006 @property 

1007 def cooling_enabled(self) -> bool | None: # 3220|00, TODO: no known RAMSES 

1008 return self._result_by_value(self._ot_msg_flag(MsgId._00, 2), None) 

1009 

1010 @property # TODO: no reliable OT equivalent? 

1011 def dhw_active(self) -> bool | None: # 3220|00, or 3EF0 (byte 3) 

1012 if self._gwy.config.use_native_ot == "prefer": # HACK: there'll always be 3EF0 

1013 return self._msg_value(Code._3EF0, key=SZ_DHW_ACTIVE) 

1014 return self._result_by_value( 

1015 self._ot_msg_flag(MsgId._00, 8 + 2), # NOTE: not reliable? 

1016 self._msg_value(Code._3EF0, key=SZ_DHW_ACTIVE), 

1017 ) 

1018 

1019 @property 

1020 def dhw_blocking(self) -> bool | None: # 3220|00, TODO: no known RAMSES 

1021 return self._result_by_value(self._ot_msg_flag(MsgId._00, 6), None) 

1022 

1023 @property 

1024 def dhw_enabled(self) -> bool | None: # 3220|00, TODO: no known RAMSES 

1025 return self._result_by_value(self._ot_msg_flag(MsgId._00, 1), None) 

1026 

1027 @property 

1028 def fault_present(self) -> bool | None: # 3220|00, TODO: no known RAMSES 

1029 return self._result_by_value(self._ot_msg_flag(MsgId._00, 8), None) 

1030 

1031 @property # TODO: no reliable OT equivalent? 

1032 def flame_active(self) -> bool | None: # 3220|00, or 3EF0 (byte 3) 

1033 if self._gwy.config.use_native_ot == "prefer": # HACK: there'll always be 3EF0 

1034 return self._msg_value(Code._3EF0, key="flame_on") 

1035 return self._result_by_value( 

1036 self._ot_msg_flag(MsgId._00, 8 + 3), # NOTE: not reliable? 

1037 self._msg_value(Code._3EF0, key="flame_on"), 

1038 ) 

1039 

1040 @property 

1041 def otc_active(self) -> bool | None: # 3220|00, TODO: no known RAMSES 

1042 return self._result_by_value(self._ot_msg_flag(MsgId._00, 3), None) 

1043 

1044 @property 

1045 def summer_mode(self) -> bool | None: # 3220|00, TODO: no known RAMSES 

1046 return self._result_by_value(self._ot_msg_flag(MsgId._00, 5), None) 

1047 

1048 @property 

1049 def opentherm_schema(self) -> dict[str, Any]: 

1050 result: dict[str, Any] = { 

1051 self._ot_msg_name(v): v.payload 

1052 for k, v in self._msgs_ot.items() 

1053 if self._supported_cmds_ctx.get(k) and int(k, 16) in SCHEMA_DATA_IDS 

1054 } 

1055 return { 

1056 m: {k: v for k, v in p.items() if k.startswith(SZ_VALUE)} 

1057 for m, p in result.items() 

1058 } 

1059 

1060 @property 

1061 def opentherm_counters(self) -> dict[str, Any]: # all are U16 

1062 return { 

1063 SZ_BURNER_HOURS: self._ot_msg_value(MsgId._78), 

1064 SZ_BURNER_STARTS: self._ot_msg_value(MsgId._74), 

1065 SZ_BURNER_FAILED_STARTS: self._ot_msg_value(MsgId._71), 

1066 SZ_CH_PUMP_HOURS: self._ot_msg_value(MsgId._79), 

1067 SZ_CH_PUMP_STARTS: self._ot_msg_value(MsgId._75), 

1068 SZ_DHW_BURNER_HOURS: self._ot_msg_value(MsgId._7B), 

1069 SZ_DHW_BURNER_STARTS: self._ot_msg_value(MsgId._77), 

1070 SZ_DHW_PUMP_HOURS: self._ot_msg_value(MsgId._7A), 

1071 SZ_DHW_PUMP_STARTS: self._ot_msg_value(MsgId._76), 

1072 SZ_FLAME_SIGNAL_LOW: self._ot_msg_value(MsgId._72), 

1073 } # 0x73 is not a counter: is OEM diagnostic code... 

1074 

1075 @property 

1076 def opentherm_params(self) -> dict[str, Any]: # F8_8, U8, {"hb": S8, "lb": S8} 

1077 result = { 

1078 self._ot_msg_name(v): v.payload 

1079 for k, v in self._msgs_ot.items() 

1080 if self._supported_cmds_ctx.get(k) and int(k, 16) in PARAMS_DATA_IDS 

1081 } 

1082 return { 

1083 m: {k: v for k, v in p.items() if k.startswith(SZ_VALUE)} 

1084 for m, p in result.items() 

1085 } 

1086 

1087 @property 

1088 def opentherm_status(self) -> dict[str, Any]: # F8_8, U16 (only OEM_CODE) or bool 

1089 return { # most these are in: STATUS_DATA_IDS 

1090 SZ_BOILER_OUTPUT_TEMP: self._ot_msg_value(MsgId._19), 

1091 SZ_BOILER_RETURN_TEMP: self._ot_msg_value(MsgId._1C), 

1092 SZ_BOILER_SETPOINT: self._ot_msg_value(MsgId._01), 

1093 # SZ_CH_MAX_SETPOINT: self._ot_msg_value(MsgId._39), # in PARAMS_DATA_IDS 

1094 SZ_CH_WATER_PRESSURE: self._ot_msg_value(MsgId._12), 

1095 SZ_DHW_FLOW_RATE: self._ot_msg_value(MsgId._13), 

1096 # SZ_DHW_SETPOINT: self._ot_msg_value(MsgId._38), # in PARAMS_DATA_IDS 

1097 SZ_DHW_TEMP: self._ot_msg_value(MsgId._1A), 

1098 SZ_OEM_CODE: self._ot_msg_value(MsgId._73), 

1099 SZ_OUTSIDE_TEMP: self._ot_msg_value(MsgId._1B), 

1100 SZ_REL_MODULATION_LEVEL: self._ot_msg_value(MsgId._11), 

1101 # 

1102 # SZ...: self._ot_msg_value(MsgId._05), # in STATUS_DATA_IDS 

1103 # SZ...: self._ot_msg_value(MsgId._18), # in STATUS_DATA_IDS 

1104 # 

1105 SZ_CH_ACTIVE: self._ot_msg_flag(MsgId._00, 8 + 1), 

1106 SZ_CH_ENABLED: self._ot_msg_flag(MsgId._00, 0), 

1107 SZ_COOLING_ACTIVE: self._ot_msg_flag(MsgId._00, 8 + 4), 

1108 SZ_COOLING_ENABLED: self._ot_msg_flag(MsgId._00, 2), 

1109 SZ_DHW_ACTIVE: self._ot_msg_flag(MsgId._00, 8 + 2), 

1110 SZ_DHW_BLOCKING: self._ot_msg_flag(MsgId._00, 6), 

1111 SZ_DHW_ENABLED: self._ot_msg_flag(MsgId._00, 1), 

1112 SZ_FAULT_PRESENT: self._ot_msg_flag(MsgId._00, 8), 

1113 SZ_FLAME_ACTIVE: self._ot_msg_flag(MsgId._00, 8 + 3), 

1114 SZ_SUMMER_MODE: self._ot_msg_flag(MsgId._00, 5), 

1115 SZ_OTC_ACTIVE: self._ot_msg_flag(MsgId._00, 3), 

1116 } 

1117 

1118 @property 

1119 def ramses_schema(self) -> PayDictT.EMPTY: 

1120 return {} 

1121 

1122 @property 

1123 def ramses_params(self) -> dict[str, float | None]: 

1124 return { 

1125 SZ_MAX_REL_MODULATION: self.max_rel_modulation, 

1126 } 

1127 

1128 @property 

1129 def ramses_status(self) -> dict[str, Any]: 

1130 return { 

1131 SZ_BOILER_OUTPUT_TEMP: self._msg_value(Code._3200, key=SZ_TEMPERATURE), 

1132 SZ_BOILER_RETURN_TEMP: self._msg_value(Code._3210, key=SZ_TEMPERATURE), 

1133 SZ_BOILER_SETPOINT: self._msg_value(Code._22D9, key=SZ_SETPOINT), 

1134 SZ_CH_MAX_SETPOINT: self._msg_value(Code._1081, key=SZ_SETPOINT), 

1135 SZ_CH_SETPOINT: self._msg_value(Code._3EF0, key=SZ_CH_SETPOINT), 

1136 SZ_CH_WATER_PRESSURE: self._msg_value(Code._1300, key=SZ_PRESSURE), 

1137 SZ_DHW_FLOW_RATE: self._msg_value(Code._12F0, key=SZ_DHW_FLOW_RATE), 

1138 SZ_DHW_SETPOINT: self._msg_value(Code._1300, key=SZ_SETPOINT), 

1139 SZ_DHW_TEMP: self._msg_value(Code._1260, key=SZ_TEMPERATURE), 

1140 SZ_OUTSIDE_TEMP: self._msg_value(Code._1290, key=SZ_TEMPERATURE), 

1141 SZ_REL_MODULATION_LEVEL: self._msg_value( 

1142 (Code._3EF0, Code._3EF1), key=self.MODULATION_LEVEL 

1143 ), 

1144 # 

1145 SZ_CH_ACTIVE: self._msg_value(Code._3EF0, key=SZ_CH_ACTIVE), 

1146 SZ_CH_ENABLED: self._msg_value(Code._3EF0, key=SZ_CH_ENABLED), 

1147 SZ_DHW_ACTIVE: self._msg_value(Code._3EF0, key=SZ_DHW_ACTIVE), 

1148 SZ_FLAME_ACTIVE: self._msg_value(Code._3EF0, key=SZ_FLAME_ACTIVE), 

1149 } 

1150 

1151 @property 

1152 def traits(self) -> dict[str, Any]: 

1153 return { 

1154 **super().traits, 

1155 "opentherm_traits": self.supported_cmds_ot, 

1156 "ramses_ii_traits": self.supported_cmds, 

1157 } 

1158 

1159 @property 

1160 def schema(self) -> dict[str, Any]: 

1161 return { 

1162 **super().schema, 

1163 "opentherm_schema": self.opentherm_schema, 

1164 "ramses_ii_schema": self.ramses_schema, 

1165 } 

1166 

1167 @property 

1168 def params(self) -> dict[str, Any]: 

1169 return { 

1170 **super().params, 

1171 "opentherm_params": self.opentherm_params, 

1172 "ramses_ii_params": self.ramses_params, 

1173 } 

1174 

1175 @property 

1176 def status(self) -> dict[str, Any]: 

1177 return { 

1178 **super().status, # incl. actuator_cycle, actuator_state 

1179 # 

1180 SZ_BOILER_OUTPUT_TEMP: self.boiler_output_temp, 

1181 SZ_BOILER_RETURN_TEMP: self.boiler_return_temp, 

1182 SZ_BOILER_SETPOINT: self.boiler_setpoint, 

1183 SZ_CH_SETPOINT: self.ch_setpoint, 

1184 SZ_CH_MAX_SETPOINT: self.ch_max_setpoint, 

1185 SZ_CH_WATER_PRESSURE: self.ch_water_pressure, 

1186 SZ_DHW_FLOW_RATE: self.dhw_flow_rate, 

1187 SZ_DHW_SETPOINT: self.dhw_setpoint, 

1188 SZ_DHW_TEMP: self.dhw_temp, 

1189 SZ_OEM_CODE: self.oem_code, 

1190 SZ_OUTSIDE_TEMP: self.outside_temp, 

1191 SZ_REL_MODULATION_LEVEL: self.rel_modulation_level, 

1192 # 

1193 SZ_CH_ACTIVE: self.ch_active, 

1194 SZ_CH_ENABLED: self.ch_enabled, 

1195 SZ_COOLING_ACTIVE: self.cooling_active, 

1196 SZ_COOLING_ENABLED: self.cooling_enabled, 

1197 SZ_DHW_ACTIVE: self.dhw_active, 

1198 SZ_DHW_BLOCKING: self.dhw_blocking, 

1199 SZ_DHW_ENABLED: self.dhw_enabled, 

1200 SZ_FAULT_PRESENT: self.fault_present, 

1201 SZ_FLAME_ACTIVE: self.flame_active, 

1202 SZ_SUMMER_MODE: self.summer_mode, 

1203 SZ_OTC_ACTIVE: self.otc_active, 

1204 # 

1205 # "status_opentherm": self.opentherm_status, 

1206 # "status_ramses_ii": self.ramses_status, 

1207 } 

1208 

1209 

1210class Thermostat(BatteryState, Setpoint, Temperature, Fakeable): # THM (..): 

1211 """The THM/STA class, such as a TR87RF.""" 

1212 

1213 _SLUG = DevType.THM 

1214 _STATE_ATTR = SZ_TEMPERATURE 

1215 

1216 def _handle_msg(self, msg: Message) -> None: 

1217 super()._handle_msg(msg) 

1218 

1219 if msg.verb != I_ or self._iz_controller is not None: 

1220 return 

1221 

1222 # NOTE: this has only been tested on a 12:, does it work for a 34: too? 

1223 if all( 

1224 ( 

1225 msg._addrs[0] is self.addr, 

1226 msg._addrs[1] is NON_DEV_ADDR, 

1227 msg._addrs[2] is self.addr, 

1228 ) 

1229 ): 

1230 if self._iz_controller is None: 

1231 # _LOGGER.info(f"{msg!r} # IS_CONTROLLER (10): is FALSE") 

1232 self._iz_controller = False 

1233 elif self._iz_controller: # TODO: raise CorruptStateError 

1234 _LOGGER.error(f"{msg!r} # IS_CONTROLLER (11): was TRUE, now False") 

1235 

1236 if msg.code in CODES_ONLY_FROM_CTL: # TODO: raise CorruptPktError 

1237 _LOGGER.error(f"{msg!r} # IS_CONTROLLER (12); is CORRUPT PKT") 

1238 

1239 elif all( 

1240 ( 

1241 msg._addrs[0] is NON_DEV_ADDR, 

1242 msg._addrs[1] is NON_DEV_ADDR, 

1243 msg._addrs[2] is self.addr, 

1244 ) 

1245 ): 

1246 if self._iz_controller is None: 

1247 # _LOGGER.info(f"{msg!r} # IS_CONTROLLER (20): is TRUE") 

1248 self._iz_controller = msg 

1249 self._make_tcs_controller(msg=msg) 

1250 elif self._iz_controller is False: # TODO: raise CorruptStateError 

1251 _LOGGER.error(f"{msg!r} # IS_CONTROLLER (21): was FALSE, now True") 

1252 

1253 async def initiate_binding_process(self) -> Packet: 

1254 return await super()._initiate_binding_process( 

1255 (Code._2309, Code._30C9, Code._0008) 

1256 ) 

1257 

1258 

1259class BdrSwitch(Actuator, RelayDemand): # BDR (13): 

1260 """The BDR class, such as a BDR91. 

1261 

1262 BDR91s can be used in six distinct modes, including: 

1263 

1264 - x2 boiler controller (FC/TPI): either traditional, or newer heat pump-aware 

1265 - x1 electric heat zones (0x/ELE) 

1266 - x1 zone valve zones (0x/VAL) 

1267 - x2 DHW thingys (F9/DHW, FA/DHW) 

1268 """ 

1269 

1270 ACTIVE: Final = "active" 

1271 TPI_PARAMS: Final = "tpi_params" 

1272 

1273 _SLUG = DevType.BDR 

1274 _STATE_ATTR = "active" 

1275 

1276 # def __init__(self, *args: Any, **kwargs: Any) -> None: 

1277 # super().__init__(*args, **kwargs) 

1278 

1279 # if kwargs.get(SZ_DOMAIN_ID) == FC: # TODO: F9/FA/FC, zone_idx 

1280 # self.ctl._set_app_cntrl(self) 

1281 

1282 def _setup_discovery_cmds(self) -> None: 

1283 """Discover BDRs. 

1284 

1285 The BDRs have one of six roles: 

1286 - heater relay *or* a heat pump relay (alternative to an OTB) 

1287 - DHW hot water valve *or* DHW heating valve 

1288 - Zones: Electric relay *or* Zone valve relay 

1289 

1290 They all seem to respond thus (TODO: heat pump/zone valve relay): 

1291 - all BDR91As will (erractically) RP to these RQs 

1292 0016, 1FC9 & 0008, 1100, 3EF1 

1293 - all BDR91As will *not* RP to these RQs 

1294 0009, 10E0, 3B00, 3EF0 

1295 - a BDR91A will *periodically* send an I/3B00/00C8 if it is the heater relay 

1296 """ 

1297 

1298 super()._setup_discovery_cmds() 

1299 

1300 if self.is_faked: 

1301 return 

1302 

1303 self._add_discovery_cmd(Command.get_tpi_params(self.id), 6 * 3600) # params 

1304 self._add_discovery_cmd( 

1305 Command.from_attrs(RQ, self.id, Code._3EF1, "00"), 

1306 60 if self._child_id in (F9, FA, FC) else 300, 

1307 ) # status 

1308 

1309 @property 

1310 def active(self) -> bool | None: # 3EF0, 3EF1 

1311 """Return the actuator's current state.""" 

1312 result = self._msg_value((Code._3EF0, Code._3EF1), key=self.MODULATION_LEVEL) 

1313 return None if result is None else bool(result) 

1314 

1315 @property 

1316 def role(self) -> str | None: 

1317 """Return the role of the BDR91A (there are six possibilities).""" 

1318 

1319 # TODO: use self._parent? 

1320 if self._child_id in DOMAIN_TYPE_MAP: 

1321 return DOMAIN_TYPE_MAP[self._child_id] 

1322 elif self._parent and isinstance(self._parent, Zone): 

1323 # TODO: remove need for isinstance 

1324 return self._parent.heating_type 

1325 

1326 # if Code._3B00 in _msgs and _msgs[Code._3B00].verb == I_: 

1327 # self._is_tpi = True 

1328 # if Code._1FC9 in _msgs and _msgs[Code._1FC9].verb == RP: 

1329 # if Code._3B00 in _msgs[Code._1FC9].raw_payload: 

1330 # self._is_tpi = True 

1331 

1332 return None 

1333 

1334 @property 

1335 def tpi_params(self) -> PayDictT._10A0 | None: 

1336 return self._msg_value(Code._1100) 

1337 

1338 @property 

1339 def schema(self) -> dict[str, Any]: 

1340 return { 

1341 **super().schema, 

1342 "role": self.role, 

1343 } 

1344 

1345 @property 

1346 def params(self) -> dict[str, Any]: 

1347 return { 

1348 **super().params, 

1349 self.TPI_PARAMS: self.tpi_params, 

1350 } 

1351 

1352 @property 

1353 def status(self) -> dict[str, Any]: 

1354 return { 

1355 **super().status, 

1356 self.ACTIVE: self.active, 

1357 } 

1358 

1359 

1360class TrvActuator(BatteryState, HeatDemand, Setpoint, Temperature): # TRV (04): 

1361 """The TRV class, such as a HR92.""" 

1362 

1363 WINDOW_OPEN: Final = SZ_WINDOW_OPEN 

1364 

1365 _SLUG = DevType.TRV 

1366 _STATE_ATTR = SZ_HEAT_DEMAND 

1367 

1368 @property 

1369 def heat_demand(self) -> float | None: # 3150 

1370 if (heat_demand := super().heat_demand) is None: 

1371 if self._msg_value(Code._3150) is None and self.setpoint is False: 

1372 return 0 # instead of None (no 3150s sent when setpoint is False) 

1373 return heat_demand 

1374 

1375 @property 

1376 def window_open(self) -> bool | None: # 12B0 

1377 return self._msg_value(Code._12B0, key=self.WINDOW_OPEN) 

1378 

1379 @property 

1380 def status(self) -> dict[str, Any]: 

1381 return { 

1382 **super().status, 

1383 self.WINDOW_OPEN: self.window_open, 

1384 } 

1385 

1386 

1387class JimDevice(Actuator): # BDR (08): 

1388 _SLUG: str = DevType.JIM 

1389 _STATE_ATTR = None 

1390 

1391 

1392class JstDevice(RelayDemand): # BDR (31): 

1393 _SLUG: str = DevType.JST 

1394 _STATE_ATTR = None 

1395 

1396 

1397class UfhCircuit(Child, Entity): # FIXME 

1398 """The UFH circuit class (UFC:circuit is much like CTL/TCS:zone). 

1399 

1400 NOTE: for circuits, there's a difference between : 

1401 - `self.ctl`: the UFH controller, and 

1402 - `self.tcs.ctl`: the Evohome controller 

1403 """ 

1404 

1405 _SLUG: str = None 

1406 _STATE_ATTR = None 

1407 

1408 def __init__(self, ufc: UfhController, ufh_idx: str) -> None: 

1409 super().__init__(ufc._gwy) 

1410 

1411 # FIXME: gwy.msg_db entities must know their parent device ID and their own idx 

1412 self._z_id = ufc.id 

1413 self._z_idx = ufh_idx 

1414 

1415 self.id: str = f"{ufc.id}_{ufh_idx}" 

1416 

1417 self.ufc: UfhController = ufc 

1418 self._child_id = ufh_idx 

1419 

1420 # TODO: _ctl should be: .ufc? .ctl? 

1421 self._ctl: Controller = None 

1422 self._zone: Zone | None = None 

1423 

1424 # def __str__(self) -> str: 

1425 # return f"{self.id} ({self._zone and self._zone._child_id})" 

1426 

1427 def _update_schema(self, **kwargs: Any) -> None: 

1428 raise NotImplementedError 

1429 

1430 def _handle_msg(self, msg: Message) -> None: 

1431 super()._handle_msg(msg) 

1432 

1433 if msg.code != Code._000C or not msg.payload[SZ_DEVICES]: # zone_devices 

1434 return 

1435 

1436 # FIXME: is messy 

1437 if not (dev_ids := msg.payload[SZ_DEVICES]): 

1438 return 

1439 if len(dev_ids) != 1: 

1440 raise exc.PacketPayloadInvalid("No devices") 

1441 

1442 # ctl = self._gwy.device_by_id.get(dev_ids[0]) 

1443 ctl: Controller = self._gwy.get_device(dev_ids[0]) 

1444 if not ctl or (self._ctl and self._ctl is not ctl): 

1445 raise exc.PacketPayloadInvalid("No CTL") 

1446 self._ctl = ctl 

1447 

1448 ctl._make_tcs_controller() 

1449 # self.set_parent(ctl.tcs) 

1450 

1451 zon = ctl.tcs.get_htg_zone(msg.payload[SZ_ZONE_IDX]) 

1452 if not zon: 

1453 raise exc.PacketPayloadInvalid("No Zone") 

1454 if self._zone and self._zone is not zon: 

1455 raise exc.PacketPayloadInvalid("Wrong Zone") 

1456 self._zone = zon 

1457 

1458 if self.ufc not in self._zone.actuators: 

1459 schema = {SZ_ACTUATORS: [self.ufc.id], SZ_CIRCUITS: [self.id]} 

1460 self._zone._update_schema(**schema) 

1461 

1462 @property 

1463 def ufx_idx(self) -> str: 

1464 return self._child_id 

1465 

1466 @property 

1467 def zone_idx(self) -> str | None: 

1468 if self._zone: 

1469 return self._zone._child_id 

1470 return None 

1471 

1472 

1473# e.g. {"CTL": Controller} 

1474HEAT_CLASS_BY_SLUG: dict[str, type[DeviceHeat]] = class_by_attr(__name__, "_SLUG") 

1475 

1476_HEAT_VC_PAIR_BY_CLASS = { 

1477 DevType.DHW: ((I_, Code._1260),), 

1478 DevType.OTB: ((I_, Code._3220), (RP, Code._3220)), 

1479} 

1480 

1481 

1482def class_dev_heat( 

1483 dev_addr: Address, *, msg: Message | None = None, eavesdrop: bool = False 

1484) -> type[DeviceHeat]: 

1485 """Return a device class, but only if the device must be from the CH/DHW group. 

1486 

1487 May return a device class, DeviceHeat (which will need promotion). 

1488 """ 

1489 

1490 if dev_addr.type in DEV_TYPE_MAP.THM_DEVICES: 

1491 return HEAT_CLASS_BY_SLUG[DevType.THM] 

1492 

1493 try: 

1494 slug = DEV_TYPE_MAP.slug(dev_addr.type) 

1495 except KeyError: 

1496 pass 

1497 else: 

1498 return HEAT_CLASS_BY_SLUG[slug] 

1499 

1500 if not eavesdrop: 

1501 raise TypeError(f"No CH/DHW class for: {dev_addr} (no eavesdropping)") 

1502 

1503 if msg and msg.code in CODES_OF_HEAT_DOMAIN_ONLY: 

1504 return DeviceHeat 

1505 

1506 raise TypeError(f"No CH/DHW class for: {dev_addr} (unknown type: {dev_addr.type})")