Coverage for src / sgn_gwframe / sinks / frame.py: 89.9%
178 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-12 13:29 -0800
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-12 13:29 -0800
1"""This module contains the FrameSink class.
3Writes time series data to .gwf files.
4"""
6from __future__ import annotations
8import shutil
9import time
10from collections import deque
11from dataclasses import dataclass, field
12from pathlib import Path
13from typing import TYPE_CHECKING, Optional, Sequence
15import gwframe
16from sgnts.base import Offset, TSFrame, TSSink
18if TYPE_CHECKING:
19 from sgn.base import SinkPad
21# filename format parameters
22FILENAME_PARAMS = (
23 "instruments",
24 "description",
25 "gps_start_time",
26 "duration",
27)
30@dataclass(kw_only=True)
31class FrameSink(TSSink):
32 """A sink element that writes time series data to file
34 Args:
35 channels:
36 Sequence[str], the instruments to write to the file
37 duration:
38 int, the duration of the data to write to the file
39 path:
40 str, the path to write the frame files to. The file name
41 must contain the following format parameters (in curly braces):
42 - {instruments}, the sorted list of instruments inferred from
43 the included channel names (e.g. "H1" for "H1:GDS-CAL...")
44 - {description}, the description string for the frame
45 - {gps_start_time}, the start time of the data in GPS seconds
46 - {duration}, the duration of the data in seconds
47 The extension on the the path determines the output file
48 type. Currently ".gwf" and ".hdf5" are supported.
49 default: "{instruments}-{description}-{gps_start_time}-{duration}.gwf"
50 force:
51 bool, whether to overwrite existing files. Default: False
52 description:
53 str, description string to include in the filename. Default: "SGN"
54 max_files:
55 int or None, when set to a positive value, enables circular buffer
56 mode that keeps only the N most recent frame files. Older files
57 are automatically deleted. Default: None (disabled)
58 retention_time:
59 float or None, retention time in seconds. Files older than this
60 will be automatically deleted. Can be combined with max_files.
61 Default: None (keep forever)
62 subdir_seconds:
63 int, organize files into subdirectories based on time buckets.
64 Files are grouped by GPS time divided by this value. For example,
65 with subdir_seconds=100000, GPS time 1234567890 goes into
66 subdirectory 12345/. When 0, files are written to a flat directory
67 structure. Default: 0 (flat structure)
68 frame_name:
69 str or None, name field for the frame metadata. If None, inferred
70 from the instrument list (e.g., "H1L1"). Default: None
71 frame_history:
72 dict mapping history entry names to comments. These are added as
73 frame history metadata. Default: empty dict
74 frame_duration:
75 int, total duration in seconds for multi-frame files. Must be an
76 integer multiple of duration. When 0 or equal to duration, writes
77 one frame per file. When greater than duration, multiple frames
78 are written to a single file. For example, with duration=1 and
79 frame_duration=16, the file contains 16 one-second frames and has
80 a total duration of 16 seconds. Default: 0 (single frame per file)
81 tmpdir:
82 str or None, directory for temporary files during atomic writes.
83 If None, uses same directory as output. Default: None
84 compression:
85 int, compression scheme for frame files.
86 Default: gwframe.Compression.ZERO_SUPPRESS_OTHERWISE_GZIP
87 compression_level:
88 int, compression level 0-9. Default: 6
90 """
92 channels: Sequence[str]
93 duration: int
94 path: str = "{instruments}-{description}-{gps_start_time}-{duration}.gwf"
95 force: bool = False
96 description: str = "SGN"
97 max_files: Optional[int] = None
98 retention_time: Optional[float] = None
99 subdir_seconds: int = 0
100 frame_name: Optional[str] = None
101 frame_history: dict[str, str] = field(default_factory=dict)
102 frame_duration: int = 0
103 tmpdir: Optional[str] = None
104 compression: int = gwframe.Compression.ZERO_SUPPRESS_OTHERWISE_GZIP
105 compression_level: int = 6
107 @property
108 def static_sink_pads(self) -> list[str]: # type: ignore[override]
109 """Return channels as static sink pads."""
110 return list(self.channels)
112 def configure(self) -> None:
113 """Configure the FrameSink."""
114 # setup the adapter config for the audioadapter
115 # ensure data is aligned to second boundaries
116 stride = Offset.fromsec(self.duration)
117 self.adapter_config.alignment(stride=stride, align_to=Offset.fromsec(1))
119 self._instruments_str: str = "".join(
120 sorted({chan.split(":")[0] for chan in self.channels})
121 )
123 # Initialize circular buffer tracking
124 # Cache of created files: deque of filepaths in order of creation
125 self._file_cache: deque[str] = deque()
127 # Multi-frame file tracking
128 self._current_writer: gwframe.FrameWriter | None = None
129 self._current_file_path: Path | None = None
130 self._temp_file_path: Path | None = None
131 self._current_file_start: int | None = None
132 self._file_duration: int = 0
134 def validate(self) -> None:
135 """Validate the FrameSink configuration."""
136 # Check valid duration
137 if not isinstance(self.duration, int) or self.duration <= 0:
138 msg = f"Duration must be an positive integer, got {self.duration}"
139 raise ValueError(msg)
141 # Check frame_duration is valid
142 if self.frame_duration < 0:
143 msg = f"frame_duration must be non-negative, got {self.frame_duration}"
144 raise ValueError(msg)
145 if self.frame_duration > 0 and self.frame_duration % self.duration != 0:
146 msg = (
147 f"frame_duration ({self.frame_duration}) must be an integer "
148 f"multiple of duration ({self.duration})"
149 )
150 raise ValueError(msg)
152 # Check path contains parameters for duration and gps_start_time
153 for param in FILENAME_PARAMS:
154 if f"{{{param}}}" not in self.path:
155 msg = f"Path must contain parameter {{{param}}}"
156 raise ValueError(msg)
158 def _get_file_path(self, start: int, file_duration: int) -> Path:
159 """Generate file path for given start time and duration.
161 Args:
162 start: GPS start time (integer seconds)
163 file_duration: Total file duration (integer seconds)
165 Returns:
166 Path object for the output file
167 """
168 # Format the base filename
169 filename = self.path.format(
170 instruments=self._instruments_str,
171 gps_start_time=f"{start:0=10.0f}",
172 duration=file_duration,
173 description=self.description,
174 )
175 base_path = Path(filename)
177 # Organize into subdirectories if enabled
178 if self.subdir_seconds > 0:
179 subdir_name = str(start // self.subdir_seconds)
180 outpath = base_path.parent / subdir_name / base_path.name
181 outpath.parent.mkdir(parents=True, exist_ok=True)
182 else:
183 outpath = base_path
185 return outpath
187 def _remove_old_file(
188 self, file_path: str | Path, current_time: float, retention_time: float
189 ) -> bool:
190 """Remove a file if it meets criteria for removal according to retention.
192 Args:
193 file_path: Path to file to check
194 current_time: Current time in seconds (from time.time())
195 retention_time: Retention time in seconds
197 Returns:
198 True if file was old and removed (or didn't exist), False otherwise
199 """
200 file_path = Path(file_path)
202 try:
203 mod_time = file_path.stat().st_mtime
204 except FileNotFoundError:
205 return True
207 is_old = (current_time - mod_time) > retention_time
208 if is_old:
209 try:
210 file_path.unlink()
211 self.logger.debug("Removed old frame file: %s", file_path)
212 except FileNotFoundError:
213 pass
214 except OSError:
215 self.logger.exception("Error deleting file %s", file_path)
217 return is_old
219 def _cleanup_files(self) -> int:
220 """Clean up old files according to count and/or time-based retention policies.
222 This function handles both count-based (keep only N most recent files) and
223 time-based (delete files older than X seconds) retention policies.
225 Returns:
226 Number of files deleted
227 """
228 deleted_count = 0
229 current_time = time.time()
231 # Time-based retention: remove old files from front of deque
232 if self.retention_time:
233 while self._file_cache and self._remove_old_file(
234 self._file_cache[0], current_time, self.retention_time
235 ):
236 self._file_cache.popleft()
237 deleted_count += 1
239 # Count-based retention: remove excess files from front of deque
240 if self.max_files and self.max_files > 0:
241 while len(self._file_cache) > self.max_files:
242 filepath = self._file_cache.popleft()
243 try:
244 Path(filepath).unlink()
245 deleted_count += 1
246 self.logger.debug("Deleted old frame file: %s", filepath)
247 except FileNotFoundError:
248 pass
249 except OSError:
250 self.logger.exception("Error deleting file %s", filepath)
252 return deleted_count
254 def _clean_old_frames(self, frame_dir: str | Path, retention_time: float) -> None:
255 """Remove frames in directory older than retention time specified.
257 This is useful for cleaning up untracked files that may have been
258 created by other processes or left over from crashes.
260 Args:
261 frame_dir: Directory containing frame files
262 retention_time: Retention time in seconds
263 """
264 if not retention_time:
265 return
267 frame_dir = Path(frame_dir)
268 if not frame_dir.exists():
269 return
271 current_time = time.time()
272 for entry in frame_dir.iterdir():
273 if entry.suffix == ".gwf":
274 self._remove_old_file(entry, current_time, retention_time)
276 def _close_current_file(self) -> None:
277 """Close the current file and perform cleanup."""
278 if self._current_writer is not None:
279 self._current_writer.__exit__(None, None, None)
280 self._current_writer = None
282 # Move temp file to final location atomically
283 if self._temp_file_path and self._current_file_path:
284 shutil.move(str(self._temp_file_path), str(self._current_file_path))
285 self.logger.info("Wrote file %s", self._current_file_path)
287 # Add to file cache and cleanup if needed
288 if self._current_file_path is not None and (
289 self.max_files is not None or self.retention_time is not None
290 ):
291 self._file_cache.append(str(self._current_file_path))
292 deleted_count = self._cleanup_files()
293 if deleted_count > 0:
294 self.logger.info("Cleaned up %d old frame files", deleted_count)
296 self._current_file_path = None
297 self._temp_file_path = None
298 self._current_file_start = None
299 self._file_duration = 0
301 def _write_frame(self, frame: gwframe.Frame, start: float, duration: float) -> None:
302 """Write a frame to file.
304 Args:
305 frame: Frame object to write
306 start: GPS start time (floating point seconds)
307 duration: Duration in seconds
308 """
309 # Ensure integer GPS time and duration
310 assert int(start) == start, f"start must be integer seconds: {start}"
311 assert int(duration) == duration, (
312 f"duration must be integer seconds: {duration}"
313 )
314 start_int = int(start)
315 duration_int = int(duration)
317 # Determine file duration
318 file_duration = (
319 self.frame_duration if self.frame_duration > 0 else self.duration
320 )
322 # Check if we need to start a new file
323 if self._current_writer is None:
324 self._current_file_start = start_int
325 self._file_duration = file_duration
326 self._current_file_path = self._get_file_path(start_int, file_duration)
328 if self._current_file_path.exists():
329 if self.force:
330 self._current_file_path.unlink()
331 else:
332 msg = f"output file exists: {self._current_file_path}"
333 raise FileExistsError(msg)
335 # Use tmpdir for atomic writes
336 if self.tmpdir:
337 tmpdir_path = Path(self.tmpdir)
338 tmpdir_path.mkdir(parents=True, exist_ok=True)
339 self._temp_file_path = (
340 tmpdir_path / f".{self._current_file_path.name}.tmp"
341 )
342 else:
343 self._temp_file_path = self._current_file_path.parent / (
344 f".{self._current_file_path.name}.tmp"
345 )
347 self._current_writer = gwframe.FrameWriter(
348 str(self._temp_file_path),
349 compression=self.compression,
350 compression_level=self.compression_level,
351 )
352 self._current_writer.__enter__()
354 # Write frame to current file
355 self._current_writer.write_frame(frame)
357 # Close file if this frame completes it
358 # File spans [_current_file_start, _current_file_start + _file_duration)
359 # After writing this frame, we have data up to start + duration
360 assert self._current_file_start is not None, "File start should be set"
361 if start_int + duration_int >= self._current_file_start + self._file_duration:
362 self._close_current_file()
364 def process(self, input_frames: dict[SinkPad, TSFrame]) -> None:
365 """Process input frames and write to file.
367 Args:
368 input_frames:
369 dict[SinkPad, TSFrame], mapping of sink pads to their input frames
370 """
371 # Collect channel data
372 channels = {}
373 sample_rates = {}
374 start: float | None = None
376 for pad, frame in input_frames.items():
377 channel = pad.pad_name
378 if frame.is_gap:
379 return
380 if frame.EOS:
381 self.mark_eos(pad)
383 # Load first buffer
384 # TODO: fix this indexing to handle multiple buffers as multiple segments
385 # This requires converting frame data to masked arrays using
386 # the data_valid flag. Needs: sgn-ts MR 171 + gwframe masked array support
387 data = frame.buffers[0]
389 # TODO: check for above todo, For now, check if the buffer has enough data
390 # for the duration, later we'll need to cumulate check across multiple
391 # segments
392 exp_samples = self.duration * data.sample_rate
393 if data.samples < exp_samples:
394 self.logger.warning(
395 "Data does not contain enough samples for duration %d. Skipping",
396 self.duration,
397 )
398 return
400 # Compute GPS time from frame offset
401 frame_start = Offset.offset_ref_start + Offset.tosec(frame.offset)
402 frame_start = int(frame_start) # Cast to int (aligned to second boundary)
404 # Store start from first channel
405 if start is None:
406 start = frame_start
408 # Store channel data and sample rate
409 channels[channel] = data.data
410 sample_rates[channel] = data.sample_rate
412 # Create frame and add channels
413 assert start is not None, "No valid frames to write"
414 frame_name = self.frame_name if self.frame_name else self._instruments_str
415 gwf = gwframe.Frame(t0=start, duration=self.duration, name=frame_name)
417 # Add history metadata
418 for name, comment in self.frame_history.items():
419 gwf.add_history(name, comment)
421 for channel, data_array in channels.items():
422 gwf.add_channel(
423 channel=channel,
424 data=data_array,
425 sample_rate=sample_rates[channel],
426 )
428 self._write_frame(gwf, start, self.duration)