Coverage for tests/tests_rf/test_binding_fsm.py: 0%

151 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2 

3# TODO: test addenda phase of binding handshake 

4# TODO: get test working with (and without) disabled QoS 

5 

6"""RAMSES RF - Test the binding protocol with a virtual RF. 

7 

8NB: This test will likely fail with pytest-repeat (pytest -n x); maybe because of 

9concurrent access to pty.openpty(). 

10""" 

11 

12import asyncio 

13from datetime import datetime as dt 

14 

15import pytest 

16 

17from ramses_rf import Code, Command, Gateway, Message, Packet 

18from ramses_rf.binding_fsm import ( 

19 SZ_RESPONDENT, 

20 SZ_SUPPLICANT, 

21 BindStateBase, 

22 _BindStates, 

23) 

24from ramses_rf.device import Fakeable 

25from ramses_tx.protocol import PortProtocol 

26 

27from .virtual_rf import rf_factory 

28from .virtual_rf.helpers import ensure_fakeable 

29 

30# patched constants 

31DEFAULT_MAX_RETRIES = 0 # # ramses_tx.protocol 

32MAINTAIN_STATE_CHAIN = False # # ramses_tx.protocol_fsm 

33 

34# other constants 

35ASSERT_CYCLE_TIME = 0.0005 # max_cycles_per_assert = max_sleep / ASSERT_CYCLE_TIME 

36DEFAULT_MAX_SLEEP = 0.1 

37 

38PKT_FLOW = "packets" 

39 

40_TENDER = 0 

41_ACCEPT = 1 

42_AFFIRM = 2 

43_RATIFY = 3 

44 

45 

46GWY_CONFIG = { 

47 "config": { 

48 "disable_discovery": True, 

49 "disable_qos": False, # this is required for this test 

50 "enforce_known_list": True, 

51 } 

52} 

53 

54ITHO__ = "itho" 

55NUAIRE = "nuaire" 

56ORCON_ = "orcon" 

57 

58TEST_SUITE_300 = [ 

59 # { # THM to CTL: FIXME: affirm is I|1FC9|07, not I|1FC9|00 

60 # SZ_RESPONDENT: {"01:085545": {"class": "CTL"}}, 

61 # SZ_SUPPLICANT: {"22:057520": {"class": "THM", "faked": True}}, # THM, not STA 

62 # PKT_FLOW: ( 

63 # " I --- 22:057520 --:------ 22:057520 1FC9 024 00-2309-58E0B0 00-30C9-58E0B0 00-0008-58E0B0 00-1FC9-58E0B0", 

64 # " W --- 01:085545 22:057520 --:------ 1FC9 006 07-2309-054E29", 

65 # " I --- 22:057520 01:085545 --:------ 1FC9 006 00-2309-58E0B0", 

66 # ), 

67 # }, 

68 # # 

69 { # RND to CTL 

70 SZ_RESPONDENT: {"01:220768": {"class": "CTL"}}, 

71 SZ_SUPPLICANT: {"34:259472": {"class": "RND", "faked": True}}, 

72 PKT_FLOW: ( 

73 " I --- 34:259472 --:------ 34:259472 1FC9 024 00-2309-8BF590 00-30C9-8BF590 00-0008-8BF590 00-1FC9-8BF590", 

74 " W --- 01:220768 34:259472 --:------ 1FC9 006 01-2309-075E60", 

75 " I --- 34:259472 01:220768 --:------ 1FC9 006 01-2309-8BF590", 

76 # I --- 34:259472 63:262142 --:------ 10E0 038 00-0001C8380F01-00-F1FF070B07E6030507E15438375246323032350000000000000000000000", 

77 # I --- 34:259472 --:------ 34:259472 1060 003 00-FF01", 

78 # I --- 34:259472 --:------ 34:259472 0005 012 000A0000000F000000100000", 

79 # I --- 34:259472 --:------ 34:259472 000C 018 000A7FFFFFFF000F7FFFFFFF00107FFFFFFF", 

80 ), 

81 }, 

82 # 

83 { # CO2 to FAN 

84 SZ_RESPONDENT: { # "_note": "Spider HRU" 

85 "18:126620": {"class": "FAN", "scheme": "itho"}, 

86 }, 

87 SZ_SUPPLICANT: { # "_note": "Spider CO2" 

88 "37:154011": {"class": "CO2", "scheme": "itho", "faked": True} 

89 }, 

90 PKT_FLOW: ( 

91 " I --- 37:154011 --:------ 37:154011 1FC9 030 00-31E0-96599B 00-1298-96599B 00-2E10-96599B 01-10E0-96599B 00-1FC9-96599B", 

92 " W --- 18:126620 37:154011 --:------ 1FC9 012 00-31D9-49EE9C 00-31DA-49EE9C", 

93 " I --- 37:154011 18:126620 --:------ 1FC9 001 00", 

94 " I --- 37:154011 63:262142 --:------ 10E0 038 00-000100280901-01-FEFFFFFFFFFF140107E5564D532D31324333390000000000000000000000", 

95 ), 

96 }, 

97 # 

98 { # REM to FAN (nuaire) 

99 SZ_RESPONDENT: { # "_note": "ECO-HEAT-HC" 

100 "30:098165": {"class": "FAN", "scheme": "nuaire"}, 

101 }, 

102 SZ_SUPPLICANT: { # "_note": "4-way switch", 

103 "32:208628": {"class": "REM", "scheme": "nuaire", "faked": True} 

104 }, 

105 PKT_FLOW: ( 

106 " I --- 32:208628 --:------ 32:208628 1FC9 018 00-22F1-832EF4 6C-10E0-832EF4 00-1FC9-832EF4", 

107 " W --- 30:098165 32:208628 --:------ 1FC9 006 21-31DA-797F75", 

108 " I --- 32:208628 30:098165 --:------ 1FC9 001 21", 

109 " I --- 32:208628 63:262142 --:------ 10E0 030 00-0001C85A0101-6C-FFFFFFFFFFFF010607E0564D4E2D32334C4D48323300", 

110 # I --- 32:208628 --:------ 32:208628 1060 003 00-FF01", # sends x3 

111 ), 

112 }, 

113 # 

114 # { # REM to FAN (orcon): FIXME: tender dst is 63:262142 (not dst=src) 

115 # SZ_RESPONDENT: { # "_note": "HRC-350" 

116 # "32:155617": {"class": "FAN", "scheme": "orcon"}, 

117 # }, 

118 # SZ_SUPPLICANT: { # "_note": "VMN-15LF01" 

119 # "29:158183": {"class": "REM", "scheme": "orcon", "faked": True} 

120 # }, 

121 # PKT_FLOW: ( 

122 # " I --- 29:158183 63:262142 --:------ 1FC9 024 00-22F1-7669E7 00-22F3-7669E7 67-10E0-7669E7 00-1FC9-7669E7", 

123 # " W --- 32:155617 29:158183 --:------ 1FC9 012 00-31D9-825FE1 00-31DA-825FE1", 

124 # " I --- 29:158183 32:155617 --:------ 1FC9 001 00", 

125 # " I --- 29:158183 63:262142 --:------ 10E0 038 00-0001C8270901-67-FFFFFFFFFFFF0D0207E3564D4E2D31354C46303100000000000000000000", 

126 # # I --- 29:158183 --:------ 29:158183 1060 003 00-FF01", 

127 # ), 

128 # }, 

129 # # 

130 { # DIS to FAN 

131 SZ_RESPONDENT: { 

132 "32:155617": {"class": "FAN", "scheme": "orcon"}, 

133 }, 

134 SZ_SUPPLICANT: { 

135 "37:171871": {"class": "DIS", "faked": True} # "scheme": "orcon", 

136 }, # , "scheme": "orcon"}}, 

137 PKT_FLOW: ( 

138 " I --- 37:171871 --:------ 37:171871 1FC9 024 00-22F1-969F5F 00-22F3-969F5F 67-10E0-969F5F 00-1FC9-969F5F", 

139 " W --- 32:155617 37:171871 --:------ 1FC9 012 00-31D9-825FE1 00-31DA-825FE1", 

140 " I --- 37:171871 32:155617 --:------ 1FC9 001 00", 

141 " I --- 37:171871 63:262142 --:------ 10E0 038 00-0001C8940301-67-FFFFFFFFFFFF1B0807E4564D492D313557534A3533000000000000000000", 

142 ), 

143 }, 

144 # 

145 { # DHW to CTL 

146 SZ_RESPONDENT: {"01:145038": {"class": "CTL"}}, 

147 SZ_SUPPLICANT: {"07:045960": {"class": "DHW", "faked": True}}, 

148 PKT_FLOW: ( 

149 " I --- 07:045960 --:------ 07:045960 1FC9 012 00-1260-1CB388 00-1FC9-1CB388", 

150 " W --- 01:145038 07:045960 --:------ 1FC9 006 00-10A0-06368E", 

151 " I --- 07:045960 01:145038 --:------ 1FC9 006 00-1260-1CB388", 

152 ), # TODO: need epilogue packets, if any (1060?) 

153 }, 

154 # 

155] 

156# TEST_SUITE_300 = [TEST_SUITE_300[-2]] 

157 

158# ### FIXTURES ######################################################################### 

159 

160 

161def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: 

162 def id_fnc(test_set: dict) -> str: 

163 r_class = list(test_set[SZ_RESPONDENT].values())[0]["class"] 

164 s_class = list(test_set[SZ_SUPPLICANT].values())[0]["class"] 

165 return str(s_class + " binding to " + r_class) 

166 

167 metafunc.parametrize("test_set", TEST_SUITE_300, ids=id_fnc) 

168 

169 

170# ###################################################################################### 

171 

172 

173async def assert_context_state( 

174 device: Fakeable, state: type[BindStateBase], max_sleep: float = DEFAULT_MAX_SLEEP 

175) -> None: 

176 assert device._bind_context 

177 

178 for _ in range(int(max_sleep / ASSERT_CYCLE_TIME)): 

179 await asyncio.sleep(ASSERT_CYCLE_TIME) 

180 if isinstance(device._bind_context.state, state): 

181 break 

182 assert isinstance(device._bind_context.state, state) 

183 

184 

185# ### TESTS ############################################################################ 

186 

187 

188# TODO: test addenda phase of binding handshake 

189async def _test_flow_10x( 

190 gwy_r: Gateway, gwy_s: Gateway, pkt_flow_expected: list[str] 

191) -> None: 

192 """Check the change of state during a binding at context layer.""" 

193 

194 # asyncio.create_task() should be OK (no need to pass in an event loop) 

195 

196 # STEP 0: Setup... 

197 respondent = gwy_r.devices[0] 

198 supplicant = gwy_s.devices[0] 

199 ensure_fakeable(respondent) 

200 

201 assert isinstance(respondent, Fakeable) # mypy 

202 assert isinstance(supplicant, Fakeable) # mypy 

203 

204 await assert_context_state(respondent, _BindStates.IS_IDLE_DEVICE) 

205 await assert_context_state(supplicant, _BindStates.IS_IDLE_DEVICE) 

206 

207 assert respondent._bind_context 

208 assert supplicant._bind_context 

209 

210 assert not respondent._bind_context.is_binding 

211 assert not supplicant._bind_context.is_binding 

212 

213 # 

214 # Step R0: Respondent initial state 

215 respondent._bind_context.set_state(_BindStates.NEEDING_TENDER) 

216 await assert_context_state(respondent, _BindStates.NEEDING_TENDER) 

217 assert respondent._bind_context.is_binding 

218 

219 # 

220 # Step S0: Supplicant initial state 

221 supplicant._bind_context.set_state(_BindStates.NEEDING_ACCEPT) # type: ignore[unreachable] 

222 await assert_context_state(supplicant, _BindStates.NEEDING_ACCEPT) 

223 assert supplicant._bind_context.is_binding 

224 

225 # 

226 # Step R1: Respondent expects an Offer 

227 resp_task = asyncio.create_task(respondent._bind_context._wait_for_offer()) 

228 

229 # 

230 # Step S1: Supplicant sends an Offer (makes Offer) and expects an Accept 

231 msg = Message(Packet(dt.now(), "000 " + pkt_flow_expected[_TENDER])) 

232 codes = [b[1] for b in msg.payload["bindings"] if b[1] != Code._1FC9] 

233 

234 pkt = await supplicant._bind_context._make_offer(codes) 

235 await assert_context_state(supplicant, _BindStates.NEEDING_ACCEPT) 

236 assert pkt is not None 

237 

238 await resp_task 

239 await assert_context_state(respondent, _BindStates.NEEDING_AFFIRM) 

240 

241 if not isinstance(gwy_r._protocol, PortProtocol) or not gwy_r._protocol._context: 

242 assert False, "QoS protocol not enabled" # use assert, not skip 

243 

244 tender = resp_task.result() 

245 assert tender._pkt == pkt, "Resp's Msg doesn't match Supp's Offer cmd" 

246 

247 supp_task = asyncio.create_task(supplicant._bind_context._wait_for_accept(tender)) 

248 

249 # 

250 # Step R2: Respondent expects a Confirm after sending an Accept (accepts Offer) 

251 msg = Message(Packet(dt.now(), "000 " + pkt_flow_expected[_ACCEPT])) 

252 codes = [b[1] for b in msg.payload["bindings"]] 

253 

254 pkt = await respondent._bind_context._accept_offer(tender, codes) 

255 await assert_context_state(respondent, _BindStates.NEEDING_AFFIRM) 

256 assert pkt is not None 

257 

258 await supp_task 

259 await assert_context_state(supplicant, _BindStates.TO_SEND_AFFIRM) 

260 

261 accept = supp_task.result() 

262 assert accept._pkt == pkt, "Supp's Msg doesn't match Resp's Accept cmd" 

263 

264 resp_task = asyncio.create_task(respondent._bind_context._wait_for_confirm(accept)) 

265 

266 # 

267 # Step S2: Supplicant sends a Confirm (confirms Accept) 

268 msg = Message(Packet(dt.now(), "000 " + pkt_flow_expected[_AFFIRM])) 

269 codes = [b[1] for b in msg.payload["bindings"] if len(b) > 1] 

270 

271 pkt = await supplicant._bind_context._confirm_accept(accept, confirm_code=codes) 

272 await assert_context_state(supplicant, _BindStates.HAS_BOUND_SUPP) 

273 assert pkt is not None 

274 

275 if len(pkt_flow_expected) > _RATIFY: # FIXME 

276 supplicant._bind_context.set_state( 

277 _BindStates.TO_SEND_RATIFY 

278 ) # HACK: easiest way 

279 

280 await resp_task 

281 await assert_context_state(respondent, _BindStates.HAS_BOUND_RESP) 

282 

283 if len(pkt_flow_expected) > _RATIFY: # FIXME 

284 respondent._bind_context.set_state( 

285 _BindStates.NEEDING_RATIFY 

286 ) # HACK: easiest way 

287 

288 affirm = resp_task.result() 

289 assert affirm._pkt == pkt, "Resp's Msg doesn't match Supp's Confirm cmd" 

290 

291 # 

292 # Some bindings don't include an Addenda... 

293 if len(pkt_flow_expected) <= _RATIFY: # i.e. no addenda FIXME 

294 return 

295 

296 await assert_context_state(respondent, _BindStates.NEEDING_RATIFY) 

297 await assert_context_state(supplicant, _BindStates.TO_SEND_RATIFY) 

298 

299 # # Step R3: Respondent expects an Addenda (optional) 

300 # resp_task = asyncio.create_task( 

301 # respondent._context._wait_for_addenda(accept, timeout=0.05) 

302 # ) 

303 

304 # # Step S3: Supplicant sends an Addenda (optional) 

305 # msg = Message(Packet(dt.now(), "000 " + pkt_flow_expected[_RATIFY])) 

306 # old code: 

307 # supplicant._msgz[msg.code] = {msg.verb: {msg._pkt._ctx: msg}} 

308 # now: only supplicant ?! explains a lot of failures 

309 # gwy_r.msg_db.add(msg) # (for supplicant only?) >> remove from respondent/filter while adding 

310 # pkt = await supplicant._context._cast_addenda() 

311 # await assert_context_state(supplicant, _BindStates.HAS_BOUND_SUPP) 

312 # assert pkt is not None 

313 

314 # await assert_context_state(respondent, _BindStates.HAS_BOUND_RESP) 

315 # await resp_task 

316 

317 # ratify = resp_task.result() 

318 # assert ratify._pkt == pkt, "Resp's Msg doesn't match Supp's Addenda cmd" 

319 

320 

321# TODO: test addenda phase of binding handshake 

322async def _test_flow_20x( 

323 gwy_r: Gateway, gwy_s: Gateway, pkt_flow_expected: list[str] 

324) -> None: 

325 """Check the change of state during a binding at device layer.""" 

326 

327 # STEP 0: Setup... 

328 respondent = gwy_r.devices[0] 

329 supplicant = gwy_s.devices[0] 

330 ensure_fakeable(respondent) 

331 

332 assert isinstance(respondent, Fakeable) # mypy 

333 assert isinstance(supplicant, Fakeable) # mypy 

334 

335 assert respondent.id == pkt_flow_expected[_ACCEPT][7:16], "bad test suite config" 

336 assert supplicant.id == pkt_flow_expected[_TENDER][7:16], "bad test suite config" 

337 

338 # Step R1: Respondent expects an Offer 

339 payload = pkt_flow_expected[_ACCEPT][46:] 

340 accept_codes = [payload[i : i + 4] for i in range(2, len(payload), 12)] 

341 

342 idx = payload[:2] 

343 require_ratify = len(pkt_flow_expected) > _RATIFY 

344 

345 resp_coro = respondent._wait_for_binding_request( 

346 accept_codes, idx=idx, require_ratify=require_ratify 

347 ) 

348 resp_task = asyncio.create_task(resp_coro) 

349 

350 # Step S1: Supplicant sends an Offer (makes Offer) and expects an Accept 

351 payload = pkt_flow_expected[_TENDER][46:] 

352 offer_codes = [payload[i : i + 4] for i in range(2, len(payload), 12)] 

353 offer_codes = [c for c in offer_codes if c != Code._1FC9] 

354 

355 confirm_code = pkt_flow_expected[_AFFIRM][48:52] or None 

356 if len(pkt_flow_expected) > _RATIFY: 

357 ratify_cmd = Command(pkt_flow_expected[_RATIFY]) 

358 else: 

359 ratify_cmd = None 

360 

361 supp_coro = supplicant._initiate_binding_process( 

362 offer_codes, confirm_code=confirm_code, ratify_cmd=ratify_cmd 

363 ) 

364 supp_task = asyncio.create_task(supp_coro) 

365 

366 # Step 2: Wait until flow is completed (or timeout) 

367 await asyncio.gather(resp_task, supp_task) 

368 

369 resp_flow = resp_task.result() 

370 supp_flow = supp_task.result() 

371 

372 for i in range(len(pkt_flow_expected)): 

373 assert resp_flow[i] == supp_flow[i] 

374 assert resp_flow[i] == Command(pkt_flow_expected[i]) 

375 

376 assert str(resp_flow[i]) == str(supp_flow[i]) 

377 assert str(resp_flow[i]) == pkt_flow_expected[i] 

378 

379 

380# TODO: binding working without QoS # @patch("ramses_tx.protocol._DBG_DISABLE_QOS", True) 

381@pytest.mark.xdist_group(name="virt_serial") 

382async def test_flow_100(test_set: dict[str, dict]) -> None: 

383 """Check packet flow / state change of a binding at context layer.""" 

384 

385 config = {} 

386 for role in (SZ_RESPONDENT, SZ_SUPPLICANT): 

387 devices = [d for d in test_set.values() if isinstance(d, dict)] 

388 config[role] = GWY_CONFIG | { 

389 "known_list": {k: v for d in devices for k, v in d.items()}, 

390 "orphans_hvac": list(test_set[role]), # TODO: used by Heat domain too! 

391 } 

392 

393 pkt_flow = [ 

394 x[:46] + x[46:].replace(" ", "").replace("-", "") 

395 for x in test_set.get(PKT_FLOW, []) 

396 ] 

397 

398 # can't use fixture for this, as new schema required for every test 

399 rf, gwys = await rf_factory([config[SZ_RESPONDENT], config[SZ_SUPPLICANT]]) 

400 

401 try: 

402 await _test_flow_10x(gwys[0], gwys[1], pkt_flow) 

403 finally: 

404 for gwy in gwys: 

405 await gwy.stop() 

406 await rf.stop() 

407 

408 

409# TODO: binding working without QoS # @patch("ramses_tx.protocol._DBG_DISABLE_QOS", True) 

410@pytest.mark.xdist_group(name="virt_serial") 

411async def test_flow_200(test_set: dict[str, dict]) -> None: 

412 """Check packet flow / state change of a binding at device layer.""" 

413 

414 config = {} 

415 for role in (SZ_RESPONDENT, SZ_SUPPLICANT): 

416 devices = [d for d in test_set.values() if isinstance(d, dict)] 

417 config[role] = GWY_CONFIG | { 

418 "known_list": {k: v for d in devices for k, v in d.items()}, 

419 "orphans_hvac": list(test_set[role]), # TODO: used by Heat domain too! 

420 } 

421 

422 pkt_flow = [ 

423 x[:46] + x[46:].replace(" ", "").replace("-", "") 

424 for x in test_set.get(PKT_FLOW, []) 

425 ] 

426 

427 # can't use fixture for this, as new schema required for every test 

428 rf, gwys = await rf_factory( 

429 [config[SZ_RESPONDENT], config[SZ_SUPPLICANT]] 

430 ) # can pop orphans_hvac 

431 

432 try: 

433 await _test_flow_20x(gwys[0], gwys[1], pkt_flow) 

434 finally: 

435 for gwy in gwys: 

436 await gwy.stop() 

437 await rf.stop()