Coverage for tests/tests_rf/test_virt_network.py: 0%

103 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2 

3# TODO: Add assert_protocol_ready to VirtualRF factory (or in library?) 

4 

5"""Test the Virtual RF library - VirtualRF is used for testing.""" 

6 

7import asyncio 

8 

9import pytest 

10import serial # type: ignore[import-untyped] 

11 

12from ramses_rf import Address, Code, Command, Gateway 

13from ramses_tx.schemas import DeviceIdT 

14from ramses_tx.transport import PortTransport 

15from tests_rf.virtual_rf import VirtualRf, rf_factory 

16 

17# other constants 

18ASSERT_CYCLE_TIME = 0.001 # max_cycles_per_assert = max_sleep / ASSERT_CYCLE_TIME 

19DEFAULT_MAX_SLEEP = 1 

20 

21 

22GWY_CONFIG = { 

23 "config": { 

24 "disable_discovery": True, # we're testing discovery here 

25 "enforce_known_list": False, 

26 }, 

27} 

28 

29 

30SCHEMA_0 = { 

31 "orphans_hvac": ["40:000000"], 

32 "known_list": {"40:000000": {"class": "REM"}}, 

33} 

34 

35SCHEMA_1 = { 

36 "orphans_hvac": ["41:111111"], 

37 "known_list": {"41:111111": {"class": "FAN"}}, 

38} 

39 

40 

41# ###################################################################################### 

42 

43 

44async def assert_code_in_device_msgindex( 

45 gwy: Gateway, 

46 dev_id: DeviceIdT, 

47 code: Code, 

48 max_sleep: int = DEFAULT_MAX_SLEEP, 

49 test_not: bool = False, 

50) -> None: 

51 """Fail if the device doesn't exist, or if it doesn't have the code in its msg_db.""" 

52 

53 for _ in range(int(max_sleep / ASSERT_CYCLE_TIME)): 

54 await asyncio.sleep(ASSERT_CYCLE_TIME) 

55 if ( 

56 (_ := gwy.device_by_id.get(dev_id)) 

57 and gwy.msg_db 

58 and ( 

59 gwy.msg_db.contains(src=dev_id, code=str(code)) 

60 or gwy.msg_db.contains(dst=dev_id, code=str(code)) 

61 ) 

62 ) != test_not: 

63 break 

64 assert ( 

65 (_ := gwy.device_by_id.get(dev_id)) 

66 and gwy.msg_db 

67 and ( 

68 gwy.msg_db.contains(src=dev_id, code=str(code)) 

69 or gwy.msg_db.contains(dst=dev_id, code=str(code)) 

70 ) 

71 ) != test_not # TODO: fix me 

72 

73 

74async def assert_devices( 

75 gwy: Gateway, devices: list[str], max_sleep: int = DEFAULT_MAX_SLEEP 

76) -> None: 

77 """Fail if the two sets of devices are not equal.""" 

78 

79 devices = [Address(d).id for d in devices] 

80 

81 for _ in range(int(max_sleep / ASSERT_CYCLE_TIME)): 

82 await asyncio.sleep(ASSERT_CYCLE_TIME) 

83 if len(gwy.devices) == len(devices): 

84 break 

85 assert sorted(d.id for d in gwy.devices) == sorted(devices) 

86 

87 

88async def assert_this_pkt( 

89 transport: PortTransport, cmd: Command, max_sleep: int = DEFAULT_MAX_SLEEP 

90) -> None: 

91 """Check, at the transport layer, that the current packet is as expected.""" 

92 for _ in range(int(max_sleep / ASSERT_CYCLE_TIME)): 

93 await asyncio.sleep(ASSERT_CYCLE_TIME) 

94 if transport._this_pkt and transport._this_pkt._frame == cmd._frame: 

95 break 

96 assert transport._this_pkt and transport._this_pkt._frame == cmd._frame 

97 

98 

99# ### TESTS ############################################################################ 

100 

101 

102async def _test_virtual_rf_dev_disc( 

103 rf: VirtualRf, gwy_0: Gateway, gwy_1: Gateway 

104) -> None: 

105 """Check the virtual RF network behaves as expected (device discovery).""" 

106 

107 ser_2 = serial.Serial(rf.ports[2]) 

108 

109 # TEST 0: Tx of fingerprint packet with one on/one off 

110 await gwy_0.start() 

111 assert gwy_0._protocol._transport 

112 

113 await assert_devices(gwy_0, ["18:000000"]) 

114 await assert_devices(gwy_1, []) 

115 

116 await gwy_1.start() 

117 assert gwy_1._protocol._transport 

118 

119 # NOTE: will pick up gwy 18:111111, since Foreign gwy detect has been removed 

120 await assert_devices(gwy_0, ["18:000000", "18:111111"]) 

121 await assert_devices(gwy_1, ["18:111111"]) 

122 

123 # TEST 1: Tx to all from GWY /dev/pty/0 (NB: no RSSI) 

124 cmd = Command(" I --- 01:010000 --:------ 01:010000 1F09 003 0004B5") 

125 gwy_0.send_cmd(cmd) 

126 

127 await assert_devices(gwy_0, ["01:010000", "18:000000", "18:111111"]) 

128 await assert_devices(gwy_1, ["01:010000", "18:111111"]) 

129 

130 # TEST 2: Tx to all from non-GWY /dev/pty/2 (NB: no RSSI) 

131 cmd = Command(" I --- 01:011111 --:------ 01:011111 1F09 003 0004B5") 

132 ser_2.write(bytes(f"{cmd}\r\n".encode("ascii"))) 

133 

134 await assert_devices(gwy_0, ["01:010000", "01:011111", "18:000000", "18:111111"]) 

135 await assert_devices(gwy_1, ["01:010000", "01:011111", "18:111111"]) 

136 

137 # TEST 3: Rx only by *only one* GWY (NB: needs RSSI) 

138 cmd = Command(" I --- 01:022222 --:------ 01:022222 1F09 003 0004B5") 

139 list(rf._port_to_object.values())[1].write(bytes(f"000 {cmd}\r\n".encode("ascii"))) 

140 

141 await assert_devices(gwy_0, ["01:010000", "01:011111", "18:000000", "18:111111"]) 

142 await assert_devices(gwy_1, ["01:010000", "01:011111", "01:022222", "18:111111"]) 

143 

144 

145async def _test_virtual_rf_pkt_flow( 

146 rf: VirtualRf, gwy_0: Gateway, gwy_1: Gateway 

147) -> None: 

148 """Check the virtual RF network behaves as expected (packet flow).""" 

149 

150 # TEST 1: 

151 await assert_code_in_device_msgindex( 

152 gwy_0, "01:022222", Code._1F09, max_sleep=0, test_not=True 

153 ) # device won't exist 

154 

155 cmd = Command(" I --- 01:022222 --:------ 01:022222 1F09 003 0004B5") 

156 gwy_0.send_cmd(cmd, num_repeats=1) 

157 

158 await assert_devices(gwy_0, ["01:022222", "18:000000", "18:111111", "40:000000"]) 

159 await assert_code_in_device_msgindex(gwy_0, "01:022222", Code._1F09) 

160 

161 await assert_this_pkt(gwy_0._transport, cmd) 

162 await assert_this_pkt(gwy_1._transport, cmd) 

163 

164 # TEST 2: 

165 # await assert_code_in_device_msgindex( 

166 # gwy_0, "40:000000", Code._22F1, max_sleep=0, test_not=True 

167 # ) 

168 

169 cmd = Command(" I --- 40:000000 --:------ 40:000000 22F1 003 000507") 

170 gwy_0.send_cmd(cmd, num_repeats=1) 

171 

172 # await assert_code_in_device_msgindex(gwy_0, "40:000000", Code._22F1) # ?needs QoS 

173 

174 await assert_this_pkt(gwy_0._transport, cmd) 

175 await assert_this_pkt(gwy_1._transport, cmd) 

176 

177 await assert_devices(gwy_0, ["01:022222", "18:000000", "18:111111", "40:000000"]) 

178 await assert_devices(gwy_1, ["01:022222", "18:111111", "40:000000", "41:111111"]) 

179 

180 

181# NOTE: does not use factory 

182@pytest.mark.xdist_group(name="virt_serial") 

183async def test_virtual_rf_dev_disc() -> None: 

184 """Check the virtual RF network behaves as expected (device discovery).""" 

185 

186 rf = VirtualRf(3) 

187 

188 gwy_0: Gateway = None # type: ignore[assignment] 

189 gwy_1: Gateway = None # type: ignore[assignment] 

190 

191 try: 

192 rf.set_gateway(rf.ports[0], "18:000000") 

193 gwy_0 = Gateway(rf.ports[0], **GWY_CONFIG) 

194 await assert_devices(gwy_0, []) 

195 

196 rf.set_gateway(rf.ports[1], "18:111111") 

197 gwy_1 = Gateway(rf.ports[1], **GWY_CONFIG) 

198 await assert_devices(gwy_1, []) 

199 

200 await _test_virtual_rf_dev_disc(rf, gwy_0, gwy_1) 

201 

202 finally: 

203 if gwy_0: 

204 await gwy_0.stop() 

205 if gwy_1: 

206 await gwy_1.stop() 

207 await rf.stop() 

208 

209 

210# NOTE: uses factory 

211@pytest.mark.xdist_group(name="virt_serial") 

212async def test_virtual_rf_pkt_flow() -> None: 

213 """Check the virtual RF network behaves as expected (packet flow).""" 

214 

215 rf: VirtualRf = None # type: ignore[assignment] 

216 

217 gwy_0: Gateway = None # type: ignore[assignment] 

218 gwy_1: Gateway = None # type: ignore[assignment] 

219 

220 try: 

221 rf, (gwy_0, gwy_1) = await rf_factory( 

222 [GWY_CONFIG | SCHEMA_0, GWY_CONFIG | SCHEMA_1] 

223 ) 

224 

225 assert gwy_0._protocol._transport 

226 # NOTE: will pick up gwy 18:111111, since Foreign gwy detect has been removed 

227 await assert_devices(gwy_0, ["18:000000", "18:111111", "40:000000"]) 

228 

229 assert gwy_1._protocol._transport 

230 await assert_devices(gwy_1, ["18:111111", "41:111111"]) 

231 

232 await _test_virtual_rf_pkt_flow(rf, gwy_0, gwy_1) 

233 

234 finally: 

235 if rf: 

236 if gwy_0: 

237 await gwy_0.stop() 

238 if gwy_1: 

239 await gwy_1.stop() 

240 await rf.stop()