Coverage for src/ramses_tx/packet.py: 33%

115 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - a RAMSES-II protocol decoder & analyser. 

3 

4Decode/process a packet (packet that was received). 

5""" 

6 

7from __future__ import annotations 

8 

9from datetime import datetime as dt, timedelta as td 

10from typing import Any 

11 

12from . import exceptions as exc 

13from .command import Command 

14from .frame import Frame 

15from .logger import getLogger # overridden logger.getLogger 

16from .opentherm import PARAMS_DATA_IDS, SCHEMA_DATA_IDS, STATUS_DATA_IDS 

17from .ramses import CODES_SCHEMA, SZ_LIFESPAN 

18 

19from .const import ( # noqa: F401, isort: skip, pylint: disable=unused-import 

20 I_, 

21 RP, 

22 RQ, 

23 W_, 

24 Code, 

25) 

26 

27 

28# these trade memory for speed 

29_TD_SECS_000 = td(seconds=0) 

30_TD_SECS_003 = td(seconds=3) 

31_TD_SECS_360 = td(seconds=360) 

32_TD_MINS_005 = td(minutes=5) 

33_TD_MINS_060 = td(minutes=60) 

34_TD_MINS_360 = td(minutes=360) 

35_TD_DAYS_001 = td(minutes=60 * 24) 

36 

37 

38PKT_LOGGER = getLogger(f"{__name__}_log", pkt_log=True) 

39 

40 

41class Packet(Frame): 

42 """The Packet class (pkts that were received); will trap/log invalid pkts. 

43 

44 They have a datetime (when received) an RSSI, and other meta-fields. 

45 """ 

46 

47 _dtm: dt 

48 _rssi: str 

49 

50 def __init__(self, dtm: dt, frame: str, **kwargs: Any) -> None: 

51 """Create a packet from a raw frame string. 

52 

53 :param dtm: The timestamp when the packet was received 

54 :type dtm: dt 

55 :param frame: The raw frame string, typically including RSSI 

56 :type frame: str 

57 :param kwargs: Metadata including 'comment', 'err_msg', or 'raw_frame' 

58 :raises PacketInvalid: If the frame content is malformed. 

59 """ 

60 

61 super().__init__(frame[4:]) # remove RSSI 

62 

63 self._dtm: dt = dtm 

64 

65 self._rssi: str = frame[0:3] 

66 

67 self.comment: str = kwargs.get("comment", "") 

68 self.error_text: str = kwargs.get("err_msg", "") 

69 self.raw_frame: str = kwargs.get("raw_frame", "") 

70 

71 self._lifespan: bool | td = pkt_lifespan(self) or False 

72 

73 self._validate(strict_checking=False) 

74 

75 def _validate(self, *, strict_checking: bool = False) -> None: 

76 """Validate the packet, and parse the addresses if so (will log all packets). 

77 

78 Raise an exception InvalidPacketError (InvalidAddrSetError) if it is not valid. 

79 """ 

80 

81 try: 

82 if self.error_text: 

83 raise exc.PacketInvalid(self.error_text) 

84 

85 if not self._frame and self.comment: # log null pkts only if has a comment 

86 raise exc.PacketInvalid("Null packet") 

87 

88 super()._validate(strict_checking=strict_checking) # no RSSI 

89 

90 # FIXME: this is messy 

91 PKT_LOGGER.info("", extra=self.__dict__) # the packet.log line 

92 

93 except exc.PacketInvalid as err: # incl. InvalidAddrSetError 

94 if self._frame or self.error_text: 

95 PKT_LOGGER.warning("%s", err, extra=self.__dict__) 

96 raise err 

97 

98 def __repr__(self) -> str: 

99 """Return an unambiguous string representation of this object.""" 

100 # e.g.: RQ --- 18:000730 01:145038 --:------ 000A 002 0800 # 000A|RQ|01:145038|08 

101 try: 

102 hdr = f" # {self._hdr}{f' ({self._ctx})' if self._ctx else ''}" 

103 except (exc.PacketInvalid, NotImplementedError): 

104 hdr = "" 

105 try: 

106 dtm = self.dtm.isoformat(timespec="microseconds") 

107 except AttributeError: 

108 dtm = dt.min.isoformat(timespec="microseconds") 

109 return f"{dtm} ... {self}{hdr}" 

110 

111 def __str__(self) -> str: 

112 """Return a brief readable string representation of this object aka 'header'.""" 

113 # e.g.: 000A|RQ|01:145038|08 

114 return super().__repr__() # TODO: self._hdr 

115 

116 @property 

117 def dtm(self) -> dt: 

118 return self._dtm 

119 

120 @staticmethod 

121 def _partition(pkt_line: str) -> tuple[str, str, str]: # map[str] 

122 """Partition a packet line into its three parts. 

123 

124 Format: packet[ < parser-hint: ...][ * evofw3-err_msg][ # evofw3-comment] 

125 """ 

126 

127 fragment, _, comment = pkt_line.partition("#") 

128 fragment, _, err_msg = fragment.partition("*") 

129 pkt_str, _, _ = fragment.partition("<") # discard any parser hints 

130 return map(str.strip, (pkt_str, err_msg, comment)) # type: ignore[return-value] 

131 

132 @classmethod 

133 def _from_cmd(cls, cmd: Command, dtm: dt | None = None) -> Packet: 

134 """Create a Packet from a Command.""" 

135 if dtm is None: 

136 dtm = dt.now() 

137 return cls.from_port(dtm, f"... {cmd._frame}") 

138 

139 @classmethod 

140 def from_dict(cls, dtm: str, pkt_line: str) -> Packet: 

141 """Create a packet from a saved state (a curated dict).""" 

142 frame, _, comment = cls._partition(pkt_line) 

143 return cls(dt.fromisoformat(dtm), frame, comment=comment) 

144 

145 @classmethod 

146 def from_file(cls, dtm: str, pkt_line: str) -> Packet: 

147 """Create a packet from a log file line.""" 

148 frame, err_msg, comment = cls._partition(pkt_line) 

149 if not frame: 

150 raise ValueError(f"null frame: >>>{frame}<<<") 

151 return cls(dt.fromisoformat(dtm), frame, err_msg=err_msg, comment=comment) 

152 

153 @classmethod 

154 def from_port(cls, dtm: dt, pkt_line: str, raw_line: bytes | None = None) -> Packet: 

155 """Create a packet from a USB port (HGI80, evofw3).""" 

156 frame, err_msg, comment = cls._partition(pkt_line) 

157 if not frame: 

158 raise ValueError(f"null frame: >>>{frame}<<<") 

159 return cls(dtm, frame, err_msg=err_msg, comment=comment, raw_frame=raw_line) 

160 

161 

162# TODO: remove None as a possible return value 

163def pkt_lifespan(pkt: Packet) -> td: # import OtbGateway?? 

164 """Return the lifespan of a packet before it expires. 

165 

166 :param pkt: The packet instance to evaluate 

167 :type pkt: Packet 

168 :return: The duration the packet's data remains valid 

169 :rtype: td 

170 """ 

171 

172 if pkt.verb in (RQ, W_): 

173 return _TD_SECS_000 

174 

175 if pkt.code in (Code._0005, Code._000C): 

176 return _TD_DAYS_001 

177 

178 if pkt.code == Code._0006: 

179 return _TD_MINS_060 

180 

181 if pkt.code == Code._0404: # 0404 tombstoned by incremented 0006 

182 return _TD_DAYS_001 

183 

184 if pkt.code == Code._000A and pkt._has_array: 

185 return _TD_MINS_060 # sends I /1h 

186 

187 if pkt.code == Code._10E0: # but: what if valid pkt with a corrupt src_id 

188 return _TD_DAYS_001 

189 

190 if pkt.code == Code._1F09: # sends I /sync_cycle 

191 # can't do better than 300s with reading the payload 

192 return _TD_SECS_360 if pkt.verb == I_ else _TD_SECS_000 

193 

194 if pkt.code == Code._1FC9 and pkt.verb == RP: 

195 return _TD_DAYS_001 # TODO: check other verbs, they seem variable 

196 

197 if pkt.code in (Code._2309, Code._30C9) and pkt._has_array: # sends I /sync_cycle 

198 return _TD_SECS_360 

199 

200 if pkt.code == Code._3220: # FIXME: 2.1 means we can miss two packets 

201 # if pkt.payload[4:6] in WRITE_MSG_IDS: # and Write-Data: # TODO 

202 # return _TD_SECS_003 * 2.1 

203 if int(pkt.payload[4:6], 16) in SCHEMA_DATA_IDS: 

204 return _TD_MINS_360 * 2.1 

205 if int(pkt.payload[4:6], 16) in PARAMS_DATA_IDS: 

206 return _TD_MINS_060 * 2.1 

207 if int(pkt.payload[4:6], 16) in STATUS_DATA_IDS: 

208 return _TD_MINS_005 * 2.1 

209 return _TD_MINS_005 * 2.1 

210 

211 # if pkt.code in (Code._3B00, Code._3EF0, ): # TODO: 0008, 3EF0, 3EF1 

212 # return td(minutes=6.7) # TODO: WIP 

213 

214 if (code := CODES_SCHEMA.get(pkt.code)) and SZ_LIFESPAN in code: 

215 result: bool | td | None = CODES_SCHEMA[pkt.code][SZ_LIFESPAN] 

216 return result if isinstance(result, td) else _TD_MINS_060 

217 

218 return _TD_MINS_060 # applies to lots of HVAC packets