Coverage for src/ramses_tx/protocol_fsm.py: 20%
305 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - RAMSES-II compatible packet protocol finite state machine."""
4from __future__ import annotations
6import asyncio
7import logging
8from collections.abc import Callable, Coroutine
9from datetime import datetime as dt
10from queue import Empty, Full, PriorityQueue
11from threading import Lock
12from typing import TYPE_CHECKING, Any, Final, TypeAlias
14from . import exceptions as exc
15from .address import HGI_DEVICE_ID
16from .command import Command
17from .const import (
18 DEFAULT_BUFFER_SIZE,
19 DEFAULT_ECHO_TIMEOUT,
20 DEFAULT_RPLY_TIMEOUT,
21 MAX_RETRY_LIMIT,
22 MAX_SEND_TIMEOUT,
23 Code,
24 Priority,
25)
26from .packet import Packet
27from .typing import QosParams
29if TYPE_CHECKING:
30 from .protocol import RamsesProtocolT
31 from .transport import RamsesTransportT
32 from .typing import ExceptionT
34#
35# NOTE: All debug flags should be False for deployment to end-users
36_DBG_MAINTAIN_STATE_CHAIN: Final[bool] = False # maintain Context._prev_state
37_DBG_USE_STRICT_TRANSITIONS: Final[bool] = False
39_LOGGER = logging.getLogger(__name__)
42#######################################################################################
44_FutureT: TypeAlias = asyncio.Future[Packet]
45_QueueEntryT: TypeAlias = tuple[Priority, dt, Command, QosParams, _FutureT]
48class ProtocolContext:
49 SEND_TIMEOUT_LIMIT = MAX_SEND_TIMEOUT
51 def __init__(
52 self,
53 protocol: RamsesProtocolT,
54 /,
55 *,
56 echo_timeout: float = DEFAULT_ECHO_TIMEOUT,
57 reply_timeout: float = DEFAULT_RPLY_TIMEOUT,
58 max_retry_limit: int = MAX_RETRY_LIMIT,
59 max_buffer_size: int = DEFAULT_BUFFER_SIZE,
60 ) -> None:
61 self._protocol = protocol
62 self.echo_timeout = echo_timeout
63 self.reply_timeout = reply_timeout
64 self.max_retry_limit = min(max_retry_limit, MAX_RETRY_LIMIT)
65 self.max_buffer_size = min(max_buffer_size, DEFAULT_BUFFER_SIZE)
67 self._loop = protocol._loop
68 self._lock = Lock() # FIXME: threading lock, or asyncio lock?
69 self._fut: _FutureT | None = None
70 self._que: PriorityQueue[_QueueEntryT] = PriorityQueue(
71 maxsize=self.max_buffer_size
72 )
74 self._expiry_timer: asyncio.Task[None] | None = None
75 self._multiplier = 0
76 self._state: _ProtocolStateT = None # type: ignore[assignment]
78 # TODO: pass this over as an instance parameter
79 self._send_fnc: Callable[[Command], Coroutine[Any, Any, None]] = None # type: ignore[assignment]
81 self._cmd: Command | None = None
82 self._qos: QosParams | None = None
83 self._cmd_tx_count: int = 0 # was: None
84 self._cmd_tx_limit: int = 0
86 self.set_state(Inactive)
88 def __repr__(self) -> str:
89 msg = f"<ProtocolContext state={repr(self._state)[21:-1]}"
90 if self._cmd is None:
91 return msg + ">"
92 if self._cmd_tx_count == 0: # was: is None
93 return msg + ", tx_count=0/0>"
94 return msg + f", tx_count={self._cmd_tx_count}/{self._cmd_tx_limit}>"
96 @property
97 def state(self) -> _ProtocolStateT:
98 return self._state
100 @property
101 def is_sending(self) -> bool: # TODO: remove asserts
102 if isinstance(self._state, WantEcho | WantRply):
103 assert self._cmd is not None, f"{self}: Coding error" # mypy hint
104 assert self._qos is not None, f"{self}: Coding error" # mypy hint
105 assert self._fut is not None, f"{self}: Coding error" # mypy hint
106 return True
108 assert self._cmd is None, f"{self}: Coding error" # mypy hint
109 assert self._qos is None, f"{self}: Coding error" # mypy hint
110 assert self._fut is None or self._fut.done(), (
111 f"{self}: Coding error"
112 ) # mypy hint
113 return False
115 def set_state(
116 self,
117 state_class: _ProtocolStateClassT,
118 expired: bool = False,
119 timed_out: bool = False,
120 exception: Exception | None = None,
121 result: Packet | None = None,
122 ) -> None:
123 async def expire_state_on_timeout() -> None:
124 # a separate coro, so can be spawned off with create_task()
126 assert self._cmd is not None # mypy
128 assert isinstance(self.is_sending, bool), (
129 f"{self}: Coding error"
130 ) # TODO: remove
131 assert self._cmd_tx_count > 0, f"{self}: Coding error" # TODO: remove
133 if isinstance(self._state, WantEcho): # otherwise is WantRply
134 delay = self.echo_timeout * (2**self._multiplier)
135 # elif self._cmd.code == Code._0404:
136 # delay = self.reply_timeout * (2**self._multiplier) * 2
137 else: # isinstance(self._state, WantRply):
138 delay = self.reply_timeout * (2**self._multiplier)
140 # assuming success, multiplier can be decremented...
141 self._multiplier, old_val = max(0, self._multiplier - 1), self._multiplier
143 await asyncio.sleep(delay) # ideally, will be interrupted by wait_for()
145 # nope, was not successful, so multiplier should be incremented...
146 self._multiplier = min(3, old_val + 1)
148 if isinstance(self._state, WantEcho):
149 _LOGGER.warning(
150 f"Timeout expired waiting for echo: {self} (delay={delay})"
151 )
152 else: # isinstance(self._state, WantRply):
153 _LOGGER.warning(
154 f"Timeout expired waiting for reply: {self} (delay={delay})"
155 )
157 assert isinstance(self.is_sending, bool), (
158 f"{self}: Coding error"
159 ) # TODO: remove
161 # Timer has expired, can we retry or are we done?
162 assert isinstance(self._cmd_tx_count, int)
164 if self._cmd_tx_count < self._cmd_tx_limit:
165 self.set_state(WantEcho, timed_out=True)
166 else:
167 self.set_state(IsInIdle, expired=True)
169 assert isinstance(self.is_sending, bool), (
170 f"{self}: Coding error"
171 ) # TODO: remove
173 def effect_state(timed_out: bool) -> None:
174 """Take any actions indicated by state, and optionally set expiry timer."""
175 # a separate function, so can be spawned off with call_soon()
177 assert isinstance(self.is_sending, bool), (
178 f"{self}: Coding error"
179 ) # TODO: remove
181 if timed_out:
182 assert self._cmd is not None, f"{self}: Coding error" # mypy hint
183 self._send_cmd(self._cmd, is_retry=True)
185 if isinstance(self._state, IsInIdle):
186 self._loop.call_soon_threadsafe(self._check_buffer_for_cmd)
188 elif isinstance(self._state, WantRply) and not self._qos.wait_for_reply: # type: ignore[union-attr]
189 self.set_state(IsInIdle, result=self._state._echo_pkt)
191 elif isinstance(self._state, WantEcho | WantRply):
192 self._expiry_timer = self._loop.create_task(expire_state_on_timeout())
194 if self._expiry_timer is not None:
195 self._expiry_timer.cancel("Changing state")
196 self._expiry_timer = None
198 # when _fut.done(), three possibilities:
199 # _fut.set_result()
200 # _fut.set_exception()
201 # _fut.cancel() (incl. via a send_cmd(qos.timeout) -> wait_for(timeout))
203 # Changing the order of the following is fraught with danger
204 # Formatitng: [TRACE_FSM] [State: Old->New] [Reason] | ...
206 current_state_name = self._state.__class__.__name__
207 new_state_name = state_class.__name__
208 transition = f"{current_state_name}->{new_state_name}"
210 if self._fut is None: # logging only - IsInIdle, Inactive
211 _LOGGER.debug(
212 f"FSM state changed {transition}: no active future (ctx={self})"
213 )
214 assert self._cmd is None, f"{self}: Coding error" # mypy hint
215 assert isinstance(self._state, IsInIdle | Inactive | None), (
216 f"{self}: Coding error"
217 ) # mypy hint
219 elif self._fut.cancelled() and not isinstance(self._state, IsInIdle):
220 # cancelled by wait_for(timeout), cancel("buffer overflow"), or other?
221 # was for previous send_cmd if currently IsInIdle (+/- Inactive?)
222 _LOGGER.debug(
223 f"FSM state changed {transition}: future cancelled (expired={expired}, ctx={self})"
224 )
225 assert self._cmd is not None, f"{self}: Coding error" # mypy hint
226 assert isinstance(self._state, WantEcho | WantRply), (
227 f"{self}: Coding error"
228 ) # mypy hint
230 elif exception:
231 _LOGGER.debug(
232 f"FSM state changed {transition}: exception occurred (error={exception}, ctx={self})"
233 )
234 assert not self._fut.done(), (
235 f"{self}: Coding error ({self._fut})"
236 ) # mypy hint
237 assert isinstance(self._state, WantEcho | WantRply), (
238 f"{self}: Coding error"
239 ) # mypy hint
240 self._fut.set_exception(exception) # apologise to the sender
242 elif result:
243 _LOGGER.debug(
244 f"FSM state changed {transition}: result received (result={result._hdr}, ctx={self})"
245 )
246 assert not self._fut.done(), (
247 f"{self}: Coding error ({self._fut})"
248 ) # mypy hint
249 assert isinstance(self._state, WantEcho | WantRply), (
250 f"{self}: Coding error"
251 ) # mypy hint
252 self._fut.set_result(result)
254 elif expired: # by expire_state_on_timeout(echo_timeout/reply_timeout)
255 _LOGGER.debug(f"FSM state changed {transition}: timer expired (ctx={self})")
256 assert not self._fut.done(), (
257 f"{self}: Coding error ({self._fut})"
258 ) # mypy hint
259 assert isinstance(self._state, WantEcho | WantRply), (
260 f"{self}: Coding error"
261 ) # mypy hint
262 self._fut.set_exception(
263 exc.ProtocolSendFailed(f"{self}: Exceeded maximum retries")
264 )
266 else: # logging only - WantEcho, WantRply
267 _LOGGER.debug(f"FSM state changed {transition}: successful (ctx={self})")
268 assert self._fut is None or self._fut.cancelled() or not self._fut.done(), (
269 f"{self}: Coding error ({self._fut})"
270 ) # mypy hint
271 # sert isinstance(self._state, WantEcho | WantRply), f"{self}: Coding error" # mypy hint
273 prev_state = self._state # for _DBG_MAINTAIN_STATE_CHAIN
275 self._state = state_class(self) # keep atomic with tx_count / tx_limit calcs
277 if _DBG_MAINTAIN_STATE_CHAIN: # for debugging
278 # tattr(prev_state, "_next_state", self._state)
279 setattr(self._state, "_prev_state", prev_state) # noqa: B010
281 if timed_out: # isinstance(self._state, WantEcho):
282 assert isinstance(self._cmd_tx_count, int), (
283 f"{self}: Coding error"
284 ) # mypy hint
285 self._cmd_tx_count += 1
287 elif isinstance(self._state, WantEcho):
288 assert self._qos is not None, f"{self}: Coding error" # mypy hint
289 # self._cmd_tx_limit = min(self._qos.max_retries, self.max_retry_limit) + 1
290 self._cmd_tx_count = 1
292 elif not isinstance(self._state, WantRply): # IsInIdle, IsInactive
293 self._cmd = self._qos = None
294 self._cmd_tx_count = 0 # was: = None
296 assert isinstance(self.is_sending, bool) # TODO: remove
298 # remaining code spawned off with a call_soon(), so early return to caller
299 self._loop.call_soon_threadsafe(effect_state, timed_out) # calls expire_state
301 if not isinstance(self._state, WantRply):
302 # _LOGGER.debug("AFTER. = %s", self)
303 return
305 assert self._qos is not None, f"{self}: Coding error" # mypy hint
306 # _LOGGER.debug("AFTER. = %s: wait_for_reply=%s", self, self._qos.wait_for_reply)
308 def connection_made(self, transport: RamsesTransportT) -> None:
309 # may want to set some instance variables, according to type of transport
310 self._state.connection_made()
312 # TODO: Should we clear the buffer if connection is lost (and apologise to senders?
313 def connection_lost(self, err: ExceptionT | None) -> None:
314 self._state.connection_lost()
316 def pkt_received(self, pkt: Packet) -> Any:
317 self._state.pkt_rcvd(pkt)
319 def pause_writing(self) -> None:
320 self._state.writing_paused()
322 def resume_writing(self) -> None:
323 self._state.writing_resumed()
325 async def send_cmd(
326 self,
327 send_fnc: Callable[[Command], Coroutine[Any, Any, None]], # TODO: remove
328 cmd: Command,
329 priority: Priority,
330 qos: QosParams,
331 ) -> Packet:
332 self._send_fnc = send_fnc # TODO: REMOVE: make per Context, not per Command
334 if isinstance(self._state, Inactive):
335 raise exc.ProtocolSendFailed(f"{self}: Send failed (no active transport?)")
337 assert self._loop is asyncio.get_running_loop() # BUG is here
339 fut: _FutureT = self._loop.create_future()
340 try:
341 self._que.put_nowait((priority, dt.now(), cmd, qos, fut))
342 except Full as err:
343 fut.cancel("Send buffer overflow")
344 raise exc.ProtocolSendFailed(f"{self}: Send buffer overflow") from err
346 if isinstance(self._state, IsInIdle):
347 self._loop.call_soon_threadsafe(self._check_buffer_for_cmd)
349 timeout = min( # needs to be greater than worse-case via set_state engine
350 qos.timeout, self.SEND_TIMEOUT_LIMIT
351 ) # incl. time queued in buffer
352 try:
353 await asyncio.wait_for(fut, timeout=timeout)
354 except TimeoutError as err: # incl. fut.cancel()
355 msg = f"{self}: Expired global timer after {timeout} sec"
356 _LOGGER.warning(
357 "TOUT.. = %s: send_timeout=%s (%s)", self, timeout, self._cmd is cmd
358 )
359 if self._cmd is cmd: # NOTE: # this cmd may not yet be self._cmd
360 self.set_state(
361 IsInIdle, expired=True
362 ) # set_exception() will cause InvalidStateError
363 raise exc.ProtocolSendFailed(msg) from err # make msg *before* state reset
365 try:
366 return fut.result()
367 except exc.ProtocolSendFailed:
368 raise
369 except (exc.ProtocolError, exc.TransportError) as err: # incl. ProtocolFsmError
370 raise exc.ProtocolSendFailed(f"{self}: Send failed: {err}") from err
372 def _check_buffer_for_cmd(self) -> None:
373 self._lock.acquire()
374 assert isinstance(self.is_sending, bool), f"{self}: Coding error" # mypy hint
376 if self._fut is not None and not self._fut.done():
377 self._lock.release()
378 return
380 while True:
381 try:
382 *_, self._cmd, self._qos, self._fut = self._que.get_nowait()
383 except Empty:
384 self._cmd = self._qos = self._fut = None
385 self._lock.release()
386 return
388 self._cmd_tx_count = 0
389 self._cmd_tx_limit = min(self._qos.max_retries, self.max_retry_limit) + 1
391 assert isinstance(self._fut, asyncio.Future) # mypy hint
392 if self._fut.done(): # e.g. TimeoutError
393 self._que.task_done()
394 continue
396 break
398 self._lock.release()
400 try:
401 assert self._cmd is not None, f"{self}: Coding error" # mypy hint
402 self._send_cmd(self._cmd)
403 finally:
404 self._que.task_done()
406 def _send_cmd(self, cmd: Command, is_retry: bool = False) -> None:
407 """Wrapper to send a command with retries, until success or exception."""
409 async def send_fnc_wrapper(cmd: Command) -> None:
410 try: # the wrapped function (actual Tx.write)
411 await self._send_fnc(cmd)
412 except exc.TransportError as err:
413 self.set_state(IsInIdle, exception=err)
415 # TODO: check what happens when exception here - why does it hang?
416 assert cmd is not None, f"{self}: Coding error"
418 try: # the wrapped function (actual Tx.write)
419 self._state.cmd_sent(cmd, is_retry=is_retry)
420 except exc.ProtocolFsmError as err:
421 self.set_state(IsInIdle, exception=err)
422 else:
423 self._loop.create_task(send_fnc_wrapper(cmd))
426# With wait_for_reply=False
427# AFTER. = <ProtocolContext state=IsInIdle>
428# BEFORE = <ProtocolContext state=IsInIdle cmd_=2349|RQ|01:145038|08, tx_count=0/4>
429# AFTER. = <ProtocolContext state=WantEcho cmd_=2349|RQ|01:145038|08, tx_count=1/4>
430# BEFORE = <ProtocolContext state=WantEcho echo=2349|RQ|01:145038|08, tx_count=1/4>
431# AFTER. = <ProtocolContext state=WantRply echo=2349|RQ|01:145038|08, tx_count=1/4>: wait_for_reply=False
432#
433# BEFORE = <ProtocolContext state=WantRply echo=2349|RQ|01:145038|08, tx_count=1/4>: result=2349|RQ|01:145038|08
434# AFTER. = <ProtocolContext state=IsInIdle>
436# With wait_for_reply=True
437# AFTER. = <ProtocolContext state=IsInIdle>
438# BEFORE = <ProtocolContext state=IsInIdle cmd_=0004|RQ|01:145038|05, tx_count=0/4>
439# AFTER. = <ProtocolContext state=WantEcho cmd_=0004|RQ|01:145038|05, tx_count=1/4>
440# BEFORE = <ProtocolContext state=WantEcho echo=0004|RQ|01:145038|05, tx_count=1/4>
441# AFTER. = <ProtocolContext state=WantRply echo=0004|RQ|01:145038|05, tx_count=1/4>: wait_for_reply=True
442#
443# BEFORE = <ProtocolContext state=WantRply rply=0004|RP|01:145038|05, tx_count=1/4>: result=0004|RP|01:145038|05
444# AFTER. = <ProtocolContext state=IsInIdle>
446#######################################################################################
448# NOTE: Because .dst / .src may switch from Address to Device from one pkt to the next:
449# - use: pkt.dst.id == self._echo_pkt.src.id
450# - not: pkt.dst is self._echo_pkt.src
453class ProtocolStateBase:
454 def __init__(self, context: ProtocolContext) -> None:
455 self._context = context
457 self._sent_cmd: Command | None = None
458 self._echo_pkt: Packet | None = None
459 self._rply_pkt: Packet | None = None
461 def __repr__(self) -> str:
462 msg = f"<ProtocolState state={self.__class__.__name__}"
463 if self._rply_pkt:
464 return msg + f" rply={self._rply_pkt._hdr}>"
465 if self._echo_pkt:
466 return msg + f" echo={self._echo_pkt._hdr}>"
467 if self._sent_cmd:
468 return msg + f" cmd_={self._sent_cmd._hdr}>"
469 return msg + ">"
471 def connection_made(self) -> None: # For all states except Inactive
472 """Do nothing, as (except for InActive) we're already connected."""
473 pass
475 def connection_lost(self) -> None: # Varies by states (not needed if Inactive)
476 """Transition to Inactive, regardless of current state."""
478 if isinstance(self._context._state, Inactive):
479 return
481 if isinstance(self._context._state, IsInIdle):
482 self._context.set_state(Inactive)
483 return
485 self._context.set_state(
486 Inactive, exception=exc.TransportError("Connection lost")
487 )
489 def pkt_rcvd(self, pkt: Packet) -> None: # Different for each state
490 """Raise a NotImplementedError."""
491 raise NotImplementedError("Invalid state to receive a packet")
493 def writing_paused(self) -> None: # Currently same for all states (TBD)
494 """Do nothing."""
495 pass
497 def writing_resumed(self) -> None: # Currently same for all states (TBD)
498 """Do nothing."""
499 pass
501 def cmd_sent( # For all except IsInIdle, WantEcho
502 self, cmd: Command, is_retry: bool | None = None
503 ) -> None:
504 raise exc.ProtocolFsmError(f"Invalid state to send a command: {self._context}")
507class Inactive(ProtocolStateBase):
508 """The Protocol is not connected to the transport layer."""
510 def connection_made(self) -> None:
511 """Transition to IsInIdle."""
512 self._context.set_state(IsInIdle)
514 def pkt_rcvd(self, pkt: Packet) -> None: # raise ProtocolFsmError
515 """Raise an exception, as a packet is not expected in this state."""
517 assert self._sent_cmd is None, f"{self}: Coding error"
519 if pkt.code != Code._PUZZ:
520 _LOGGER.warning("%s: Invalid state to receive a packet", self._context)
523class IsInIdle(ProtocolStateBase):
524 """The Protocol is not in the process of sending a Command."""
526 def pkt_rcvd(self, pkt: Packet) -> None: # Do nothing
527 """Do nothing as we're not expecting an echo, nor a reply."""
529 assert self._sent_cmd is None, f"{self}: Coding error"
531 pass
533 def cmd_sent( # Will expect an Echo
534 self, cmd: Command, is_retry: bool | None = None
535 ) -> None:
536 """Transition to WantEcho."""
538 assert self._sent_cmd is None and is_retry is False, f"{self}: Coding error"
540 self._sent_cmd = cmd
542 # HACK for headers with sentinel values:
543 # I --- 18:000730 18:222222 --:------ 30C9 003 000333 # 30C9| I|18:000730, *but* will be: 30C9| I|18:222222
544 # I --- --:------ --:------ 18:000730 0008 002 00BB # 0008| I|18:000730|00, *and* will be unchanged
546 if HGI_DEVICE_ID in cmd.tx_header: # HACK: what do I do about this
547 cmd._hdr_ = cmd._hdr_.replace(HGI_DEVICE_ID, self._context._protocol.hgi_id)
548 self._context.set_state(WantEcho)
551class WantEcho(ProtocolStateBase):
552 """The Protocol is waiting to receive an echo Packet."""
554 # NOTE: unfortunately, the cmd's src / echo's src can be different:
555 # RQ --- 18:000730 10:052644 --:------ 3220 005 0000050000 # RQ|10:048122|3220|05
556 # RQ --- 18:198151 10:052644 --:------ 3220 005 0000050000 # RQ|10:048122|3220|05
558 def __init__(self, context: ProtocolContext) -> None:
559 super().__init__(context)
561 self._sent_cmd = context._state._sent_cmd
562 # if isinstance(context._state, WantEcho | WantRply):
563 # self._echo_pkt = context._state._echo_pkt
564 # else:
565 # self._echo_pkt = None
567 def pkt_rcvd(self, pkt: Packet) -> None: # Check if pkt is expected Echo
568 """If the pkt is the expected Echo, transition to IsInIdle, or WantRply."""
570 # RQ --- 18:002563 01:078710 --:------ 2349 002 0200 # 2349|RQ|01:078710|02
571 # RP --- 01:078710 18:002563 --:------ 2349 007 0201F400FFFFFF # 2349|RP|01:078710|02
572 # W --- 30:257306 01:096339 --:------ 313F 009 0060002916050B07E7 # 313F| W|01:096339
573 # I --- 01:096339 30:257306 --:------ 313F 009 00FC0029D6050B07E7 # 313F| I|01:096339
575 assert self._sent_cmd, f"{self}: Coding error" # mypy hint
577 # if self._sent_cmd.rx_header and pkt._hdr == self._sent_cmd.rx_header:
578 # _LOGGER.error("hdr=%s", self._sent_cmd.rx_header)
579 # _LOGGER.error("src=%s", self._sent_cmd.src.id)
580 # _LOGGER.error("dst=%s", pkt.dst.id)
582 if (
583 self._sent_cmd.rx_header
584 and pkt._hdr == self._sent_cmd.rx_header
585 and (
586 pkt.dst.id == self._sent_cmd.src.id
587 or ( # handle: 18:146440 == 18:000730
588 self._sent_cmd.src.id == HGI_DEVICE_ID
589 and pkt.dst.id == self._context._protocol.hgi_id
590 )
591 )
592 ):
593 _LOGGER.warning(
594 "%s: Invalid state to receive a reply (expecting echo)", self._context
595 )
597 self._rply_pkt = pkt
598 self._context.set_state(IsInIdle, result=pkt)
599 return
601 # HACK for packets with addr sets like (issue is only with sentinel values?):
602 # I --- --:------ --:------ 18:000730 0008 002 00BB
604 if HGI_DEVICE_ID in pkt._hdr: # HACK: what do I do about this?
605 pkt__hdr = pkt._hdr_.replace(HGI_DEVICE_ID, self._context._protocol.hgi_id)
606 else:
607 pkt__hdr = pkt._hdr
609 if pkt__hdr != self._sent_cmd.tx_header:
610 return
612 # # HACK: for testing - drop some packets
613 # import random
614 # if random.random() < 0.2:
615 # return
617 self._echo_pkt = pkt
618 if self._sent_cmd.rx_header:
619 self._context.set_state(WantRply)
620 else:
621 self._context.set_state(IsInIdle, result=pkt)
623 def cmd_sent(self, cmd: Command, is_retry: bool | None = None) -> None:
624 """Transition to WantEcho (i.e. a retransmit)."""
626 assert self._sent_cmd is not None and is_retry is True, f"{self}: Coding error"
628 # NOTE: don't self._context.set_state(WantEcho) here - may cause endless loop
631class WantRply(ProtocolStateBase):
632 """The Protocol is waiting to receive an reply Packet."""
634 # NOTE: is possible get a false rply (same rx_header), e.g.:
635 # RP --- 10:048122 18:198151 --:------ 3220 005 00C0050000 # 3220|RP|10:048122|05
636 # RP --- 10:048122 01:145038 --:------ 3220 005 00C0050000 # 3220|RP|10:048122|05
638 # NOTE: unfortunately, the cmd's src / rply's dst can still be different:
639 # RQ --- 18:000730 10:052644 --:------ 3220 005 0000050000 # 3220|RQ|10:048122|05
640 # RP --- 10:048122 18:198151 --:------ 3220 005 00C0050000 # 3220|RP|10:048122|05
642 def __init__(self, context: ProtocolContext) -> None:
643 super().__init__(context)
645 self._sent_cmd = context._state._sent_cmd
646 self._echo_pkt = context._state._echo_pkt
648 def pkt_rcvd(self, pkt: Packet) -> None: # Check if pkt is expected Reply
649 """If the pkt is the expected reply, transition to IsInIdle."""
651 assert self._sent_cmd, f"{self}: Coding error" # mypy hint
652 assert self._echo_pkt, f"{self}: Coding error" # mypy hint
654 # NOTE: beware collisions: same header, but is not reply (must check RSSI or src)
655 # 2024-04-16 08:28:33.895 000 RQ --- 18:146440 10:048122 --:------ 3220 005 0000110000 # 3220|RQ|10:048122|11
656 # 2024-04-16 08:28:33.910 052 RQ --- 01:145038 10:048122 --:------ 3220 005 0000110000 # 3220|RQ|10:048122|11
658 if pkt._hdr == self._sent_cmd.tx_header and pkt.src == self._echo_pkt.src:
659 _LOGGER.warning(
660 "%s: Invalid state to receive an echo (expecting reply)", self._context
661 )
662 return # do not transition, wait until existing timer expires
664 # HACK: special case: if null log entry for log_idx=nn, then
665 # HACK: rx_hdr will be 0418|RP|01:145038|00, and not 0418|RP|01:145038|nn
666 # HACK: wait_for_reply must be true for RQ|0418 commands
667 if (
668 self._sent_cmd.rx_header[:8] == "0418|RP|" # type: ignore[index]
669 and self._sent_cmd.rx_header[:-2] == pkt._hdr[:-2] # type: ignore[index]
670 and pkt.payload == "000000B0000000000000000000007FFFFF7000000000"
671 ):
672 self._rply_pkt = pkt
674 elif pkt._hdr != self._sent_cmd.rx_header:
675 return
677 else:
678 self._rply_pkt = pkt
680 self._context.set_state(IsInIdle, result=pkt)
683#######################################################################################
686_ProtocolStateT: TypeAlias = Inactive | IsInIdle | WantEcho | WantRply
688_ProtocolStateClassT: TypeAlias = (
689 type[Inactive] | type[IsInIdle] | type[WantEcho] | type[WantRply]
690)