Coverage for src/ramses_rf/helpers.py: 20%
56 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - Helper functions."""
4from __future__ import annotations
6import asyncio
7from collections.abc import Awaitable, Callable
8from copy import deepcopy
9from inspect import iscoroutinefunction
10from typing import Any, TypeAlias
12_SchemaT: TypeAlias = dict[str, Any]
15def is_subset(inner: _SchemaT, outer: _SchemaT) -> bool:
16 """Return True is one dict (or list) is a subset of another."""
18 def _is_subset(
19 a: dict[str, Any] | list[Any] | Any, b: dict[str, Any] | list[Any] | Any
20 ) -> bool:
21 if isinstance(a, dict):
22 return isinstance(b, dict) and all(
23 k in b and _is_subset(v, b[k]) for k, v in a.items()
24 )
25 if isinstance(a, list):
26 return isinstance(b, list) and all(
27 any(_is_subset(x, y) for y in b) for x in a
28 )
29 return bool(a == b)
31 return _is_subset(inner, outer)
34def deep_merge(src: _SchemaT, dst: _SchemaT, _dc: bool = False) -> _SchemaT:
35 """Deep merge a src dict (precedent) into a dst dict and return the result.
37 run me with nosetests --with-doctest file.py
39 >>> s = {'data': {'rows': {'pass': 'dog', 'num': '1'}}}
40 >>> d = {'data': {'rows': { 'fail': 'cat', 'num': '5'}}}
41 >>> merge(s, d) == {'data': {'rows': {'pass': 'dog', 'fail': 'cat', 'num': '1'}}}
42 True
43 """
45 new_dst = dst if _dc else deepcopy(dst) # start with copy of dst, merge src into it
46 for key, value in src.items(): # values are only: dict, list, value or None
47 if isinstance(value, dict): # is dict
48 node = new_dst.setdefault(key, {}) # get node or create one
49 deep_merge(value, node, _dc=True)
51 elif not isinstance(value, list): # is value
52 new_dst[key] = value # src takes precedence, assert will fail
54 elif key not in new_dst or not isinstance(new_dst[key], list): # is list
55 new_dst[key] = src[key] # not expected, but maybe
57 else:
58 new_dst[key] = list(set(src[key] + new_dst[key])) # will sort
60 # assert _is_subset(shrink(src), shrink(new_dst))
61 return new_dst
64def shrink(
65 value: _SchemaT, keep_falsys: bool = False, keep_hints: bool = False
66) -> _SchemaT:
67 """Return a minimized dict, after removing all the meaningless items.
69 Specifically, removes items with:
70 - unwanted keys (starting with '_')
71 - falsey values
72 """
74 def walk(node: Any) -> Any:
75 if isinstance(node, dict):
76 return {
77 k: walk(v)
78 for k, v in node.items()
79 if (keep_hints or k[:1] != "_") and (keep_falsys or walk(v))
80 }
81 elif isinstance(node, list):
82 try:
83 return sorted([walk(x) for x in node if x])
84 except TypeError: # if a list of dicts
85 return [walk(x) for x in node if x]
86 else:
87 return node
89 if not isinstance(value, dict):
90 raise TypeError("value is not a dict")
92 result: _SchemaT = walk(value)
93 return result
96def schedule_task(
97 fnc: Awaitable[Any] | Callable[..., Any],
98 *args: Any,
99 delay: float | None = None,
100 period: float | None = None,
101 **kwargs: Any,
102) -> asyncio.Task[Any]:
103 """Start a coro after delay seconds."""
105 async def execute_fnc(
106 fnc: Awaitable[Any] | Callable[..., Any], *args: Any, **kwargs: Any
107 ) -> Any:
108 if iscoroutinefunction(fnc): # Awaitable, else Callable
109 return await fnc(*args, **kwargs)
110 return fnc(*args, **kwargs) # type: ignore[operator]
112 async def schedule_fnc(
113 fnc: Awaitable[Any] | Callable[..., Any],
114 delay: float | None,
115 period: float | None,
116 *args: Any,
117 **kwargs: Any,
118 ) -> Any:
119 if delay:
120 await asyncio.sleep(delay)
122 if not period:
123 await execute_fnc(fnc, *args, **kwargs)
124 return
126 while period:
127 await execute_fnc(fnc, *args, **kwargs)
128 await asyncio.sleep(period)
130 return asyncio.create_task( # do we need to pass in an event loop?
131 schedule_fnc(fnc, delay, period, *args, **kwargs), name=str(fnc)
132 )