Coverage for tests/tests_rf/test_virt_network.py: 0%
103 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
3# TODO: Add assert_protocol_ready to VirtualRF factory (or in library?)
5"""Test the Virtual RF library - VirtualRF is used for testing."""
7import asyncio
9import pytest
10import serial # type: ignore[import-untyped]
12from ramses_rf import Address, Code, Command, Gateway
13from ramses_tx.schemas import DeviceIdT
14from ramses_tx.transport import PortTransport
15from tests_rf.virtual_rf import VirtualRf, rf_factory
17# other constants
18ASSERT_CYCLE_TIME = 0.001 # max_cycles_per_assert = max_sleep / ASSERT_CYCLE_TIME
19DEFAULT_MAX_SLEEP = 1
22GWY_CONFIG = {
23 "config": {
24 "disable_discovery": True, # we're testing discovery here
25 "enforce_known_list": False,
26 },
27}
30SCHEMA_0 = {
31 "orphans_hvac": ["40:000000"],
32 "known_list": {"40:000000": {"class": "REM"}},
33}
35SCHEMA_1 = {
36 "orphans_hvac": ["41:111111"],
37 "known_list": {"41:111111": {"class": "FAN"}},
38}
41# ######################################################################################
44async def assert_code_in_device_msgindex(
45 gwy: Gateway,
46 dev_id: DeviceIdT,
47 code: Code,
48 max_sleep: int = DEFAULT_MAX_SLEEP,
49 test_not: bool = False,
50) -> None:
51 """Fail if the device doesn't exist, or if it doesn't have the code in its msg_db."""
53 for _ in range(int(max_sleep / ASSERT_CYCLE_TIME)):
54 await asyncio.sleep(ASSERT_CYCLE_TIME)
55 if (
56 (_ := gwy.device_by_id.get(dev_id))
57 and gwy.msg_db
58 and (
59 gwy.msg_db.contains(src=dev_id, code=str(code))
60 or gwy.msg_db.contains(dst=dev_id, code=str(code))
61 )
62 ) != test_not:
63 break
64 assert (
65 (_ := gwy.device_by_id.get(dev_id))
66 and gwy.msg_db
67 and (
68 gwy.msg_db.contains(src=dev_id, code=str(code))
69 or gwy.msg_db.contains(dst=dev_id, code=str(code))
70 )
71 ) != test_not # TODO: fix me
74async def assert_devices(
75 gwy: Gateway, devices: list[str], max_sleep: int = DEFAULT_MAX_SLEEP
76) -> None:
77 """Fail if the two sets of devices are not equal."""
79 devices = [Address(d).id for d in devices]
81 for _ in range(int(max_sleep / ASSERT_CYCLE_TIME)):
82 await asyncio.sleep(ASSERT_CYCLE_TIME)
83 if len(gwy.devices) == len(devices):
84 break
85 assert sorted(d.id for d in gwy.devices) == sorted(devices)
88async def assert_this_pkt(
89 transport: PortTransport, cmd: Command, max_sleep: int = DEFAULT_MAX_SLEEP
90) -> None:
91 """Check, at the transport layer, that the current packet is as expected."""
92 for _ in range(int(max_sleep / ASSERT_CYCLE_TIME)):
93 await asyncio.sleep(ASSERT_CYCLE_TIME)
94 if transport._this_pkt and transport._this_pkt._frame == cmd._frame:
95 break
96 assert transport._this_pkt and transport._this_pkt._frame == cmd._frame
99# ### TESTS ############################################################################
102async def _test_virtual_rf_dev_disc(
103 rf: VirtualRf, gwy_0: Gateway, gwy_1: Gateway
104) -> None:
105 """Check the virtual RF network behaves as expected (device discovery)."""
107 ser_2 = serial.Serial(rf.ports[2])
109 # TEST 0: Tx of fingerprint packet with one on/one off
110 await gwy_0.start()
111 assert gwy_0._protocol._transport
113 await assert_devices(gwy_0, ["18:000000"])
114 await assert_devices(gwy_1, [])
116 await gwy_1.start()
117 assert gwy_1._protocol._transport
119 # NOTE: will pick up gwy 18:111111, since Foreign gwy detect has been removed
120 await assert_devices(gwy_0, ["18:000000", "18:111111"])
121 await assert_devices(gwy_1, ["18:111111"])
123 # TEST 1: Tx to all from GWY /dev/pty/0 (NB: no RSSI)
124 cmd = Command(" I --- 01:010000 --:------ 01:010000 1F09 003 0004B5")
125 gwy_0.send_cmd(cmd)
127 await assert_devices(gwy_0, ["01:010000", "18:000000", "18:111111"])
128 await assert_devices(gwy_1, ["01:010000", "18:111111"])
130 # TEST 2: Tx to all from non-GWY /dev/pty/2 (NB: no RSSI)
131 cmd = Command(" I --- 01:011111 --:------ 01:011111 1F09 003 0004B5")
132 ser_2.write(bytes(f"{cmd}\r\n".encode("ascii")))
134 await assert_devices(gwy_0, ["01:010000", "01:011111", "18:000000", "18:111111"])
135 await assert_devices(gwy_1, ["01:010000", "01:011111", "18:111111"])
137 # TEST 3: Rx only by *only one* GWY (NB: needs RSSI)
138 cmd = Command(" I --- 01:022222 --:------ 01:022222 1F09 003 0004B5")
139 list(rf._port_to_object.values())[1].write(bytes(f"000 {cmd}\r\n".encode("ascii")))
141 await assert_devices(gwy_0, ["01:010000", "01:011111", "18:000000", "18:111111"])
142 await assert_devices(gwy_1, ["01:010000", "01:011111", "01:022222", "18:111111"])
145async def _test_virtual_rf_pkt_flow(
146 rf: VirtualRf, gwy_0: Gateway, gwy_1: Gateway
147) -> None:
148 """Check the virtual RF network behaves as expected (packet flow)."""
150 # TEST 1:
151 await assert_code_in_device_msgindex(
152 gwy_0, "01:022222", Code._1F09, max_sleep=0, test_not=True
153 ) # device won't exist
155 cmd = Command(" I --- 01:022222 --:------ 01:022222 1F09 003 0004B5")
156 gwy_0.send_cmd(cmd, num_repeats=1)
158 await assert_devices(gwy_0, ["01:022222", "18:000000", "18:111111", "40:000000"])
159 await assert_code_in_device_msgindex(gwy_0, "01:022222", Code._1F09)
161 await assert_this_pkt(gwy_0._transport, cmd)
162 await assert_this_pkt(gwy_1._transport, cmd)
164 # TEST 2:
165 # await assert_code_in_device_msgindex(
166 # gwy_0, "40:000000", Code._22F1, max_sleep=0, test_not=True
167 # )
169 cmd = Command(" I --- 40:000000 --:------ 40:000000 22F1 003 000507")
170 gwy_0.send_cmd(cmd, num_repeats=1)
172 # await assert_code_in_device_msgindex(gwy_0, "40:000000", Code._22F1) # ?needs QoS
174 await assert_this_pkt(gwy_0._transport, cmd)
175 await assert_this_pkt(gwy_1._transport, cmd)
177 await assert_devices(gwy_0, ["01:022222", "18:000000", "18:111111", "40:000000"])
178 await assert_devices(gwy_1, ["01:022222", "18:111111", "40:000000", "41:111111"])
181# NOTE: does not use factory
182@pytest.mark.xdist_group(name="virt_serial")
183async def test_virtual_rf_dev_disc() -> None:
184 """Check the virtual RF network behaves as expected (device discovery)."""
186 rf = VirtualRf(3)
188 gwy_0: Gateway = None # type: ignore[assignment]
189 gwy_1: Gateway = None # type: ignore[assignment]
191 try:
192 rf.set_gateway(rf.ports[0], "18:000000")
193 gwy_0 = Gateway(rf.ports[0], **GWY_CONFIG)
194 await assert_devices(gwy_0, [])
196 rf.set_gateway(rf.ports[1], "18:111111")
197 gwy_1 = Gateway(rf.ports[1], **GWY_CONFIG)
198 await assert_devices(gwy_1, [])
200 await _test_virtual_rf_dev_disc(rf, gwy_0, gwy_1)
202 finally:
203 if gwy_0:
204 await gwy_0.stop()
205 if gwy_1:
206 await gwy_1.stop()
207 await rf.stop()
210# NOTE: uses factory
211@pytest.mark.xdist_group(name="virt_serial")
212async def test_virtual_rf_pkt_flow() -> None:
213 """Check the virtual RF network behaves as expected (packet flow)."""
215 rf: VirtualRf = None # type: ignore[assignment]
217 gwy_0: Gateway = None # type: ignore[assignment]
218 gwy_1: Gateway = None # type: ignore[assignment]
220 try:
221 rf, (gwy_0, gwy_1) = await rf_factory(
222 [GWY_CONFIG | SCHEMA_0, GWY_CONFIG | SCHEMA_1]
223 )
225 assert gwy_0._protocol._transport
226 # NOTE: will pick up gwy 18:111111, since Foreign gwy detect has been removed
227 await assert_devices(gwy_0, ["18:000000", "18:111111", "40:000000"])
229 assert gwy_1._protocol._transport
230 await assert_devices(gwy_1, ["18:111111", "41:111111"])
232 await _test_virtual_rf_pkt_flow(rf, gwy_0, gwy_1)
234 finally:
235 if rf:
236 if gwy_0:
237 await gwy_0.stop()
238 if gwy_1:
239 await gwy_1.stop()
240 await rf.stop()