Coverage for arrakis/mux.py: 73.1%
78 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-16 15:43 -0700
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-16 15:43 -0700
1# Copyright (c) 2022, California Institute of Technology and contributors
2#
3# You should have received a copy of the licensing terms for this
4# software included in the file "LICENSE" located in the top-level
5# directory of this package. If you did not, you can view a copy at
6# https://git.ligo.org/ngdd/arrakis-server/-/raw/main/LICENSE
8import heapq
9import warnings
10from collections import defaultdict
11from collections.abc import Iterable, Iterator, Mapping
12from dataclasses import dataclass
13from datetime import timedelta
14from enum import Enum, auto
15from typing import Generic, TypeVar
17import gpstime
19from .block import Time
21T = TypeVar("T")
23DEFAULT_TIMEOUT = timedelta(seconds=1)
26class OnDrop(Enum):
27 IGNORE = auto()
28 RAISE = auto()
29 WARN = auto()
32@dataclass
33class MuxedData(Mapping, Generic[T]):
34 """Container that holds timestamped data.
36 Parameters
37 ----------
38 time : int
39 The timestamp associated with this data, in nanoseconds.
40 data : dict[str, T]
41 The keyed data.
43 """
45 time: int
46 data: dict[str, T]
48 def __getitem__(self, index: str) -> T:
49 return self.data[index]
51 def __iter__(self) -> Iterator[str]:
52 return iter(self.data)
54 def __len__(self) -> int:
55 return len(self.data)
58class Muxer(Generic[T]):
59 """A data structure that multiplexes items from multiple named streams.
61 Given items from multiple named streams with monotonically increasing
62 integer timestamps, this data structure can be used to pull out sets of
63 synchronized items (items all with the same timestamp).
65 The oldest items will be held until either all named streams are
66 available or until the timeout has been reached. If a start time has been
67 set, any items with an older timestamp will be rejected.
69 Parameters
70 ----------
71 keys : Iterable[str]
72 Identifiers for the named streams to expect when adding items.
73 start : int, optional
74 The GPS time to start muxing items for.
75 If not set, accept items from any time.
76 timeout : timedelta or None, optional
77 The maximum time to wait for messages from named streams, in seconds,
78 before multiplexing. If None is specified, wait indefinitely. Default
79 is 1 second.
81 """
83 def __init__(
84 self,
85 keys: Iterable[str],
86 start: int | None = None,
87 timeout: timedelta | None = DEFAULT_TIMEOUT,
88 ) -> None:
89 self._keys = set(keys)
90 self._items: dict[int, dict[str, T]] = defaultdict(lambda: defaultdict())
91 self._times: list[int] = []
92 self._last_time = None
93 self._start = start
94 self._last_time = start
95 self._timeout = timeout
97 # track when processing started to handle lookback properly
98 self._processing_start_time = int(gpstime.gpsnow() * Time.SECONDS)
100 def push(self, time: int, key: str, item: T, on_drop: str = "warn") -> None:
101 """Push an item into the muxer.
103 Parameters
104 ----------
105 time : int
106 The timestamp associated with this item.
107 key : str
108 The key stream associated with this item. Must match a key provided
109 at initialization.
110 item : T
111 The item to add.
112 on_drop : str, optional
113 Specifies behavior when the item would be dropped from the muxer,
114 in the case that it was not provided to the muxer before the
115 specified timeout. Options are 'ignore', 'raise', or 'warn'.
116 Default is 'warn'.
118 """
119 if key not in self._keys:
120 msg = f"{key} doesn't match keys provided at initialization"
121 raise KeyError(msg)
123 if not self._last_time:
124 self._last_time = time
126 # skip over items that have already been pulled
127 if time < self._last_time:
128 if self._start is not None and time < self._start:
129 return
130 msg = f"item's timestamp is too old: ({time} < {self._last_time})"
131 match OnDrop[on_drop.lower()]:
132 case OnDrop.IGNORE:
133 return
134 case OnDrop.RAISE:
135 raise ValueError(msg)
136 case OnDrop.WARN:
137 warnings.warn(msg, stacklevel=2)
139 # add item
140 if time in self._items:
141 if key not in self._items[time]:
142 self._items[time][key] = item
143 else:
144 heapq.heappush(self._times, time)
145 self._items[time][key] = item
147 def pull(self) -> Iterator[MuxedData[T]]:
148 """Pull monotonically increasing synchronized items from the muxer.
150 Yields
151 ------
152 MuxedData[T]
153 Synchronized items with a common timestamp, keyed by stream keys.
155 """
156 if not self._times:
157 return
159 # yield items in monotonically increasing order as long
160 # as conditions are met
161 time = self._times[0]
162 while self._has_all_items(time) or self._are_items_stale(time):
163 yield MuxedData(time, self._items.pop(time))
164 self._last_time = heapq.heappop(self._times)
165 if not self._times:
166 break
167 time = self._times[0]
169 def _has_all_items(self, time: int):
170 """Check if a timestamp has all items requested."""
171 return len(self._items[time]) == len(self._keys)
173 def _are_items_stale(self, time):
174 """Check if a timestamp is older than the latency cutoff."""
175 if self._timeout is None:
176 return False
178 time_now = gpstime.gpsnow()
179 dt_lookback = max(self._processing_start_time - time, 0) / float(Time.SECONDS)
180 dt_timeout = self._timeout.total_seconds()
181 oldest_time_allowed = time_now - dt_lookback - dt_timeout
182 return time <= int(oldest_time_allowed * Time.SECONDS)