Coverage for tests/tests_rf/virtual_rf/__init__.py: 0%
47 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - A pseudo-mocked serial port used for testing."""
4from typing import Any, Final
5from unittest.mock import patch
7from ramses_rf import Gateway
8from ramses_rf.const import DEV_TYPE_MAP, DevType
9from ramses_rf.database import MessageIndex
10from ramses_rf.schemas import SZ_CLASS, SZ_KNOWN_LIST
12from .const import HgiFwTypes
13from .virtual_rf import VirtualRf
15__all__ = ["HgiFwTypes", "VirtualRf", "rf_factory"]
17# patched constants
18# _DBG_DISABLE_IMPERSONATION_ALERTS = True # # ramses_tx.protocol
19# _DBG_DISABLE_QOS = False # # ramses_tx.protocol
20MIN_INTER_WRITE_GAP = 0 # # ramses_tx.protocol
22# other constants
23GWY_ID_0: Final = "18:000000"
24GWY_ID_1: Final = "18:111111"
26_DEFAULT_GWY_CONFIG = {
27 "config": {
28 "disable_discovery": True,
29 "enforce_known_list": False,
30 }
31}
34def _get_hgi_id_for_schema(
35 schema: dict[str, Any], port_idx: int
36) -> tuple[str, HgiFwTypes]:
37 """Return the Gateway's device_id for a schema (if required, construct an id).
39 Does not modify the schema.
41 If a Gateway (18:) device is present in the schema, it must have a defined class of
42 "HGI". Otherwise, the Gateway device_id is derived from the serial port ordinal
43 (port_idx, 0-5).
44 """
46 known_list: dict[str, Any] = schema.get(SZ_KNOWN_LIST, {})
48 hgi_ids = [k for k, v in known_list.items() if v.get(SZ_CLASS) == DevType.HGI]
50 if len(hgi_ids) > 1:
51 raise TypeError("Multiple Gateways per schema are not supported")
53 elif len(hgi_ids) == 1:
54 hgi_id = hgi_ids[0]
55 fw_type = known_list[hgi_id].get("_type", "EVOFW3")
57 elif [
58 k
59 for k, v in known_list.items()
60 if k[:2] == DEV_TYPE_MAP.HGI and not v.get(SZ_CLASS)
61 ]:
62 raise TypeError("Any Gateway must have its class defined explicitly")
64 else:
65 hgi_id = f"18:{str(port_idx) * 6}"
66 fw_type = "EVOFW3"
68 return hgi_id, fw_type
71@patch("ramses_tx.transport.MIN_INTER_WRITE_GAP", MIN_INTER_WRITE_GAP)
72async def rf_factory(
73 schemas: list[dict[str, Any] | None], start_gwys: bool = True
74) -> tuple[VirtualRf, list[Gateway]]:
75 """Return the virtual network corresponding to a list of gateway schema/configs.
77 Each dict entry will consist of a standard gateway config/schema (or None). Any
78 serial port configs are ignored, and are instead allocated sequentially from the
79 virtual RF pool.
80 """
82 MAX_PORTS = 6 # 18:666666 is not a valid device_id, but 18:000000 is OK
84 if len(schemas) > MAX_PORTS:
85 raise TypeError(f"Only a maximum of {MAX_PORTS} ports is supported")
87 gwys = []
89 rf = VirtualRf(len(schemas))
91 for idx, schema in enumerate(schemas):
92 if schema is None: # assume no gateway device
93 # rf._create_port(idx) # REMOVED: Redundant and causes race condition
94 continue
96 hgi_id, fw_type = _get_hgi_id_for_schema(schema, idx)
98 # rf._create_port(idx) # REMOVED: Redundant and causes race condition
99 rf.set_gateway(rf.ports[idx], hgi_id, fw_type=HgiFwTypes.__members__[fw_type])
101 with patch("ramses_tx.transport.comports", rf.comports):
102 gwy = Gateway(rf.ports[idx], **schema)
103 gwys.append(gwy)
105 if start_gwys:
106 await gwy.start()
107 gwy.msg_db = MessageIndex(maintain=False)
108 assert gwy._transport is not None # mypy
109 gwy._transport._extra["virtual_rf"] = rf
111 return rf, gwys