Coverage for src/ramses_tx/schemas.py: 77%

132 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - a RAMSES-II protocol decoder & analyser. 

3 

4:term:`Schema` processor for protocol (lower) layer. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10from collections.abc import Callable 

11from typing import Any, Final, Never, NewType, TypeAlias, TypedDict, TypeVar 

12 

13import voluptuous as vol 

14 

15from .const import ( 

16 DEFAULT_ECHO_TIMEOUT, 

17 DEFAULT_RPLY_TIMEOUT, 

18 DEV_TYPE_MAP, 

19 DEVICE_ID_REGEX, 

20 MAX_DUTY_CYCLE_RATE, 

21 MIN_INTER_WRITE_GAP, 

22) 

23 

24_LOGGER = logging.getLogger(__name__) 

25 

26 

27# 

28# 0/5: Packet source configuration 

29SZ_COMMS_PARAMS: Final = "comms_params" 

30SZ_DUTY_CYCLE_LIMIT: Final = "duty_cycle_limit" 

31SZ_GAP_BETWEEN_WRITES: Final = "gap_between_writes" 

32SZ_ECHO_TIMEOUT: Final = "echo_timeout" 

33SZ_RPLY_TIMEOUT: Final = "reply_timeout" 

34 

35SCH_COMMS_PARAMS = vol.Schema( 

36 { 

37 vol.Required(SZ_DUTY_CYCLE_LIMIT, default=MAX_DUTY_CYCLE_RATE): vol.All( 

38 float, vol.Range(min=0.005, max=0.2) 

39 ), 

40 vol.Required(SZ_GAP_BETWEEN_WRITES, default=MIN_INTER_WRITE_GAP): vol.All( 

41 float, vol.Range(min=0.05, max=1.0) 

42 ), 

43 vol.Required(SZ_ECHO_TIMEOUT, default=DEFAULT_ECHO_TIMEOUT): vol.All( 

44 float, vol.Range(min=0.01, max=1.0) 

45 ), 

46 vol.Required(SZ_RPLY_TIMEOUT, default=DEFAULT_RPLY_TIMEOUT): vol.All( 

47 float, vol.Range(min=0.01, max=1.0) 

48 ), 

49 }, 

50 extra=vol.PREVENT_EXTRA, 

51) 

52 

53# 

54# 1/5: Packet source configuration 

55SZ_INPUT_FILE: Final = "input_file" 

56SZ_PACKET_SOURCE: Final = "packet_source" 

57 

58 

59# 

60# 2/5: Packet log configuration 

61SZ_FILE_NAME: Final = "file_name" 

62SZ_PACKET_LOG: Final = "packet_log" 

63SZ_ROTATE_BACKUPS: Final = "rotate_backups" 

64SZ_ROTATE_BYTES: Final = "rotate_bytes" 

65 

66 

67class PktLogConfigT(TypedDict): 

68 file_name: str 

69 rotate_backups: int 

70 rotate_bytes: int | None 

71 

72 

73def sch_packet_log_dict_factory( 

74 default_backups: int = 0, 

75) -> dict[vol.Required, vol.Any]: 

76 """ 

77 :return: a packet log dict with a configurable default rotation policy. 

78 

79 Usage: 

80 

81 .. code-block:: 

82 

83 SCH_PACKET_LOG_7 = vol.Schema( 

84 packet_log_dict_factory(default_backups=7), extra=vol.PREVENT_EXTRA 

85 ) 

86 

87 """ 

88 

89 SCH_PACKET_LOG_CONFIG = vol.Schema( 

90 { 

91 vol.Optional(SZ_ROTATE_BACKUPS, default=default_backups): vol.Any( 

92 None, int 

93 ), 

94 vol.Optional(SZ_ROTATE_BYTES): vol.Any(None, int), 

95 }, 

96 extra=vol.PREVENT_EXTRA, 

97 ) 

98 

99 SCH_PACKET_LOG_NAME = str 

100 

101 def NormalisePacketLog(rotate_backups: int = 0) -> Callable[..., Any]: 

102 def normalise_packet_log(node_value: str | PktLogConfigT) -> PktLogConfigT: 

103 if isinstance(node_value, str): 

104 return { 

105 SZ_FILE_NAME: node_value, 

106 SZ_ROTATE_BACKUPS: rotate_backups, 

107 SZ_ROTATE_BYTES: None, 

108 } 

109 return node_value 

110 

111 return normalise_packet_log 

112 

113 return { # SCH_PACKET_LOG_DICT 

114 vol.Required(SZ_PACKET_LOG, default=None): vol.Any( 

115 None, 

116 vol.All( 

117 SCH_PACKET_LOG_NAME, 

118 NormalisePacketLog(rotate_backups=default_backups), 

119 ), 

120 SCH_PACKET_LOG_CONFIG.extend( 

121 {vol.Required(SZ_FILE_NAME): SCH_PACKET_LOG_NAME} 

122 ), 

123 ) 

124 } 

125 

126 

127SCH_PACKET_LOG = vol.Schema( 

128 sch_packet_log_dict_factory(default_backups=7), extra=vol.PREVENT_EXTRA 

129) 

130 

131# 

132# 3/5: Serial port configuration 

133SZ_PORT_CONFIG: Final = "port_config" 

134SZ_PORT_NAME: Final = "port_name" 

135SZ_SERIAL_PORT: Final = "serial_port" 

136 

137SZ_BAUDRATE: Final = "baudrate" 

138SZ_DSRDTR: Final = "dsrdtr" 

139SZ_RTSCTS: Final = "rtscts" 

140SZ_TIMEOUT: Final = "timeout" 

141SZ_XONXOFF: Final = "xonxoff" 

142 

143 

144SCH_SERIAL_PORT_CONFIG = vol.Schema( 

145 { 

146 vol.Optional(SZ_BAUDRATE, default=115200): vol.All( 

147 vol.Coerce(int), vol.Any(57600, 115200) 

148 ), # NB: HGI80 does not work, except at 115200 - so must be default 

149 vol.Optional(SZ_DSRDTR, default=False): bool, 

150 vol.Optional(SZ_RTSCTS, default=False): bool, 

151 vol.Optional(SZ_TIMEOUT, default=0): vol.Any(None, int), # default None? 

152 vol.Optional(SZ_XONXOFF, default=True): bool, # set True to remove \x11 

153 }, 

154 extra=vol.PREVENT_EXTRA, 

155) 

156 

157 

158class PortConfigT(TypedDict): 

159 baudrate: int # 57600, 115200 

160 dsrdtr: bool 

161 rtscts: bool 

162 timeout: int 

163 xonxoff: bool 

164 

165 

166def sch_serial_port_dict_factory() -> dict[vol.Required, vol.Any]: 

167 """Return a serial port dict. 

168 

169 Usage: 

170 

171 .. code-block:: 

172 

173 SCH_SERIAL_PORT = vol.Schema( 

174 sch_serial_port_dict_factory(), extra=vol.PREVENT_EXTRA 

175 ) 

176 """ 

177 

178 SCH_SERIAL_PORT_NAME = str 

179 

180 def NormaliseSerialPort() -> Callable[[str | PortConfigT], PortConfigT]: 

181 def normalise_serial_port(node_value: str | PortConfigT) -> PortConfigT: 

182 if isinstance(node_value, str): 

183 return {SZ_PORT_NAME: node_value} | SCH_SERIAL_PORT_CONFIG({}) # type: ignore[no-any-return] 

184 return node_value 

185 

186 return normalise_serial_port 

187 

188 return { # SCH_SERIAL_PORT_DICT 

189 vol.Required(SZ_SERIAL_PORT): vol.Any( 

190 vol.All( 

191 SCH_SERIAL_PORT_NAME, 

192 NormaliseSerialPort(), 

193 ), 

194 SCH_SERIAL_PORT_CONFIG.extend( 

195 {vol.Required(SZ_PORT_NAME): SCH_SERIAL_PORT_NAME} 

196 ), 

197 ) 

198 } 

199 

200 

201def extract_serial_port(ser_port_dict: dict[str, Any]) -> tuple[str, PortConfigT]: 

202 """Extract a serial port, port_config_dict tuple from a sch_serial_port_dict.""" 

203 port_name: str = ser_port_dict.get(SZ_PORT_NAME) # type: ignore[assignment] 

204 port_config = {k: v for k, v in ser_port_dict.items() if k != SZ_PORT_NAME} 

205 return port_name, port_config # type: ignore[return-value] 

206 

207 

208# 

209# 4/5: Traits (of devices) configuration (basic) 

210 

211_T = TypeVar("_T") 

212 

213 

214def ConvertNullToDict() -> Callable[[_T | None], _T | dict[Never, Never]]: 

215 def convert_null_to_dict(node_value: _T | None) -> _T | dict[Never, Never]: 

216 if node_value is None: 

217 return {} 

218 return node_value 

219 

220 return convert_null_to_dict 

221 

222 

223SZ_ALIAS: Final = "alias" 

224SZ_BOUND_TO: Final = "bound" 

225SZ_CLASS: Final = "class" 

226SZ_FAKED: Final = "faked" 

227SZ_SCHEME: Final = "scheme" 

228 

229SZ_BLOCK_LIST: Final = "block_list" 

230SZ_KNOWN_LIST: Final = "known_list" 

231 

232SCH_DEVICE_ID_ANY = vol.Match(DEVICE_ID_REGEX.ANY) 

233SCH_DEVICE_ID_SEN = vol.Match(DEVICE_ID_REGEX.SEN) 

234SCH_DEVICE_ID_CTL = vol.Match(DEVICE_ID_REGEX.CTL) 

235SCH_DEVICE_ID_DHW = vol.Match(DEVICE_ID_REGEX.DHW) 

236SCH_DEVICE_ID_HGI = vol.Match(DEVICE_ID_REGEX.HGI) 

237SCH_DEVICE_ID_APP = vol.Match(DEVICE_ID_REGEX.APP) 

238SCH_DEVICE_ID_BDR = vol.Match(DEVICE_ID_REGEX.BDR) 

239SCH_DEVICE_ID_UFC = vol.Match(DEVICE_ID_REGEX.UFC) 

240 

241_SCH_TRAITS_DOMAINS = ("heat", "hvac") 

242_SCH_TRAITS_HVAC_SCHEMES = ("itho", "nuaire", "orcon", "vasco", "climarad") 

243 

244 

245DeviceTraitsT = TypedDict( 

246 "DeviceTraitsT", 

247 { 

248 "alias": str | None, 

249 "faked": bool | None, 

250 "class": str | None, 

251 }, 

252) 

253 

254 

255def sch_global_traits_dict_factory( 

256 heat_traits: dict[vol.Optional, vol.Any] | None = None, 

257 hvac_traits: dict[vol.Optional, vol.Any] | None = None, 

258) -> tuple[dict[vol.Optional, vol.Any], vol.Any]: 

259 """Return a global traits dict with a configurable extra traits. 

260 

261 Usage: 

262 

263 .. code-block:: 

264 

265 SCH_GLOBAL_TRAITS = vol.Schema( 

266 sch_global_traits_dict(heat=traits), extra=vol.PREVENT_EXTRA 

267 ) 

268 """ 

269 

270 heat_traits = heat_traits or {} 

271 hvac_traits = hvac_traits or {} 

272 

273 SCH_TRAITS_BASE = vol.Schema( 

274 { 

275 vol.Optional(SZ_ALIAS, default=None): vol.Any(None, str), 

276 vol.Optional(SZ_FAKED, default=None): vol.Any(None, bool), 

277 vol.Optional(vol.Remove("_note")): str, # only for convenience, not used 

278 }, 

279 extra=vol.PREVENT_EXTRA, 

280 ) 

281 

282 # NOTE: voluptuous doesn't like StrEnums, hence str(s) 

283 # TIP: the _domain key can be used to force which traits schema to use 

284 heat_slugs = list( 

285 str(s) for s in DEV_TYPE_MAP.slugs() if s not in DEV_TYPE_MAP.HVAC_SLUGS 

286 ) 

287 SCH_TRAITS_HEAT = SCH_TRAITS_BASE.extend( 

288 { 

289 vol.Optional("_domain", default="heat"): "heat", 

290 vol.Optional(SZ_CLASS): vol.Any( 

291 None, *heat_slugs, *(str(DEV_TYPE_MAP[s]) for s in heat_slugs) 

292 ), 

293 } 

294 ) 

295 SCH_TRAITS_HEAT = SCH_TRAITS_HEAT.extend( 

296 heat_traits, 

297 extra=vol.PREVENT_EXTRA, # Always prevent extra keys 

298 ) 

299 

300 # NOTE: voluptuous doesn't like StrEnums, hence str(s) 

301 hvac_slugs = list(str(s) for s in DEV_TYPE_MAP.HVAC_SLUGS) 

302 SCH_TRAITS_HVAC = SCH_TRAITS_BASE.extend( 

303 { 

304 vol.Optional("_domain", default="hvac"): "hvac", 

305 vol.Optional(SZ_CLASS, default="HVC"): vol.Any( 

306 None, *hvac_slugs, *(str(DEV_TYPE_MAP[s]) for s in hvac_slugs) 

307 ), # TODO: consider removing None 

308 # Add 'bound' trait for FAN devices 

309 vol.Optional(SZ_BOUND_TO): vol.Any(None, vol.Match(DEVICE_ID_REGEX.ANY)), 

310 } 

311 ) 

312 SCH_TRAITS_HVAC = SCH_TRAITS_HVAC.extend( 

313 {vol.Optional(SZ_SCHEME): vol.Any(*_SCH_TRAITS_HVAC_SCHEMES)} 

314 ) 

315 SCH_TRAITS_HVAC = SCH_TRAITS_HVAC.extend( 

316 hvac_traits, 

317 extra=vol.PREVENT_EXTRA, # Always prevent extra keys 

318 ) 

319 

320 SCH_TRAITS = vol.Any( 

321 vol.All(None, ConvertNullToDict()), 

322 vol.Any(SCH_TRAITS_HEAT, SCH_TRAITS_HVAC), 

323 extra=vol.PREVENT_EXTRA, 

324 ) 

325 SCH_DEVICE = vol.Schema( 

326 {vol.Optional(SCH_DEVICE_ID_ANY): SCH_TRAITS}, 

327 extra=vol.PREVENT_EXTRA, 

328 ) 

329 

330 global_traits_dict = { # Filter lists with Device traits... 

331 vol.Optional(SZ_KNOWN_LIST, default={}): vol.Any( 

332 vol.All(None, ConvertNullToDict()), 

333 vol.All(SCH_DEVICE, vol.Length(min=0)), 

334 ), 

335 vol.Optional(SZ_BLOCK_LIST, default={}): vol.Any( 

336 vol.All(None, ConvertNullToDict()), 

337 vol.All(SCH_DEVICE, vol.Length(min=0)), 

338 ), 

339 } 

340 

341 return global_traits_dict, SCH_TRAITS 

342 

343 

344SCH_GLOBAL_TRAITS_DICT, SCH_TRAITS = sch_global_traits_dict_factory() 

345 

346# 

347# Device lists (Engine configuration) 

348 

349DeviceIdT = NewType("DeviceIdT", str) # TypeVar('DeviceIdT', bound=str) # 

350DevIndexT = NewType("DevIndexT", str) 

351DeviceListT: TypeAlias = dict[DeviceIdT, DeviceTraitsT] 

352 

353 

354def select_device_filter_mode( 

355 enforce_known_list: bool, 

356 known_list: DeviceListT, 

357 block_list: DeviceListT, 

358) -> bool: 

359 """ 

360 Determine which device filter to use, if any. 

361 

362 Either: 

363 - block if device_id in block_list (could be empty), otherwise 

364 - allow if device_id in known_list, or 

365 """ 

366 

367 # warn if not has_exactly_one_valid_hgi(known_list) 

368 

369 if enforce_known_list and not known_list: 

370 _LOGGER.warning( 

371 f"Best practice is to enforce a {SZ_KNOWN_LIST} (an allow list), " 

372 f"but it is empty, so it can't be used " 

373 ) 

374 enforce_known_list = False 

375 

376 if enforce_known_list: 

377 _LOGGER.info( 

378 f"A valid {SZ_KNOWN_LIST} was provided, " 

379 f"and will be enforced as a allow list: length = {len(known_list)}" 

380 ) 

381 _LOGGER.debug(f"known_list = {known_list}") 

382 

383 elif block_list: 

384 _LOGGER.info( 

385 f"A valid {SZ_BLOCK_LIST} was provided, " 

386 f"and will be used as a deny list: length = {len(block_list)}" 

387 ) 

388 _LOGGER.debug(f"block_list = {block_list}") 

389 

390 elif known_list: 

391 _LOGGER.warning( 

392 f"Best practice is to enforce the {SZ_KNOWN_LIST} as an allow list, " 

393 f"configure: {SZ_ENFORCE_KNOWN_LIST} = True" 

394 ) 

395 _LOGGER.debug(f"known_list = {known_list}") 

396 

397 else: 

398 _LOGGER.warning( 

399 f"Best practice is to provide a {SZ_KNOWN_LIST} and enforce it, " 

400 f"configure: {SZ_ENFORCE_KNOWN_LIST} = True" 

401 ) 

402 

403 return enforce_known_list 

404 

405 

406# 

407# 5/5: Gateway (engine) configuration 

408 

409SZ_DISABLE_SENDING: Final = "disable_sending" 

410SZ_DISABLE_QOS: Final = "disable_qos" 

411SZ_ENFORCE_KNOWN_LIST: Final[str] = f"enforce_{SZ_KNOWN_LIST}" 

412SZ_EVOFW_FLAG: Final = "evofw_flag" 

413SZ_SQLITE_INDEX: Final = ( 

414 "sqlite_index" # temporary 0.52.x SQLite dev config option in ramses_cc 

415) 

416SZ_LOG_ALL_MQTT: Final = "log_all_mqtt" 

417SZ_USE_REGEX: Final = "use_regex" 

418 

419SCH_ENGINE_DICT = { 

420 vol.Optional(SZ_DISABLE_SENDING, default=False): bool, 

421 vol.Optional(SZ_DISABLE_QOS, default=None): vol.Any( 

422 None, # None is selective QoS (e.g. QoS only for bindings, schedule, etc.) 

423 bool, 

424 ), # in the long term, this default to be True (not None) 

425 vol.Optional(SZ_ENFORCE_KNOWN_LIST, default=False): bool, 

426 vol.Optional(SZ_EVOFW_FLAG): vol.Any(None, str), 

427 # vol.Optional(SZ_PORT_CONFIG): SCH_SERIAL_PORT_CONFIG, 

428 vol.Optional( 

429 SZ_SQLITE_INDEX, default=False 

430 ): bool, # temporary 0.52.x dev config option 

431 vol.Optional( 

432 SZ_LOG_ALL_MQTT, default=False 

433 ): bool, # log all incoming MQTT traffic config option 

434 vol.Optional(SZ_USE_REGEX): dict, # vol.All(ConvertNullToDict(), dict), 

435 vol.Optional(SZ_COMMS_PARAMS): SCH_COMMS_PARAMS, 

436} 

437SCH_ENGINE_CONFIG = vol.Schema(SCH_ENGINE_DICT, extra=vol.REMOVE_EXTRA) 

438 

439SZ_INBOUND: Final = "inbound" # for use_regex (intentionally obscured) 

440SZ_OUTBOUND: Final = "outbound"