Coverage for tests/tests_rf/test_binding_fsm.py: 0%
151 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
3# TODO: test addenda phase of binding handshake
4# TODO: get test working with (and without) disabled QoS
6"""RAMSES RF - Test the binding protocol with a virtual RF.
8NB: This test will likely fail with pytest-repeat (pytest -n x); maybe because of
9concurrent access to pty.openpty().
10"""
12import asyncio
13from datetime import datetime as dt
15import pytest
17from ramses_rf import Code, Command, Gateway, Message, Packet
18from ramses_rf.binding_fsm import (
19 SZ_RESPONDENT,
20 SZ_SUPPLICANT,
21 BindStateBase,
22 _BindStates,
23)
24from ramses_rf.device import Fakeable
25from ramses_tx.protocol import PortProtocol
27from .virtual_rf import rf_factory
28from .virtual_rf.helpers import ensure_fakeable
30# patched constants
31DEFAULT_MAX_RETRIES = 0 # # ramses_tx.protocol
32MAINTAIN_STATE_CHAIN = False # # ramses_tx.protocol_fsm
34# other constants
35ASSERT_CYCLE_TIME = 0.0005 # max_cycles_per_assert = max_sleep / ASSERT_CYCLE_TIME
36DEFAULT_MAX_SLEEP = 0.1
38PKT_FLOW = "packets"
40_TENDER = 0
41_ACCEPT = 1
42_AFFIRM = 2
43_RATIFY = 3
46GWY_CONFIG = {
47 "config": {
48 "disable_discovery": True,
49 "disable_qos": False, # this is required for this test
50 "enforce_known_list": True,
51 }
52}
54ITHO__ = "itho"
55NUAIRE = "nuaire"
56ORCON_ = "orcon"
58TEST_SUITE_300 = [
59 # { # THM to CTL: FIXME: affirm is I|1FC9|07, not I|1FC9|00
60 # SZ_RESPONDENT: {"01:085545": {"class": "CTL"}},
61 # SZ_SUPPLICANT: {"22:057520": {"class": "THM", "faked": True}}, # THM, not STA
62 # PKT_FLOW: (
63 # " I --- 22:057520 --:------ 22:057520 1FC9 024 00-2309-58E0B0 00-30C9-58E0B0 00-0008-58E0B0 00-1FC9-58E0B0",
64 # " W --- 01:085545 22:057520 --:------ 1FC9 006 07-2309-054E29",
65 # " I --- 22:057520 01:085545 --:------ 1FC9 006 00-2309-58E0B0",
66 # ),
67 # },
68 # #
69 { # RND to CTL
70 SZ_RESPONDENT: {"01:220768": {"class": "CTL"}},
71 SZ_SUPPLICANT: {"34:259472": {"class": "RND", "faked": True}},
72 PKT_FLOW: (
73 " I --- 34:259472 --:------ 34:259472 1FC9 024 00-2309-8BF590 00-30C9-8BF590 00-0008-8BF590 00-1FC9-8BF590",
74 " W --- 01:220768 34:259472 --:------ 1FC9 006 01-2309-075E60",
75 " I --- 34:259472 01:220768 --:------ 1FC9 006 01-2309-8BF590",
76 # I --- 34:259472 63:262142 --:------ 10E0 038 00-0001C8380F01-00-F1FF070B07E6030507E15438375246323032350000000000000000000000",
77 # I --- 34:259472 --:------ 34:259472 1060 003 00-FF01",
78 # I --- 34:259472 --:------ 34:259472 0005 012 000A0000000F000000100000",
79 # I --- 34:259472 --:------ 34:259472 000C 018 000A7FFFFFFF000F7FFFFFFF00107FFFFFFF",
80 ),
81 },
82 #
83 { # CO2 to FAN
84 SZ_RESPONDENT: { # "_note": "Spider HRU"
85 "18:126620": {"class": "FAN", "scheme": "itho"},
86 },
87 SZ_SUPPLICANT: { # "_note": "Spider CO2"
88 "37:154011": {"class": "CO2", "scheme": "itho", "faked": True}
89 },
90 PKT_FLOW: (
91 " I --- 37:154011 --:------ 37:154011 1FC9 030 00-31E0-96599B 00-1298-96599B 00-2E10-96599B 01-10E0-96599B 00-1FC9-96599B",
92 " W --- 18:126620 37:154011 --:------ 1FC9 012 00-31D9-49EE9C 00-31DA-49EE9C",
93 " I --- 37:154011 18:126620 --:------ 1FC9 001 00",
94 " I --- 37:154011 63:262142 --:------ 10E0 038 00-000100280901-01-FEFFFFFFFFFF140107E5564D532D31324333390000000000000000000000",
95 ),
96 },
97 #
98 { # REM to FAN (nuaire)
99 SZ_RESPONDENT: { # "_note": "ECO-HEAT-HC"
100 "30:098165": {"class": "FAN", "scheme": "nuaire"},
101 },
102 SZ_SUPPLICANT: { # "_note": "4-way switch",
103 "32:208628": {"class": "REM", "scheme": "nuaire", "faked": True}
104 },
105 PKT_FLOW: (
106 " I --- 32:208628 --:------ 32:208628 1FC9 018 00-22F1-832EF4 6C-10E0-832EF4 00-1FC9-832EF4",
107 " W --- 30:098165 32:208628 --:------ 1FC9 006 21-31DA-797F75",
108 " I --- 32:208628 30:098165 --:------ 1FC9 001 21",
109 " I --- 32:208628 63:262142 --:------ 10E0 030 00-0001C85A0101-6C-FFFFFFFFFFFF010607E0564D4E2D32334C4D48323300",
110 # I --- 32:208628 --:------ 32:208628 1060 003 00-FF01", # sends x3
111 ),
112 },
113 #
114 # { # REM to FAN (orcon): FIXME: tender dst is 63:262142 (not dst=src)
115 # SZ_RESPONDENT: { # "_note": "HRC-350"
116 # "32:155617": {"class": "FAN", "scheme": "orcon"},
117 # },
118 # SZ_SUPPLICANT: { # "_note": "VMN-15LF01"
119 # "29:158183": {"class": "REM", "scheme": "orcon", "faked": True}
120 # },
121 # PKT_FLOW: (
122 # " I --- 29:158183 63:262142 --:------ 1FC9 024 00-22F1-7669E7 00-22F3-7669E7 67-10E0-7669E7 00-1FC9-7669E7",
123 # " W --- 32:155617 29:158183 --:------ 1FC9 012 00-31D9-825FE1 00-31DA-825FE1",
124 # " I --- 29:158183 32:155617 --:------ 1FC9 001 00",
125 # " I --- 29:158183 63:262142 --:------ 10E0 038 00-0001C8270901-67-FFFFFFFFFFFF0D0207E3564D4E2D31354C46303100000000000000000000",
126 # # I --- 29:158183 --:------ 29:158183 1060 003 00-FF01",
127 # ),
128 # },
129 # #
130 { # DIS to FAN
131 SZ_RESPONDENT: {
132 "32:155617": {"class": "FAN", "scheme": "orcon"},
133 },
134 SZ_SUPPLICANT: {
135 "37:171871": {"class": "DIS", "faked": True} # "scheme": "orcon",
136 }, # , "scheme": "orcon"}},
137 PKT_FLOW: (
138 " I --- 37:171871 --:------ 37:171871 1FC9 024 00-22F1-969F5F 00-22F3-969F5F 67-10E0-969F5F 00-1FC9-969F5F",
139 " W --- 32:155617 37:171871 --:------ 1FC9 012 00-31D9-825FE1 00-31DA-825FE1",
140 " I --- 37:171871 32:155617 --:------ 1FC9 001 00",
141 " I --- 37:171871 63:262142 --:------ 10E0 038 00-0001C8940301-67-FFFFFFFFFFFF1B0807E4564D492D313557534A3533000000000000000000",
142 ),
143 },
144 #
145 { # DHW to CTL
146 SZ_RESPONDENT: {"01:145038": {"class": "CTL"}},
147 SZ_SUPPLICANT: {"07:045960": {"class": "DHW", "faked": True}},
148 PKT_FLOW: (
149 " I --- 07:045960 --:------ 07:045960 1FC9 012 00-1260-1CB388 00-1FC9-1CB388",
150 " W --- 01:145038 07:045960 --:------ 1FC9 006 00-10A0-06368E",
151 " I --- 07:045960 01:145038 --:------ 1FC9 006 00-1260-1CB388",
152 ), # TODO: need epilogue packets, if any (1060?)
153 },
154 #
155]
156# TEST_SUITE_300 = [TEST_SUITE_300[-2]]
158# ### FIXTURES #########################################################################
161def pytest_generate_tests(metafunc: pytest.Metafunc) -> None:
162 def id_fnc(test_set: dict) -> str:
163 r_class = list(test_set[SZ_RESPONDENT].values())[0]["class"]
164 s_class = list(test_set[SZ_SUPPLICANT].values())[0]["class"]
165 return str(s_class + " binding to " + r_class)
167 metafunc.parametrize("test_set", TEST_SUITE_300, ids=id_fnc)
170# ######################################################################################
173async def assert_context_state(
174 device: Fakeable, state: type[BindStateBase], max_sleep: float = DEFAULT_MAX_SLEEP
175) -> None:
176 assert device._bind_context
178 for _ in range(int(max_sleep / ASSERT_CYCLE_TIME)):
179 await asyncio.sleep(ASSERT_CYCLE_TIME)
180 if isinstance(device._bind_context.state, state):
181 break
182 assert isinstance(device._bind_context.state, state)
185# ### TESTS ############################################################################
188# TODO: test addenda phase of binding handshake
189async def _test_flow_10x(
190 gwy_r: Gateway, gwy_s: Gateway, pkt_flow_expected: list[str]
191) -> None:
192 """Check the change of state during a binding at context layer."""
194 # asyncio.create_task() should be OK (no need to pass in an event loop)
196 # STEP 0: Setup...
197 respondent = gwy_r.devices[0]
198 supplicant = gwy_s.devices[0]
199 ensure_fakeable(respondent)
201 assert isinstance(respondent, Fakeable) # mypy
202 assert isinstance(supplicant, Fakeable) # mypy
204 await assert_context_state(respondent, _BindStates.IS_IDLE_DEVICE)
205 await assert_context_state(supplicant, _BindStates.IS_IDLE_DEVICE)
207 assert respondent._bind_context
208 assert supplicant._bind_context
210 assert not respondent._bind_context.is_binding
211 assert not supplicant._bind_context.is_binding
213 #
214 # Step R0: Respondent initial state
215 respondent._bind_context.set_state(_BindStates.NEEDING_TENDER)
216 await assert_context_state(respondent, _BindStates.NEEDING_TENDER)
217 assert respondent._bind_context.is_binding
219 #
220 # Step S0: Supplicant initial state
221 supplicant._bind_context.set_state(_BindStates.NEEDING_ACCEPT) # type: ignore[unreachable]
222 await assert_context_state(supplicant, _BindStates.NEEDING_ACCEPT)
223 assert supplicant._bind_context.is_binding
225 #
226 # Step R1: Respondent expects an Offer
227 resp_task = asyncio.create_task(respondent._bind_context._wait_for_offer())
229 #
230 # Step S1: Supplicant sends an Offer (makes Offer) and expects an Accept
231 msg = Message(Packet(dt.now(), "000 " + pkt_flow_expected[_TENDER]))
232 codes = [b[1] for b in msg.payload["bindings"] if b[1] != Code._1FC9]
234 pkt = await supplicant._bind_context._make_offer(codes)
235 await assert_context_state(supplicant, _BindStates.NEEDING_ACCEPT)
236 assert pkt is not None
238 await resp_task
239 await assert_context_state(respondent, _BindStates.NEEDING_AFFIRM)
241 if not isinstance(gwy_r._protocol, PortProtocol) or not gwy_r._protocol._context:
242 assert False, "QoS protocol not enabled" # use assert, not skip
244 tender = resp_task.result()
245 assert tender._pkt == pkt, "Resp's Msg doesn't match Supp's Offer cmd"
247 supp_task = asyncio.create_task(supplicant._bind_context._wait_for_accept(tender))
249 #
250 # Step R2: Respondent expects a Confirm after sending an Accept (accepts Offer)
251 msg = Message(Packet(dt.now(), "000 " + pkt_flow_expected[_ACCEPT]))
252 codes = [b[1] for b in msg.payload["bindings"]]
254 pkt = await respondent._bind_context._accept_offer(tender, codes)
255 await assert_context_state(respondent, _BindStates.NEEDING_AFFIRM)
256 assert pkt is not None
258 await supp_task
259 await assert_context_state(supplicant, _BindStates.TO_SEND_AFFIRM)
261 accept = supp_task.result()
262 assert accept._pkt == pkt, "Supp's Msg doesn't match Resp's Accept cmd"
264 resp_task = asyncio.create_task(respondent._bind_context._wait_for_confirm(accept))
266 #
267 # Step S2: Supplicant sends a Confirm (confirms Accept)
268 msg = Message(Packet(dt.now(), "000 " + pkt_flow_expected[_AFFIRM]))
269 codes = [b[1] for b in msg.payload["bindings"] if len(b) > 1]
271 pkt = await supplicant._bind_context._confirm_accept(accept, confirm_code=codes)
272 await assert_context_state(supplicant, _BindStates.HAS_BOUND_SUPP)
273 assert pkt is not None
275 if len(pkt_flow_expected) > _RATIFY: # FIXME
276 supplicant._bind_context.set_state(
277 _BindStates.TO_SEND_RATIFY
278 ) # HACK: easiest way
280 await resp_task
281 await assert_context_state(respondent, _BindStates.HAS_BOUND_RESP)
283 if len(pkt_flow_expected) > _RATIFY: # FIXME
284 respondent._bind_context.set_state(
285 _BindStates.NEEDING_RATIFY
286 ) # HACK: easiest way
288 affirm = resp_task.result()
289 assert affirm._pkt == pkt, "Resp's Msg doesn't match Supp's Confirm cmd"
291 #
292 # Some bindings don't include an Addenda...
293 if len(pkt_flow_expected) <= _RATIFY: # i.e. no addenda FIXME
294 return
296 await assert_context_state(respondent, _BindStates.NEEDING_RATIFY)
297 await assert_context_state(supplicant, _BindStates.TO_SEND_RATIFY)
299 # # Step R3: Respondent expects an Addenda (optional)
300 # resp_task = asyncio.create_task(
301 # respondent._context._wait_for_addenda(accept, timeout=0.05)
302 # )
304 # # Step S3: Supplicant sends an Addenda (optional)
305 # msg = Message(Packet(dt.now(), "000 " + pkt_flow_expected[_RATIFY]))
306 # old code:
307 # supplicant._msgz[msg.code] = {msg.verb: {msg._pkt._ctx: msg}}
308 # now: only supplicant ?! explains a lot of failures
309 # gwy_r.msg_db.add(msg) # (for supplicant only?) >> remove from respondent/filter while adding
310 # pkt = await supplicant._context._cast_addenda()
311 # await assert_context_state(supplicant, _BindStates.HAS_BOUND_SUPP)
312 # assert pkt is not None
314 # await assert_context_state(respondent, _BindStates.HAS_BOUND_RESP)
315 # await resp_task
317 # ratify = resp_task.result()
318 # assert ratify._pkt == pkt, "Resp's Msg doesn't match Supp's Addenda cmd"
321# TODO: test addenda phase of binding handshake
322async def _test_flow_20x(
323 gwy_r: Gateway, gwy_s: Gateway, pkt_flow_expected: list[str]
324) -> None:
325 """Check the change of state during a binding at device layer."""
327 # STEP 0: Setup...
328 respondent = gwy_r.devices[0]
329 supplicant = gwy_s.devices[0]
330 ensure_fakeable(respondent)
332 assert isinstance(respondent, Fakeable) # mypy
333 assert isinstance(supplicant, Fakeable) # mypy
335 assert respondent.id == pkt_flow_expected[_ACCEPT][7:16], "bad test suite config"
336 assert supplicant.id == pkt_flow_expected[_TENDER][7:16], "bad test suite config"
338 # Step R1: Respondent expects an Offer
339 payload = pkt_flow_expected[_ACCEPT][46:]
340 accept_codes = [payload[i : i + 4] for i in range(2, len(payload), 12)]
342 idx = payload[:2]
343 require_ratify = len(pkt_flow_expected) > _RATIFY
345 resp_coro = respondent._wait_for_binding_request(
346 accept_codes, idx=idx, require_ratify=require_ratify
347 )
348 resp_task = asyncio.create_task(resp_coro)
350 # Step S1: Supplicant sends an Offer (makes Offer) and expects an Accept
351 payload = pkt_flow_expected[_TENDER][46:]
352 offer_codes = [payload[i : i + 4] for i in range(2, len(payload), 12)]
353 offer_codes = [c for c in offer_codes if c != Code._1FC9]
355 confirm_code = pkt_flow_expected[_AFFIRM][48:52] or None
356 if len(pkt_flow_expected) > _RATIFY:
357 ratify_cmd = Command(pkt_flow_expected[_RATIFY])
358 else:
359 ratify_cmd = None
361 supp_coro = supplicant._initiate_binding_process(
362 offer_codes, confirm_code=confirm_code, ratify_cmd=ratify_cmd
363 )
364 supp_task = asyncio.create_task(supp_coro)
366 # Step 2: Wait until flow is completed (or timeout)
367 await asyncio.gather(resp_task, supp_task)
369 resp_flow = resp_task.result()
370 supp_flow = supp_task.result()
372 for i in range(len(pkt_flow_expected)):
373 assert resp_flow[i] == supp_flow[i]
374 assert resp_flow[i] == Command(pkt_flow_expected[i])
376 assert str(resp_flow[i]) == str(supp_flow[i])
377 assert str(resp_flow[i]) == pkt_flow_expected[i]
380# TODO: binding working without QoS # @patch("ramses_tx.protocol._DBG_DISABLE_QOS", True)
381@pytest.mark.xdist_group(name="virt_serial")
382async def test_flow_100(test_set: dict[str, dict]) -> None:
383 """Check packet flow / state change of a binding at context layer."""
385 config = {}
386 for role in (SZ_RESPONDENT, SZ_SUPPLICANT):
387 devices = [d for d in test_set.values() if isinstance(d, dict)]
388 config[role] = GWY_CONFIG | {
389 "known_list": {k: v for d in devices for k, v in d.items()},
390 "orphans_hvac": list(test_set[role]), # TODO: used by Heat domain too!
391 }
393 pkt_flow = [
394 x[:46] + x[46:].replace(" ", "").replace("-", "")
395 for x in test_set.get(PKT_FLOW, [])
396 ]
398 # can't use fixture for this, as new schema required for every test
399 rf, gwys = await rf_factory([config[SZ_RESPONDENT], config[SZ_SUPPLICANT]])
401 try:
402 await _test_flow_10x(gwys[0], gwys[1], pkt_flow)
403 finally:
404 for gwy in gwys:
405 await gwy.stop()
406 await rf.stop()
409# TODO: binding working without QoS # @patch("ramses_tx.protocol._DBG_DISABLE_QOS", True)
410@pytest.mark.xdist_group(name="virt_serial")
411async def test_flow_200(test_set: dict[str, dict]) -> None:
412 """Check packet flow / state change of a binding at device layer."""
414 config = {}
415 for role in (SZ_RESPONDENT, SZ_SUPPLICANT):
416 devices = [d for d in test_set.values() if isinstance(d, dict)]
417 config[role] = GWY_CONFIG | {
418 "known_list": {k: v for d in devices for k, v in d.items()},
419 "orphans_hvac": list(test_set[role]), # TODO: used by Heat domain too!
420 }
422 pkt_flow = [
423 x[:46] + x[46:].replace(" ", "").replace("-", "")
424 for x in test_set.get(PKT_FLOW, [])
425 ]
427 # can't use fixture for this, as new schema required for every test
428 rf, gwys = await rf_factory(
429 [config[SZ_RESPONDENT], config[SZ_SUPPLICANT]]
430 ) # can pop orphans_hvac
432 try:
433 await _test_flow_20x(gwys[0], gwys[1], pkt_flow)
434 finally:
435 for gwy in gwys:
436 await gwy.stop()
437 await rf.stop()