Coverage for src/ramses_rf/system/schedule.py: 34%

237 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - Expose an 0404 schedule (is a stateful process).""" 

3 

4# TODO: use schemas from evohome_async 

5 

6from __future__ import annotations 

7 

8import asyncio 

9import logging 

10import struct 

11import zlib 

12from collections.abc import Iterable 

13from datetime import timedelta as td 

14from typing import TYPE_CHECKING, Any, Final, NotRequired, TypeAlias, TypedDict 

15 

16import voluptuous as vol # type: ignore[import, unused-ignore] 

17 

18from ramses_rf.const import ( 

19 SZ_FRAG_NUMBER, 

20 SZ_FRAGMENT, 

21 SZ_SCHEDULE, 

22 SZ_TOTAL_FRAGS, 

23 SZ_ZONE_IDX, 

24) 

25from ramses_tx.command import Command 

26from ramses_tx.const import SZ_CHANGE_COUNTER, Priority 

27from ramses_tx.message import Message 

28from ramses_tx.packet import Packet 

29 

30from ramses_rf.const import ( # noqa: F401, isort: skip, pylint: disable=unused-import 

31 I_, 

32 RP, 

33 RQ, 

34 W_, 

35 Code, 

36) 

37 

38if TYPE_CHECKING: 

39 from ramses_rf.system.zones import DhwZone, Zone 

40 

41 

42class EmptyDictT(TypedDict): 

43 pass 

44 

45 

46class SwitchPointDhw(TypedDict): 

47 time_of_day: str 

48 enabled: bool 

49 

50 

51class SwitchPointZon(TypedDict): 

52 time_of_day: str 

53 heat_setpoint: float 

54 

55 

56SwitchPointT: TypeAlias = SwitchPointDhw | SwitchPointZon 

57SwitchPointsT: TypeAlias = list[SwitchPointDhw] | list[SwitchPointZon] 

58 

59 

60class DayOfWeek(TypedDict): 

61 day_of_week: int 

62 switchpoints: SwitchPointsT 

63 

64 

65DayOfWeekT: TypeAlias = DayOfWeek 

66InnerScheduleT: TypeAlias = list[DayOfWeek] 

67 

68 

69class _OuterSchedule(TypedDict): 

70 zone_idx: str 

71 schedule: InnerScheduleT 

72 

73 

74class _EmptySchedule(TypedDict): 

75 zone_idx: str 

76 schedule: NotRequired[EmptyDictT | None] 

77 

78 

79OuterScheduleT: TypeAlias = _OuterSchedule | _EmptySchedule 

80 

81 

82_LOGGER = logging.getLogger(__name__) 

83 

84 

85FIVE_MINS = td(minutes=5) 

86 

87SZ_MSG: Final = "msg" 

88 

89SZ_DAY_OF_WEEK: Final = "day_of_week" 

90SZ_HEAT_SETPOINT: Final = "heat_setpoint" 

91SZ_SWITCHPOINTS: Final = "switchpoints" 

92SZ_TIME_OF_DAY: Final = "time_of_day" 

93SZ_ENABLED: Final = "enabled" 

94 

95REGEX_TIME_OF_DAY: Final = r"^([0-1][0-9]|2[0-3]):[0-5][05]$" 

96 

97 

98def schema_sched(schema_switchpoint: vol.Schema) -> vol.Schema: 

99 schema_sched_day = vol.Schema( 

100 { 

101 vol.Required(SZ_DAY_OF_WEEK): int, 

102 vol.Required(SZ_SWITCHPOINTS): vol.All( 

103 [schema_switchpoint], vol.Length(min=1) 

104 ), 

105 }, 

106 extra=vol.PREVENT_EXTRA, 

107 ) 

108 return vol.Schema( 

109 vol.All([schema_sched_day], vol.Length(min=0, max=7)), 

110 extra=vol.PREVENT_EXTRA, 

111 ) 

112 

113 

114SCH_SWITCHPOINT_DHW = vol.Schema( 

115 { 

116 vol.Required(SZ_TIME_OF_DAY): vol.Match(REGEX_TIME_OF_DAY), 

117 vol.Required(SZ_ENABLED): bool, 

118 }, 

119 extra=vol.PREVENT_EXTRA, 

120) 

121 

122SCH_SWITCHPOINT_ZON = vol.Schema( 

123 { 

124 vol.Required(SZ_TIME_OF_DAY): vol.Match(REGEX_TIME_OF_DAY), 

125 vol.Required(SZ_HEAT_SETPOINT): vol.All( 

126 vol.Coerce(float), vol.Range(min=5, max=35) 

127 ), 

128 }, 

129 extra=vol.PREVENT_EXTRA, 

130) 

131 

132SCH_SCHEDULE_DHW = schema_sched(SCH_SWITCHPOINT_DHW) 

133SCH_SCHEDULE_DHW_OUTER = vol.Schema( 

134 { 

135 vol.Required(SZ_ZONE_IDX): "HW", 

136 vol.Required(SZ_SCHEDULE): SCH_SCHEDULE_DHW, 

137 }, 

138 extra=vol.PREVENT_EXTRA, 

139) 

140 

141SCH_SCHEDULE_ZON = schema_sched(SCH_SWITCHPOINT_ZON) 

142SCH_SCHEDULE_ZON_OUTER = vol.Schema( 

143 { 

144 vol.Required(SZ_ZONE_IDX): vol.Match(r"0[0-F]"), 

145 vol.Required(SZ_SCHEDULE): SCH_SCHEDULE_ZON, 

146 }, 

147 extra=vol.PREVENT_EXTRA, 

148) 

149 

150SCH_FULL_SCHEDULE = vol.Schema( 

151 vol.Any(SCH_SCHEDULE_DHW_OUTER, SCH_SCHEDULE_ZON_OUTER), 

152 extra=vol.PREVENT_EXTRA, 

153) 

154 

155 

156_PayloadT: TypeAlias = dict[str, Any] # Message payload 

157_PayloadSetT: TypeAlias = list[_PayloadT | None] 

158 

159_FragmentT: TypeAlias = str 

160_FragmentSetT: TypeAlias = list[_FragmentT] 

161 

162EMPTY_PAYLOAD_SET: _PayloadSetT = [None] 

163 

164 

165# TODO: make stateful (a la binding) 

166class Schedule: # 0404 

167 """The schedule of a zone.""" 

168 

169 def __init__(self, zone: DhwZone | Zone) -> None: 

170 _LOGGER.debug("Schedule(zon=%s).__init__()", zone) 

171 

172 self.id = zone.id 

173 self._zone = zone 

174 self.idx = zone.idx 

175 

176 self.ctl = zone.ctl 

177 self.tcs = zone.tcs 

178 self._gwy = zone._gwy 

179 

180 self._full_schedule: OuterScheduleT | EmptyDictT = {} 

181 

182 self._payload_set: _PayloadSetT = EMPTY_PAYLOAD_SET # Rx'd 

183 self._fragments: _FragmentSetT = [] # to Tx 

184 

185 self._global_ver = 0 # None is a sentinel for 'dont know' 

186 self._sched_ver = 0 # the global_ver when this schedule was retrieved 

187 

188 def __str__(self) -> str: 

189 return f"{self._zone} (schedule)" 

190 

191 def _handle_msg(self, msg: Message) -> None: 

192 """Process a schedule packet: if possible, create the corresponding schedule.""" 

193 

194 if msg.code == Code._0006: # keep up, in cause is useful to know in future 

195 self._global_ver = msg.payload[SZ_CHANGE_COUNTER] 

196 return 

197 

198 if msg.code != Code._0404: 

199 return 

200 

201 # can do via here, or via gwy.async_send_cmd(cmd) 

202 # next line also in self._get_schedule(), so protected here with a lock 

203 if msg.payload[SZ_TOTAL_FRAGS] != 0xFF and self.tcs.zone_lock_idx != self.idx: 

204 self._payload_set = self._update_payload_set(self._payload_set, msg.payload) 

205 

206 async def _is_dated(self, *, force_io: bool = False) -> tuple[bool, bool]: 

207 """Indicate if it is possible that a more recent schedule is available. 

208 

209 If required, retrieve the latest global version (change counter) from the 

210 TCS. 

211 

212 There may be a false positive if another zone's schedule is changed when 

213 this zone's schedule has not. There may be a false negative if this zone's 

214 schedule was changed only very recently and a cached global version was 

215 used. 

216 

217 If `force_io`, then a true negative is guaranteed (it forces an RQ|0006 unless 

218 self._global_ver > self._sched_ver). 

219 """ 

220 

221 # this will not cause an I/O... 

222 if ( 

223 not force_io 

224 and not self._sched_ver 

225 or (self._global_ver and self._global_ver > self._sched_ver) 

226 ): 

227 return True, False # is_dated, did_io 

228 

229 # this may cause an I/O... 

230 self._global_ver, did_io = await self.tcs._schedule_version() 

231 if did_io or self._global_ver > self._sched_ver: 

232 return self._global_ver > self._sched_ver, did_io # is_dated, did_io 

233 

234 if force_io: # this will cause an I/O... 

235 self._global_ver, did_io = await self.tcs._schedule_version( 

236 force_io=force_io 

237 ) 

238 

239 return self._global_ver > self._sched_ver, did_io # is_dated, did_io 

240 

241 async def get_schedule( 

242 self, *, force_io: bool = False, timeout: float = 15 

243 ) -> InnerScheduleT | None: 

244 """Retrieve/return the brief schedule of a zone. 

245 

246 Return the cached schedule (which may have been eavesdropped) only if the 

247 global change counter has not increased. 

248 Otherwise, RQ the latest schedule from the controller and return that. 

249 

250 If `force_io`, then the latest schedule is guaranteed (it forces an RQ|0006). 

251 """ 

252 

253 try: 

254 await asyncio.wait_for( 

255 self._get_schedule(force_io=force_io), timeout=timeout 

256 ) 

257 except TimeoutError as err: 

258 raise TimeoutError( 

259 f"Failed to obtain schedule within {timeout} secs" 

260 ) from err 

261 # TODO: raise a more parochial exception 

262 return self.schedule 

263 

264 async def _get_schedule(self, *, force_io: bool = False) -> None: 

265 """Retrieve/return the schedule of a zone (sets self._full_schedule).""" 

266 

267 async def get_fragment(frag_num: int) -> _PayloadT: # may: TimeoutError? 

268 """Retrieve a schedule fragment from the controller.""" 

269 

270 frag_set_size = 0 if frag_num == 1 else _len(self._payload_set) 

271 cmd = Command.get_schedule_fragment( 

272 self.ctl.id, self.idx, frag_num, frag_set_size 

273 ) 

274 pkt: Packet = await self._gwy.async_send_cmd( 

275 cmd, wait_for_reply=True, priority=Priority.HIGH 

276 ) 

277 msg = Message(pkt) 

278 assert isinstance(msg.payload, dict) # mypy check 

279 return msg.payload # may: TimeoutError? 

280 

281 is_dated, did_io = await self._is_dated(force_io=force_io) 

282 if is_dated: 

283 self._full_schedule = {} # keep frags, maybe only other scheds have changed 

284 if self._full_schedule: 

285 return 

286 

287 await self.tcs._obtain_lock(self.idx) # maybe raise TimeOutError 

288 

289 if not did_io: # must know the version of the schedule about to be RQ'd 

290 self._global_ver, _ = await self.tcs._schedule_version(force_io=True) 

291 

292 self._payload_set[0] = None # if 1st frag valid: schedule very likely unchanged 

293 while frag_num := next( 

294 i for i, f in enumerate(self._payload_set, 1) if f is None 

295 ): 

296 fragment = await get_fragment(frag_num) 

297 # next line also in self._handle_msg(), so protected there with a lock 

298 self._payload_set = self._update_payload_set(self._payload_set, fragment) 

299 if self._full_schedule: # TODO: potential for infinite loop? 

300 self._sched_ver = self._global_ver # type: ignore[unreachable] 

301 break 

302 

303 self.tcs._release_lock() 

304 

305 def _proc_payload_set(self, payload_set: _PayloadSetT) -> OuterScheduleT | None: 

306 """Process a payload set and return the full schedule (sets `self._schedule`). 

307 

308 If the schedule is for DHW, set the `zone_idx` key to 'HW' (to avoid confusing 

309 with zone '00'). 

310 """ 

311 

312 # TODO: relying upon caller to ensure set is only empty or full 

313 

314 if payload_set == EMPTY_PAYLOAD_SET: 

315 self._full_schedule = {SZ_ZONE_IDX: self.idx} 

316 return self._full_schedule 

317 

318 try: 

319 schedule = fragz_to_full_sched( 

320 payload[SZ_FRAGMENT] for payload in payload_set if payload 

321 ) # TODO: messy - what is set not full 

322 except zlib.error: 

323 return None # TODO: raise a more parochial exception 

324 

325 if self.idx == "HW": 

326 schedule[SZ_ZONE_IDX] = "HW" 

327 self._full_schedule = schedule 

328 

329 return self._full_schedule # NOTE: not self.schedule 

330 

331 def _update_payload_set( 

332 self, payload_set: _PayloadSetT, payload: _PayloadT 

333 ) -> _PayloadSetT: 

334 """Add a fragment to a frag set and process/return the new set. 

335 

336 If the frag set is complete, check for a schedule (sets `self._schedule`). 

337 

338 If required, start a new frag set with the fragment. 

339 """ 

340 

341 def init_payload_set(payload: _PayloadT) -> _PayloadSetT: 

342 payload_set: _PayloadSetT = [None] * payload[SZ_TOTAL_FRAGS] 

343 payload_set[payload[SZ_FRAG_NUMBER] - 1] = payload 

344 return payload_set 

345 

346 if payload[SZ_TOTAL_FRAGS] is None: # zone has no schedule 

347 payload_set = EMPTY_PAYLOAD_SET 

348 self._proc_payload_set(payload_set) 

349 return payload_set 

350 

351 if payload[SZ_TOTAL_FRAGS] != _len(payload_set): # sched has changed 

352 return init_payload_set(payload) 

353 

354 payload_set[payload[SZ_FRAG_NUMBER] - 1] = payload 

355 if None in payload_set or self._proc_payload_set( 

356 payload_set 

357 ): # sets self._schedule 

358 return payload_set 

359 

360 return init_payload_set(payload) 

361 

362 async def set_schedule( 

363 self, schedule: InnerScheduleT, force_refresh: bool = False 

364 ) -> InnerScheduleT | None: 

365 """Set the schedule of a zone.""" 

366 

367 async def put_fragment(frag_num: int, frag_cnt: int, fragment: str) -> None: 

368 """Send a schedule fragment to the controller.""" 

369 

370 cmd = Command.set_schedule_fragment( 

371 self.ctl.id, self.idx, frag_num, frag_cnt, fragment 

372 ) 

373 await self._gwy.async_send_cmd( 

374 cmd, wait_for_reply=True, priority=Priority.HIGH 

375 ) 

376 

377 def normalise_validate(schedule: InnerScheduleT) -> _OuterSchedule: 

378 full_schedule: _OuterSchedule 

379 

380 if self.idx == "HW": 

381 full_schedule = {SZ_ZONE_IDX: "HW", SZ_SCHEDULE: schedule} 

382 schedule_schema = SCH_SCHEDULE_DHW_OUTER 

383 else: 

384 full_schedule = {SZ_ZONE_IDX: self.idx, SZ_SCHEDULE: schedule} 

385 schedule_schema = SCH_SCHEDULE_ZON_OUTER 

386 

387 try: 

388 full_schedule = schedule_schema(full_schedule) 

389 except vol.MultipleInvalid as err: 

390 raise TypeError(f"failed to set schedule: {err}") from err 

391 

392 if self.idx == "HW": # HACK: to avoid confusing dhw with zone '00' 

393 full_schedule[SZ_ZONE_IDX] = "00" 

394 

395 return full_schedule 

396 

397 full_schedule: _OuterSchedule = normalise_validate(schedule) 

398 self._fragments = full_sched_to_fragz(full_schedule) 

399 

400 await self.tcs._obtain_lock(self.idx) # maybe raise TimeOutError 

401 

402 try: 

403 for num, frag in enumerate(self._fragments, 1): 

404 await put_fragment(num, len(self._fragments), frag) 

405 except TimeoutError as err: 

406 raise TimeoutError(f"failed to set schedule: {err}") from err 

407 else: 

408 if not force_refresh: 

409 self._global_ver, _ = await self.tcs._schedule_version(force_io=True) 

410 # assert self._global_ver > self._sched_ver 

411 self._sched_ver = self._global_ver 

412 finally: 

413 self.tcs._release_lock() 

414 

415 if force_refresh: 

416 await self.get_schedule(force_io=True) # sets self._full_schedule 

417 else: 

418 self._full_schedule = full_schedule 

419 

420 return self.schedule 

421 

422 @property 

423 def schedule(self) -> InnerScheduleT | None: 

424 """Return the current (not full) schedule, if any.""" 

425 if not self._full_schedule: # can be {} 

426 return None 

427 result: InnerScheduleT = self._full_schedule.get(SZ_SCHEDULE) # type: ignore[assignment] 

428 return result 

429 

430 @property 

431 def version(self) -> int | None: 

432 """Return the version associated with the current schedule, if any.""" 

433 return self._sched_ver if self._full_schedule else None 

434 

435 

436# TODO: deprecate in favour of len(payload_set) 

437def _len(payload_set: _PayloadSetT) -> int: 

438 """Return the total number of fragments in the complete frag set. 

439 

440 Return 0 if the expected set size is unknown (sentinel value as per RAMSES II). 

441 

442 Uses frag_set[i][SZ_TOTAL_FRAGS] instead of `len(frag_set)` (is necessary?). 

443 """ 

444 # for frag in (f for f in payload_set if f is not None): # they will all match 

445 # assert len(payload_set) == frag[SZ_TOTAL_FRAGS] # TODO: remove 

446 # assert isinstance(frag[SZ_TOTAL_FRAGS], int) # mypy check 

447 # result: int = frag[SZ_TOTAL_FRAGS] 

448 # return result 

449 

450 # assert payload_set == EMPTY_PAYLOAD_SET # TODO: remove 

451 # return 0 # sentinel value as per RAMSES protocol 

452 return len(payload_set) 

453 

454 

455def fragz_to_full_sched(fragments: Iterable[_FragmentT]) -> _OuterSchedule: 

456 """Convert a tuple of fragments strs (a blob) into a schedule. 

457 

458 May raise a `zlib.error` exception. 

459 """ 

460 

461 def setpoint(value: int) -> dict[str, bool | float]: 

462 if value in (0, 1): 

463 return {SZ_ENABLED: bool(value)} 

464 return {SZ_HEAT_SETPOINT: value / 100} 

465 

466 raw_schedule = zlib.decompress(bytearray.fromhex("".join(fragments))) 

467 

468 old_day = 0 

469 schedule: InnerScheduleT = [] 

470 switchpoints: SwitchPointsT = [] # type: ignore[assignment, unused-ignore] 

471 

472 idx: int 

473 dow: int 

474 tod: int 

475 val: int 

476 

477 for i in range(0, len(raw_schedule), 20): 

478 idx, dow, tod, val = _struct_unpack(raw_schedule[i : i + 20]) 

479 

480 if dow > old_day: 

481 schedule.append({SZ_DAY_OF_WEEK: old_day, SZ_SWITCHPOINTS: switchpoints}) 

482 old_day, switchpoints = dow, [] # type: ignore[assignment, unused-ignore] 

483 

484 switchpoint: SwitchPointDhw | SwitchPointZon = { 

485 SZ_TIME_OF_DAY: "{:02d}:{:02d}".format(*divmod(tod, 60)) 

486 } | setpoint(val) # type: ignore[assignment] 

487 switchpoints.append(switchpoint) # type: ignore[arg-type] 

488 

489 schedule.append({SZ_DAY_OF_WEEK: old_day, SZ_SWITCHPOINTS: switchpoints}) 

490 

491 return {SZ_ZONE_IDX: f"{idx:02X}", SZ_SCHEDULE: schedule} 

492 

493 

494def full_sched_to_fragz(full_schedule: _OuterSchedule) -> list[_FragmentT]: 

495 """Convert a schedule into a set of fragments (a blob). 

496 

497 May raise `KeyError`, `zlib.error` exceptions. 

498 """ 

499 

500 cobj = zlib.compressobj(level=9, wbits=14) 

501 frags: list[bytes] = [] 

502 

503 days_of_week: InnerScheduleT = full_schedule[SZ_SCHEDULE] 

504 for week_day in days_of_week: 

505 switchpoints: SwitchPointsT = week_day[SZ_SWITCHPOINTS] 

506 for switchpoint in switchpoints: 

507 frags.append(_struct_pack(full_schedule, week_day, switchpoint)) 

508 

509 blob = (b"".join(cobj.compress(f) for f in frags) + cobj.flush()).hex().upper() 

510 

511 return [blob[i : i + 82] for i in range(0, len(blob), 82)] 

512 

513 

514def _struct_pack( 

515 full_schedule: OuterScheduleT, 

516 week_day: DayOfWeekT, 

517 switchpoint: SwitchPointDhw | SwitchPointZon, 

518) -> bytes: 

519 idx_: str = full_schedule[SZ_ZONE_IDX] 

520 dow_: int = week_day[SZ_DAY_OF_WEEK] 

521 tod_: str = switchpoint[SZ_TIME_OF_DAY] 

522 

523 idx = int(idx_, 16) 

524 dow = int(dow_) 

525 tod = int(tod_[:2]) * 60 + int(tod_[3:]) 

526 

527 if SZ_HEAT_SETPOINT in switchpoint: 

528 val = int(switchpoint[SZ_HEAT_SETPOINT] * 100) # type: ignore[typeddict-item] 

529 else: 

530 val = int(bool(switchpoint[SZ_ENABLED])) 

531 

532 return struct.pack("<xxxxBxxxBxxxHxxHxx", idx, dow, tod, val) 

533 

534 

535def _struct_unpack(raw_schedule: bytes) -> tuple[int, int, int, int]: 

536 idx, dow, tod, val, _ = struct.unpack("<xxxxBxxxBxxxHxxHH", raw_schedule) 

537 return idx, dow, tod, val 

538 

539 

540# 16:27:56.942 000 RQ --- 18:006402 01:145038 --:------ 0006 001 00 

541# 16:27:56.958 038 RP --- 01:145038 18:006402 --:------ 0006 004 00050009 

542 

543# 16:27:57.005 000 RQ --- 18:006402 01:145038 --:------ 0404 007 0120000800-0100 

544# 16:27:57.068 037 RP --- 01:145038 18:006402 --:------ 0404 048 0120000829-0103-68816DCFCB0980301045D1994C3E624916660956604596600516E1D285094112F566F5B80C072222A2 

545# 16:27:57.114 000 RQ --- 18:006402 01:145038 --:------ 0404 007 0120000800-0203 

546# 16:27:57.161 038 RP --- 01:145038 18:006402 --:------ 0404 048 0120000829-0203-52DF92C79CEA7EDA91C7F06997FDEFC620B287D6143C054FC153F01C780E3C079E03CFC033F00C3C03 

547# 16:27:57.202 000 RQ --- 18:006402 01:145038 --:------ 0404 007 0120000800-0303 

548# 16:27:57.245 038 RP --- 01:145038 18:006402 --:------ 0404 045 0120000826-0303-CF83E7C1F3E079F0CADC3E5E696BFECC944EED5BF5DEAD7AAD45F0227811BCD87937936E24CF