Coverage for tests/tests_rf/test_api_schedule.py: 0%

46 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - Check get/set of zone/DHW schedules.""" 

3 

4import asyncio 

5 

6import pytest 

7 

8from ramses_rf import Gateway 

9from ramses_rf.device import Controller 

10from ramses_rf.system import Evohome, Zone 

11from ramses_rf.system.schedule import InnerScheduleT 

12from ramses_tx.address import HGI_DEVICE_ID, Address 

13from ramses_tx.protocol import PortProtocol 

14from ramses_tx.schemas import DeviceIdT 

15 

16from .conftest import _GwyConfigDictT 

17from .virtual_rf import VirtualRf 

18 

19TST_ID_ = Address("18:123456").id # the id of the test HGI80-compatible device 

20 

21 

22# ### FIXTURES ######################################################################### 

23 

24pytestmark = pytest.mark.asyncio() 

25 

26 

27@pytest.fixture() 

28def gwy_config() -> _GwyConfigDictT: 

29 return { 

30 "config": { 

31 "disable_discovery": True, 

32 "disable_qos": False, # QoS is required for this test 

33 "enforce_known_list": False, 

34 }, 

35 "known_list": {HGI_DEVICE_ID: {}}, # req'd to thwart foreign HGI blacklisting 

36 } 

37 

38 

39@pytest.fixture() 

40def gwy_dev_id() -> DeviceIdT: 

41 return TST_ID_ 

42 

43 

44####################################################################################### 

45 

46 

47@pytest.mark.xdist_group(name="virt_serial") 

48async def _test_get_schedule(gwy: Gateway, ctl_id: DeviceIdT, idx: str) -> None: 

49 """Test obtaining the version and schedule.""" 

50 

51 assert gwy._loop is asyncio.get_running_loop() # scope BUG is here 

52 assert isinstance(gwy._protocol, PortProtocol) # mypy 

53 assert gwy._protocol._disable_qos is False # QoS is required for this test 

54 

55 _: Controller = gwy.get_device(ctl_id) # type: ignore[assignment] 

56 

57 tcs: Evohome | None = gwy.tcs 

58 assert isinstance(tcs, Evohome) # mypy 

59 

60 global_ver, did_io = await tcs._schedule_version() 

61 assert isinstance(global_ver, int) and did_io 

62 

63 zon: Zone = tcs.get_htg_zone(idx) 

64 schedule: InnerScheduleT | None = await zon.get_schedule() 

65 assert schedule is not None 

66 assert len(schedule) == 7 # days of week 

67 

68 

69####################################################################################### 

70 

71# 2021-10-24T15:26:06.084723 023 RQ --- 18:013393 01:145038 --:------ 0404 007 01200008000100 

72# 2021-10-24T15:26:06.144750 046 RP --- 01:145038 18:013393 --:------ 0404 048 0120000829010368816DCCB10D80300C0551DB710C03515052310303300B73320A440111E0527D9D9C2722A252DF768E 

73# 2021-10-24T15:26:06.303681 023 RQ --- 18:013393 01:145038 --:------ 0404 007 01200008000203 

74# 2021-10-24T15:26:06.347579 046 RP --- 01:145038 18:013393 --:------ 0404 048 01200008290203A1AFFB6EFB39FAEEDD46FFDFCD59648AA729780A9E82A7E01978069E8167E025F0127809BC049E83E7 

75# 2021-10-24T15:26:06.381601 023 RQ --- 18:013393 01:145038 --:------ 0404 007 01200008000303 

76# 2021-10-24T15:26:06.417568 045 RP --- 01:145038 18:013393 --:------ 0404 038 012000081F0303E039780E5EBEFEB677A52DF66F5FAFB4F5E30578015E80178D77006F8713D1 

77 

78 

79TEST_SUITE = { 

80 r"RQ.* 18:.* 01:.* 0006 001 00": "RP --- 01:145038 18:013393 --:------ 0006 004 00050135", 

81 r"RQ.* 18:.* 01:.* 0404 007 01200008000100": "RP --- 01:145038 18:013393 --:------ 0404 048 0120000829010368816DCCB10D80300C0551DB710C03515052310303300B73320A440111E0527D9D9C2722A252DF768E", 

82 r"RQ.* 18:.* 01:.* 0404 007 01200008000203": "RP --- 01:145038 18:013393 --:------ 0404 048 01200008290203A1AFFB6EFB39FAEEDD46FFDFCD59648AA729780A9E82A7E01978069E8167E025F0127809BC049E83E7", 

83 r"RQ.* 18:.* 01:.* 0404 007 01200008000303": "RP --- 01:145038 18:013393 --:------ 0404 038 012000081F0303E039780E5EBEFEB677A52DF66F5FAFB4F5E30578015E80178D77006F8713D1", 

84} 

85 

86 

87async def test_get_schedule_fake(fake_evofw3: Gateway) -> None: 

88 """Test obtaining the schedule from a faked controller via Virtual RF.""" 

89 

90 assert fake_evofw3._transport # mypy 

91 

92 # Prime the virtual RF with the expected replies... 

93 rf: VirtualRf = fake_evofw3._transport.get_extra_info("virtual_rf") 

94 for k, v in TEST_SUITE.items(): 

95 rf.add_reply_for_cmd(k, v) 

96 

97 await _test_get_schedule(fake_evofw3, "01:145038", "01") 

98 

99 

100@pytest.mark.xdist_group(name="real_serial") 

101async def _test_get_schedule_mqtt(mqtt_evofw3: Gateway) -> None: 

102 """Test obtaining the schedule from a real controller via MQTT.""" 

103 

104 await _test_get_schedule(mqtt_evofw3, "01:145038", "01") 

105 

106 

107@pytest.mark.xdist_group(name="real_serial") 

108async def _test_get_schedule_real(real_evofw3: Gateway) -> None: 

109 """Test obtaining the schedule from a real controller via RF.""" 

110 

111 await _test_get_schedule(real_evofw3, "01:145038", "01")