Coverage for src/ramses_tx/address.py: 44%

104 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - a RAMSES-II protocol decoder & analyser.""" 

3 

4from __future__ import annotations 

5 

6from functools import lru_cache 

7from typing import TYPE_CHECKING, Final 

8 

9from . import exceptions as exc 

10from .const import DEV_TYPE_MAP as _DEV_TYPE_MAP, DEVICE_ID_REGEX, DevType 

11from .schemas import DeviceIdT 

12 

13if TYPE_CHECKING: 

14 from .schemas import DeviceIdT 

15 

16 

17DEVICE_LOOKUP: dict[str, str] = { 

18 k: _DEV_TYPE_MAP._hex(k) 

19 for k in _DEV_TYPE_MAP.SLUGS 

20 if k not in (DevType.JIM, DevType.JST) 

21} 

22DEVICE_LOOKUP |= {"NUL": "63", "---": "--"} 

23DEV_TYPE_MAP: dict[str, str] = {v: k for k, v in DEVICE_LOOKUP.items()} 

24 

25 

26HGI_DEVICE_ID: DeviceIdT = "18:000730" # type: ignore[assignment] 

27NON_DEVICE_ID: DeviceIdT = "--:------" # type: ignore[assignment] 

28ALL_DEVICE_ID: DeviceIdT = "63:262142" # type: ignore[assignment] # aka 'FFFFFE' 

29 

30# 

31# NOTE: All debug flags should be False for deployment to end-users 

32_DBG_DISABLE_STRICT_CHECKING: Final[bool] = False # a convenience for the test suite 

33_DBG_DISABLE_DEV_HVAC = False 

34 

35 

36class Address: 

37 """The device Address class.""" 

38 

39 _SLUG = None 

40 

41 def __init__(self, device_id: DeviceIdT) -> None: 

42 """Create an address from a valid device ID. 

43 

44 :param device_id: The RAMSES II device ID (e.g., '01:123456') 

45 :type device_id: DeviceIdT 

46 :raises ValueError: If the device_id is not a valid format. 

47 """ 

48 

49 # if device_id is None: 

50 # device_id = NON_DEVICE_ID 

51 

52 self.id = device_id # TODO: check is a valid id... 

53 self.type = device_id[:2] # dex, drops 2nd part, incl. ":" 

54 self._hex_id: str = None # type: ignore[assignment] 

55 

56 if not self.is_valid(device_id): 

57 raise ValueError(f"Invalid device_id: {device_id}") 

58 

59 def __repr__(self) -> str: 

60 return str(self.id) 

61 

62 def __str__(self) -> str: 

63 return self._friendly(self.id).strip() 

64 

65 def __eq__(self, other: object) -> bool: 

66 if not hasattr(other, "id"): # can compare Address with Device 

67 return NotImplemented 

68 return self.id == other.id # type: ignore[no-any-return] 

69 

70 @property 

71 def hex_id(self) -> str: 

72 if self._hex_id is not None: 

73 return self._hex_id 

74 self._hex_id = self.convert_to_hex(self.id) # type: ignore[unreachable] 

75 return self._hex_id 

76 

77 @staticmethod 

78 def is_valid(value: str) -> bool: # Union[str, Match[str], None]: 

79 # if value[:2] not in DEV_TYPE_MAP: 

80 # return False 

81 

82 return isinstance(value, str) and ( 

83 value == NON_DEVICE_ID or DEVICE_ID_REGEX.ANY.match(value) 

84 ) 

85 

86 @classmethod 

87 def _friendly(cls, device_id: DeviceIdT) -> str: 

88 """Convert (say) '01:145038' to 'CTL:145038'.""" 

89 

90 if not cls.is_valid(device_id): 

91 raise TypeError 

92 

93 _type, _tmp = device_id.split(":") 

94 

95 return f"{DEV_TYPE_MAP.get(_type, f'{_type:>3}')}:{_tmp}" 

96 

97 @classmethod 

98 def convert_from_hex(cls, device_hex: str, friendly_id: bool = False) -> str: 

99 """Convert a 6-character hex string to a device ID. 

100 

101 :param device_hex: The hex string to convert (e.g., '06368E') 

102 :type device_hex: str 

103 :param friendly_id: If True, returns a named ID (e.g., 'CTL:145038'), defaults to False 

104 :type friendly_id: bool 

105 :return: The formatted device ID string 

106 :rtype: str 

107 """ 

108 

109 if device_hex == "FFFFFE": # aka '63:262142' 

110 return ">null dev<" if friendly_id else ALL_DEVICE_ID 

111 

112 if not device_hex.strip(): # aka '--:------' 

113 return f"{'':10}" if friendly_id else NON_DEVICE_ID 

114 

115 _tmp = int(device_hex, 16) 

116 device_id: DeviceIdT = f"{(_tmp & 0xFC0000) >> 18:02d}:{_tmp & 0x03FFFF:06d}" # type: ignore[assignment] 

117 

118 return cls._friendly(device_id) if friendly_id else device_id 

119 

120 @classmethod 

121 def convert_to_hex(cls, device_id: DeviceIdT) -> str: 

122 """Convert (say) '01:145038' (or 'CTL:145038') to '06368E'.""" 

123 

124 if not cls.is_valid(device_id): 

125 raise TypeError 

126 

127 if len(device_id) == 9: # e.g. '01:123456' 

128 dev_type = device_id[:2] 

129 

130 else: # len(device_id) == 10, e.g. 'CTL:123456', or ' 63:262142' 

131 dev_type = DEVICE_LOOKUP.get(device_id[:3], device_id[1:3]) 

132 

133 return f"{(int(dev_type) << 18) + int(device_id[-6:]):0>6X}" # no preceding 0x 

134 

135 # @classmethod 

136 # def from_hex(cls, hex_id: DeviceIdT): 

137 # """Call as: d = Address.from_hex('06368E').""" 

138 

139 # return cls(cls.convert_from_hex(hex_id)) 

140 

141 

142@lru_cache(maxsize=256) 

143def id_to_address(device_id: DeviceIdT) -> Address: 

144 """Factory method to cache & return device Address from device ID.""" 

145 return Address(device_id=device_id) 

146 

147 

148HGI_DEV_ADDR = Address(HGI_DEVICE_ID) # 18:000730 

149NON_DEV_ADDR = Address(NON_DEVICE_ID) # --:------ 

150ALL_DEV_ADDR = Address(ALL_DEVICE_ID) # 63:262142 

151 

152 

153def dev_id_to_hex_id(device_id: DeviceIdT) -> str: 

154 """Convert (say) '01:145038' (or 'CTL:145038') to '06368E'.""" 

155 

156 if len(device_id) == 9: # e.g. '01:123456' 

157 dev_type = device_id[:2] 

158 

159 elif len(device_id) == 10: # e.g. '01:123456' 

160 dev_type = DEVICE_LOOKUP.get(device_id[:3], device_id[1:3]) 

161 

162 else: # len(device_id) == 10, e.g. 'CTL:123456', or ' 63:262142' 

163 raise ValueError(f"Invalid value: {device_id}, is not 9-10 characters long") 

164 

165 return f"{(int(dev_type) << 18) + int(device_id[-6:]):0>6X}" 

166 

167 

168def hex_id_to_dev_id(device_hex: str, friendly_id: bool = False) -> DeviceIdT: 

169 """Convert (say) '06368E' to '01:145038' (or 'CTL:145038').""" 

170 if device_hex == "FFFFFE": # aka '63:262142' 

171 return "NUL:262142" if friendly_id else ALL_DEVICE_ID # type: ignore[return-value] 

172 

173 if not device_hex.strip(): # aka '--:------' 

174 return f"{'':10}" if friendly_id else NON_DEVICE_ID # type: ignore[return-value] 

175 

176 _tmp = int(device_hex, 16) 

177 dev_type = f"{(_tmp & 0xFC0000) >> 18:02d}" 

178 

179 if friendly_id: 

180 dev_type = DEV_TYPE_MAP.get(dev_type, f"{dev_type:<3}") 

181 

182 return f"{dev_type}:{_tmp & 0x03FFFF:06d}" # type: ignore[return-value] 

183 

184 

185@lru_cache(maxsize=128) 

186def is_valid_dev_id(value: str, dev_class: None | str = None) -> bool: 

187 """Return True if a device_id is valid.""" 

188 

189 if not isinstance(value, str) or not DEVICE_ID_REGEX.ANY.match(value): 

190 return False 

191 

192 return not _DBG_DISABLE_DEV_HVAC or value.split(":", 1)[0] in DEV_TYPE_MAP 

193 

194 # if _DBG_DISABLE_DEV_HVAC and value.split(":", 1)[0] not in DEV_TYPE_MAP: 

195 # return False 

196 

197 # # TODO: specify device type (for HVAC) 

198 # # elif dev_type is not None and dev_type != value.split(":", maxsplit=1)[0]: 

199 # # raise TypeError(f"The device type does not match '{dev_type}'") 

200 

201 # # assert value == hex_id_to_dev_id(dev_id_to_hex_id(value)) 

202 # return True 

203 

204 

205@lru_cache(maxsize=256) # there is definite benefit in caching this 

206def pkt_addrs(addr_fragment: str) -> tuple[Address, Address, Address, Address, Address]: 

207 """Parse address fields from a 30-character address fragment. 

208 

209 :param addr_fragment: The 30-char fragment (e.g., '01:078710 --:------ 01:144246') 

210 :type addr_fragment: str 

211 :return: A tuple of (src_addr, dst_addr, addr_0, addr_1, addr_2) 

212 :rtype: tuple[Address, Address, Address, Address, Address] 

213 :raises PacketAddrSetInvalid: If the address fields are not valid. 

214 """ 

215 # for debug: print(pkt_addrs.cache_info()) 

216 

217 try: 

218 addrs = tuple(id_to_address(addr_fragment[i : i + 9]) for i in range(0, 30, 10)) 

219 except ValueError as err: 

220 raise exc.PacketAddrSetInvalid( 

221 f"Invalid address set: {addr_fragment}: {err}" 

222 ) from None 

223 

224 if not _DBG_DISABLE_STRICT_CHECKING and ( 

225 not ( 

226 # .I --- 01:145038 --:------ 01:145038 1F09 003 FF073F # valid 

227 # .I --- 04:108173 --:------ 01:155341 2309 003 0001F4 # valid 

228 addrs[0] not in (NON_DEV_ADDR, ALL_DEV_ADDR) 

229 and addrs[1] == NON_DEV_ADDR 

230 and addrs[2] != NON_DEV_ADDR 

231 ) 

232 and not ( 

233 # .I --- 32:206250 30:082155 --:------ 22F1 003 00020A # valid 

234 # .I --- 29:151550 29:237552 --:------ 22F3 007 00023C03040000 # valid 

235 addrs[0] not in (NON_DEV_ADDR, ALL_DEV_ADDR) 

236 and addrs[1] not in (NON_DEV_ADDR, addrs[0]) 

237 and addrs[2] == NON_DEV_ADDR 

238 ) 

239 and not ( 

240 # .I --- --:------ --:------ 10:105624 1FD4 003 00AAD4 # valid 

241 addrs[2] not in (NON_DEV_ADDR, ALL_DEV_ADDR) 

242 and addrs[0] == NON_DEV_ADDR 

243 and addrs[1] == NON_DEV_ADDR 

244 ) 

245 ): 

246 raise exc.PacketAddrSetInvalid(f"Invalid address set: {addr_fragment}") 

247 

248 device_addrs = list(filter(lambda a: a.type != "--", addrs)) # dex 

249 src_addr = device_addrs[0] 

250 dst_addr = device_addrs[1] if len(device_addrs) > 1 else NON_DEV_ADDR 

251 

252 if src_addr.id == dst_addr.id: # incl. HGI_DEV_ADDR == HGI_DEV_ADDR 

253 src_addr = dst_addr 

254 

255 return src_addr, dst_addr, addrs[0], addrs[1], addrs[2]