Coverage for tests/tests/test_schemas.py: 0%

28 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - Test the Schema processor.""" 

3 

4import json 

5from pathlib import Path 

6 

7import pytest 

8 

9from ramses_rf import Gateway 

10from ramses_rf.helpers import shrink 

11from ramses_rf.schemas import load_schema 

12 

13from .helpers import ( 

14 TEST_DIR, 

15 gwy, # noqa: F401 

16 shuffle_dict, 

17) 

18 

19WORK_DIR = f"{TEST_DIR}/schemas" 

20 

21 

22@pytest.mark.parametrize( 

23 "f_name", [f.stem for f in Path(f"{WORK_DIR}/log_files").glob("*.log")] 

24) 

25async def test_schema_discover_from_log(f_name: Path) -> None: 

26 path = f"{WORK_DIR}/log_files/{f_name}.log" 

27 gwy = Gateway(None, input_file=path, config={}) # noqa: F811 

28 await gwy.start() # this is what we're testing 

29 

30 with open(f"{WORK_DIR}/log_files/{f_name}.json") as f: 

31 schema = json.load(f) 

32 

33 assert shrink(gwy.schema) == shrink(schema) 

34 

35 gwy.ser_name = "/dev/null" # HACK: needed to pause engine 

36 schema, packets = gwy.get_state(include_expired=True) 

37 packets = shuffle_dict(packets) 

38 await gwy._restore_cached_packets(packets) 

39 

40 assert shrink(gwy.schema) == shrink(schema) 

41 

42 await gwy.stop() 

43 

44 

45@pytest.mark.parametrize( 

46 "f_name", [f.stem for f in Path(f"{WORK_DIR}/jsn_files").glob("*.json")] 

47) 

48async def test_schema_load_from_json(gwy: Gateway, f_name: Path) -> None: # noqa: F811 

49 with open(f"{WORK_DIR}/jsn_files/{f_name}.json") as f: 

50 schema = json.load(f) 

51 

52 load_schema(gwy, **schema) 

53 

54 # print(json.dumps(schema, indent=4)) 

55 # print(json.dumps(self.gwy.schema, indent=4)) 

56 

57 assert shrink(gwy.schema) == shrink(schema)