Coverage for tests/tests_rf/test_protocol_fsm.py: 0%
199 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - Test the binding protocol with a virtual RF
4NB: This test will likely fail with pytest -n x, because of the protocol's throttle
5limits.
6"""
8import asyncio
9import random
10from collections.abc import AsyncGenerator, Awaitable
11from datetime import datetime as dt
13import pytest
14import serial # type: ignore[import-untyped]
16from ramses_rf import Command, Message, Packet
17from ramses_tx import exceptions as exc
18from ramses_tx.protocol import PortProtocol, ReadProtocol, protocol_factory
19from ramses_tx.protocol_fsm import (
20 Inactive,
21 IsInIdle,
22 ProtocolContext,
23 WantEcho,
24 WantRply,
25 _ProtocolStateT,
26)
27from ramses_tx.transport import transport_factory
28from ramses_tx.typing import QosParams
30from .virtual_rf import VirtualRf
32# patched constants
33DEFAULT_MAX_RETRIES = 0 # # ramses_tx.protocol
34MAX_DUTY_CYCLE = 1.0 # # ramses_tx.protocol
36# other constants
37CALL_LATER_DELAY = 0.001 # FIXME: this is hardware-specific
39ASSERT_CYCLE_TIME = 0.0005 # max_cycles_per_assert = max_sleep / ASSERT_CYCLE_TIME
40DEFAULT_MAX_SLEEP = 0.1
43# TODO: better handling than AttributeError for this...
44# Command("RQ --- 18:111111 01:222222 --:------ 12B0 003 07")
46II_CMD_STR_0 = " I --- 01:006056 --:------ 01:006056 1F09 003 0005C8"
47II_CMD_0 = Command(II_CMD_STR_0)
48II_PKT_0 = Packet(dt.now(), f"... {II_CMD_STR_0}")
50# TIP: using 18:000730 as the source will prevent impersonation alerts
52RQ_CMD_STR_0 = "RQ --- 18:000730 01:222222 --:------ 12B0 001 00"
53RP_CMD_STR_0 = "RP --- 01:222222 18:000730 --:------ 12B0 003 000000"
55RQ_CMD_0 = Command(RQ_CMD_STR_0)
56RQ_PKT_0 = Packet(dt.now(), f"... {RQ_CMD_STR_0}")
57RP_PKT_0 = Packet(dt.now(), f"... {RP_CMD_STR_0}")
59RQ_CMD_STR_1 = "RQ --- 18:000730 01:222222 --:------ 12B0 001 01"
60RP_CMD_STR_1 = "RP --- 01:222222 18:000730 --:------ 12B0 003 010000"
62RQ_CMD_1 = Command(RQ_CMD_STR_1)
63RQ_PKT_1 = Packet(dt.now(), f"... {RQ_CMD_STR_1}")
64RP_PKT_1 = Packet(dt.now(), f"... {RP_CMD_STR_1}")
67# ### FIXTURES #########################################################################
70@pytest.fixture()
71async def protocol(rf: VirtualRf) -> AsyncGenerator[PortProtocol, None]:
72 def _msg_handler(msg: Message) -> None:
73 pass
75 protocol = protocol_factory(_msg_handler)
77 # These values should be asserted as needed for subsequent tests
78 assert isinstance(protocol, PortProtocol) # mypy
79 assert isinstance(protocol._context, ProtocolContext) # mypy
81 protocol._disable_qos = False # HACK: needed for tests to succeed (default: None?)
83 assert protocol._context.echo_timeout == 0.5
84 assert protocol._context.reply_timeout == 0.5
85 assert protocol._context.SEND_TIMEOUT_LIMIT == 20.0
87 await assert_protocol_state(protocol, Inactive, max_sleep=0)
89 transport = await transport_factory(protocol, port_name=rf.ports[0], port_config={})
90 transport._extra["virtual_rf"] = rf # injected to aid any debugging
92 await assert_protocol_state(protocol, IsInIdle, max_sleep=0)
94 try:
95 yield protocol
97 except serial.SerialException as err:
98 transport._close(exc=err)
99 raise
101 except (AssertionError, asyncio.InvalidStateError, TimeoutError):
102 transport.close()
103 raise
105 else:
106 await assert_protocol_state(protocol, IsInIdle)
107 transport.close()
109 finally:
110 await assert_protocol_state(protocol, Inactive, max_sleep=0.1)
111 await rf.stop()
114# ######################################################################################
117async def assert_protocol_state(
118 protocol: PortProtocol | ReadProtocol,
119 expected_state: type[_ProtocolStateT],
120 max_sleep: float = DEFAULT_MAX_SLEEP,
121) -> None:
122 assert isinstance(protocol, PortProtocol) # mypy
123 assert isinstance(protocol._context, ProtocolContext) # mypy
125 for _ in range(int(max_sleep / ASSERT_CYCLE_TIME)):
126 await asyncio.sleep(ASSERT_CYCLE_TIME)
127 if isinstance(protocol._context.state, expected_state):
128 break
129 assert isinstance(protocol._context.state, expected_state)
132def assert_protocol_state_detail(
133 protocol: PortProtocol, cmd: Command | None, num_sends: int
134) -> None:
135 assert isinstance(protocol._context, ProtocolContext) # mypy
137 assert protocol._context.state.cmd_sent == cmd
138 assert protocol._context._cmd_tx_count == num_sends
139 assert bool(cmd) is isinstance(protocol._context.state, WantEcho | WantRply)
142async def async_pkt_received( # type: ignore[no-any-unimported]
143 protocol: PortProtocol,
144 pkt: Packet,
145 method: int = 0,
146 ser: None | serial.Serial = None,
147) -> None:
148 # await assert_protocol_state(protocol, ProtocolState.IDLE, max_sleep=0)
149 # assert_state_temp(protocol, None, 0)
151 if method == 0:
152 protocol.pkt_received(pkt)
153 return
155 if method == 1:
156 protocol._loop.call_soon(protocol.pkt_received, pkt)
157 return
159 assert ser is not None
160 frame = bytes(str(pkt).encode("ascii")) + b"\r\n"
162 if method == 2:
163 ser.write(frame)
164 elif method == 3:
165 protocol._loop.call_soon(ser.write, frame)
166 else:
167 protocol._loop.call_later(0.001, ser.write, frame)
169 # await assert_protocol_state(protocol, ProtocolState.IDLE, max_sleep=0)
170 # assert_state_temp(protocol, None, 0)
173# ### TESTS ############################################################################
176async def _test_flow_30x(protocol: PortProtocol) -> None:
177 assert (
178 protocol._transport is not None
179 ) # mypy: fixture ensures transport is connected
180 # STEP 0: Setup...
181 rf: VirtualRf = protocol._transport._extra["virtual_rf"]
182 ser = serial.Serial(rf.ports[1])
184 qos = QosParams(wait_for_reply=True)
186 # STEP 1: Send an I cmd (no reply)...
187 task = rf._loop.create_task(protocol._send_cmd(II_CMD_0, qos=qos), name="send_1")
188 assert await task == II_CMD_0 # no reply pkt expected
190 # STEP 2: Send an RQ cmd, then receive the corresponding RP pkt...
191 task = rf._loop.create_task(protocol._send_cmd(RQ_CMD_0, qos=qos), name="send_2")
192 protocol._loop.call_later(
193 CALL_LATER_DELAY, ser.write, bytes(str(RP_PKT_0).encode("ascii")) + b"\r\n"
194 )
195 assert await task == RP_PKT_0
197 # STEP 3: Send an I cmd (no reply) *twice*...
198 task = rf._loop.create_task(protocol._send_cmd(II_CMD_0, qos=qos), name="send_3A")
199 assert await task == II_CMD_0 # no reply pkt expected
201 task = rf._loop.create_task(protocol._send_cmd(II_CMD_0, qos=qos), name="send_3B")
202 assert await task == II_CMD_0 # no reply pkt expected
204 # STEP 4: Send an RQ cmd, then receive the corresponding RP pkt...
205 task = rf._loop.create_task(protocol._send_cmd(RQ_CMD_1, qos=qos), name="send_4A")
206 # sk = rf._loop.create_task(protocol._send_cmd(RQ_CMD_1, qos=qos), name="send_4B")
208 # TODO: make these deterministic so ser replies *only after* it receives cmd
209 protocol._loop.call_later(
210 CALL_LATER_DELAY, ser.write, bytes(str(RP_PKT_0).encode("ascii")) + b"\r\n"
211 )
212 protocol._loop.call_later(
213 CALL_LATER_DELAY, ser.write, bytes(str(RP_PKT_1).encode("ascii")) + b"\r\n"
214 )
216 assert await task == RP_PKT_1
219async def _test_flow_401(protocol: PortProtocol) -> None:
220 qos = QosParams(wait_for_reply=False)
222 numbers = list(range(24))
223 tasks = {}
225 for i in numbers:
226 cmd = Command.put_sensor_temp("03:123456", i)
227 tasks[i] = protocol._loop.create_task(protocol._send_cmd(cmd, qos=qos))
229 assert await asyncio.gather(*tasks.values())
231 for i in numbers:
232 pkt = tasks[i].result()
233 assert pkt == Command.put_sensor_temp("03:123456", i)
236async def _test_flow_402(protocol: PortProtocol) -> None:
237 qos = QosParams(wait_for_reply=False)
239 numbers = list(range(24))
240 tasks = {}
242 for i in numbers:
243 cmd = Command.put_sensor_temp("03:123456", i)
244 tasks[i] = protocol._loop.create_task(protocol._send_cmd(cmd, qos=qos))
246 random.shuffle(numbers)
248 for i in numbers:
249 pkt = await tasks[i]
250 assert pkt == Command.put_sensor_temp("03:123456", i)
253async def _test_flow_qos_helper(
254 send_cmd_coro: Awaitable, will_fail: bool = False
255) -> None:
256 try:
257 _ = await send_cmd_coro
258 except exc.ProtocolSendFailed:
259 pass
260 else:
261 assert False, f"Had expected {exc.ProtocolSendFailed}"
264async def _test_flow_60x(protocol: PortProtocol, num_cmds: int = 1) -> None:
265 #
266 # Setup...
267 tasks = []
268 for idx in range(num_cmds):
269 cmd = Command.get_zone_temp("01:123456", f"{idx:02X}")
270 coro = protocol._send_cmd(cmd, qos=QosParams(wait_for_reply=False))
271 tasks.append(protocol._loop.create_task(coro, name=f"cmd_{idx:02X}"))
273 assert await asyncio.gather(*tasks)
276async def _test_flow_qos(protocol: PortProtocol) -> None:
277 assert isinstance(protocol._context, ProtocolContext) # mypy
279 # HACK: to reduce test time
280 protocol._context.SEND_TIMEOUT_LIMIT = 0.01
281 protocol._context.max_retry_limit = 0
283 #
284 # ### Simple test for an I (does not expect any reply)...
286 cmd = Command.put_sensor_temp("03:000111", 19.5)
287 pkt = await protocol._send_cmd(cmd) # qos == QosParams()
288 assert pkt == cmd, "Should be echo as there's no reply to wait for"
290 cmd = Command.put_sensor_temp("03:000222", 19.5)
291 pkt = await protocol._send_cmd(cmd, qos=None) # qos == QosParams()
292 assert pkt == cmd, "Should be echo as there's no reply to wait for"
294 cmd = Command.put_sensor_temp("03:000333", 19.5)
295 pkt = await protocol._send_cmd(cmd, qos=QosParams())
296 assert pkt == cmd, "Should be echo as there's no reply to wait for"
298 cmd = Command.put_sensor_temp("03:000444", 19.5)
299 pkt = await protocol._send_cmd(cmd, qos=QosParams(wait_for_reply=None))
300 assert pkt == cmd, "should be echo as there is no wait_for_reply"
302 cmd = Command.put_sensor_temp("03:000555", 19.5)
303 pkt = await protocol._send_cmd(cmd, qos=QosParams(wait_for_reply=False))
304 assert pkt == cmd, "should be echo as there is no wait_for_reply"
306 cmd = Command.put_sensor_temp("03:000666", 19.5)
307 pkt = await protocol._send_cmd(cmd, qos=QosParams(wait_for_reply=True))
308 assert pkt == cmd, "Should be echo as there's no reply to wait for"
310 # # ### Simple test for an RQ (expects an RP)...
312 cmd = Command.get_system_time("01:000111")
313 pkt = await protocol._send_cmd(cmd)
314 assert pkt == cmd, "Should be echo as there's no reply to wait for"
316 cmd = Command.get_system_time("01:000222")
317 pkt = await protocol._send_cmd(cmd, qos=None)
318 assert pkt == cmd, "Should be echo as there's no reply to wait for"
320 cmd = Command.get_system_time("01:000333")
321 pkt = await protocol._send_cmd(cmd, qos=QosParams())
322 assert pkt == cmd, "Should be echo as there's no reply to wait for"
324 cmd = Command.get_system_time("01:000444")
325 pkt = await protocol._send_cmd(cmd, qos=QosParams(wait_for_reply=None))
326 assert pkt == cmd, "Should be echo as there is no wait_for_reply"
328 cmd = Command.get_system_time("01:000555")
329 pkt = await protocol._send_cmd(cmd, qos=QosParams(wait_for_reply=False))
330 assert pkt == cmd, "Should be echo as there is no wait_for_reply"
332 cmd = Command.get_system_time("01:000666")
333 coro = protocol._send_cmd(cmd, qos=QosParams(wait_for_reply=True, timeout=0.05))
334 await _test_flow_qos_helper(coro)
336 # # ### Simple test for an I (does not expect any reply)...
338 cmd = Command.put_sensor_temp("03:000999", 19.5)
339 pkt = await protocol._send_cmd(cmd)
340 assert pkt == cmd
343# ######################################################################################
346@pytest.mark.xdist_group(name="virt_serial")
347async def test_flow_300(protocol: PortProtocol) -> None:
348 """Check state change of RQ/I/RQ cmds using protocol methods."""
349 await _test_flow_30x(protocol)
352@pytest.mark.xdist_group(name="virt_serial")
353async def test_flow_401(protocol: PortProtocol) -> None:
354 """Throw a bunch of commands in a random order, and see that all are echo'd."""
355 await _test_flow_401(protocol)
358@pytest.mark.xdist_group(name="virt_serial")
359async def test_flow_402(protocol: PortProtocol) -> None:
360 """Throw a bunch of commands in a random order, and see that all are echo'd."""
361 await _test_flow_402(protocol)
364@pytest.mark.xdist_group(name="virt_serial")
365async def test_flow_601(protocol: PortProtocol) -> None:
366 """Check the wait_for_reply kwarg."""
367 await _test_flow_60x(protocol)
370@pytest.mark.xdist_group(name="virt_serial")
371async def test_flow_602(protocol: PortProtocol) -> None:
372 """Check the wait_for_reply kwarg."""
373 await _test_flow_60x(protocol, num_cmds=2)
376@pytest.mark.xdist_group(name="virt_serial")
377async def test_flow_qos(protocol: PortProtocol) -> None:
378 """Check the wait_for_reply kwarg."""
379 await _test_flow_qos(protocol)
382# @pytest_asyncio.fixture
383# async def async_benchmark(benchmark: pytest.FixtureDef) -> Callable[..., None]:
384# event_loop = asyncio.get_running_loop()
386# def _wrapper(func: Callable, *args: Any, **kwargs: Any) -> None:
387# if asyncio.iscoroutinefunction(func):
389# @benchmark
390# def _():
391# return event_loop.run_until_complete(func(*args, **kwargs))
393# else:
394# benchmark(func, *args, **kwargs)
396# return _wrapper
399# @pytest.mark.xdist_group(name="virt_serial")
400# def test_benchmark_100(async_benchmark) -> None:
401# async_benchmark(_test_flow_10x)
404# @pytest.mark.xdist_group(name="virt_serial")
405# def test_benchmark_300(async_benchmark) -> None:
406# async_benchmark(_test_flow_30x)