Coverage for tests/tests/test_eavesdrop_schema.py: 0%

42 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - Test eavesdropping of a device class.""" 

3 

4import json 

5from pathlib import Path, PurePath 

6 

7import pytest 

8 

9from ramses_rf import Gateway 

10 

11from .helpers import TEST_DIR, assert_expected, shuffle_dict 

12 

13WORK_DIR = f"{TEST_DIR}/eavesdrop_schema" 

14 

15 

16def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: 

17 def id_fnc(param: Path) -> str: 

18 return PurePath(param).name 

19 

20 folders = [f for f in Path(WORK_DIR).iterdir() if f.is_dir() and f.name[:1] != "_"] 

21 folders.sort() 

22 metafunc.parametrize("dir_name", folders, ids=id_fnc) 

23 

24 

25async def assert_schemas_equal(gwy: Gateway, expected_schema: dict) -> None: 

26 """Check the gwy schema, then shuffle and test again.""" 

27 

28 schema, packets = gwy.get_state(include_expired=True) 

29 assert_expected(schema, expected_schema) 

30 

31 packets = shuffle_dict(packets) 

32 await gwy._restore_cached_packets(packets) 

33 assert_expected(gwy.schema, expected_schema) 

34 

35 

36# duplicate in test_eavesdrop_dev_class 

37async def test_eavesdrop_off(dir_name: Path) -> None: 

38 """Check discovery of schema and known_list *without* eavesdropping.""" 

39 

40 path = f"{dir_name}/packet.log" 

41 gwy = Gateway(None, input_file=path, config={"enable_eavesdrop": False}) 

42 await gwy.start() 

43 

44 with open(f"{dir_name}/schema_eavesdrop_off.json") as f: 

45 await assert_schemas_equal(gwy, json.load(f)) 

46 

47 try: 

48 with open(f"{dir_name}/known_list_eavesdrop_off.json") as f: 

49 assert_expected(gwy.known_list, json.load(f).get("known_list")) 

50 except FileNotFoundError: 

51 pass 

52 

53 await gwy.stop() 

54 

55 

56# duplicate in test_eavesdrop_dev_class 

57async def test_eavesdrop_on_(dir_name: Path) -> None: 

58 """Check discovery of schema and known_list *with* eavesdropping.""" 

59 

60 path = f"{dir_name}/packet.log" 

61 gwy = Gateway(None, input_file=path, config={"enable_eavesdrop": True}) 

62 await gwy.start() 

63 

64 with open(f"{dir_name}/schema_eavesdrop_on.json") as f: 

65 await assert_schemas_equal(gwy, json.load(f)) 

66 

67 try: 

68 with open(f"{dir_name}/known_list_eavesdrop_on.json") as f: 

69 assert_expected(gwy.known_list, json.load(f).get("known_list")) 

70 except FileNotFoundError: 

71 pass 

72 

73 await gwy.stop()