Coverage for tests/tests_rf/test_api_schedule.py: 0%
46 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - Check get/set of zone/DHW schedules."""
4import asyncio
6import pytest
8from ramses_rf import Gateway
9from ramses_rf.device import Controller
10from ramses_rf.system import Evohome, Zone
11from ramses_rf.system.schedule import InnerScheduleT
12from ramses_tx.address import HGI_DEVICE_ID, Address
13from ramses_tx.protocol import PortProtocol
14from ramses_tx.schemas import DeviceIdT
16from .conftest import _GwyConfigDictT
17from .virtual_rf import VirtualRf
19TST_ID_ = Address("18:123456").id # the id of the test HGI80-compatible device
22# ### FIXTURES #########################################################################
24pytestmark = pytest.mark.asyncio()
27@pytest.fixture()
28def gwy_config() -> _GwyConfigDictT:
29 return {
30 "config": {
31 "disable_discovery": True,
32 "disable_qos": False, # QoS is required for this test
33 "enforce_known_list": False,
34 },
35 "known_list": {HGI_DEVICE_ID: {}}, # req'd to thwart foreign HGI blacklisting
36 }
39@pytest.fixture()
40def gwy_dev_id() -> DeviceIdT:
41 return TST_ID_
44#######################################################################################
47@pytest.mark.xdist_group(name="virt_serial")
48async def _test_get_schedule(gwy: Gateway, ctl_id: DeviceIdT, idx: str) -> None:
49 """Test obtaining the version and schedule."""
51 assert gwy._loop is asyncio.get_running_loop() # scope BUG is here
52 assert isinstance(gwy._protocol, PortProtocol) # mypy
53 assert gwy._protocol._disable_qos is False # QoS is required for this test
55 _: Controller = gwy.get_device(ctl_id) # type: ignore[assignment]
57 tcs: Evohome | None = gwy.tcs
58 assert isinstance(tcs, Evohome) # mypy
60 global_ver, did_io = await tcs._schedule_version()
61 assert isinstance(global_ver, int) and did_io
63 zon: Zone = tcs.get_htg_zone(idx)
64 schedule: InnerScheduleT | None = await zon.get_schedule()
65 assert schedule is not None
66 assert len(schedule) == 7 # days of week
69#######################################################################################
71# 2021-10-24T15:26:06.084723 023 RQ --- 18:013393 01:145038 --:------ 0404 007 01200008000100
72# 2021-10-24T15:26:06.144750 046 RP --- 01:145038 18:013393 --:------ 0404 048 0120000829010368816DCCB10D80300C0551DB710C03515052310303300B73320A440111E0527D9D9C2722A252DF768E
73# 2021-10-24T15:26:06.303681 023 RQ --- 18:013393 01:145038 --:------ 0404 007 01200008000203
74# 2021-10-24T15:26:06.347579 046 RP --- 01:145038 18:013393 --:------ 0404 048 01200008290203A1AFFB6EFB39FAEEDD46FFDFCD59648AA729780A9E82A7E01978069E8167E025F0127809BC049E83E7
75# 2021-10-24T15:26:06.381601 023 RQ --- 18:013393 01:145038 --:------ 0404 007 01200008000303
76# 2021-10-24T15:26:06.417568 045 RP --- 01:145038 18:013393 --:------ 0404 038 012000081F0303E039780E5EBEFEB677A52DF66F5FAFB4F5E30578015E80178D77006F8713D1
79TEST_SUITE = {
80 r"RQ.* 18:.* 01:.* 0006 001 00": "RP --- 01:145038 18:013393 --:------ 0006 004 00050135",
81 r"RQ.* 18:.* 01:.* 0404 007 01200008000100": "RP --- 01:145038 18:013393 --:------ 0404 048 0120000829010368816DCCB10D80300C0551DB710C03515052310303300B73320A440111E0527D9D9C2722A252DF768E",
82 r"RQ.* 18:.* 01:.* 0404 007 01200008000203": "RP --- 01:145038 18:013393 --:------ 0404 048 01200008290203A1AFFB6EFB39FAEEDD46FFDFCD59648AA729780A9E82A7E01978069E8167E025F0127809BC049E83E7",
83 r"RQ.* 18:.* 01:.* 0404 007 01200008000303": "RP --- 01:145038 18:013393 --:------ 0404 038 012000081F0303E039780E5EBEFEB677A52DF66F5FAFB4F5E30578015E80178D77006F8713D1",
84}
87async def test_get_schedule_fake(fake_evofw3: Gateway) -> None:
88 """Test obtaining the schedule from a faked controller via Virtual RF."""
90 assert fake_evofw3._transport # mypy
92 # Prime the virtual RF with the expected replies...
93 rf: VirtualRf = fake_evofw3._transport.get_extra_info("virtual_rf")
94 for k, v in TEST_SUITE.items():
95 rf.add_reply_for_cmd(k, v)
97 await _test_get_schedule(fake_evofw3, "01:145038", "01")
100@pytest.mark.xdist_group(name="real_serial")
101async def _test_get_schedule_mqtt(mqtt_evofw3: Gateway) -> None:
102 """Test obtaining the schedule from a real controller via MQTT."""
104 await _test_get_schedule(mqtt_evofw3, "01:145038", "01")
107@pytest.mark.xdist_group(name="real_serial")
108async def _test_get_schedule_real(real_evofw3: Gateway) -> None:
109 """Test obtaining the schedule from a real controller via RF."""
111 await _test_get_schedule(real_evofw3, "01:145038", "01")