Coverage for tests/tests/test_eavesdrop_dev_class.py: 0%

48 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - Test eavesdropping of a device class.""" 

3 

4import json 

5from pathlib import Path, PurePath 

6 

7import pytest 

8 

9from ramses_rf import Gateway, Message 

10 

11from .helpers import TEST_DIR, assert_expected 

12 

13WORK_DIR = f"{TEST_DIR}/eavesdrop_dev_class" 

14 

15 

16def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: 

17 def id_fnc(param: Path) -> str: 

18 return PurePath(param).name 

19 

20 folders = [f for f in Path(WORK_DIR).iterdir() if f.is_dir()] 

21 metafunc.parametrize("dir_name", folders, ids=id_fnc) 

22 

23 

24async def test_packets_from_log_file(dir_name: Path) -> None: 

25 """Check eavesdropping of a src device _SLUG (from each packet line).""" 

26 

27 def proc_log_line(msg: Message) -> None: 

28 assert msg.src._SLUG in eval(msg._pkt.comment) 

29 

30 path = f"{dir_name}/packet.log" 

31 

32 gwy = Gateway(None, input_file=path, config={"enable_eavesdrop": False}) 

33 gwy.config.enable_eavesdrop = True # Test setting this config attr 

34 

35 gwy.add_msg_handler(proc_log_line) 

36 

37 try: 

38 await gwy.start() 

39 finally: 

40 await gwy.stop() 

41 

42 

43# duplicate in test_eavesdrop_schema 

44async def test_dev_eavesdrop_on_(dir_name: Path) -> None: 

45 """Check discovery of schema and known_list *with* eavesdropping.""" 

46 

47 path = f"{dir_name}/packet.log" 

48 gwy = Gateway(None, input_file=path, config={"enable_eavesdrop": True}) 

49 await gwy.start() 

50 

51 with open(f"{dir_name}/known_list_eavesdrop_on.json") as f: 

52 assert_expected(gwy.known_list, json.load(f).get("known_list")) 

53 

54 try: 

55 with open(f"{dir_name}/schema_eavesdrop_on.json") as f: 

56 assert_expected(gwy.schema, json.load(f)) 

57 except FileNotFoundError: 

58 pass 

59 

60 await gwy.stop() 

61 

62 

63# duplicate in test_eavesdrop_schema 

64async def test_dev_eavesdrop_off(dir_name: Path) -> None: 

65 """Check discovery of schema and known_list *without* eavesdropping.""" 

66 

67 path = f"{dir_name}/packet.log" 

68 gwy = Gateway(None, input_file=path, config={"enable_eavesdrop": False}) 

69 await gwy.start() 

70 

71 try: 

72 with open(f"{dir_name}/known_list_eavesdrop_off.json") as f: 

73 assert_expected(gwy.known_list, json.load(f).get("known_list")) 

74 except FileNotFoundError: 

75 pass 

76 

77 try: 

78 with open(f"{dir_name}/schema_eavesdrop_off.json") as f: 

79 assert_expected(gwy.schema, json.load(f)) 

80 except FileNotFoundError: 

81 pass 

82 

83 await gwy.stop()