Coverage for tests/tests_rf/test_protocol_fsm.py: 0%

199 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - Test the binding protocol with a virtual RF 

3 

4NB: This test will likely fail with pytest -n x, because of the protocol's throttle 

5limits. 

6""" 

7 

8import asyncio 

9import random 

10from collections.abc import AsyncGenerator, Awaitable 

11from datetime import datetime as dt 

12 

13import pytest 

14import serial # type: ignore[import-untyped] 

15 

16from ramses_rf import Command, Message, Packet 

17from ramses_tx import exceptions as exc 

18from ramses_tx.protocol import PortProtocol, ReadProtocol, protocol_factory 

19from ramses_tx.protocol_fsm import ( 

20 Inactive, 

21 IsInIdle, 

22 ProtocolContext, 

23 WantEcho, 

24 WantRply, 

25 _ProtocolStateT, 

26) 

27from ramses_tx.transport import transport_factory 

28from ramses_tx.typing import QosParams 

29 

30from .virtual_rf import VirtualRf 

31 

32# patched constants 

33DEFAULT_MAX_RETRIES = 0 # # ramses_tx.protocol 

34MAX_DUTY_CYCLE = 1.0 # # ramses_tx.protocol 

35 

36# other constants 

37CALL_LATER_DELAY = 0.001 # FIXME: this is hardware-specific 

38 

39ASSERT_CYCLE_TIME = 0.0005 # max_cycles_per_assert = max_sleep / ASSERT_CYCLE_TIME 

40DEFAULT_MAX_SLEEP = 0.1 

41 

42 

43# TODO: better handling than AttributeError for this... 

44# Command("RQ --- 18:111111 01:222222 --:------ 12B0 003 07") 

45 

46II_CMD_STR_0 = " I --- 01:006056 --:------ 01:006056 1F09 003 0005C8" 

47II_CMD_0 = Command(II_CMD_STR_0) 

48II_PKT_0 = Packet(dt.now(), f"... {II_CMD_STR_0}") 

49 

50# TIP: using 18:000730 as the source will prevent impersonation alerts 

51 

52RQ_CMD_STR_0 = "RQ --- 18:000730 01:222222 --:------ 12B0 001 00" 

53RP_CMD_STR_0 = "RP --- 01:222222 18:000730 --:------ 12B0 003 000000" 

54 

55RQ_CMD_0 = Command(RQ_CMD_STR_0) 

56RQ_PKT_0 = Packet(dt.now(), f"... {RQ_CMD_STR_0}") 

57RP_PKT_0 = Packet(dt.now(), f"... {RP_CMD_STR_0}") 

58 

59RQ_CMD_STR_1 = "RQ --- 18:000730 01:222222 --:------ 12B0 001 01" 

60RP_CMD_STR_1 = "RP --- 01:222222 18:000730 --:------ 12B0 003 010000" 

61 

62RQ_CMD_1 = Command(RQ_CMD_STR_1) 

63RQ_PKT_1 = Packet(dt.now(), f"... {RQ_CMD_STR_1}") 

64RP_PKT_1 = Packet(dt.now(), f"... {RP_CMD_STR_1}") 

65 

66 

67# ### FIXTURES ######################################################################### 

68 

69 

70@pytest.fixture() 

71async def protocol(rf: VirtualRf) -> AsyncGenerator[PortProtocol, None]: 

72 def _msg_handler(msg: Message) -> None: 

73 pass 

74 

75 protocol = protocol_factory(_msg_handler) 

76 

77 # These values should be asserted as needed for subsequent tests 

78 assert isinstance(protocol, PortProtocol) # mypy 

79 assert isinstance(protocol._context, ProtocolContext) # mypy 

80 

81 protocol._disable_qos = False # HACK: needed for tests to succeed (default: None?) 

82 

83 assert protocol._context.echo_timeout == 0.5 

84 assert protocol._context.reply_timeout == 0.5 

85 assert protocol._context.SEND_TIMEOUT_LIMIT == 20.0 

86 

87 await assert_protocol_state(protocol, Inactive, max_sleep=0) 

88 

89 transport = await transport_factory(protocol, port_name=rf.ports[0], port_config={}) 

90 transport._extra["virtual_rf"] = rf # injected to aid any debugging 

91 

92 await assert_protocol_state(protocol, IsInIdle, max_sleep=0) 

93 

94 try: 

95 yield protocol 

96 

97 except serial.SerialException as err: 

98 transport._close(exc=err) 

99 raise 

100 

101 except (AssertionError, asyncio.InvalidStateError, TimeoutError): 

102 transport.close() 

103 raise 

104 

105 else: 

106 await assert_protocol_state(protocol, IsInIdle) 

107 transport.close() 

108 

109 finally: 

110 await assert_protocol_state(protocol, Inactive, max_sleep=0.1) 

111 await rf.stop() 

112 

113 

114# ###################################################################################### 

115 

116 

117async def assert_protocol_state( 

118 protocol: PortProtocol | ReadProtocol, 

119 expected_state: type[_ProtocolStateT], 

120 max_sleep: float = DEFAULT_MAX_SLEEP, 

121) -> None: 

122 assert isinstance(protocol, PortProtocol) # mypy 

123 assert isinstance(protocol._context, ProtocolContext) # mypy 

124 

125 for _ in range(int(max_sleep / ASSERT_CYCLE_TIME)): 

126 await asyncio.sleep(ASSERT_CYCLE_TIME) 

127 if isinstance(protocol._context.state, expected_state): 

128 break 

129 assert isinstance(protocol._context.state, expected_state) 

130 

131 

132def assert_protocol_state_detail( 

133 protocol: PortProtocol, cmd: Command | None, num_sends: int 

134) -> None: 

135 assert isinstance(protocol._context, ProtocolContext) # mypy 

136 

137 assert protocol._context.state.cmd_sent == cmd 

138 assert protocol._context._cmd_tx_count == num_sends 

139 assert bool(cmd) is isinstance(protocol._context.state, WantEcho | WantRply) 

140 

141 

142async def async_pkt_received( # type: ignore[no-any-unimported] 

143 protocol: PortProtocol, 

144 pkt: Packet, 

145 method: int = 0, 

146 ser: None | serial.Serial = None, 

147) -> None: 

148 # await assert_protocol_state(protocol, ProtocolState.IDLE, max_sleep=0) 

149 # assert_state_temp(protocol, None, 0) 

150 

151 if method == 0: 

152 protocol.pkt_received(pkt) 

153 return 

154 

155 if method == 1: 

156 protocol._loop.call_soon(protocol.pkt_received, pkt) 

157 return 

158 

159 assert ser is not None 

160 frame = bytes(str(pkt).encode("ascii")) + b"\r\n" 

161 

162 if method == 2: 

163 ser.write(frame) 

164 elif method == 3: 

165 protocol._loop.call_soon(ser.write, frame) 

166 else: 

167 protocol._loop.call_later(0.001, ser.write, frame) 

168 

169 # await assert_protocol_state(protocol, ProtocolState.IDLE, max_sleep=0) 

170 # assert_state_temp(protocol, None, 0) 

171 

172 

173# ### TESTS ############################################################################ 

174 

175 

176async def _test_flow_30x(protocol: PortProtocol) -> None: 

177 assert ( 

178 protocol._transport is not None 

179 ) # mypy: fixture ensures transport is connected 

180 # STEP 0: Setup... 

181 rf: VirtualRf = protocol._transport._extra["virtual_rf"] 

182 ser = serial.Serial(rf.ports[1]) 

183 

184 qos = QosParams(wait_for_reply=True) 

185 

186 # STEP 1: Send an I cmd (no reply)... 

187 task = rf._loop.create_task(protocol._send_cmd(II_CMD_0, qos=qos), name="send_1") 

188 assert await task == II_CMD_0 # no reply pkt expected 

189 

190 # STEP 2: Send an RQ cmd, then receive the corresponding RP pkt... 

191 task = rf._loop.create_task(protocol._send_cmd(RQ_CMD_0, qos=qos), name="send_2") 

192 protocol._loop.call_later( 

193 CALL_LATER_DELAY, ser.write, bytes(str(RP_PKT_0).encode("ascii")) + b"\r\n" 

194 ) 

195 assert await task == RP_PKT_0 

196 

197 # STEP 3: Send an I cmd (no reply) *twice*... 

198 task = rf._loop.create_task(protocol._send_cmd(II_CMD_0, qos=qos), name="send_3A") 

199 assert await task == II_CMD_0 # no reply pkt expected 

200 

201 task = rf._loop.create_task(protocol._send_cmd(II_CMD_0, qos=qos), name="send_3B") 

202 assert await task == II_CMD_0 # no reply pkt expected 

203 

204 # STEP 4: Send an RQ cmd, then receive the corresponding RP pkt... 

205 task = rf._loop.create_task(protocol._send_cmd(RQ_CMD_1, qos=qos), name="send_4A") 

206 # sk = rf._loop.create_task(protocol._send_cmd(RQ_CMD_1, qos=qos), name="send_4B") 

207 

208 # TODO: make these deterministic so ser replies *only after* it receives cmd 

209 protocol._loop.call_later( 

210 CALL_LATER_DELAY, ser.write, bytes(str(RP_PKT_0).encode("ascii")) + b"\r\n" 

211 ) 

212 protocol._loop.call_later( 

213 CALL_LATER_DELAY, ser.write, bytes(str(RP_PKT_1).encode("ascii")) + b"\r\n" 

214 ) 

215 

216 assert await task == RP_PKT_1 

217 

218 

219async def _test_flow_401(protocol: PortProtocol) -> None: 

220 qos = QosParams(wait_for_reply=False) 

221 

222 numbers = list(range(24)) 

223 tasks = {} 

224 

225 for i in numbers: 

226 cmd = Command.put_sensor_temp("03:123456", i) 

227 tasks[i] = protocol._loop.create_task(protocol._send_cmd(cmd, qos=qos)) 

228 

229 assert await asyncio.gather(*tasks.values()) 

230 

231 for i in numbers: 

232 pkt = tasks[i].result() 

233 assert pkt == Command.put_sensor_temp("03:123456", i) 

234 

235 

236async def _test_flow_402(protocol: PortProtocol) -> None: 

237 qos = QosParams(wait_for_reply=False) 

238 

239 numbers = list(range(24)) 

240 tasks = {} 

241 

242 for i in numbers: 

243 cmd = Command.put_sensor_temp("03:123456", i) 

244 tasks[i] = protocol._loop.create_task(protocol._send_cmd(cmd, qos=qos)) 

245 

246 random.shuffle(numbers) 

247 

248 for i in numbers: 

249 pkt = await tasks[i] 

250 assert pkt == Command.put_sensor_temp("03:123456", i) 

251 

252 

253async def _test_flow_qos_helper( 

254 send_cmd_coro: Awaitable, will_fail: bool = False 

255) -> None: 

256 try: 

257 _ = await send_cmd_coro 

258 except exc.ProtocolSendFailed: 

259 pass 

260 else: 

261 assert False, f"Had expected {exc.ProtocolSendFailed}" 

262 

263 

264async def _test_flow_60x(protocol: PortProtocol, num_cmds: int = 1) -> None: 

265 # 

266 # Setup... 

267 tasks = [] 

268 for idx in range(num_cmds): 

269 cmd = Command.get_zone_temp("01:123456", f"{idx:02X}") 

270 coro = protocol._send_cmd(cmd, qos=QosParams(wait_for_reply=False)) 

271 tasks.append(protocol._loop.create_task(coro, name=f"cmd_{idx:02X}")) 

272 

273 assert await asyncio.gather(*tasks) 

274 

275 

276async def _test_flow_qos(protocol: PortProtocol) -> None: 

277 assert isinstance(protocol._context, ProtocolContext) # mypy 

278 

279 # HACK: to reduce test time 

280 protocol._context.SEND_TIMEOUT_LIMIT = 0.01 

281 protocol._context.max_retry_limit = 0 

282 

283 # 

284 # ### Simple test for an I (does not expect any reply)... 

285 

286 cmd = Command.put_sensor_temp("03:000111", 19.5) 

287 pkt = await protocol._send_cmd(cmd) # qos == QosParams() 

288 assert pkt == cmd, "Should be echo as there's no reply to wait for" 

289 

290 cmd = Command.put_sensor_temp("03:000222", 19.5) 

291 pkt = await protocol._send_cmd(cmd, qos=None) # qos == QosParams() 

292 assert pkt == cmd, "Should be echo as there's no reply to wait for" 

293 

294 cmd = Command.put_sensor_temp("03:000333", 19.5) 

295 pkt = await protocol._send_cmd(cmd, qos=QosParams()) 

296 assert pkt == cmd, "Should be echo as there's no reply to wait for" 

297 

298 cmd = Command.put_sensor_temp("03:000444", 19.5) 

299 pkt = await protocol._send_cmd(cmd, qos=QosParams(wait_for_reply=None)) 

300 assert pkt == cmd, "should be echo as there is no wait_for_reply" 

301 

302 cmd = Command.put_sensor_temp("03:000555", 19.5) 

303 pkt = await protocol._send_cmd(cmd, qos=QosParams(wait_for_reply=False)) 

304 assert pkt == cmd, "should be echo as there is no wait_for_reply" 

305 

306 cmd = Command.put_sensor_temp("03:000666", 19.5) 

307 pkt = await protocol._send_cmd(cmd, qos=QosParams(wait_for_reply=True)) 

308 assert pkt == cmd, "Should be echo as there's no reply to wait for" 

309 

310 # # ### Simple test for an RQ (expects an RP)... 

311 

312 cmd = Command.get_system_time("01:000111") 

313 pkt = await protocol._send_cmd(cmd) 

314 assert pkt == cmd, "Should be echo as there's no reply to wait for" 

315 

316 cmd = Command.get_system_time("01:000222") 

317 pkt = await protocol._send_cmd(cmd, qos=None) 

318 assert pkt == cmd, "Should be echo as there's no reply to wait for" 

319 

320 cmd = Command.get_system_time("01:000333") 

321 pkt = await protocol._send_cmd(cmd, qos=QosParams()) 

322 assert pkt == cmd, "Should be echo as there's no reply to wait for" 

323 

324 cmd = Command.get_system_time("01:000444") 

325 pkt = await protocol._send_cmd(cmd, qos=QosParams(wait_for_reply=None)) 

326 assert pkt == cmd, "Should be echo as there is no wait_for_reply" 

327 

328 cmd = Command.get_system_time("01:000555") 

329 pkt = await protocol._send_cmd(cmd, qos=QosParams(wait_for_reply=False)) 

330 assert pkt == cmd, "Should be echo as there is no wait_for_reply" 

331 

332 cmd = Command.get_system_time("01:000666") 

333 coro = protocol._send_cmd(cmd, qos=QosParams(wait_for_reply=True, timeout=0.05)) 

334 await _test_flow_qos_helper(coro) 

335 

336 # # ### Simple test for an I (does not expect any reply)... 

337 

338 cmd = Command.put_sensor_temp("03:000999", 19.5) 

339 pkt = await protocol._send_cmd(cmd) 

340 assert pkt == cmd 

341 

342 

343# ###################################################################################### 

344 

345 

346@pytest.mark.xdist_group(name="virt_serial") 

347async def test_flow_300(protocol: PortProtocol) -> None: 

348 """Check state change of RQ/I/RQ cmds using protocol methods.""" 

349 await _test_flow_30x(protocol) 

350 

351 

352@pytest.mark.xdist_group(name="virt_serial") 

353async def test_flow_401(protocol: PortProtocol) -> None: 

354 """Throw a bunch of commands in a random order, and see that all are echo'd.""" 

355 await _test_flow_401(protocol) 

356 

357 

358@pytest.mark.xdist_group(name="virt_serial") 

359async def test_flow_402(protocol: PortProtocol) -> None: 

360 """Throw a bunch of commands in a random order, and see that all are echo'd.""" 

361 await _test_flow_402(protocol) 

362 

363 

364@pytest.mark.xdist_group(name="virt_serial") 

365async def test_flow_601(protocol: PortProtocol) -> None: 

366 """Check the wait_for_reply kwarg.""" 

367 await _test_flow_60x(protocol) 

368 

369 

370@pytest.mark.xdist_group(name="virt_serial") 

371async def test_flow_602(protocol: PortProtocol) -> None: 

372 """Check the wait_for_reply kwarg.""" 

373 await _test_flow_60x(protocol, num_cmds=2) 

374 

375 

376@pytest.mark.xdist_group(name="virt_serial") 

377async def test_flow_qos(protocol: PortProtocol) -> None: 

378 """Check the wait_for_reply kwarg.""" 

379 await _test_flow_qos(protocol) 

380 

381 

382# @pytest_asyncio.fixture 

383# async def async_benchmark(benchmark: pytest.FixtureDef) -> Callable[..., None]: 

384# event_loop = asyncio.get_running_loop() 

385 

386# def _wrapper(func: Callable, *args: Any, **kwargs: Any) -> None: 

387# if asyncio.iscoroutinefunction(func): 

388 

389# @benchmark 

390# def _(): 

391# return event_loop.run_until_complete(func(*args, **kwargs)) 

392 

393# else: 

394# benchmark(func, *args, **kwargs) 

395 

396# return _wrapper 

397 

398 

399# @pytest.mark.xdist_group(name="virt_serial") 

400# def test_benchmark_100(async_benchmark) -> None: 

401# async_benchmark(_test_flow_10x) 

402 

403 

404# @pytest.mark.xdist_group(name="virt_serial") 

405# def test_benchmark_300(async_benchmark) -> None: 

406# async_benchmark(_test_flow_30x)