Coverage for src/ramses_tx/address.py: 44%
104 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - a RAMSES-II protocol decoder & analyser."""
4from __future__ import annotations
6from functools import lru_cache
7from typing import TYPE_CHECKING, Final
9from . import exceptions as exc
10from .const import DEV_TYPE_MAP as _DEV_TYPE_MAP, DEVICE_ID_REGEX, DevType
11from .schemas import DeviceIdT
13if TYPE_CHECKING:
14 from .schemas import DeviceIdT
17DEVICE_LOOKUP: dict[str, str] = {
18 k: _DEV_TYPE_MAP._hex(k)
19 for k in _DEV_TYPE_MAP.SLUGS
20 if k not in (DevType.JIM, DevType.JST)
21}
22DEVICE_LOOKUP |= {"NUL": "63", "---": "--"}
23DEV_TYPE_MAP: dict[str, str] = {v: k for k, v in DEVICE_LOOKUP.items()}
26HGI_DEVICE_ID: DeviceIdT = "18:000730" # type: ignore[assignment]
27NON_DEVICE_ID: DeviceIdT = "--:------" # type: ignore[assignment]
28ALL_DEVICE_ID: DeviceIdT = "63:262142" # type: ignore[assignment] # aka 'FFFFFE'
30#
31# NOTE: All debug flags should be False for deployment to end-users
32_DBG_DISABLE_STRICT_CHECKING: Final[bool] = False # a convenience for the test suite
33_DBG_DISABLE_DEV_HVAC = False
36class Address:
37 """The device Address class."""
39 _SLUG = None
41 def __init__(self, device_id: DeviceIdT) -> None:
42 """Create an address from a valid device ID.
44 :param device_id: The RAMSES II device ID (e.g., '01:123456')
45 :type device_id: DeviceIdT
46 :raises ValueError: If the device_id is not a valid format.
47 """
49 # if device_id is None:
50 # device_id = NON_DEVICE_ID
52 self.id = device_id # TODO: check is a valid id...
53 self.type = device_id[:2] # dex, drops 2nd part, incl. ":"
54 self._hex_id: str = None # type: ignore[assignment]
56 if not self.is_valid(device_id):
57 raise ValueError(f"Invalid device_id: {device_id}")
59 def __repr__(self) -> str:
60 return str(self.id)
62 def __str__(self) -> str:
63 return self._friendly(self.id).strip()
65 def __eq__(self, other: object) -> bool:
66 if not hasattr(other, "id"): # can compare Address with Device
67 return NotImplemented
68 return self.id == other.id # type: ignore[no-any-return]
70 @property
71 def hex_id(self) -> str:
72 if self._hex_id is not None:
73 return self._hex_id
74 self._hex_id = self.convert_to_hex(self.id) # type: ignore[unreachable]
75 return self._hex_id
77 @staticmethod
78 def is_valid(value: str) -> bool: # Union[str, Match[str], None]:
79 # if value[:2] not in DEV_TYPE_MAP:
80 # return False
82 return isinstance(value, str) and (
83 value == NON_DEVICE_ID or DEVICE_ID_REGEX.ANY.match(value)
84 )
86 @classmethod
87 def _friendly(cls, device_id: DeviceIdT) -> str:
88 """Convert (say) '01:145038' to 'CTL:145038'."""
90 if not cls.is_valid(device_id):
91 raise TypeError
93 _type, _tmp = device_id.split(":")
95 return f"{DEV_TYPE_MAP.get(_type, f'{_type:>3}')}:{_tmp}"
97 @classmethod
98 def convert_from_hex(cls, device_hex: str, friendly_id: bool = False) -> str:
99 """Convert a 6-character hex string to a device ID.
101 :param device_hex: The hex string to convert (e.g., '06368E')
102 :type device_hex: str
103 :param friendly_id: If True, returns a named ID (e.g., 'CTL:145038'), defaults to False
104 :type friendly_id: bool
105 :return: The formatted device ID string
106 :rtype: str
107 """
109 if device_hex == "FFFFFE": # aka '63:262142'
110 return ">null dev<" if friendly_id else ALL_DEVICE_ID
112 if not device_hex.strip(): # aka '--:------'
113 return f"{'':10}" if friendly_id else NON_DEVICE_ID
115 _tmp = int(device_hex, 16)
116 device_id: DeviceIdT = f"{(_tmp & 0xFC0000) >> 18:02d}:{_tmp & 0x03FFFF:06d}" # type: ignore[assignment]
118 return cls._friendly(device_id) if friendly_id else device_id
120 @classmethod
121 def convert_to_hex(cls, device_id: DeviceIdT) -> str:
122 """Convert (say) '01:145038' (or 'CTL:145038') to '06368E'."""
124 if not cls.is_valid(device_id):
125 raise TypeError
127 if len(device_id) == 9: # e.g. '01:123456'
128 dev_type = device_id[:2]
130 else: # len(device_id) == 10, e.g. 'CTL:123456', or ' 63:262142'
131 dev_type = DEVICE_LOOKUP.get(device_id[:3], device_id[1:3])
133 return f"{(int(dev_type) << 18) + int(device_id[-6:]):0>6X}" # no preceding 0x
135 # @classmethod
136 # def from_hex(cls, hex_id: DeviceIdT):
137 # """Call as: d = Address.from_hex('06368E')."""
139 # return cls(cls.convert_from_hex(hex_id))
142@lru_cache(maxsize=256)
143def id_to_address(device_id: DeviceIdT) -> Address:
144 """Factory method to cache & return device Address from device ID."""
145 return Address(device_id=device_id)
148HGI_DEV_ADDR = Address(HGI_DEVICE_ID) # 18:000730
149NON_DEV_ADDR = Address(NON_DEVICE_ID) # --:------
150ALL_DEV_ADDR = Address(ALL_DEVICE_ID) # 63:262142
153def dev_id_to_hex_id(device_id: DeviceIdT) -> str:
154 """Convert (say) '01:145038' (or 'CTL:145038') to '06368E'."""
156 if len(device_id) == 9: # e.g. '01:123456'
157 dev_type = device_id[:2]
159 elif len(device_id) == 10: # e.g. '01:123456'
160 dev_type = DEVICE_LOOKUP.get(device_id[:3], device_id[1:3])
162 else: # len(device_id) == 10, e.g. 'CTL:123456', or ' 63:262142'
163 raise ValueError(f"Invalid value: {device_id}, is not 9-10 characters long")
165 return f"{(int(dev_type) << 18) + int(device_id[-6:]):0>6X}"
168def hex_id_to_dev_id(device_hex: str, friendly_id: bool = False) -> DeviceIdT:
169 """Convert (say) '06368E' to '01:145038' (or 'CTL:145038')."""
170 if device_hex == "FFFFFE": # aka '63:262142'
171 return "NUL:262142" if friendly_id else ALL_DEVICE_ID # type: ignore[return-value]
173 if not device_hex.strip(): # aka '--:------'
174 return f"{'':10}" if friendly_id else NON_DEVICE_ID # type: ignore[return-value]
176 _tmp = int(device_hex, 16)
177 dev_type = f"{(_tmp & 0xFC0000) >> 18:02d}"
179 if friendly_id:
180 dev_type = DEV_TYPE_MAP.get(dev_type, f"{dev_type:<3}")
182 return f"{dev_type}:{_tmp & 0x03FFFF:06d}" # type: ignore[return-value]
185@lru_cache(maxsize=128)
186def is_valid_dev_id(value: str, dev_class: None | str = None) -> bool:
187 """Return True if a device_id is valid."""
189 if not isinstance(value, str) or not DEVICE_ID_REGEX.ANY.match(value):
190 return False
192 return not _DBG_DISABLE_DEV_HVAC or value.split(":", 1)[0] in DEV_TYPE_MAP
194 # if _DBG_DISABLE_DEV_HVAC and value.split(":", 1)[0] not in DEV_TYPE_MAP:
195 # return False
197 # # TODO: specify device type (for HVAC)
198 # # elif dev_type is not None and dev_type != value.split(":", maxsplit=1)[0]:
199 # # raise TypeError(f"The device type does not match '{dev_type}'")
201 # # assert value == hex_id_to_dev_id(dev_id_to_hex_id(value))
202 # return True
205@lru_cache(maxsize=256) # there is definite benefit in caching this
206def pkt_addrs(addr_fragment: str) -> tuple[Address, Address, Address, Address, Address]:
207 """Parse address fields from a 30-character address fragment.
209 :param addr_fragment: The 30-char fragment (e.g., '01:078710 --:------ 01:144246')
210 :type addr_fragment: str
211 :return: A tuple of (src_addr, dst_addr, addr_0, addr_1, addr_2)
212 :rtype: tuple[Address, Address, Address, Address, Address]
213 :raises PacketAddrSetInvalid: If the address fields are not valid.
214 """
215 # for debug: print(pkt_addrs.cache_info())
217 try:
218 addrs = tuple(id_to_address(addr_fragment[i : i + 9]) for i in range(0, 30, 10))
219 except ValueError as err:
220 raise exc.PacketAddrSetInvalid(
221 f"Invalid address set: {addr_fragment}: {err}"
222 ) from None
224 if not _DBG_DISABLE_STRICT_CHECKING and (
225 not (
226 # .I --- 01:145038 --:------ 01:145038 1F09 003 FF073F # valid
227 # .I --- 04:108173 --:------ 01:155341 2309 003 0001F4 # valid
228 addrs[0] not in (NON_DEV_ADDR, ALL_DEV_ADDR)
229 and addrs[1] == NON_DEV_ADDR
230 and addrs[2] != NON_DEV_ADDR
231 )
232 and not (
233 # .I --- 32:206250 30:082155 --:------ 22F1 003 00020A # valid
234 # .I --- 29:151550 29:237552 --:------ 22F3 007 00023C03040000 # valid
235 addrs[0] not in (NON_DEV_ADDR, ALL_DEV_ADDR)
236 and addrs[1] not in (NON_DEV_ADDR, addrs[0])
237 and addrs[2] == NON_DEV_ADDR
238 )
239 and not (
240 # .I --- --:------ --:------ 10:105624 1FD4 003 00AAD4 # valid
241 addrs[2] not in (NON_DEV_ADDR, ALL_DEV_ADDR)
242 and addrs[0] == NON_DEV_ADDR
243 and addrs[1] == NON_DEV_ADDR
244 )
245 ):
246 raise exc.PacketAddrSetInvalid(f"Invalid address set: {addr_fragment}")
248 device_addrs = list(filter(lambda a: a.type != "--", addrs)) # dex
249 src_addr = device_addrs[0]
250 dst_addr = device_addrs[1] if len(device_addrs) > 1 else NON_DEV_ADDR
252 if src_addr.id == dst_addr.id: # incl. HGI_DEV_ADDR == HGI_DEV_ADDR
253 src_addr = dst_addr
255 return src_addr, dst_addr, addrs[0], addrs[1], addrs[2]