Coverage for tests/tests_rf/test_use_regex.py: 0%

70 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - Test the use_regex feature.""" 

3 

4import asyncio 

5from datetime import datetime as dt 

6 

7import pytest 

8import serial # type: ignore[import-untyped] 

9 

10from ramses_rf import Command, Gateway, Packet 

11from ramses_tx.protocol import PortProtocol 

12from ramses_tx.schemas import SZ_INBOUND, SZ_OUTBOUND, SZ_USE_REGEX 

13from ramses_tx.transport import _str 

14from tests_rf.virtual_rf import VirtualRf 

15 

16# other constants 

17ASSERT_CYCLE_TIME = 0.0005 # max_cycles_per_assert = max_sleep / ASSERT_CYCLE_TIME 

18DEFAULT_MAX_SLEEP = 0.1 

19 

20 

21RULES_INBOUND = { 

22 "63:262143": "04:262143", 

23 "(W.*) 1FC9 (...) 21": "\\g<1> 1FC9 \\g<2> 00", 

24 "--:------ --:------ 12:215819": "01:215819 --:------ 01:215819", 

25 "000C 006 02(04|08)00FFFFFF": "000C 006 02\\g<1>0013FFFF", 

26} 

27 

28RULES_OUTBOUND = { 

29 "04:262143": "63:262143", 

30 "(W.*) 1FC9 (...) 00": "\\g<1> 1FC9 \\g<2> 21", 

31 "01:215819 --:------ 01:215819": "--:------ --:------ 12:215819", 

32} 

33 

34RULES_COMBINED = {SZ_INBOUND: RULES_INBOUND, SZ_OUTBOUND: RULES_OUTBOUND} 

35 

36TESTS_OUTBOUND = { # sent, received by other 

37 " I 003 01:215819 --:------ 01:215819 0009 003 0000FF": " I 003 --:------ --:------ 12:215819 0009 003 0000FF", 

38 " I --- 04:262143 --:------ 04:262143 30C9 003 000713": " I --- 63:262143 --:------ 63:262143 30C9 003 000713", 

39 " I --- 04:262143 --:------ 01:182924 2309 003 0205DC": " I --- 63:262143 --:------ 01:182924 2309 003 0205DC", 

40 # NOTE: the below doesn't work with QoS, as expects a response pkt (would be an I) 

41 " W --- 30:098165 32:206251 --:------ 1FC9 006 0031DA797F75": " W --- 30:098165 32:206251 --:------ 1FC9 006 2131DA797F75", 

42} 

43 

44TESTS_INBOUND = { # sent by other, received 

45 " I 003 --:------ --:------ 12:215819 0009 003 0000FF": " I 003 01:215819 --:------ 01:215819 0009 003 0000FF", 

46 " I --- 63:262143 --:------ 63:262143 30C9 003 000713": " I --- 04:262143 --:------ 04:262143 30C9 003 000713", 

47 " I --- 63:262143 --:------ 01:182924 2309 003 0205DC": " I --- 04:262143 --:------ 01:182924 2309 003 0205DC", 

48 # NOTE: the below won't work with QoS - why? - still the case with refactored QoS 

49 " W --- 30:098165 32:206251 --:------ 1FC9 006 2131DA797F75": " W --- 30:098165 32:206251 --:------ 1FC9 006 0031DA797F75", 

50 "RP --- 01:182924 30:068640 --:------ 000C 006 020400FFFFFF": "RP --- 01:182924 30:068640 --:------ 000C 006 02040013FFFF", 

51 "RP --- 01:182924 30:068640 --:------ 000C 006 020800FFFFFF": "RP --- 01:182924 30:068640 --:------ 000C 006 02080013FFFF", 

52} 

53 

54GWY_CONFIG = { 

55 "config": { 

56 "disable_discovery": True, 

57 "disable_qos": False, # QoS is required for this test 

58 "enforce_known_list": False, 

59 } 

60} 

61 

62 

63# ### FIXTURES ######################################################################### 

64 

65 

66@pytest.fixture(autouse=True) 

67def patches_for_tests(monkeypatch: pytest.MonkeyPatch) -> None: 

68 monkeypatch.setattr("ramses_tx.protocol._DBG_DISABLE_IMPERSONATION_ALERTS", True) 

69 monkeypatch.setattr("ramses_tx.transport.MIN_INTER_WRITE_GAP", 0) 

70 

71 

72async def assert_this_pkt( 

73 gwy: Gateway, expected: Command, max_sleep: float = DEFAULT_MAX_SLEEP 

74) -> None: 

75 """Check, at the gateway layer, that the current packet is as expected.""" 

76 for _ in range(int(max_sleep / ASSERT_CYCLE_TIME)): 

77 await asyncio.sleep(ASSERT_CYCLE_TIME) 

78 if gwy._this_msg and gwy._this_msg._pkt._frame == expected._frame: 

79 break 

80 assert gwy._this_msg and gwy._this_msg._pkt._frame == expected._frame 

81 

82 

83# ### TESTS ############################################################################ 

84 

85 

86@pytest.mark.xdist_group(name="virt_serial") 

87async def test_regex_inbound_() -> None: 

88 """Check the inbound filters work as expected (this test works without QoS).""" 

89 

90 rf = VirtualRf(2) 

91 

92 # NOTE: the absence of reciprocal outbound tests is intentional 

93 config = GWY_CONFIG 

94 config["config"].update({SZ_USE_REGEX: {SZ_INBOUND: RULES_INBOUND}}) # type: ignore[dict-item] 

95 

96 gwy_0 = Gateway(rf.ports[0], **config) 

97 ser_1 = serial.Serial(rf.ports[1]) 

98 

99 try: 

100 await gwy_0.start() 

101 assert gwy_0._protocol._transport 

102 

103 for cmd, pkt in TESTS_INBOUND.items(): 

104 ser_1.write(bytes(cmd.encode("ascii")) + b"\r\n") 

105 

106 await assert_this_pkt(gwy_0, Command(pkt)) 

107 

108 finally: 

109 await gwy_0.stop() 

110 await rf.stop() 

111 

112 

113@pytest.mark.xdist_group(name="virt_serial") 

114async def test_regex_with_qos() -> None: 

115 """Check the in/outbound regex filters work as expected with QoS.""" 

116 

117 rf = VirtualRf(2) 

118 

119 config = GWY_CONFIG 

120 config["config"].update({SZ_USE_REGEX: RULES_COMBINED}) # type: ignore[dict-item] 

121 

122 gwy_0 = Gateway(rf.ports[0], **config) 

123 ser_1 = serial.Serial(rf.ports[1]) 

124 

125 if not isinstance(gwy_0._protocol, PortProtocol) or not gwy_0._protocol._context: 

126 await rf.stop() 

127 pytest.skip("QoS protocol not enabled") 

128 else: 

129 assert gwy_0._protocol._disable_qos is False # needed for successful tests 

130 

131 try: 

132 await gwy_0.start() 

133 assert gwy_0._protocol._transport 

134 

135 _ = ser_1.read(ser_1.in_waiting) # ser_1.flush() doesn't work? 

136 

137 for before, after in TESTS_OUTBOUND.items(): 

138 cmd = Command(before) 

139 if cmd.rx_header: # we won't be getting any replies 

140 continue 

141 

142 pkt_src = await gwy_0.async_send_cmd(cmd) # , timeout=DEFAULT_WAIT_TIMEOUT) 

143 assert str(pkt_src) == before 

144 

145 pkt_dst = Packet.from_port(dt.now(), _str(ser_1.read(ser_1.in_waiting))) 

146 assert str(pkt_dst) == after 

147 

148 for before, after in TESTS_INBOUND.items(): 

149 ser_1.write(bytes(before.encode("ascii")) + b"\r\n") 

150 await assert_this_pkt(gwy_0, Command(after)) 

151 

152 finally: 

153 await gwy_0.stop() 

154 await rf.stop()