Coverage for src/ramses_tx/typing.py: 79%
57 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - Typing for RamsesProtocol & RamsesTransport."""
4import asyncio
5from collections.abc import Callable
6from datetime import datetime as dt
7from typing import Any, Protocol, TypeVar
9from serial import Serial # type: ignore[import-untyped]
11from .command import Command
12from .const import (
13 DEFAULT_GAP_DURATION,
14 DEFAULT_MAX_RETRIES,
15 DEFAULT_NUM_REPEATS,
16 DEFAULT_SEND_TIMEOUT,
17 DEFAULT_WAIT_FOR_REPLY,
18 Priority,
19)
20from .message import Message
21from .packet import Packet
23ExceptionT = TypeVar("ExceptionT", bound=type[Exception])
24MsgFilterT = Callable[[Message], bool]
25MsgHandlerT = Callable[[Message], None]
26SerPortNameT = str
29class QosParams:
30 """A container for QoS attributes and state."""
32 def __init__(
33 self,
34 *,
35 max_retries: int | None = DEFAULT_MAX_RETRIES,
36 timeout: float | None = DEFAULT_SEND_TIMEOUT,
37 wait_for_reply: bool | None = DEFAULT_WAIT_FOR_REPLY,
38 ) -> None:
39 """Create a QosParams instance."""
41 self._max_retries = DEFAULT_MAX_RETRIES if max_retries is None else max_retries
42 self._timeout = timeout or DEFAULT_SEND_TIMEOUT
43 self._wait_for_reply = wait_for_reply
45 self._echo_pkt: Packet | None = None
46 self._rply_pkt: Packet | None = None
48 self._dt_cmd_sent: dt | None = None
49 self._dt_echo_rcvd: dt | None = None
50 self._dt_rply_rcvd: dt | None = None
52 @property
53 def max_retries(self) -> int:
54 return self._max_retries
56 @property
57 def timeout(self) -> float:
58 return self._timeout
60 @property
61 def wait_for_reply(self) -> bool | None:
62 return self._wait_for_reply
65class SendParams:
66 """A container for Send attributes and state."""
68 def __init__(
69 self,
70 *,
71 gap_duration: float | None = DEFAULT_GAP_DURATION,
72 num_repeats: int | None = DEFAULT_NUM_REPEATS,
73 priority: Priority | None = Priority.DEFAULT,
74 ) -> None:
75 """Create a SendParams instance."""
77 self._gap_duration = gap_duration or DEFAULT_GAP_DURATION
78 self._num_repeats = num_repeats or DEFAULT_NUM_REPEATS
79 self._priority = priority or Priority.DEFAULT
81 self._dt_cmd_arrived: dt | None = None
82 self._dt_cmd_queued: dt | None = None
83 self._dt_cmd_sent: dt | None = None
85 @property
86 def gap_duration(self) -> float:
87 return self._gap_duration
89 @property
90 def num_repeats(self) -> int:
91 return self._num_repeats
93 @property
94 def priority(self) -> Priority:
95 return self._priority
98class xRamsesTransportT(Protocol):
99 """A typing.Protocol (i.e. a structural type) of asyncio.Transport."""
101 _is_closing: bool
102 # _is_reading: bool
104 def __init__( # type: ignore[no-any-unimported]
105 self,
106 protocol: asyncio.Protocol,
107 pkt_source: Serial | dict[str, str] | str,
108 loop: asyncio.AbstractEventLoop | None = None,
109 extra: dict[str, Any] | None = None,
110 **kwargs: Any,
111 ) -> None: ...
113 def _dt_now(self) -> dt: ...
115 def _abort(self, exc: ExceptionT) -> None: # only in serial transport
116 ...
118 def _close(self, exc: ExceptionT | None = None) -> None: ...
120 def close(self) -> None:
121 """Close the transport gracefully.
123 Schedules a call to `transport._protocol.connection_lost(None)`."""
124 ...
126 def get_extra_info(self, name: str, default: Any | None = None) -> Any: ...
128 def is_closing(self) -> bool: ...
130 # NOTE this should not be included - maybe is a subclasses
131 # @staticmethod
132 # def is_hgi80(serial_port: SerPortName) -> None | bool: ...
134 def is_reading(self) -> bool: ...
136 def pause_reading(self) -> None: ...
138 def resume_reading(self) -> None: ...
140 def send_frame(self, frame: str) -> None: ...
142 # NOTE RamsesProtocol will not invoke write() directly
143 def write(self, data: bytes) -> None: ...
146class xRamsesProtocolT(Protocol):
147 """A typing.Protocol (i.e. a structural type) of asyncio.Protocol."""
149 _msg_handler: MsgHandlerT
150 _pause_writing: bool
151 _transport: xRamsesTransportT
153 def __init__(self, msg_handler: MsgHandlerT) -> None: ...
155 def add_handler(
156 self, msg_handler: MsgHandlerT, /, *, msg_filter: MsgFilterT | None = None
157 ) -> Callable[[], None]: ...
159 def connection_lost(self, err: ExceptionT | None) -> None: ...
161 @property
162 def wait_connection_lost(self) -> asyncio.Future[ExceptionT | None]: ...
164 def connection_made(self, transport: xRamsesTransportT) -> None: ...
166 def pause_writing(self) -> None: ...
168 def pkt_received(self, pkt: Packet) -> None: ...
170 def resume_writing(self) -> None: ...
172 async def send_cmd(
173 self,
174 cmd: Command,
175 /,
176 *,
177 gap_duration: float = DEFAULT_GAP_DURATION,
178 num_repeats: int = DEFAULT_NUM_REPEATS,
179 priority: Priority = Priority.DEFAULT,
180 qos: QosParams | None = None,
181 ) -> Packet | None: ...