Coverage for tests/tests/test_eavesdrop_schema.py: 0%
42 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - Test eavesdropping of a device class."""
4import json
5from pathlib import Path, PurePath
7import pytest
9from ramses_rf import Gateway
11from .helpers import TEST_DIR, assert_expected, shuffle_dict
13WORK_DIR = f"{TEST_DIR}/eavesdrop_schema"
16def pytest_generate_tests(metafunc: pytest.Metafunc) -> None:
17 def id_fnc(param: Path) -> str:
18 return PurePath(param).name
20 folders = [f for f in Path(WORK_DIR).iterdir() if f.is_dir() and f.name[:1] != "_"]
21 folders.sort()
22 metafunc.parametrize("dir_name", folders, ids=id_fnc)
25async def assert_schemas_equal(gwy: Gateway, expected_schema: dict) -> None:
26 """Check the gwy schema, then shuffle and test again."""
28 schema, packets = gwy.get_state(include_expired=True)
29 assert_expected(schema, expected_schema)
31 packets = shuffle_dict(packets)
32 await gwy._restore_cached_packets(packets)
33 assert_expected(gwy.schema, expected_schema)
36# duplicate in test_eavesdrop_dev_class
37async def test_eavesdrop_off(dir_name: Path) -> None:
38 """Check discovery of schema and known_list *without* eavesdropping."""
40 path = f"{dir_name}/packet.log"
41 gwy = Gateway(None, input_file=path, config={"enable_eavesdrop": False})
42 await gwy.start()
44 with open(f"{dir_name}/schema_eavesdrop_off.json") as f:
45 await assert_schemas_equal(gwy, json.load(f))
47 try:
48 with open(f"{dir_name}/known_list_eavesdrop_off.json") as f:
49 assert_expected(gwy.known_list, json.load(f).get("known_list"))
50 except FileNotFoundError:
51 pass
53 await gwy.stop()
56# duplicate in test_eavesdrop_dev_class
57async def test_eavesdrop_on_(dir_name: Path) -> None:
58 """Check discovery of schema and known_list *with* eavesdropping."""
60 path = f"{dir_name}/packet.log"
61 gwy = Gateway(None, input_file=path, config={"enable_eavesdrop": True})
62 await gwy.start()
64 with open(f"{dir_name}/schema_eavesdrop_on.json") as f:
65 await assert_schemas_equal(gwy, json.load(f))
67 try:
68 with open(f"{dir_name}/known_list_eavesdrop_on.json") as f:
69 assert_expected(gwy.known_list, json.load(f).get("known_list"))
70 except FileNotFoundError:
71 pass
73 await gwy.stop()