Coverage for arrakis/block.py: 90.6%
181 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-16 15:43 -0700
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-16 15:43 -0700
1# Copyright (c) 2022, California Institute of Technology and contributors
2#
3# You should have received a copy of the licensing terms for this
4# software included in the file "LICENSE" located in the top-level
5# directory of this package. If you did not, you can view a copy at
6# https://git.ligo.org/ngdd/arrakis-python/-/raw/main/LICENSE
8"""Series block representation of timeseries data."""
10from __future__ import annotations
12from collections import defaultdict
13from dataclasses import dataclass, field
14from enum import Enum
15from functools import cached_property
16from typing import TYPE_CHECKING, Generic, TypeVar
18import numpy
19import pyarrow
20import pyarrow.compute
22from .channel import Channel
24if TYPE_CHECKING:
25 from collections.abc import Generator, Iterator, KeysView
26 from numbers import Real
29ChannelLike = TypeVar("ChannelLike", bound=Channel)
32class Time(int, Enum):
33 SECONDS = 1_000_000_000
34 MILLISECONDS = 1_000_000
35 MICROSECONDS = 1_000
36 NANOSECONDS = 1
37 s = 1_000_000_000
38 ms = 1_000_000
39 us = 1_000
40 ns = 1
43class Freq(Enum):
44 GHz = 1_000_000_000
45 MHz = 1_000_000
46 kHz = 1_000 # noqa: N815
47 Hz = 1
49 def __rmul__(self, other: Real) -> int: # type: ignore
50 return int((self.value / other) * Time.s) # type: ignore
53def time_as_ns(time: float) -> int:
54 """Convert a timestamp from seconds to nanoseconds.
56 Parameters
57 ----------
58 time : float
59 The timestamp to convert, in seconds.
61 Returns
62 -------
63 int
64 The converted timestamp, in nanoseconds.
66 """
67 seconds = int(time) * Time.s
68 nanoseconds = int((time % 1)) * Time.s
69 return seconds + nanoseconds
72@dataclass(frozen=True)
73class Series(Generic[ChannelLike]):
74 """Single-channel timeseries data for a given timestamp.
76 Parameters
77 ----------
78 time_ns : int
79 The timestamp associated with this data, in nanoseconds.
80 data : numpy.ndarray
81 The timeseries data.
82 channel : Channel
83 Channel metadata associated with this timeseries.
85 """
87 time_ns: int
88 data: numpy.ndarray
89 channel: ChannelLike
91 @cached_property
92 def time(self) -> float:
93 """Timestamp associated with this data, in seconds."""
94 return self.time_ns / Time.s
96 @property
97 def t0(self) -> float:
98 """Timestamp associated with this data, in seconds."""
99 return self.time
101 @cached_property
102 def duration(self) -> float:
103 """Series duration in seconds."""
104 return self.duration_ns / Time.s
106 @cached_property
107 def duration_ns(self) -> int:
108 """Series duration in nanoseconds."""
109 return int((len(self) * Time.s) / self.sample_rate)
111 @property
112 def dt(self) -> float:
113 """The time separation in seconds between successive samples."""
114 return 1 / self.sample_rate
116 @property
117 def name(self) -> str:
118 """Channel name."""
119 return str(self.channel)
121 @property
122 def data_type(self) -> numpy.dtype:
123 """Data type of the data array's elements."""
124 return self.data.dtype
126 @property
127 def dtype(self) -> numpy.dtype:
128 """Data type of the data array's elements."""
129 return self.data.dtype
131 @property
132 def sample_rate(self) -> float:
133 """Data rate for this series in samples per second (Hz)."""
134 return self.channel.sample_rate
136 @cached_property
137 def times(self) -> numpy.ndarray:
138 """The array of times corresponding to all data points in the series."""
139 return numpy.arange(len(self)) * self.dt + self.time
141 def __len__(self) -> int:
142 return len(self.data)
145@dataclass(frozen=True)
146class SeriesBlock(Generic[ChannelLike]):
147 """Series block containing timeseries for channels for a given timestamp.
149 Parameters
150 ----------
151 time_ns : int
152 The timestamp associated with this data, in nanoseconds.
153 data : dict[str, numpy.ndarray]
154 Mapping between channels and timeseries.
155 channels : dict[str, Channel]
156 Channel metadata associated with this data block.
158 """
160 time_ns: int
161 data: dict[str, numpy.ndarray] | dict[str, numpy.ma.MaskedArray]
162 channels: dict[str, ChannelLike] = field(default_factory=dict)
163 _duration_ns: int = field(init=False, default=0)
165 def __post_init__(self):
166 # various validation checks
167 #
168 # check that the channel lists are consistent
169 assert set(self.data) == set(self.channels), (
170 "data and channels dicts have different keys"
171 )
172 # check that the duration of all Series are consistent
173 for channel, data in self.data.items():
174 duration_ns = int((len(data) * Time.s) / self.channels[channel].sample_rate)
175 if self._duration_ns == 0:
176 # NOTE: this is a hacky way to set an attribute of a
177 # frozen dataclass
178 object.__setattr__(self, "_duration_ns", duration_ns)
179 assert duration_ns == self._duration_ns, "Series durations do not agree"
181 @cached_property
182 def time(self) -> float:
183 """Timestamp associated with this block, in seconds."""
184 return self.time_ns / Time.s
186 @property
187 def t0(self) -> float:
188 """Timestamp associated with this block, in seconds."""
189 return self.time
191 @cached_property
192 def duration(self) -> float:
193 """Duration of this block, in seconds."""
194 return self._duration_ns / Time.s
196 @property
197 def duration_ns(self) -> int:
198 """Duration of this block, in nanoseconds."""
199 return self._duration_ns
201 def __getitem__(self, channel: str) -> Series:
202 return Series(self.time_ns, self.data[channel], self.channels[channel])
204 def __len__(self) -> int:
205 return len(self.data)
207 def keys(self) -> KeysView[str]:
208 return self.data.keys()
210 def items(self) -> Generator[tuple[str, Series], None, None]:
211 for channel in self.keys():
212 yield (channel, self[channel])
214 def values(self) -> list[Series]:
215 return [self[channel] for channel in self.keys()]
217 def filter(self, channels: list[str] | None = None) -> SeriesBlock:
218 """Filter a block based on criteria.
220 FIXME: more info needed
222 Parameters
223 ----------
224 channels : list[str], optional
225 If specified, keep only these channels.
227 Returns
228 -------
229 SeriesBlock
230 The filtered series.
232 """
233 if not channels:
234 return self
236 data = {channel: self.data[channel] for channel in channels}
237 if self.channels:
238 channel_dict = {channel: self.channels[channel] for channel in channels}
239 else:
240 channel_dict = self.channels
242 return type(self)(self.time_ns, data, channel_dict)
244 def to_column_batch(self) -> pyarrow.RecordBatch:
245 """Create a row-based record batch from a series block.
247 Returns
248 -------
249 pyarrow.RecordBatch
250 A record batch, with a 'time' column with the timestamp
251 and channel columns with all channels to publish.
253 """
254 schema = self._generate_column_schema()
255 return pyarrow.RecordBatch.from_arrays(
256 [
257 pyarrow.array([self.time_ns], type=schema.field("time").type),
258 *[
259 pyarrow.array([series], type=schema.field(channel).type)
260 for channel, series in self.data.items()
261 ],
262 ],
263 schema=schema,
264 )
266 def to_row_batches(self, partitions: dict) -> Iterator[pyarrow.RecordBatch]:
267 """Create column-based record batches from a series block.
269 Yields
270 -------
271 pyarrow.RecordBatch
272 Record batches, one per data type. The record batches have a
273 'time' column with the timestamp, a 'channel' column with
274 the channel name, and a 'data' column containing the timeseries.
276 """
277 # group channels by partitions
278 channels_by_part = defaultdict(list)
279 for channel in self.keys():
280 if channel in partitions:
281 partition = partitions[channel]
282 channels_by_part[partition].append(channel)
284 # generate column-based record batches
285 for partition_id, channels in channels_by_part.items():
286 # all channels have the same data type
287 dtype = self.channels[channels[0]].data_type
288 schema = self._generate_row_schema(pyarrow.from_numpy_dtype(dtype))
289 series: list[numpy.ndarray] = [self.data[channel] for channel in channels]
290 yield (
291 partition_id,
292 pyarrow.RecordBatch.from_arrays(
293 [
294 pyarrow.array(
295 numpy.full(len(channels), self.time_ns),
296 type=schema.field("time").type,
297 ),
298 pyarrow.array(channels, type=schema.field("channel").type),
299 pyarrow.array(series, type=schema.field("data").type),
300 ],
301 schema=schema,
302 ),
303 )
305 @classmethod
306 def from_column_batch(
307 cls,
308 batch: pyarrow.RecordBatch,
309 channels: dict[str, ChannelLike],
310 ) -> SeriesBlock:
311 """Create a series block from a record batch.
313 Parameters
314 ----------
315 batch : pyarrow.RecordBatch
316 A record batch, with a 'time' column with the timestamp
317 and channel columns with all channels to publish.
318 channels : dict[str, Channel]
319 Channel metadata. The metadata for the channels defined
320 in the batch will be extracted from this dictionary, so
321 this dictionary may include metadata for additional
322 channels now included in the batch.
324 Returns
325 -------
326 SeriesBlock
327 The block representation of the record batch.
329 """
330 time = batch.column("time").to_numpy()[0]
331 fields: list[pyarrow.field] = list(batch.schema)
332 channel_names = [field.name for field in fields[1:]]
333 series_dict = {
334 channel: pyarrow.compute.list_flatten(batch.column(channel)).to_numpy()
335 for channel in channel_names
336 }
337 channel_dict = {channel: channels[channel] for channel in channel_names}
338 return cls(time, series_dict, channel_dict)
340 @classmethod
341 def from_row_batch(
342 cls,
343 batch: pyarrow.RecordBatch,
344 channels: dict[str, ChannelLike],
345 ) -> SeriesBlock:
346 """Create a series block from a record batch.
348 Parameters
349 ----------
350 batch : pyarrow.RecordBatch
351 A record batch, with a 'time' column with the timestamp, a
352 'channel' column with the channel name, and a 'data' column
353 containing the timeseries.
354 channels : dict[str, Channel]
355 Channel metadata. The metadata for the channels defined
356 in the batch will be extracted from this dictionary, so
357 this dictionary may include metadata for additional
358 channels now included in the batch.
360 Returns
361 -------
362 SeriesBlock
363 The block representation of the record batch.
365 """
366 time = batch.column("time").to_numpy()[0]
367 channel_names = batch.column("channel").to_pylist()
368 data = batch.column("data")
369 series_dict = {}
370 channel_dict = {}
371 for idx, channel in enumerate(channel_names):
372 series_dict[channel] = pyarrow.array(data[idx]).to_numpy()
373 channel_dict[channel] = channels[channel]
374 return cls(time, series_dict, channel_dict)
376 def _generate_column_schema(self) -> pyarrow.Schema:
377 fields = [pyarrow.field("time", pyarrow.int64())]
378 for channel, arr in self.data.items():
379 dtype = pyarrow.from_numpy_dtype(arr.dtype)
380 fields.append(pyarrow.field(channel, pyarrow.list_(dtype)))
381 return pyarrow.schema(fields)
383 def _generate_row_schema(self, dtype: pyarrow.DataType) -> pyarrow.Schema:
384 return pyarrow.schema(
385 [
386 pyarrow.field("time", pyarrow.int64()),
387 pyarrow.field("channel", pyarrow.string()),
388 pyarrow.field("data", pyarrow.list_(dtype)),
389 ]
390 )
393# backwards compatibility with previous name
394DataBlock = SeriesBlock
397def concatenate_blocks(*blocks: SeriesBlock) -> SeriesBlock:
398 """Join a sequence of timeseries blocks into a single block.
400 If the SeriesBlock arguments are not sequential in time an
401 AssertionError will be thrown.
403 Parameters
404 ----------
405 *blocks : SeriesBlock
406 The timeseries blocks to concatenate.
408 Returns
409 -------
410 SeriesBlock
411 The combined timeseries block.
413 """
414 channel_dict = blocks[0].channels
415 channel_set = set(channel_dict)
416 start_time_ns = end_time_ns = blocks[0].time_ns
417 duration_ns = 0
418 for block in blocks:
419 assert set(block.data.keys()) == channel_set, (
420 "all blocks must contain the same channel sets"
421 )
422 assert block.time_ns == end_time_ns, (
423 f"block start time ({block.time_ns}) does not match "
424 f"concatenated block end time ({end_time_ns})"
425 )
426 duration_ns += block.duration_ns
427 end_time_ns += block.duration_ns
428 series_dict: dict[str, numpy.ndarray] = {}
429 for channel in channel_set:
430 series_dict[str(channel)] = numpy.concatenate(
431 [block[str(channel)].data for block in blocks]
432 )
433 return SeriesBlock(start_time_ns, series_dict, channel_dict)
436def combine_blocks(*blocks: SeriesBlock) -> SeriesBlock:
437 """Combine multiple SeriesBlocks from the same time into a single SeriesBlock
439 Each block must contain a distinct set of channels, and the time
440 properties of each block must agree, otherwise an AssertionError
441 will be thrown.
443 Parameters
444 ----------
445 *blocks : SeriesBlock
446 The blocks to combine.
448 Returns
449 -------
450 SeriesBlock
451 The combined block.
453 """
454 time_ns = blocks[0].time_ns
455 duration_ns = blocks[0].duration_ns
456 series_dict: dict[str, numpy.ndarray] = {}
457 channel_dict: dict[str, Channel] = {}
458 for block in blocks:
459 assert block.time_ns == time_ns, "all block times must agree"
460 assert block.duration_ns == duration_ns, "all block durations must agree"
461 for channel, series in block.items():
462 assert channel not in series_dict, (
463 f"channel {channel} has already been included from another block"
464 )
465 series_dict[channel] = series.data
466 channel_dict[channel] = series.channel
467 return SeriesBlock(time_ns, series_dict, channel_dict)