Coverage for tests/tests/test_schemas.py: 0%
28 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - Test the Schema processor."""
4import json
5from pathlib import Path
7import pytest
9from ramses_rf import Gateway
10from ramses_rf.helpers import shrink
11from ramses_rf.schemas import load_schema
13from .helpers import (
14 TEST_DIR,
15 gwy, # noqa: F401
16 shuffle_dict,
17)
19WORK_DIR = f"{TEST_DIR}/schemas"
22@pytest.mark.parametrize(
23 "f_name", [f.stem for f in Path(f"{WORK_DIR}/log_files").glob("*.log")]
24)
25async def test_schema_discover_from_log(f_name: Path) -> None:
26 path = f"{WORK_DIR}/log_files/{f_name}.log"
27 gwy = Gateway(None, input_file=path, config={}) # noqa: F811
28 await gwy.start() # this is what we're testing
30 with open(f"{WORK_DIR}/log_files/{f_name}.json") as f:
31 schema = json.load(f)
33 assert shrink(gwy.schema) == shrink(schema)
35 gwy.ser_name = "/dev/null" # HACK: needed to pause engine
36 schema, packets = gwy.get_state(include_expired=True)
37 packets = shuffle_dict(packets)
38 await gwy._restore_cached_packets(packets)
40 assert shrink(gwy.schema) == shrink(schema)
42 await gwy.stop()
45@pytest.mark.parametrize(
46 "f_name", [f.stem for f in Path(f"{WORK_DIR}/jsn_files").glob("*.json")]
47)
48async def test_schema_load_from_json(gwy: Gateway, f_name: Path) -> None: # noqa: F811
49 with open(f"{WORK_DIR}/jsn_files/{f_name}.json") as f:
50 schema = json.load(f)
52 load_schema(gwy, **schema)
54 # print(json.dumps(schema, indent=4))
55 # print(json.dumps(self.gwy.schema, indent=4))
57 assert shrink(gwy.schema) == shrink(schema)