Coverage for src/ramses_tx/typing.py: 79%

57 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - Typing for RamsesProtocol & RamsesTransport.""" 

3 

4import asyncio 

5from collections.abc import Callable 

6from datetime import datetime as dt 

7from typing import Any, Protocol, TypeVar 

8 

9from serial import Serial # type: ignore[import-untyped] 

10 

11from .command import Command 

12from .const import ( 

13 DEFAULT_GAP_DURATION, 

14 DEFAULT_MAX_RETRIES, 

15 DEFAULT_NUM_REPEATS, 

16 DEFAULT_SEND_TIMEOUT, 

17 DEFAULT_WAIT_FOR_REPLY, 

18 Priority, 

19) 

20from .message import Message 

21from .packet import Packet 

22 

23ExceptionT = TypeVar("ExceptionT", bound=type[Exception]) 

24MsgFilterT = Callable[[Message], bool] 

25MsgHandlerT = Callable[[Message], None] 

26SerPortNameT = str 

27 

28 

29class QosParams: 

30 """A container for QoS attributes and state.""" 

31 

32 def __init__( 

33 self, 

34 *, 

35 max_retries: int | None = DEFAULT_MAX_RETRIES, 

36 timeout: float | None = DEFAULT_SEND_TIMEOUT, 

37 wait_for_reply: bool | None = DEFAULT_WAIT_FOR_REPLY, 

38 ) -> None: 

39 """Create a QosParams instance.""" 

40 

41 self._max_retries = DEFAULT_MAX_RETRIES if max_retries is None else max_retries 

42 self._timeout = timeout or DEFAULT_SEND_TIMEOUT 

43 self._wait_for_reply = wait_for_reply 

44 

45 self._echo_pkt: Packet | None = None 

46 self._rply_pkt: Packet | None = None 

47 

48 self._dt_cmd_sent: dt | None = None 

49 self._dt_echo_rcvd: dt | None = None 

50 self._dt_rply_rcvd: dt | None = None 

51 

52 @property 

53 def max_retries(self) -> int: 

54 return self._max_retries 

55 

56 @property 

57 def timeout(self) -> float: 

58 return self._timeout 

59 

60 @property 

61 def wait_for_reply(self) -> bool | None: 

62 return self._wait_for_reply 

63 

64 

65class SendParams: 

66 """A container for Send attributes and state.""" 

67 

68 def __init__( 

69 self, 

70 *, 

71 gap_duration: float | None = DEFAULT_GAP_DURATION, 

72 num_repeats: int | None = DEFAULT_NUM_REPEATS, 

73 priority: Priority | None = Priority.DEFAULT, 

74 ) -> None: 

75 """Create a SendParams instance.""" 

76 

77 self._gap_duration = gap_duration or DEFAULT_GAP_DURATION 

78 self._num_repeats = num_repeats or DEFAULT_NUM_REPEATS 

79 self._priority = priority or Priority.DEFAULT 

80 

81 self._dt_cmd_arrived: dt | None = None 

82 self._dt_cmd_queued: dt | None = None 

83 self._dt_cmd_sent: dt | None = None 

84 

85 @property 

86 def gap_duration(self) -> float: 

87 return self._gap_duration 

88 

89 @property 

90 def num_repeats(self) -> int: 

91 return self._num_repeats 

92 

93 @property 

94 def priority(self) -> Priority: 

95 return self._priority 

96 

97 

98class xRamsesTransportT(Protocol): 

99 """A typing.Protocol (i.e. a structural type) of asyncio.Transport.""" 

100 

101 _is_closing: bool 

102 # _is_reading: bool 

103 

104 def __init__( # type: ignore[no-any-unimported] 

105 self, 

106 protocol: asyncio.Protocol, 

107 pkt_source: Serial | dict[str, str] | str, 

108 loop: asyncio.AbstractEventLoop | None = None, 

109 extra: dict[str, Any] | None = None, 

110 **kwargs: Any, 

111 ) -> None: ... 

112 

113 def _dt_now(self) -> dt: ... 

114 

115 def _abort(self, exc: ExceptionT) -> None: # only in serial transport 

116 ... 

117 

118 def _close(self, exc: ExceptionT | None = None) -> None: ... 

119 

120 def close(self) -> None: 

121 """Close the transport gracefully. 

122 

123 Schedules a call to `transport._protocol.connection_lost(None)`.""" 

124 ... 

125 

126 def get_extra_info(self, name: str, default: Any | None = None) -> Any: ... 

127 

128 def is_closing(self) -> bool: ... 

129 

130 # NOTE this should not be included - maybe is a subclasses 

131 # @staticmethod 

132 # def is_hgi80(serial_port: SerPortName) -> None | bool: ... 

133 

134 def is_reading(self) -> bool: ... 

135 

136 def pause_reading(self) -> None: ... 

137 

138 def resume_reading(self) -> None: ... 

139 

140 def send_frame(self, frame: str) -> None: ... 

141 

142 # NOTE RamsesProtocol will not invoke write() directly 

143 def write(self, data: bytes) -> None: ... 

144 

145 

146class xRamsesProtocolT(Protocol): 

147 """A typing.Protocol (i.e. a structural type) of asyncio.Protocol.""" 

148 

149 _msg_handler: MsgHandlerT 

150 _pause_writing: bool 

151 _transport: xRamsesTransportT 

152 

153 def __init__(self, msg_handler: MsgHandlerT) -> None: ... 

154 

155 def add_handler( 

156 self, msg_handler: MsgHandlerT, /, *, msg_filter: MsgFilterT | None = None 

157 ) -> Callable[[], None]: ... 

158 

159 def connection_lost(self, err: ExceptionT | None) -> None: ... 

160 

161 @property 

162 def wait_connection_lost(self) -> asyncio.Future[ExceptionT | None]: ... 

163 

164 def connection_made(self, transport: xRamsesTransportT) -> None: ... 

165 

166 def pause_writing(self) -> None: ... 

167 

168 def pkt_received(self, pkt: Packet) -> None: ... 

169 

170 def resume_writing(self) -> None: ... 

171 

172 async def send_cmd( 

173 self, 

174 cmd: Command, 

175 /, 

176 *, 

177 gap_duration: float = DEFAULT_GAP_DURATION, 

178 num_repeats: int = DEFAULT_NUM_REPEATS, 

179 priority: Priority = Priority.DEFAULT, 

180 qos: QosParams | None = None, 

181 ) -> Packet | None: ...