Coverage for src / sgn_gwframe / sinks / frame.py: 89.9%

178 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-12 13:29 -0800

1"""This module contains the FrameSink class. 

2 

3Writes time series data to .gwf files. 

4""" 

5 

6from __future__ import annotations 

7 

8import shutil 

9import time 

10from collections import deque 

11from dataclasses import dataclass, field 

12from pathlib import Path 

13from typing import TYPE_CHECKING, Optional, Sequence 

14 

15import gwframe 

16from sgnts.base import Offset, TSFrame, TSSink 

17 

18if TYPE_CHECKING: 

19 from sgn.base import SinkPad 

20 

21# filename format parameters 

22FILENAME_PARAMS = ( 

23 "instruments", 

24 "description", 

25 "gps_start_time", 

26 "duration", 

27) 

28 

29 

30@dataclass(kw_only=True) 

31class FrameSink(TSSink): 

32 """A sink element that writes time series data to file 

33 

34 Args: 

35 channels: 

36 Sequence[str], the instruments to write to the file 

37 duration: 

38 int, the duration of the data to write to the file 

39 path: 

40 str, the path to write the frame files to. The file name 

41 must contain the following format parameters (in curly braces): 

42 - {instruments}, the sorted list of instruments inferred from 

43 the included channel names (e.g. "H1" for "H1:GDS-CAL...") 

44 - {description}, the description string for the frame 

45 - {gps_start_time}, the start time of the data in GPS seconds 

46 - {duration}, the duration of the data in seconds 

47 The extension on the the path determines the output file 

48 type. Currently ".gwf" and ".hdf5" are supported. 

49 default: "{instruments}-{description}-{gps_start_time}-{duration}.gwf" 

50 force: 

51 bool, whether to overwrite existing files. Default: False 

52 description: 

53 str, description string to include in the filename. Default: "SGN" 

54 max_files: 

55 int or None, when set to a positive value, enables circular buffer 

56 mode that keeps only the N most recent frame files. Older files 

57 are automatically deleted. Default: None (disabled) 

58 retention_time: 

59 float or None, retention time in seconds. Files older than this 

60 will be automatically deleted. Can be combined with max_files. 

61 Default: None (keep forever) 

62 subdir_seconds: 

63 int, organize files into subdirectories based on time buckets. 

64 Files are grouped by GPS time divided by this value. For example, 

65 with subdir_seconds=100000, GPS time 1234567890 goes into 

66 subdirectory 12345/. When 0, files are written to a flat directory 

67 structure. Default: 0 (flat structure) 

68 frame_name: 

69 str or None, name field for the frame metadata. If None, inferred 

70 from the instrument list (e.g., "H1L1"). Default: None 

71 frame_history: 

72 dict mapping history entry names to comments. These are added as 

73 frame history metadata. Default: empty dict 

74 frame_duration: 

75 int, total duration in seconds for multi-frame files. Must be an 

76 integer multiple of duration. When 0 or equal to duration, writes 

77 one frame per file. When greater than duration, multiple frames 

78 are written to a single file. For example, with duration=1 and 

79 frame_duration=16, the file contains 16 one-second frames and has 

80 a total duration of 16 seconds. Default: 0 (single frame per file) 

81 tmpdir: 

82 str or None, directory for temporary files during atomic writes. 

83 If None, uses same directory as output. Default: None 

84 compression: 

85 int, compression scheme for frame files. 

86 Default: gwframe.Compression.ZERO_SUPPRESS_OTHERWISE_GZIP 

87 compression_level: 

88 int, compression level 0-9. Default: 6 

89 

90 """ 

91 

92 channels: Sequence[str] 

93 duration: int 

94 path: str = "{instruments}-{description}-{gps_start_time}-{duration}.gwf" 

95 force: bool = False 

96 description: str = "SGN" 

97 max_files: Optional[int] = None 

98 retention_time: Optional[float] = None 

99 subdir_seconds: int = 0 

100 frame_name: Optional[str] = None 

101 frame_history: dict[str, str] = field(default_factory=dict) 

102 frame_duration: int = 0 

103 tmpdir: Optional[str] = None 

104 compression: int = gwframe.Compression.ZERO_SUPPRESS_OTHERWISE_GZIP 

105 compression_level: int = 6 

106 

107 @property 

108 def static_sink_pads(self) -> list[str]: # type: ignore[override] 

109 """Return channels as static sink pads.""" 

110 return list(self.channels) 

111 

112 def configure(self) -> None: 

113 """Configure the FrameSink.""" 

114 # setup the adapter config for the audioadapter 

115 # ensure data is aligned to second boundaries 

116 stride = Offset.fromsec(self.duration) 

117 self.adapter_config.alignment(stride=stride, align_to=Offset.fromsec(1)) 

118 

119 self._instruments_str: str = "".join( 

120 sorted({chan.split(":")[0] for chan in self.channels}) 

121 ) 

122 

123 # Initialize circular buffer tracking 

124 # Cache of created files: deque of filepaths in order of creation 

125 self._file_cache: deque[str] = deque() 

126 

127 # Multi-frame file tracking 

128 self._current_writer: gwframe.FrameWriter | None = None 

129 self._current_file_path: Path | None = None 

130 self._temp_file_path: Path | None = None 

131 self._current_file_start: int | None = None 

132 self._file_duration: int = 0 

133 

134 def validate(self) -> None: 

135 """Validate the FrameSink configuration.""" 

136 # Check valid duration 

137 if not isinstance(self.duration, int) or self.duration <= 0: 

138 msg = f"Duration must be an positive integer, got {self.duration}" 

139 raise ValueError(msg) 

140 

141 # Check frame_duration is valid 

142 if self.frame_duration < 0: 

143 msg = f"frame_duration must be non-negative, got {self.frame_duration}" 

144 raise ValueError(msg) 

145 if self.frame_duration > 0 and self.frame_duration % self.duration != 0: 

146 msg = ( 

147 f"frame_duration ({self.frame_duration}) must be an integer " 

148 f"multiple of duration ({self.duration})" 

149 ) 

150 raise ValueError(msg) 

151 

152 # Check path contains parameters for duration and gps_start_time 

153 for param in FILENAME_PARAMS: 

154 if f"{{{param}}}" not in self.path: 

155 msg = f"Path must contain parameter {{{param}}}" 

156 raise ValueError(msg) 

157 

158 def _get_file_path(self, start: int, file_duration: int) -> Path: 

159 """Generate file path for given start time and duration. 

160 

161 Args: 

162 start: GPS start time (integer seconds) 

163 file_duration: Total file duration (integer seconds) 

164 

165 Returns: 

166 Path object for the output file 

167 """ 

168 # Format the base filename 

169 filename = self.path.format( 

170 instruments=self._instruments_str, 

171 gps_start_time=f"{start:0=10.0f}", 

172 duration=file_duration, 

173 description=self.description, 

174 ) 

175 base_path = Path(filename) 

176 

177 # Organize into subdirectories if enabled 

178 if self.subdir_seconds > 0: 

179 subdir_name = str(start // self.subdir_seconds) 

180 outpath = base_path.parent / subdir_name / base_path.name 

181 outpath.parent.mkdir(parents=True, exist_ok=True) 

182 else: 

183 outpath = base_path 

184 

185 return outpath 

186 

187 def _remove_old_file( 

188 self, file_path: str | Path, current_time: float, retention_time: float 

189 ) -> bool: 

190 """Remove a file if it meets criteria for removal according to retention. 

191 

192 Args: 

193 file_path: Path to file to check 

194 current_time: Current time in seconds (from time.time()) 

195 retention_time: Retention time in seconds 

196 

197 Returns: 

198 True if file was old and removed (or didn't exist), False otherwise 

199 """ 

200 file_path = Path(file_path) 

201 

202 try: 

203 mod_time = file_path.stat().st_mtime 

204 except FileNotFoundError: 

205 return True 

206 

207 is_old = (current_time - mod_time) > retention_time 

208 if is_old: 

209 try: 

210 file_path.unlink() 

211 self.logger.debug("Removed old frame file: %s", file_path) 

212 except FileNotFoundError: 

213 pass 

214 except OSError: 

215 self.logger.exception("Error deleting file %s", file_path) 

216 

217 return is_old 

218 

219 def _cleanup_files(self) -> int: 

220 """Clean up old files according to count and/or time-based retention policies. 

221 

222 This function handles both count-based (keep only N most recent files) and 

223 time-based (delete files older than X seconds) retention policies. 

224 

225 Returns: 

226 Number of files deleted 

227 """ 

228 deleted_count = 0 

229 current_time = time.time() 

230 

231 # Time-based retention: remove old files from front of deque 

232 if self.retention_time: 

233 while self._file_cache and self._remove_old_file( 

234 self._file_cache[0], current_time, self.retention_time 

235 ): 

236 self._file_cache.popleft() 

237 deleted_count += 1 

238 

239 # Count-based retention: remove excess files from front of deque 

240 if self.max_files and self.max_files > 0: 

241 while len(self._file_cache) > self.max_files: 

242 filepath = self._file_cache.popleft() 

243 try: 

244 Path(filepath).unlink() 

245 deleted_count += 1 

246 self.logger.debug("Deleted old frame file: %s", filepath) 

247 except FileNotFoundError: 

248 pass 

249 except OSError: 

250 self.logger.exception("Error deleting file %s", filepath) 

251 

252 return deleted_count 

253 

254 def _clean_old_frames(self, frame_dir: str | Path, retention_time: float) -> None: 

255 """Remove frames in directory older than retention time specified. 

256 

257 This is useful for cleaning up untracked files that may have been 

258 created by other processes or left over from crashes. 

259 

260 Args: 

261 frame_dir: Directory containing frame files 

262 retention_time: Retention time in seconds 

263 """ 

264 if not retention_time: 

265 return 

266 

267 frame_dir = Path(frame_dir) 

268 if not frame_dir.exists(): 

269 return 

270 

271 current_time = time.time() 

272 for entry in frame_dir.iterdir(): 

273 if entry.suffix == ".gwf": 

274 self._remove_old_file(entry, current_time, retention_time) 

275 

276 def _close_current_file(self) -> None: 

277 """Close the current file and perform cleanup.""" 

278 if self._current_writer is not None: 

279 self._current_writer.__exit__(None, None, None) 

280 self._current_writer = None 

281 

282 # Move temp file to final location atomically 

283 if self._temp_file_path and self._current_file_path: 

284 shutil.move(str(self._temp_file_path), str(self._current_file_path)) 

285 self.logger.info("Wrote file %s", self._current_file_path) 

286 

287 # Add to file cache and cleanup if needed 

288 if self._current_file_path is not None and ( 

289 self.max_files is not None or self.retention_time is not None 

290 ): 

291 self._file_cache.append(str(self._current_file_path)) 

292 deleted_count = self._cleanup_files() 

293 if deleted_count > 0: 

294 self.logger.info("Cleaned up %d old frame files", deleted_count) 

295 

296 self._current_file_path = None 

297 self._temp_file_path = None 

298 self._current_file_start = None 

299 self._file_duration = 0 

300 

301 def _write_frame(self, frame: gwframe.Frame, start: float, duration: float) -> None: 

302 """Write a frame to file. 

303 

304 Args: 

305 frame: Frame object to write 

306 start: GPS start time (floating point seconds) 

307 duration: Duration in seconds 

308 """ 

309 # Ensure integer GPS time and duration 

310 assert int(start) == start, f"start must be integer seconds: {start}" 

311 assert int(duration) == duration, ( 

312 f"duration must be integer seconds: {duration}" 

313 ) 

314 start_int = int(start) 

315 duration_int = int(duration) 

316 

317 # Determine file duration 

318 file_duration = ( 

319 self.frame_duration if self.frame_duration > 0 else self.duration 

320 ) 

321 

322 # Check if we need to start a new file 

323 if self._current_writer is None: 

324 self._current_file_start = start_int 

325 self._file_duration = file_duration 

326 self._current_file_path = self._get_file_path(start_int, file_duration) 

327 

328 if self._current_file_path.exists(): 

329 if self.force: 

330 self._current_file_path.unlink() 

331 else: 

332 msg = f"output file exists: {self._current_file_path}" 

333 raise FileExistsError(msg) 

334 

335 # Use tmpdir for atomic writes 

336 if self.tmpdir: 

337 tmpdir_path = Path(self.tmpdir) 

338 tmpdir_path.mkdir(parents=True, exist_ok=True) 

339 self._temp_file_path = ( 

340 tmpdir_path / f".{self._current_file_path.name}.tmp" 

341 ) 

342 else: 

343 self._temp_file_path = self._current_file_path.parent / ( 

344 f".{self._current_file_path.name}.tmp" 

345 ) 

346 

347 self._current_writer = gwframe.FrameWriter( 

348 str(self._temp_file_path), 

349 compression=self.compression, 

350 compression_level=self.compression_level, 

351 ) 

352 self._current_writer.__enter__() 

353 

354 # Write frame to current file 

355 self._current_writer.write_frame(frame) 

356 

357 # Close file if this frame completes it 

358 # File spans [_current_file_start, _current_file_start + _file_duration) 

359 # After writing this frame, we have data up to start + duration 

360 assert self._current_file_start is not None, "File start should be set" 

361 if start_int + duration_int >= self._current_file_start + self._file_duration: 

362 self._close_current_file() 

363 

364 def process(self, input_frames: dict[SinkPad, TSFrame]) -> None: 

365 """Process input frames and write to file. 

366 

367 Args: 

368 input_frames: 

369 dict[SinkPad, TSFrame], mapping of sink pads to their input frames 

370 """ 

371 # Collect channel data 

372 channels = {} 

373 sample_rates = {} 

374 start: float | None = None 

375 

376 for pad, frame in input_frames.items(): 

377 channel = pad.pad_name 

378 if frame.is_gap: 

379 return 

380 if frame.EOS: 

381 self.mark_eos(pad) 

382 

383 # Load first buffer 

384 # TODO: fix this indexing to handle multiple buffers as multiple segments 

385 # This requires converting frame data to masked arrays using 

386 # the data_valid flag. Needs: sgn-ts MR 171 + gwframe masked array support 

387 data = frame.buffers[0] 

388 

389 # TODO: check for above todo, For now, check if the buffer has enough data 

390 # for the duration, later we'll need to cumulate check across multiple 

391 # segments 

392 exp_samples = self.duration * data.sample_rate 

393 if data.samples < exp_samples: 

394 self.logger.warning( 

395 "Data does not contain enough samples for duration %d. Skipping", 

396 self.duration, 

397 ) 

398 return 

399 

400 # Compute GPS time from frame offset 

401 frame_start = Offset.offset_ref_start + Offset.tosec(frame.offset) 

402 frame_start = int(frame_start) # Cast to int (aligned to second boundary) 

403 

404 # Store start from first channel 

405 if start is None: 

406 start = frame_start 

407 

408 # Store channel data and sample rate 

409 channels[channel] = data.data 

410 sample_rates[channel] = data.sample_rate 

411 

412 # Create frame and add channels 

413 assert start is not None, "No valid frames to write" 

414 frame_name = self.frame_name if self.frame_name else self._instruments_str 

415 gwf = gwframe.Frame(t0=start, duration=self.duration, name=frame_name) 

416 

417 # Add history metadata 

418 for name, comment in self.frame_history.items(): 

419 gwf.add_history(name, comment) 

420 

421 for channel, data_array in channels.items(): 

422 gwf.add_channel( 

423 channel=channel, 

424 data=data_array, 

425 sample_rate=sample_rates[channel], 

426 ) 

427 

428 self._write_frame(gwf, start, self.duration)