Coverage for tests/tests_rf/virtual_rf/__init__.py: 0%

47 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - A pseudo-mocked serial port used for testing.""" 

3 

4from typing import Any, Final 

5from unittest.mock import patch 

6 

7from ramses_rf import Gateway 

8from ramses_rf.const import DEV_TYPE_MAP, DevType 

9from ramses_rf.database import MessageIndex 

10from ramses_rf.schemas import SZ_CLASS, SZ_KNOWN_LIST 

11 

12from .const import HgiFwTypes 

13from .virtual_rf import VirtualRf 

14 

15__all__ = ["HgiFwTypes", "VirtualRf", "rf_factory"] 

16 

17# patched constants 

18# _DBG_DISABLE_IMPERSONATION_ALERTS = True # # ramses_tx.protocol 

19# _DBG_DISABLE_QOS = False # # ramses_tx.protocol 

20MIN_INTER_WRITE_GAP = 0 # # ramses_tx.protocol 

21 

22# other constants 

23GWY_ID_0: Final = "18:000000" 

24GWY_ID_1: Final = "18:111111" 

25 

26_DEFAULT_GWY_CONFIG = { 

27 "config": { 

28 "disable_discovery": True, 

29 "enforce_known_list": False, 

30 } 

31} 

32 

33 

34def _get_hgi_id_for_schema( 

35 schema: dict[str, Any], port_idx: int 

36) -> tuple[str, HgiFwTypes]: 

37 """Return the Gateway's device_id for a schema (if required, construct an id). 

38 

39 Does not modify the schema. 

40 

41 If a Gateway (18:) device is present in the schema, it must have a defined class of 

42 "HGI". Otherwise, the Gateway device_id is derived from the serial port ordinal 

43 (port_idx, 0-5). 

44 """ 

45 

46 known_list: dict[str, Any] = schema.get(SZ_KNOWN_LIST, {}) 

47 

48 hgi_ids = [k for k, v in known_list.items() if v.get(SZ_CLASS) == DevType.HGI] 

49 

50 if len(hgi_ids) > 1: 

51 raise TypeError("Multiple Gateways per schema are not supported") 

52 

53 elif len(hgi_ids) == 1: 

54 hgi_id = hgi_ids[0] 

55 fw_type = known_list[hgi_id].get("_type", "EVOFW3") 

56 

57 elif [ 

58 k 

59 for k, v in known_list.items() 

60 if k[:2] == DEV_TYPE_MAP.HGI and not v.get(SZ_CLASS) 

61 ]: 

62 raise TypeError("Any Gateway must have its class defined explicitly") 

63 

64 else: 

65 hgi_id = f"18:{str(port_idx) * 6}" 

66 fw_type = "EVOFW3" 

67 

68 return hgi_id, fw_type 

69 

70 

71@patch("ramses_tx.transport.MIN_INTER_WRITE_GAP", MIN_INTER_WRITE_GAP) 

72async def rf_factory( 

73 schemas: list[dict[str, Any] | None], start_gwys: bool = True 

74) -> tuple[VirtualRf, list[Gateway]]: 

75 """Return the virtual network corresponding to a list of gateway schema/configs. 

76 

77 Each dict entry will consist of a standard gateway config/schema (or None). Any 

78 serial port configs are ignored, and are instead allocated sequentially from the 

79 virtual RF pool. 

80 """ 

81 

82 MAX_PORTS = 6 # 18:666666 is not a valid device_id, but 18:000000 is OK 

83 

84 if len(schemas) > MAX_PORTS: 

85 raise TypeError(f"Only a maximum of {MAX_PORTS} ports is supported") 

86 

87 gwys = [] 

88 

89 rf = VirtualRf(len(schemas)) 

90 

91 for idx, schema in enumerate(schemas): 

92 if schema is None: # assume no gateway device 

93 # rf._create_port(idx) # REMOVED: Redundant and causes race condition 

94 continue 

95 

96 hgi_id, fw_type = _get_hgi_id_for_schema(schema, idx) 

97 

98 # rf._create_port(idx) # REMOVED: Redundant and causes race condition 

99 rf.set_gateway(rf.ports[idx], hgi_id, fw_type=HgiFwTypes.__members__[fw_type]) 

100 

101 with patch("ramses_tx.transport.comports", rf.comports): 

102 gwy = Gateway(rf.ports[idx], **schema) 

103 gwys.append(gwy) 

104 

105 if start_gwys: 

106 await gwy.start() 

107 gwy.msg_db = MessageIndex(maintain=False) 

108 assert gwy._transport is not None # mypy 

109 gwy._transport._extra["virtual_rf"] = rf 

110 

111 return rf, gwys