Coverage for src/ramses_rf/helpers.py: 20%

56 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - Helper functions.""" 

3 

4from __future__ import annotations 

5 

6import asyncio 

7from collections.abc import Awaitable, Callable 

8from copy import deepcopy 

9from inspect import iscoroutinefunction 

10from typing import Any, TypeAlias 

11 

12_SchemaT: TypeAlias = dict[str, Any] 

13 

14 

15def is_subset(inner: _SchemaT, outer: _SchemaT) -> bool: 

16 """Return True is one dict (or list) is a subset of another.""" 

17 

18 def _is_subset( 

19 a: dict[str, Any] | list[Any] | Any, b: dict[str, Any] | list[Any] | Any 

20 ) -> bool: 

21 if isinstance(a, dict): 

22 return isinstance(b, dict) and all( 

23 k in b and _is_subset(v, b[k]) for k, v in a.items() 

24 ) 

25 if isinstance(a, list): 

26 return isinstance(b, list) and all( 

27 any(_is_subset(x, y) for y in b) for x in a 

28 ) 

29 return bool(a == b) 

30 

31 return _is_subset(inner, outer) 

32 

33 

34def deep_merge(src: _SchemaT, dst: _SchemaT, _dc: bool = False) -> _SchemaT: 

35 """Deep merge a src dict (precedent) into a dst dict and return the result. 

36 

37 run me with nosetests --with-doctest file.py 

38 

39 >>> s = {'data': {'rows': {'pass': 'dog', 'num': '1'}}} 

40 >>> d = {'data': {'rows': { 'fail': 'cat', 'num': '5'}}} 

41 >>> merge(s, d) == {'data': {'rows': {'pass': 'dog', 'fail': 'cat', 'num': '1'}}} 

42 True 

43 """ 

44 

45 new_dst = dst if _dc else deepcopy(dst) # start with copy of dst, merge src into it 

46 for key, value in src.items(): # values are only: dict, list, value or None 

47 if isinstance(value, dict): # is dict 

48 node = new_dst.setdefault(key, {}) # get node or create one 

49 deep_merge(value, node, _dc=True) 

50 

51 elif not isinstance(value, list): # is value 

52 new_dst[key] = value # src takes precedence, assert will fail 

53 

54 elif key not in new_dst or not isinstance(new_dst[key], list): # is list 

55 new_dst[key] = src[key] # not expected, but maybe 

56 

57 else: 

58 new_dst[key] = list(set(src[key] + new_dst[key])) # will sort 

59 

60 # assert _is_subset(shrink(src), shrink(new_dst)) 

61 return new_dst 

62 

63 

64def shrink( 

65 value: _SchemaT, keep_falsys: bool = False, keep_hints: bool = False 

66) -> _SchemaT: 

67 """Return a minimized dict, after removing all the meaningless items. 

68 

69 Specifically, removes items with: 

70 - unwanted keys (starting with '_') 

71 - falsey values 

72 """ 

73 

74 def walk(node: Any) -> Any: 

75 if isinstance(node, dict): 

76 return { 

77 k: walk(v) 

78 for k, v in node.items() 

79 if (keep_hints or k[:1] != "_") and (keep_falsys or walk(v)) 

80 } 

81 elif isinstance(node, list): 

82 try: 

83 return sorted([walk(x) for x in node if x]) 

84 except TypeError: # if a list of dicts 

85 return [walk(x) for x in node if x] 

86 else: 

87 return node 

88 

89 if not isinstance(value, dict): 

90 raise TypeError("value is not a dict") 

91 

92 result: _SchemaT = walk(value) 

93 return result 

94 

95 

96def schedule_task( 

97 fnc: Awaitable[Any] | Callable[..., Any], 

98 *args: Any, 

99 delay: float | None = None, 

100 period: float | None = None, 

101 **kwargs: Any, 

102) -> asyncio.Task[Any]: 

103 """Start a coro after delay seconds.""" 

104 

105 async def execute_fnc( 

106 fnc: Awaitable[Any] | Callable[..., Any], *args: Any, **kwargs: Any 

107 ) -> Any: 

108 if iscoroutinefunction(fnc): # Awaitable, else Callable 

109 return await fnc(*args, **kwargs) 

110 return fnc(*args, **kwargs) # type: ignore[operator] 

111 

112 async def schedule_fnc( 

113 fnc: Awaitable[Any] | Callable[..., Any], 

114 delay: float | None, 

115 period: float | None, 

116 *args: Any, 

117 **kwargs: Any, 

118 ) -> Any: 

119 if delay: 

120 await asyncio.sleep(delay) 

121 

122 if not period: 

123 await execute_fnc(fnc, *args, **kwargs) 

124 return 

125 

126 while period: 

127 await execute_fnc(fnc, *args, **kwargs) 

128 await asyncio.sleep(period) 

129 

130 return asyncio.create_task( # do we need to pass in an event loop? 

131 schedule_fnc(fnc, delay, period, *args, **kwargs), name=str(fnc) 

132 )