Coverage for tests/tests_rf/test_use_regex.py: 0%
70 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - Test the use_regex feature."""
4import asyncio
5from datetime import datetime as dt
7import pytest
8import serial # type: ignore[import-untyped]
10from ramses_rf import Command, Gateway, Packet
11from ramses_tx.protocol import PortProtocol
12from ramses_tx.schemas import SZ_INBOUND, SZ_OUTBOUND, SZ_USE_REGEX
13from ramses_tx.transport import _str
14from tests_rf.virtual_rf import VirtualRf
16# other constants
17ASSERT_CYCLE_TIME = 0.0005 # max_cycles_per_assert = max_sleep / ASSERT_CYCLE_TIME
18DEFAULT_MAX_SLEEP = 0.1
21RULES_INBOUND = {
22 "63:262143": "04:262143",
23 "(W.*) 1FC9 (...) 21": "\\g<1> 1FC9 \\g<2> 00",
24 "--:------ --:------ 12:215819": "01:215819 --:------ 01:215819",
25 "000C 006 02(04|08)00FFFFFF": "000C 006 02\\g<1>0013FFFF",
26}
28RULES_OUTBOUND = {
29 "04:262143": "63:262143",
30 "(W.*) 1FC9 (...) 00": "\\g<1> 1FC9 \\g<2> 21",
31 "01:215819 --:------ 01:215819": "--:------ --:------ 12:215819",
32}
34RULES_COMBINED = {SZ_INBOUND: RULES_INBOUND, SZ_OUTBOUND: RULES_OUTBOUND}
36TESTS_OUTBOUND = { # sent, received by other
37 " I 003 01:215819 --:------ 01:215819 0009 003 0000FF": " I 003 --:------ --:------ 12:215819 0009 003 0000FF",
38 " I --- 04:262143 --:------ 04:262143 30C9 003 000713": " I --- 63:262143 --:------ 63:262143 30C9 003 000713",
39 " I --- 04:262143 --:------ 01:182924 2309 003 0205DC": " I --- 63:262143 --:------ 01:182924 2309 003 0205DC",
40 # NOTE: the below doesn't work with QoS, as expects a response pkt (would be an I)
41 " W --- 30:098165 32:206251 --:------ 1FC9 006 0031DA797F75": " W --- 30:098165 32:206251 --:------ 1FC9 006 2131DA797F75",
42}
44TESTS_INBOUND = { # sent by other, received
45 " I 003 --:------ --:------ 12:215819 0009 003 0000FF": " I 003 01:215819 --:------ 01:215819 0009 003 0000FF",
46 " I --- 63:262143 --:------ 63:262143 30C9 003 000713": " I --- 04:262143 --:------ 04:262143 30C9 003 000713",
47 " I --- 63:262143 --:------ 01:182924 2309 003 0205DC": " I --- 04:262143 --:------ 01:182924 2309 003 0205DC",
48 # NOTE: the below won't work with QoS - why? - still the case with refactored QoS
49 " W --- 30:098165 32:206251 --:------ 1FC9 006 2131DA797F75": " W --- 30:098165 32:206251 --:------ 1FC9 006 0031DA797F75",
50 "RP --- 01:182924 30:068640 --:------ 000C 006 020400FFFFFF": "RP --- 01:182924 30:068640 --:------ 000C 006 02040013FFFF",
51 "RP --- 01:182924 30:068640 --:------ 000C 006 020800FFFFFF": "RP --- 01:182924 30:068640 --:------ 000C 006 02080013FFFF",
52}
54GWY_CONFIG = {
55 "config": {
56 "disable_discovery": True,
57 "disable_qos": False, # QoS is required for this test
58 "enforce_known_list": False,
59 }
60}
63# ### FIXTURES #########################################################################
66@pytest.fixture(autouse=True)
67def patches_for_tests(monkeypatch: pytest.MonkeyPatch) -> None:
68 monkeypatch.setattr("ramses_tx.protocol._DBG_DISABLE_IMPERSONATION_ALERTS", True)
69 monkeypatch.setattr("ramses_tx.transport.MIN_INTER_WRITE_GAP", 0)
72async def assert_this_pkt(
73 gwy: Gateway, expected: Command, max_sleep: float = DEFAULT_MAX_SLEEP
74) -> None:
75 """Check, at the gateway layer, that the current packet is as expected."""
76 for _ in range(int(max_sleep / ASSERT_CYCLE_TIME)):
77 await asyncio.sleep(ASSERT_CYCLE_TIME)
78 if gwy._this_msg and gwy._this_msg._pkt._frame == expected._frame:
79 break
80 assert gwy._this_msg and gwy._this_msg._pkt._frame == expected._frame
83# ### TESTS ############################################################################
86@pytest.mark.xdist_group(name="virt_serial")
87async def test_regex_inbound_() -> None:
88 """Check the inbound filters work as expected (this test works without QoS)."""
90 rf = VirtualRf(2)
92 # NOTE: the absence of reciprocal outbound tests is intentional
93 config = GWY_CONFIG
94 config["config"].update({SZ_USE_REGEX: {SZ_INBOUND: RULES_INBOUND}}) # type: ignore[dict-item]
96 gwy_0 = Gateway(rf.ports[0], **config)
97 ser_1 = serial.Serial(rf.ports[1])
99 try:
100 await gwy_0.start()
101 assert gwy_0._protocol._transport
103 for cmd, pkt in TESTS_INBOUND.items():
104 ser_1.write(bytes(cmd.encode("ascii")) + b"\r\n")
106 await assert_this_pkt(gwy_0, Command(pkt))
108 finally:
109 await gwy_0.stop()
110 await rf.stop()
113@pytest.mark.xdist_group(name="virt_serial")
114async def test_regex_with_qos() -> None:
115 """Check the in/outbound regex filters work as expected with QoS."""
117 rf = VirtualRf(2)
119 config = GWY_CONFIG
120 config["config"].update({SZ_USE_REGEX: RULES_COMBINED}) # type: ignore[dict-item]
122 gwy_0 = Gateway(rf.ports[0], **config)
123 ser_1 = serial.Serial(rf.ports[1])
125 if not isinstance(gwy_0._protocol, PortProtocol) or not gwy_0._protocol._context:
126 await rf.stop()
127 pytest.skip("QoS protocol not enabled")
128 else:
129 assert gwy_0._protocol._disable_qos is False # needed for successful tests
131 try:
132 await gwy_0.start()
133 assert gwy_0._protocol._transport
135 _ = ser_1.read(ser_1.in_waiting) # ser_1.flush() doesn't work?
137 for before, after in TESTS_OUTBOUND.items():
138 cmd = Command(before)
139 if cmd.rx_header: # we won't be getting any replies
140 continue
142 pkt_src = await gwy_0.async_send_cmd(cmd) # , timeout=DEFAULT_WAIT_TIMEOUT)
143 assert str(pkt_src) == before
145 pkt_dst = Packet.from_port(dt.now(), _str(ser_1.read(ser_1.in_waiting)))
146 assert str(pkt_dst) == after
148 for before, after in TESTS_INBOUND.items():
149 ser_1.write(bytes(before.encode("ascii")) + b"\r\n")
150 await assert_this_pkt(gwy_0, Command(after))
152 finally:
153 await gwy_0.stop()
154 await rf.stop()