Coverage for arrakis/mux.py: 73.1%

78 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-16 15:43 -0700

1# Copyright (c) 2022, California Institute of Technology and contributors 

2# 

3# You should have received a copy of the licensing terms for this 

4# software included in the file "LICENSE" located in the top-level 

5# directory of this package. If you did not, you can view a copy at 

6# https://git.ligo.org/ngdd/arrakis-server/-/raw/main/LICENSE 

7 

8import heapq 

9import warnings 

10from collections import defaultdict 

11from collections.abc import Iterable, Iterator, Mapping 

12from dataclasses import dataclass 

13from datetime import timedelta 

14from enum import Enum, auto 

15from typing import Generic, TypeVar 

16 

17import gpstime 

18 

19from .block import Time 

20 

21T = TypeVar("T") 

22 

23DEFAULT_TIMEOUT = timedelta(seconds=1) 

24 

25 

26class OnDrop(Enum): 

27 IGNORE = auto() 

28 RAISE = auto() 

29 WARN = auto() 

30 

31 

32@dataclass 

33class MuxedData(Mapping, Generic[T]): 

34 """Container that holds timestamped data. 

35 

36 Parameters 

37 ---------- 

38 time : int 

39 The timestamp associated with this data, in nanoseconds. 

40 data : dict[str, T] 

41 The keyed data. 

42 

43 """ 

44 

45 time: int 

46 data: dict[str, T] 

47 

48 def __getitem__(self, index: str) -> T: 

49 return self.data[index] 

50 

51 def __iter__(self) -> Iterator[str]: 

52 return iter(self.data) 

53 

54 def __len__(self) -> int: 

55 return len(self.data) 

56 

57 

58class Muxer(Generic[T]): 

59 """A data structure that multiplexes items from multiple named streams. 

60 

61 Given items from multiple named streams with monotonically increasing 

62 integer timestamps, this data structure can be used to pull out sets of 

63 synchronized items (items all with the same timestamp). 

64 

65 The oldest items will be held until either all named streams are 

66 available or until the timeout has been reached. If a start time has been 

67 set, any items with an older timestamp will be rejected. 

68 

69 Parameters 

70 ---------- 

71 keys : Iterable[str] 

72 Identifiers for the named streams to expect when adding items. 

73 start : int, optional 

74 The GPS time to start muxing items for. 

75 If not set, accept items from any time. 

76 timeout : timedelta or None, optional 

77 The maximum time to wait for messages from named streams, in seconds, 

78 before multiplexing. If None is specified, wait indefinitely. Default 

79 is 1 second. 

80 

81 """ 

82 

83 def __init__( 

84 self, 

85 keys: Iterable[str], 

86 start: int | None = None, 

87 timeout: timedelta | None = DEFAULT_TIMEOUT, 

88 ) -> None: 

89 self._keys = set(keys) 

90 self._items: dict[int, dict[str, T]] = defaultdict(lambda: defaultdict()) 

91 self._times: list[int] = [] 

92 self._last_time = None 

93 self._start = start 

94 self._last_time = start 

95 self._timeout = timeout 

96 

97 # track when processing started to handle lookback properly 

98 self._processing_start_time = int(gpstime.gpsnow() * Time.SECONDS) 

99 

100 def push(self, time: int, key: str, item: T, on_drop: str = "warn") -> None: 

101 """Push an item into the muxer. 

102 

103 Parameters 

104 ---------- 

105 time : int 

106 The timestamp associated with this item. 

107 key : str 

108 The key stream associated with this item. Must match a key provided 

109 at initialization. 

110 item : T 

111 The item to add. 

112 on_drop : str, optional 

113 Specifies behavior when the item would be dropped from the muxer, 

114 in the case that it was not provided to the muxer before the 

115 specified timeout. Options are 'ignore', 'raise', or 'warn'. 

116 Default is 'warn'. 

117 

118 """ 

119 if key not in self._keys: 

120 msg = f"{key} doesn't match keys provided at initialization" 

121 raise KeyError(msg) 

122 

123 if not self._last_time: 

124 self._last_time = time 

125 

126 # skip over items that have already been pulled 

127 if time < self._last_time: 

128 if self._start is not None and time < self._start: 

129 return 

130 msg = f"item's timestamp is too old: ({time} < {self._last_time})" 

131 match OnDrop[on_drop.lower()]: 

132 case OnDrop.IGNORE: 

133 return 

134 case OnDrop.RAISE: 

135 raise ValueError(msg) 

136 case OnDrop.WARN: 

137 warnings.warn(msg, stacklevel=2) 

138 

139 # add item 

140 if time in self._items: 

141 if key not in self._items[time]: 

142 self._items[time][key] = item 

143 else: 

144 heapq.heappush(self._times, time) 

145 self._items[time][key] = item 

146 

147 def pull(self) -> Iterator[MuxedData[T]]: 

148 """Pull monotonically increasing synchronized items from the muxer. 

149 

150 Yields 

151 ------ 

152 MuxedData[T] 

153 Synchronized items with a common timestamp, keyed by stream keys. 

154 

155 """ 

156 if not self._times: 

157 return 

158 

159 # yield items in monotonically increasing order as long 

160 # as conditions are met 

161 time = self._times[0] 

162 while self._has_all_items(time) or self._are_items_stale(time): 

163 yield MuxedData(time, self._items.pop(time)) 

164 self._last_time = heapq.heappop(self._times) 

165 if not self._times: 

166 break 

167 time = self._times[0] 

168 

169 def _has_all_items(self, time: int): 

170 """Check if a timestamp has all items requested.""" 

171 return len(self._items[time]) == len(self._keys) 

172 

173 def _are_items_stale(self, time): 

174 """Check if a timestamp is older than the latency cutoff.""" 

175 if self._timeout is None: 

176 return False 

177 

178 time_now = gpstime.gpsnow() 

179 dt_lookback = max(self._processing_start_time - time, 0) / float(Time.SECONDS) 

180 dt_timeout = self._timeout.total_seconds() 

181 oldest_time_allowed = time_now - dt_lookback - dt_timeout 

182 return time <= int(oldest_time_allowed * Time.SECONDS)