Coverage for src/ramses_tx/protocol_fsm.py: 20%

305 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - RAMSES-II compatible packet protocol finite state machine.""" 

3 

4from __future__ import annotations 

5 

6import asyncio 

7import logging 

8from collections.abc import Callable, Coroutine 

9from datetime import datetime as dt 

10from queue import Empty, Full, PriorityQueue 

11from threading import Lock 

12from typing import TYPE_CHECKING, Any, Final, TypeAlias 

13 

14from . import exceptions as exc 

15from .address import HGI_DEVICE_ID 

16from .command import Command 

17from .const import ( 

18 DEFAULT_BUFFER_SIZE, 

19 DEFAULT_ECHO_TIMEOUT, 

20 DEFAULT_RPLY_TIMEOUT, 

21 MAX_RETRY_LIMIT, 

22 MAX_SEND_TIMEOUT, 

23 Code, 

24 Priority, 

25) 

26from .packet import Packet 

27from .typing import QosParams 

28 

29if TYPE_CHECKING: 

30 from .protocol import RamsesProtocolT 

31 from .transport import RamsesTransportT 

32 from .typing import ExceptionT 

33 

34# 

35# NOTE: All debug flags should be False for deployment to end-users 

36_DBG_MAINTAIN_STATE_CHAIN: Final[bool] = False # maintain Context._prev_state 

37_DBG_USE_STRICT_TRANSITIONS: Final[bool] = False 

38 

39_LOGGER = logging.getLogger(__name__) 

40 

41 

42####################################################################################### 

43 

44_FutureT: TypeAlias = asyncio.Future[Packet] 

45_QueueEntryT: TypeAlias = tuple[Priority, dt, Command, QosParams, _FutureT] 

46 

47 

48class ProtocolContext: 

49 SEND_TIMEOUT_LIMIT = MAX_SEND_TIMEOUT 

50 

51 def __init__( 

52 self, 

53 protocol: RamsesProtocolT, 

54 /, 

55 *, 

56 echo_timeout: float = DEFAULT_ECHO_TIMEOUT, 

57 reply_timeout: float = DEFAULT_RPLY_TIMEOUT, 

58 max_retry_limit: int = MAX_RETRY_LIMIT, 

59 max_buffer_size: int = DEFAULT_BUFFER_SIZE, 

60 ) -> None: 

61 self._protocol = protocol 

62 self.echo_timeout = echo_timeout 

63 self.reply_timeout = reply_timeout 

64 self.max_retry_limit = min(max_retry_limit, MAX_RETRY_LIMIT) 

65 self.max_buffer_size = min(max_buffer_size, DEFAULT_BUFFER_SIZE) 

66 

67 self._loop = protocol._loop 

68 self._lock = Lock() # FIXME: threading lock, or asyncio lock? 

69 self._fut: _FutureT | None = None 

70 self._que: PriorityQueue[_QueueEntryT] = PriorityQueue( 

71 maxsize=self.max_buffer_size 

72 ) 

73 

74 self._expiry_timer: asyncio.Task[None] | None = None 

75 self._multiplier = 0 

76 self._state: _ProtocolStateT = None # type: ignore[assignment] 

77 

78 # TODO: pass this over as an instance parameter 

79 self._send_fnc: Callable[[Command], Coroutine[Any, Any, None]] = None # type: ignore[assignment] 

80 

81 self._cmd: Command | None = None 

82 self._qos: QosParams | None = None 

83 self._cmd_tx_count: int = 0 # was: None 

84 self._cmd_tx_limit: int = 0 

85 

86 self.set_state(Inactive) 

87 

88 def __repr__(self) -> str: 

89 msg = f"<ProtocolContext state={repr(self._state)[21:-1]}" 

90 if self._cmd is None: 

91 return msg + ">" 

92 if self._cmd_tx_count == 0: # was: is None 

93 return msg + ", tx_count=0/0>" 

94 return msg + f", tx_count={self._cmd_tx_count}/{self._cmd_tx_limit}>" 

95 

96 @property 

97 def state(self) -> _ProtocolStateT: 

98 return self._state 

99 

100 @property 

101 def is_sending(self) -> bool: # TODO: remove asserts 

102 if isinstance(self._state, WantEcho | WantRply): 

103 assert self._cmd is not None, f"{self}: Coding error" # mypy hint 

104 assert self._qos is not None, f"{self}: Coding error" # mypy hint 

105 assert self._fut is not None, f"{self}: Coding error" # mypy hint 

106 return True 

107 

108 assert self._cmd is None, f"{self}: Coding error" # mypy hint 

109 assert self._qos is None, f"{self}: Coding error" # mypy hint 

110 assert self._fut is None or self._fut.done(), ( 

111 f"{self}: Coding error" 

112 ) # mypy hint 

113 return False 

114 

115 def set_state( 

116 self, 

117 state_class: _ProtocolStateClassT, 

118 expired: bool = False, 

119 timed_out: bool = False, 

120 exception: Exception | None = None, 

121 result: Packet | None = None, 

122 ) -> None: 

123 async def expire_state_on_timeout() -> None: 

124 # a separate coro, so can be spawned off with create_task() 

125 

126 assert self._cmd is not None # mypy 

127 

128 assert isinstance(self.is_sending, bool), ( 

129 f"{self}: Coding error" 

130 ) # TODO: remove 

131 assert self._cmd_tx_count > 0, f"{self}: Coding error" # TODO: remove 

132 

133 if isinstance(self._state, WantEcho): # otherwise is WantRply 

134 delay = self.echo_timeout * (2**self._multiplier) 

135 # elif self._cmd.code == Code._0404: 

136 # delay = self.reply_timeout * (2**self._multiplier) * 2 

137 else: # isinstance(self._state, WantRply): 

138 delay = self.reply_timeout * (2**self._multiplier) 

139 

140 # assuming success, multiplier can be decremented... 

141 self._multiplier, old_val = max(0, self._multiplier - 1), self._multiplier 

142 

143 await asyncio.sleep(delay) # ideally, will be interrupted by wait_for() 

144 

145 # nope, was not successful, so multiplier should be incremented... 

146 self._multiplier = min(3, old_val + 1) 

147 

148 if isinstance(self._state, WantEcho): 

149 _LOGGER.warning( 

150 f"Timeout expired waiting for echo: {self} (delay={delay})" 

151 ) 

152 else: # isinstance(self._state, WantRply): 

153 _LOGGER.warning( 

154 f"Timeout expired waiting for reply: {self} (delay={delay})" 

155 ) 

156 

157 assert isinstance(self.is_sending, bool), ( 

158 f"{self}: Coding error" 

159 ) # TODO: remove 

160 

161 # Timer has expired, can we retry or are we done? 

162 assert isinstance(self._cmd_tx_count, int) 

163 

164 if self._cmd_tx_count < self._cmd_tx_limit: 

165 self.set_state(WantEcho, timed_out=True) 

166 else: 

167 self.set_state(IsInIdle, expired=True) 

168 

169 assert isinstance(self.is_sending, bool), ( 

170 f"{self}: Coding error" 

171 ) # TODO: remove 

172 

173 def effect_state(timed_out: bool) -> None: 

174 """Take any actions indicated by state, and optionally set expiry timer.""" 

175 # a separate function, so can be spawned off with call_soon() 

176 

177 assert isinstance(self.is_sending, bool), ( 

178 f"{self}: Coding error" 

179 ) # TODO: remove 

180 

181 if timed_out: 

182 assert self._cmd is not None, f"{self}: Coding error" # mypy hint 

183 self._send_cmd(self._cmd, is_retry=True) 

184 

185 if isinstance(self._state, IsInIdle): 

186 self._loop.call_soon_threadsafe(self._check_buffer_for_cmd) 

187 

188 elif isinstance(self._state, WantRply) and not self._qos.wait_for_reply: # type: ignore[union-attr] 

189 self.set_state(IsInIdle, result=self._state._echo_pkt) 

190 

191 elif isinstance(self._state, WantEcho | WantRply): 

192 self._expiry_timer = self._loop.create_task(expire_state_on_timeout()) 

193 

194 if self._expiry_timer is not None: 

195 self._expiry_timer.cancel("Changing state") 

196 self._expiry_timer = None 

197 

198 # when _fut.done(), three possibilities: 

199 # _fut.set_result() 

200 # _fut.set_exception() 

201 # _fut.cancel() (incl. via a send_cmd(qos.timeout) -> wait_for(timeout)) 

202 

203 # Changing the order of the following is fraught with danger 

204 # Formatitng: [TRACE_FSM] [State: Old->New] [Reason] | ... 

205 

206 current_state_name = self._state.__class__.__name__ 

207 new_state_name = state_class.__name__ 

208 transition = f"{current_state_name}->{new_state_name}" 

209 

210 if self._fut is None: # logging only - IsInIdle, Inactive 

211 _LOGGER.debug( 

212 f"FSM state changed {transition}: no active future (ctx={self})" 

213 ) 

214 assert self._cmd is None, f"{self}: Coding error" # mypy hint 

215 assert isinstance(self._state, IsInIdle | Inactive | None), ( 

216 f"{self}: Coding error" 

217 ) # mypy hint 

218 

219 elif self._fut.cancelled() and not isinstance(self._state, IsInIdle): 

220 # cancelled by wait_for(timeout), cancel("buffer overflow"), or other? 

221 # was for previous send_cmd if currently IsInIdle (+/- Inactive?) 

222 _LOGGER.debug( 

223 f"FSM state changed {transition}: future cancelled (expired={expired}, ctx={self})" 

224 ) 

225 assert self._cmd is not None, f"{self}: Coding error" # mypy hint 

226 assert isinstance(self._state, WantEcho | WantRply), ( 

227 f"{self}: Coding error" 

228 ) # mypy hint 

229 

230 elif exception: 

231 _LOGGER.debug( 

232 f"FSM state changed {transition}: exception occurred (error={exception}, ctx={self})" 

233 ) 

234 assert not self._fut.done(), ( 

235 f"{self}: Coding error ({self._fut})" 

236 ) # mypy hint 

237 assert isinstance(self._state, WantEcho | WantRply), ( 

238 f"{self}: Coding error" 

239 ) # mypy hint 

240 self._fut.set_exception(exception) # apologise to the sender 

241 

242 elif result: 

243 _LOGGER.debug( 

244 f"FSM state changed {transition}: result received (result={result._hdr}, ctx={self})" 

245 ) 

246 assert not self._fut.done(), ( 

247 f"{self}: Coding error ({self._fut})" 

248 ) # mypy hint 

249 assert isinstance(self._state, WantEcho | WantRply), ( 

250 f"{self}: Coding error" 

251 ) # mypy hint 

252 self._fut.set_result(result) 

253 

254 elif expired: # by expire_state_on_timeout(echo_timeout/reply_timeout) 

255 _LOGGER.debug(f"FSM state changed {transition}: timer expired (ctx={self})") 

256 assert not self._fut.done(), ( 

257 f"{self}: Coding error ({self._fut})" 

258 ) # mypy hint 

259 assert isinstance(self._state, WantEcho | WantRply), ( 

260 f"{self}: Coding error" 

261 ) # mypy hint 

262 self._fut.set_exception( 

263 exc.ProtocolSendFailed(f"{self}: Exceeded maximum retries") 

264 ) 

265 

266 else: # logging only - WantEcho, WantRply 

267 _LOGGER.debug(f"FSM state changed {transition}: successful (ctx={self})") 

268 assert self._fut is None or self._fut.cancelled() or not self._fut.done(), ( 

269 f"{self}: Coding error ({self._fut})" 

270 ) # mypy hint 

271 # sert isinstance(self._state, WantEcho | WantRply), f"{self}: Coding error" # mypy hint 

272 

273 prev_state = self._state # for _DBG_MAINTAIN_STATE_CHAIN 

274 

275 self._state = state_class(self) # keep atomic with tx_count / tx_limit calcs 

276 

277 if _DBG_MAINTAIN_STATE_CHAIN: # for debugging 

278 # tattr(prev_state, "_next_state", self._state) 

279 setattr(self._state, "_prev_state", prev_state) # noqa: B010 

280 

281 if timed_out: # isinstance(self._state, WantEcho): 

282 assert isinstance(self._cmd_tx_count, int), ( 

283 f"{self}: Coding error" 

284 ) # mypy hint 

285 self._cmd_tx_count += 1 

286 

287 elif isinstance(self._state, WantEcho): 

288 assert self._qos is not None, f"{self}: Coding error" # mypy hint 

289 # self._cmd_tx_limit = min(self._qos.max_retries, self.max_retry_limit) + 1 

290 self._cmd_tx_count = 1 

291 

292 elif not isinstance(self._state, WantRply): # IsInIdle, IsInactive 

293 self._cmd = self._qos = None 

294 self._cmd_tx_count = 0 # was: = None 

295 

296 assert isinstance(self.is_sending, bool) # TODO: remove 

297 

298 # remaining code spawned off with a call_soon(), so early return to caller 

299 self._loop.call_soon_threadsafe(effect_state, timed_out) # calls expire_state 

300 

301 if not isinstance(self._state, WantRply): 

302 # _LOGGER.debug("AFTER. = %s", self) 

303 return 

304 

305 assert self._qos is not None, f"{self}: Coding error" # mypy hint 

306 # _LOGGER.debug("AFTER. = %s: wait_for_reply=%s", self, self._qos.wait_for_reply) 

307 

308 def connection_made(self, transport: RamsesTransportT) -> None: 

309 # may want to set some instance variables, according to type of transport 

310 self._state.connection_made() 

311 

312 # TODO: Should we clear the buffer if connection is lost (and apologise to senders? 

313 def connection_lost(self, err: ExceptionT | None) -> None: 

314 self._state.connection_lost() 

315 

316 def pkt_received(self, pkt: Packet) -> Any: 

317 self._state.pkt_rcvd(pkt) 

318 

319 def pause_writing(self) -> None: 

320 self._state.writing_paused() 

321 

322 def resume_writing(self) -> None: 

323 self._state.writing_resumed() 

324 

325 async def send_cmd( 

326 self, 

327 send_fnc: Callable[[Command], Coroutine[Any, Any, None]], # TODO: remove 

328 cmd: Command, 

329 priority: Priority, 

330 qos: QosParams, 

331 ) -> Packet: 

332 self._send_fnc = send_fnc # TODO: REMOVE: make per Context, not per Command 

333 

334 if isinstance(self._state, Inactive): 

335 raise exc.ProtocolSendFailed(f"{self}: Send failed (no active transport?)") 

336 

337 assert self._loop is asyncio.get_running_loop() # BUG is here 

338 

339 fut: _FutureT = self._loop.create_future() 

340 try: 

341 self._que.put_nowait((priority, dt.now(), cmd, qos, fut)) 

342 except Full as err: 

343 fut.cancel("Send buffer overflow") 

344 raise exc.ProtocolSendFailed(f"{self}: Send buffer overflow") from err 

345 

346 if isinstance(self._state, IsInIdle): 

347 self._loop.call_soon_threadsafe(self._check_buffer_for_cmd) 

348 

349 timeout = min( # needs to be greater than worse-case via set_state engine 

350 qos.timeout, self.SEND_TIMEOUT_LIMIT 

351 ) # incl. time queued in buffer 

352 try: 

353 await asyncio.wait_for(fut, timeout=timeout) 

354 except TimeoutError as err: # incl. fut.cancel() 

355 msg = f"{self}: Expired global timer after {timeout} sec" 

356 _LOGGER.warning( 

357 "TOUT.. = %s: send_timeout=%s (%s)", self, timeout, self._cmd is cmd 

358 ) 

359 if self._cmd is cmd: # NOTE: # this cmd may not yet be self._cmd 

360 self.set_state( 

361 IsInIdle, expired=True 

362 ) # set_exception() will cause InvalidStateError 

363 raise exc.ProtocolSendFailed(msg) from err # make msg *before* state reset 

364 

365 try: 

366 return fut.result() 

367 except exc.ProtocolSendFailed: 

368 raise 

369 except (exc.ProtocolError, exc.TransportError) as err: # incl. ProtocolFsmError 

370 raise exc.ProtocolSendFailed(f"{self}: Send failed: {err}") from err 

371 

372 def _check_buffer_for_cmd(self) -> None: 

373 self._lock.acquire() 

374 assert isinstance(self.is_sending, bool), f"{self}: Coding error" # mypy hint 

375 

376 if self._fut is not None and not self._fut.done(): 

377 self._lock.release() 

378 return 

379 

380 while True: 

381 try: 

382 *_, self._cmd, self._qos, self._fut = self._que.get_nowait() 

383 except Empty: 

384 self._cmd = self._qos = self._fut = None 

385 self._lock.release() 

386 return 

387 

388 self._cmd_tx_count = 0 

389 self._cmd_tx_limit = min(self._qos.max_retries, self.max_retry_limit) + 1 

390 

391 assert isinstance(self._fut, asyncio.Future) # mypy hint 

392 if self._fut.done(): # e.g. TimeoutError 

393 self._que.task_done() 

394 continue 

395 

396 break 

397 

398 self._lock.release() 

399 

400 try: 

401 assert self._cmd is not None, f"{self}: Coding error" # mypy hint 

402 self._send_cmd(self._cmd) 

403 finally: 

404 self._que.task_done() 

405 

406 def _send_cmd(self, cmd: Command, is_retry: bool = False) -> None: 

407 """Wrapper to send a command with retries, until success or exception.""" 

408 

409 async def send_fnc_wrapper(cmd: Command) -> None: 

410 try: # the wrapped function (actual Tx.write) 

411 await self._send_fnc(cmd) 

412 except exc.TransportError as err: 

413 self.set_state(IsInIdle, exception=err) 

414 

415 # TODO: check what happens when exception here - why does it hang? 

416 assert cmd is not None, f"{self}: Coding error" 

417 

418 try: # the wrapped function (actual Tx.write) 

419 self._state.cmd_sent(cmd, is_retry=is_retry) 

420 except exc.ProtocolFsmError as err: 

421 self.set_state(IsInIdle, exception=err) 

422 else: 

423 self._loop.create_task(send_fnc_wrapper(cmd)) 

424 

425 

426# With wait_for_reply=False 

427# AFTER. = <ProtocolContext state=IsInIdle> 

428# BEFORE = <ProtocolContext state=IsInIdle cmd_=2349|RQ|01:145038|08, tx_count=0/4> 

429# AFTER. = <ProtocolContext state=WantEcho cmd_=2349|RQ|01:145038|08, tx_count=1/4> 

430# BEFORE = <ProtocolContext state=WantEcho echo=2349|RQ|01:145038|08, tx_count=1/4> 

431# AFTER. = <ProtocolContext state=WantRply echo=2349|RQ|01:145038|08, tx_count=1/4>: wait_for_reply=False 

432# 

433# BEFORE = <ProtocolContext state=WantRply echo=2349|RQ|01:145038|08, tx_count=1/4>: result=2349|RQ|01:145038|08 

434# AFTER. = <ProtocolContext state=IsInIdle> 

435 

436# With wait_for_reply=True 

437# AFTER. = <ProtocolContext state=IsInIdle> 

438# BEFORE = <ProtocolContext state=IsInIdle cmd_=0004|RQ|01:145038|05, tx_count=0/4> 

439# AFTER. = <ProtocolContext state=WantEcho cmd_=0004|RQ|01:145038|05, tx_count=1/4> 

440# BEFORE = <ProtocolContext state=WantEcho echo=0004|RQ|01:145038|05, tx_count=1/4> 

441# AFTER. = <ProtocolContext state=WantRply echo=0004|RQ|01:145038|05, tx_count=1/4>: wait_for_reply=True 

442# 

443# BEFORE = <ProtocolContext state=WantRply rply=0004|RP|01:145038|05, tx_count=1/4>: result=0004|RP|01:145038|05 

444# AFTER. = <ProtocolContext state=IsInIdle> 

445 

446####################################################################################### 

447 

448# NOTE: Because .dst / .src may switch from Address to Device from one pkt to the next: 

449# - use: pkt.dst.id == self._echo_pkt.src.id 

450# - not: pkt.dst is self._echo_pkt.src 

451 

452 

453class ProtocolStateBase: 

454 def __init__(self, context: ProtocolContext) -> None: 

455 self._context = context 

456 

457 self._sent_cmd: Command | None = None 

458 self._echo_pkt: Packet | None = None 

459 self._rply_pkt: Packet | None = None 

460 

461 def __repr__(self) -> str: 

462 msg = f"<ProtocolState state={self.__class__.__name__}" 

463 if self._rply_pkt: 

464 return msg + f" rply={self._rply_pkt._hdr}>" 

465 if self._echo_pkt: 

466 return msg + f" echo={self._echo_pkt._hdr}>" 

467 if self._sent_cmd: 

468 return msg + f" cmd_={self._sent_cmd._hdr}>" 

469 return msg + ">" 

470 

471 def connection_made(self) -> None: # For all states except Inactive 

472 """Do nothing, as (except for InActive) we're already connected.""" 

473 pass 

474 

475 def connection_lost(self) -> None: # Varies by states (not needed if Inactive) 

476 """Transition to Inactive, regardless of current state.""" 

477 

478 if isinstance(self._context._state, Inactive): 

479 return 

480 

481 if isinstance(self._context._state, IsInIdle): 

482 self._context.set_state(Inactive) 

483 return 

484 

485 self._context.set_state( 

486 Inactive, exception=exc.TransportError("Connection lost") 

487 ) 

488 

489 def pkt_rcvd(self, pkt: Packet) -> None: # Different for each state 

490 """Raise a NotImplementedError.""" 

491 raise NotImplementedError("Invalid state to receive a packet") 

492 

493 def writing_paused(self) -> None: # Currently same for all states (TBD) 

494 """Do nothing.""" 

495 pass 

496 

497 def writing_resumed(self) -> None: # Currently same for all states (TBD) 

498 """Do nothing.""" 

499 pass 

500 

501 def cmd_sent( # For all except IsInIdle, WantEcho 

502 self, cmd: Command, is_retry: bool | None = None 

503 ) -> None: 

504 raise exc.ProtocolFsmError(f"Invalid state to send a command: {self._context}") 

505 

506 

507class Inactive(ProtocolStateBase): 

508 """The Protocol is not connected to the transport layer.""" 

509 

510 def connection_made(self) -> None: 

511 """Transition to IsInIdle.""" 

512 self._context.set_state(IsInIdle) 

513 

514 def pkt_rcvd(self, pkt: Packet) -> None: # raise ProtocolFsmError 

515 """Raise an exception, as a packet is not expected in this state.""" 

516 

517 assert self._sent_cmd is None, f"{self}: Coding error" 

518 

519 if pkt.code != Code._PUZZ: 

520 _LOGGER.warning("%s: Invalid state to receive a packet", self._context) 

521 

522 

523class IsInIdle(ProtocolStateBase): 

524 """The Protocol is not in the process of sending a Command.""" 

525 

526 def pkt_rcvd(self, pkt: Packet) -> None: # Do nothing 

527 """Do nothing as we're not expecting an echo, nor a reply.""" 

528 

529 assert self._sent_cmd is None, f"{self}: Coding error" 

530 

531 pass 

532 

533 def cmd_sent( # Will expect an Echo 

534 self, cmd: Command, is_retry: bool | None = None 

535 ) -> None: 

536 """Transition to WantEcho.""" 

537 

538 assert self._sent_cmd is None and is_retry is False, f"{self}: Coding error" 

539 

540 self._sent_cmd = cmd 

541 

542 # HACK for headers with sentinel values: 

543 # I --- 18:000730 18:222222 --:------ 30C9 003 000333 # 30C9| I|18:000730, *but* will be: 30C9| I|18:222222 

544 # I --- --:------ --:------ 18:000730 0008 002 00BB # 0008| I|18:000730|00, *and* will be unchanged 

545 

546 if HGI_DEVICE_ID in cmd.tx_header: # HACK: what do I do about this 

547 cmd._hdr_ = cmd._hdr_.replace(HGI_DEVICE_ID, self._context._protocol.hgi_id) 

548 self._context.set_state(WantEcho) 

549 

550 

551class WantEcho(ProtocolStateBase): 

552 """The Protocol is waiting to receive an echo Packet.""" 

553 

554 # NOTE: unfortunately, the cmd's src / echo's src can be different: 

555 # RQ --- 18:000730 10:052644 --:------ 3220 005 0000050000 # RQ|10:048122|3220|05 

556 # RQ --- 18:198151 10:052644 --:------ 3220 005 0000050000 # RQ|10:048122|3220|05 

557 

558 def __init__(self, context: ProtocolContext) -> None: 

559 super().__init__(context) 

560 

561 self._sent_cmd = context._state._sent_cmd 

562 # if isinstance(context._state, WantEcho | WantRply): 

563 # self._echo_pkt = context._state._echo_pkt 

564 # else: 

565 # self._echo_pkt = None 

566 

567 def pkt_rcvd(self, pkt: Packet) -> None: # Check if pkt is expected Echo 

568 """If the pkt is the expected Echo, transition to IsInIdle, or WantRply.""" 

569 

570 # RQ --- 18:002563 01:078710 --:------ 2349 002 0200 # 2349|RQ|01:078710|02 

571 # RP --- 01:078710 18:002563 --:------ 2349 007 0201F400FFFFFF # 2349|RP|01:078710|02 

572 # W --- 30:257306 01:096339 --:------ 313F 009 0060002916050B07E7 # 313F| W|01:096339 

573 # I --- 01:096339 30:257306 --:------ 313F 009 00FC0029D6050B07E7 # 313F| I|01:096339 

574 

575 assert self._sent_cmd, f"{self}: Coding error" # mypy hint 

576 

577 # if self._sent_cmd.rx_header and pkt._hdr == self._sent_cmd.rx_header: 

578 # _LOGGER.error("hdr=%s", self._sent_cmd.rx_header) 

579 # _LOGGER.error("src=%s", self._sent_cmd.src.id) 

580 # _LOGGER.error("dst=%s", pkt.dst.id) 

581 

582 if ( 

583 self._sent_cmd.rx_header 

584 and pkt._hdr == self._sent_cmd.rx_header 

585 and ( 

586 pkt.dst.id == self._sent_cmd.src.id 

587 or ( # handle: 18:146440 == 18:000730 

588 self._sent_cmd.src.id == HGI_DEVICE_ID 

589 and pkt.dst.id == self._context._protocol.hgi_id 

590 ) 

591 ) 

592 ): 

593 _LOGGER.warning( 

594 "%s: Invalid state to receive a reply (expecting echo)", self._context 

595 ) 

596 

597 self._rply_pkt = pkt 

598 self._context.set_state(IsInIdle, result=pkt) 

599 return 

600 

601 # HACK for packets with addr sets like (issue is only with sentinel values?): 

602 # I --- --:------ --:------ 18:000730 0008 002 00BB 

603 

604 if HGI_DEVICE_ID in pkt._hdr: # HACK: what do I do about this? 

605 pkt__hdr = pkt._hdr_.replace(HGI_DEVICE_ID, self._context._protocol.hgi_id) 

606 else: 

607 pkt__hdr = pkt._hdr 

608 

609 if pkt__hdr != self._sent_cmd.tx_header: 

610 return 

611 

612 # # HACK: for testing - drop some packets 

613 # import random 

614 # if random.random() < 0.2: 

615 # return 

616 

617 self._echo_pkt = pkt 

618 if self._sent_cmd.rx_header: 

619 self._context.set_state(WantRply) 

620 else: 

621 self._context.set_state(IsInIdle, result=pkt) 

622 

623 def cmd_sent(self, cmd: Command, is_retry: bool | None = None) -> None: 

624 """Transition to WantEcho (i.e. a retransmit).""" 

625 

626 assert self._sent_cmd is not None and is_retry is True, f"{self}: Coding error" 

627 

628 # NOTE: don't self._context.set_state(WantEcho) here - may cause endless loop 

629 

630 

631class WantRply(ProtocolStateBase): 

632 """The Protocol is waiting to receive an reply Packet.""" 

633 

634 # NOTE: is possible get a false rply (same rx_header), e.g.: 

635 # RP --- 10:048122 18:198151 --:------ 3220 005 00C0050000 # 3220|RP|10:048122|05 

636 # RP --- 10:048122 01:145038 --:------ 3220 005 00C0050000 # 3220|RP|10:048122|05 

637 

638 # NOTE: unfortunately, the cmd's src / rply's dst can still be different: 

639 # RQ --- 18:000730 10:052644 --:------ 3220 005 0000050000 # 3220|RQ|10:048122|05 

640 # RP --- 10:048122 18:198151 --:------ 3220 005 00C0050000 # 3220|RP|10:048122|05 

641 

642 def __init__(self, context: ProtocolContext) -> None: 

643 super().__init__(context) 

644 

645 self._sent_cmd = context._state._sent_cmd 

646 self._echo_pkt = context._state._echo_pkt 

647 

648 def pkt_rcvd(self, pkt: Packet) -> None: # Check if pkt is expected Reply 

649 """If the pkt is the expected reply, transition to IsInIdle.""" 

650 

651 assert self._sent_cmd, f"{self}: Coding error" # mypy hint 

652 assert self._echo_pkt, f"{self}: Coding error" # mypy hint 

653 

654 # NOTE: beware collisions: same header, but is not reply (must check RSSI or src) 

655 # 2024-04-16 08:28:33.895 000 RQ --- 18:146440 10:048122 --:------ 3220 005 0000110000 # 3220|RQ|10:048122|11 

656 # 2024-04-16 08:28:33.910 052 RQ --- 01:145038 10:048122 --:------ 3220 005 0000110000 # 3220|RQ|10:048122|11 

657 

658 if pkt._hdr == self._sent_cmd.tx_header and pkt.src == self._echo_pkt.src: 

659 _LOGGER.warning( 

660 "%s: Invalid state to receive an echo (expecting reply)", self._context 

661 ) 

662 return # do not transition, wait until existing timer expires 

663 

664 # HACK: special case: if null log entry for log_idx=nn, then 

665 # HACK: rx_hdr will be 0418|RP|01:145038|00, and not 0418|RP|01:145038|nn 

666 # HACK: wait_for_reply must be true for RQ|0418 commands 

667 if ( 

668 self._sent_cmd.rx_header[:8] == "0418|RP|" # type: ignore[index] 

669 and self._sent_cmd.rx_header[:-2] == pkt._hdr[:-2] # type: ignore[index] 

670 and pkt.payload == "000000B0000000000000000000007FFFFF7000000000" 

671 ): 

672 self._rply_pkt = pkt 

673 

674 elif pkt._hdr != self._sent_cmd.rx_header: 

675 return 

676 

677 else: 

678 self._rply_pkt = pkt 

679 

680 self._context.set_state(IsInIdle, result=pkt) 

681 

682 

683####################################################################################### 

684 

685 

686_ProtocolStateT: TypeAlias = Inactive | IsInIdle | WantEcho | WantRply 

687 

688_ProtocolStateClassT: TypeAlias = ( 

689 type[Inactive] | type[IsInIdle] | type[WantEcho] | type[WantRply] 

690)