Coverage for arrakis/block.py: 90.6%

181 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-16 15:43 -0700

1# Copyright (c) 2022, California Institute of Technology and contributors 

2# 

3# You should have received a copy of the licensing terms for this 

4# software included in the file "LICENSE" located in the top-level 

5# directory of this package. If you did not, you can view a copy at 

6# https://git.ligo.org/ngdd/arrakis-python/-/raw/main/LICENSE 

7 

8"""Series block representation of timeseries data.""" 

9 

10from __future__ import annotations 

11 

12from collections import defaultdict 

13from dataclasses import dataclass, field 

14from enum import Enum 

15from functools import cached_property 

16from typing import TYPE_CHECKING, Generic, TypeVar 

17 

18import numpy 

19import pyarrow 

20import pyarrow.compute 

21 

22from .channel import Channel 

23 

24if TYPE_CHECKING: 

25 from collections.abc import Generator, Iterator, KeysView 

26 from numbers import Real 

27 

28 

29ChannelLike = TypeVar("ChannelLike", bound=Channel) 

30 

31 

32class Time(int, Enum): 

33 SECONDS = 1_000_000_000 

34 MILLISECONDS = 1_000_000 

35 MICROSECONDS = 1_000 

36 NANOSECONDS = 1 

37 s = 1_000_000_000 

38 ms = 1_000_000 

39 us = 1_000 

40 ns = 1 

41 

42 

43class Freq(Enum): 

44 GHz = 1_000_000_000 

45 MHz = 1_000_000 

46 kHz = 1_000 # noqa: N815 

47 Hz = 1 

48 

49 def __rmul__(self, other: Real) -> int: # type: ignore 

50 return int((self.value / other) * Time.s) # type: ignore 

51 

52 

53def time_as_ns(time: float) -> int: 

54 """Convert a timestamp from seconds to nanoseconds. 

55 

56 Parameters 

57 ---------- 

58 time : float 

59 The timestamp to convert, in seconds. 

60 

61 Returns 

62 ------- 

63 int 

64 The converted timestamp, in nanoseconds. 

65 

66 """ 

67 seconds = int(time) * Time.s 

68 nanoseconds = int((time % 1)) * Time.s 

69 return seconds + nanoseconds 

70 

71 

72@dataclass(frozen=True) 

73class Series(Generic[ChannelLike]): 

74 """Single-channel timeseries data for a given timestamp. 

75 

76 Parameters 

77 ---------- 

78 time_ns : int 

79 The timestamp associated with this data, in nanoseconds. 

80 data : numpy.ndarray 

81 The timeseries data. 

82 channel : Channel 

83 Channel metadata associated with this timeseries. 

84 

85 """ 

86 

87 time_ns: int 

88 data: numpy.ndarray 

89 channel: ChannelLike 

90 

91 @cached_property 

92 def time(self) -> float: 

93 """Timestamp associated with this data, in seconds.""" 

94 return self.time_ns / Time.s 

95 

96 @property 

97 def t0(self) -> float: 

98 """Timestamp associated with this data, in seconds.""" 

99 return self.time 

100 

101 @cached_property 

102 def duration(self) -> float: 

103 """Series duration in seconds.""" 

104 return self.duration_ns / Time.s 

105 

106 @cached_property 

107 def duration_ns(self) -> int: 

108 """Series duration in nanoseconds.""" 

109 return int((len(self) * Time.s) / self.sample_rate) 

110 

111 @property 

112 def dt(self) -> float: 

113 """The time separation in seconds between successive samples.""" 

114 return 1 / self.sample_rate 

115 

116 @property 

117 def name(self) -> str: 

118 """Channel name.""" 

119 return str(self.channel) 

120 

121 @property 

122 def data_type(self) -> numpy.dtype: 

123 """Data type of the data array's elements.""" 

124 return self.data.dtype 

125 

126 @property 

127 def dtype(self) -> numpy.dtype: 

128 """Data type of the data array's elements.""" 

129 return self.data.dtype 

130 

131 @property 

132 def sample_rate(self) -> float: 

133 """Data rate for this series in samples per second (Hz).""" 

134 return self.channel.sample_rate 

135 

136 @cached_property 

137 def times(self) -> numpy.ndarray: 

138 """The array of times corresponding to all data points in the series.""" 

139 return numpy.arange(len(self)) * self.dt + self.time 

140 

141 def __len__(self) -> int: 

142 return len(self.data) 

143 

144 

145@dataclass(frozen=True) 

146class SeriesBlock(Generic[ChannelLike]): 

147 """Series block containing timeseries for channels for a given timestamp. 

148 

149 Parameters 

150 ---------- 

151 time_ns : int 

152 The timestamp associated with this data, in nanoseconds. 

153 data : dict[str, numpy.ndarray] 

154 Mapping between channels and timeseries. 

155 channels : dict[str, Channel] 

156 Channel metadata associated with this data block. 

157 

158 """ 

159 

160 time_ns: int 

161 data: dict[str, numpy.ndarray] | dict[str, numpy.ma.MaskedArray] 

162 channels: dict[str, ChannelLike] = field(default_factory=dict) 

163 _duration_ns: int = field(init=False, default=0) 

164 

165 def __post_init__(self): 

166 # various validation checks 

167 # 

168 # check that the channel lists are consistent 

169 assert set(self.data) == set(self.channels), ( 

170 "data and channels dicts have different keys" 

171 ) 

172 # check that the duration of all Series are consistent 

173 for channel, data in self.data.items(): 

174 duration_ns = int((len(data) * Time.s) / self.channels[channel].sample_rate) 

175 if self._duration_ns == 0: 

176 # NOTE: this is a hacky way to set an attribute of a 

177 # frozen dataclass 

178 object.__setattr__(self, "_duration_ns", duration_ns) 

179 assert duration_ns == self._duration_ns, "Series durations do not agree" 

180 

181 @cached_property 

182 def time(self) -> float: 

183 """Timestamp associated with this block, in seconds.""" 

184 return self.time_ns / Time.s 

185 

186 @property 

187 def t0(self) -> float: 

188 """Timestamp associated with this block, in seconds.""" 

189 return self.time 

190 

191 @cached_property 

192 def duration(self) -> float: 

193 """Duration of this block, in seconds.""" 

194 return self._duration_ns / Time.s 

195 

196 @property 

197 def duration_ns(self) -> int: 

198 """Duration of this block, in nanoseconds.""" 

199 return self._duration_ns 

200 

201 def __getitem__(self, channel: str) -> Series: 

202 return Series(self.time_ns, self.data[channel], self.channels[channel]) 

203 

204 def __len__(self) -> int: 

205 return len(self.data) 

206 

207 def keys(self) -> KeysView[str]: 

208 return self.data.keys() 

209 

210 def items(self) -> Generator[tuple[str, Series], None, None]: 

211 for channel in self.keys(): 

212 yield (channel, self[channel]) 

213 

214 def values(self) -> list[Series]: 

215 return [self[channel] for channel in self.keys()] 

216 

217 def filter(self, channels: list[str] | None = None) -> SeriesBlock: 

218 """Filter a block based on criteria. 

219 

220 FIXME: more info needed 

221 

222 Parameters 

223 ---------- 

224 channels : list[str], optional 

225 If specified, keep only these channels. 

226 

227 Returns 

228 ------- 

229 SeriesBlock 

230 The filtered series. 

231 

232 """ 

233 if not channels: 

234 return self 

235 

236 data = {channel: self.data[channel] for channel in channels} 

237 if self.channels: 

238 channel_dict = {channel: self.channels[channel] for channel in channels} 

239 else: 

240 channel_dict = self.channels 

241 

242 return type(self)(self.time_ns, data, channel_dict) 

243 

244 def to_column_batch(self) -> pyarrow.RecordBatch: 

245 """Create a row-based record batch from a series block. 

246 

247 Returns 

248 ------- 

249 pyarrow.RecordBatch 

250 A record batch, with a 'time' column with the timestamp 

251 and channel columns with all channels to publish. 

252 

253 """ 

254 schema = self._generate_column_schema() 

255 return pyarrow.RecordBatch.from_arrays( 

256 [ 

257 pyarrow.array([self.time_ns], type=schema.field("time").type), 

258 *[ 

259 pyarrow.array([series], type=schema.field(channel).type) 

260 for channel, series in self.data.items() 

261 ], 

262 ], 

263 schema=schema, 

264 ) 

265 

266 def to_row_batches(self, partitions: dict) -> Iterator[pyarrow.RecordBatch]: 

267 """Create column-based record batches from a series block. 

268 

269 Yields 

270 ------- 

271 pyarrow.RecordBatch 

272 Record batches, one per data type. The record batches have a 

273 'time' column with the timestamp, a 'channel' column with 

274 the channel name, and a 'data' column containing the timeseries. 

275 

276 """ 

277 # group channels by partitions 

278 channels_by_part = defaultdict(list) 

279 for channel in self.keys(): 

280 if channel in partitions: 

281 partition = partitions[channel] 

282 channels_by_part[partition].append(channel) 

283 

284 # generate column-based record batches 

285 for partition_id, channels in channels_by_part.items(): 

286 # all channels have the same data type 

287 dtype = self.channels[channels[0]].data_type 

288 schema = self._generate_row_schema(pyarrow.from_numpy_dtype(dtype)) 

289 series: list[numpy.ndarray] = [self.data[channel] for channel in channels] 

290 yield ( 

291 partition_id, 

292 pyarrow.RecordBatch.from_arrays( 

293 [ 

294 pyarrow.array( 

295 numpy.full(len(channels), self.time_ns), 

296 type=schema.field("time").type, 

297 ), 

298 pyarrow.array(channels, type=schema.field("channel").type), 

299 pyarrow.array(series, type=schema.field("data").type), 

300 ], 

301 schema=schema, 

302 ), 

303 ) 

304 

305 @classmethod 

306 def from_column_batch( 

307 cls, 

308 batch: pyarrow.RecordBatch, 

309 channels: dict[str, ChannelLike], 

310 ) -> SeriesBlock: 

311 """Create a series block from a record batch. 

312 

313 Parameters 

314 ---------- 

315 batch : pyarrow.RecordBatch 

316 A record batch, with a 'time' column with the timestamp 

317 and channel columns with all channels to publish. 

318 channels : dict[str, Channel] 

319 Channel metadata. The metadata for the channels defined 

320 in the batch will be extracted from this dictionary, so 

321 this dictionary may include metadata for additional 

322 channels now included in the batch. 

323 

324 Returns 

325 ------- 

326 SeriesBlock 

327 The block representation of the record batch. 

328 

329 """ 

330 time = batch.column("time").to_numpy()[0] 

331 fields: list[pyarrow.field] = list(batch.schema) 

332 channel_names = [field.name for field in fields[1:]] 

333 series_dict = { 

334 channel: pyarrow.compute.list_flatten(batch.column(channel)).to_numpy() 

335 for channel in channel_names 

336 } 

337 channel_dict = {channel: channels[channel] for channel in channel_names} 

338 return cls(time, series_dict, channel_dict) 

339 

340 @classmethod 

341 def from_row_batch( 

342 cls, 

343 batch: pyarrow.RecordBatch, 

344 channels: dict[str, ChannelLike], 

345 ) -> SeriesBlock: 

346 """Create a series block from a record batch. 

347 

348 Parameters 

349 ---------- 

350 batch : pyarrow.RecordBatch 

351 A record batch, with a 'time' column with the timestamp, a 

352 'channel' column with the channel name, and a 'data' column 

353 containing the timeseries. 

354 channels : dict[str, Channel] 

355 Channel metadata. The metadata for the channels defined 

356 in the batch will be extracted from this dictionary, so 

357 this dictionary may include metadata for additional 

358 channels now included in the batch. 

359 

360 Returns 

361 ------- 

362 SeriesBlock 

363 The block representation of the record batch. 

364 

365 """ 

366 time = batch.column("time").to_numpy()[0] 

367 channel_names = batch.column("channel").to_pylist() 

368 data = batch.column("data") 

369 series_dict = {} 

370 channel_dict = {} 

371 for idx, channel in enumerate(channel_names): 

372 series_dict[channel] = pyarrow.array(data[idx]).to_numpy() 

373 channel_dict[channel] = channels[channel] 

374 return cls(time, series_dict, channel_dict) 

375 

376 def _generate_column_schema(self) -> pyarrow.Schema: 

377 fields = [pyarrow.field("time", pyarrow.int64())] 

378 for channel, arr in self.data.items(): 

379 dtype = pyarrow.from_numpy_dtype(arr.dtype) 

380 fields.append(pyarrow.field(channel, pyarrow.list_(dtype))) 

381 return pyarrow.schema(fields) 

382 

383 def _generate_row_schema(self, dtype: pyarrow.DataType) -> pyarrow.Schema: 

384 return pyarrow.schema( 

385 [ 

386 pyarrow.field("time", pyarrow.int64()), 

387 pyarrow.field("channel", pyarrow.string()), 

388 pyarrow.field("data", pyarrow.list_(dtype)), 

389 ] 

390 ) 

391 

392 

393# backwards compatibility with previous name 

394DataBlock = SeriesBlock 

395 

396 

397def concatenate_blocks(*blocks: SeriesBlock) -> SeriesBlock: 

398 """Join a sequence of timeseries blocks into a single block. 

399 

400 If the SeriesBlock arguments are not sequential in time an 

401 AssertionError will be thrown. 

402 

403 Parameters 

404 ---------- 

405 *blocks : SeriesBlock 

406 The timeseries blocks to concatenate. 

407 

408 Returns 

409 ------- 

410 SeriesBlock 

411 The combined timeseries block. 

412 

413 """ 

414 channel_dict = blocks[0].channels 

415 channel_set = set(channel_dict) 

416 start_time_ns = end_time_ns = blocks[0].time_ns 

417 duration_ns = 0 

418 for block in blocks: 

419 assert set(block.data.keys()) == channel_set, ( 

420 "all blocks must contain the same channel sets" 

421 ) 

422 assert block.time_ns == end_time_ns, ( 

423 f"block start time ({block.time_ns}) does not match " 

424 f"concatenated block end time ({end_time_ns})" 

425 ) 

426 duration_ns += block.duration_ns 

427 end_time_ns += block.duration_ns 

428 series_dict: dict[str, numpy.ndarray] = {} 

429 for channel in channel_set: 

430 series_dict[str(channel)] = numpy.concatenate( 

431 [block[str(channel)].data for block in blocks] 

432 ) 

433 return SeriesBlock(start_time_ns, series_dict, channel_dict) 

434 

435 

436def combine_blocks(*blocks: SeriesBlock) -> SeriesBlock: 

437 """Combine multiple SeriesBlocks from the same time into a single SeriesBlock 

438 

439 Each block must contain a distinct set of channels, and the time 

440 properties of each block must agree, otherwise an AssertionError 

441 will be thrown. 

442 

443 Parameters 

444 ---------- 

445 *blocks : SeriesBlock 

446 The blocks to combine. 

447 

448 Returns 

449 ------- 

450 SeriesBlock 

451 The combined block. 

452 

453 """ 

454 time_ns = blocks[0].time_ns 

455 duration_ns = blocks[0].duration_ns 

456 series_dict: dict[str, numpy.ndarray] = {} 

457 channel_dict: dict[str, Channel] = {} 

458 for block in blocks: 

459 assert block.time_ns == time_ns, "all block times must agree" 

460 assert block.duration_ns == duration_ns, "all block durations must agree" 

461 for channel, series in block.items(): 

462 assert channel not in series_dict, ( 

463 f"channel {channel} has already been included from another block" 

464 ) 

465 series_dict[channel] = series.data 

466 channel_dict[channel] = series.channel 

467 return SeriesBlock(time_ns, series_dict, channel_dict)