Coverage for src / sgn_gwframe / sources / framewatch.py: 93.7%
221 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-12 13:29 -0800
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-12 13:29 -0800
1"""A source element to watch directories for new frame files in real-time."""
3# Copyright (C) 2022 Ron Tapia
4# Copyright (C) 2024-2025 Becca Ewing, Yun-Jing Huang
6from __future__ import annotations
8import os
9import queue
10from dataclasses import dataclass
11from enum import Enum
12from typing import TYPE_CHECKING, ClassVar
14import gpstime
15import gwframe
16from sgnts.base import Offset, SeriesBuffer, TSResourceSource
17from watchdog.events import PatternMatchingEventHandler
18from watchdog.observers import Observer
20if TYPE_CHECKING:
21 import logging
23 from gwframe import TimeSeries
24 from sgn.base import SourcePad
25 from sgn.subprocess import WorkerContext
27from sgn_gwframe.base import from_T050017
30class WorkerAction(Enum):
31 """Actions that the worker can take in a single iteration."""
33 DATA = "data"
34 GAP = "gap"
35 HEARTBEAT = "heartbeat"
38class _FrameFileEventHandler(PatternMatchingEventHandler):
39 """Custom watchdog event handler for tracking new frame files."""
41 def __init__(self, queue, watch_suffix, current_watermark):
42 self.queue = queue
43 self.watch_suffix = watch_suffix
44 self.current_watermark = current_watermark
45 super().__init__(patterns=[f"*{self.watch_suffix}"])
47 def on_closed(self, event):
48 if event.is_directory:
49 return
50 self._handle_event(event.src_path)
52 def on_moved(self, event):
53 if event.is_directory:
54 return
55 self._handle_event(event.dest_path)
57 def _handle_event(self, path):
58 extension = os.path.splitext(path)[1]
59 if extension != self.watch_suffix:
60 return
61 # skip any files past the current watermark
62 _, _, start, _ = from_T050017(path)
63 if start < self.current_watermark:
64 return
65 self.queue.put((path, start))
68@dataclass(kw_only=True)
69class FrameWatchSource(TSResourceSource):
70 """Source element to watch a directory for new frame files in real-time.
72 Monitors a directory for new frame files, automatically reading and sending
73 data as files arrive. Handles discontinuities with gap detection, validates
74 data integrity, and supports both live streaming (start=None) and replay
75 from a specific GPS time.
77 For multi-IFO coherent analysis, use multiple FrameWatchSource elements
78 (one per IFO) and let the pipeline handle synchronization.
80 Args:
81 channels:
82 list[str], channel names to read from frame files.
83 Source pads will be automatically generated for each channel, with
84 channel name as pad name.
85 watch_dir:
86 str, directory path to watch for new frame files.
87 discont_wait_time:
88 float, time to wait before sending a gap buffer when no data arrives,
89 in seconds. Default: 60
90 queue_timeout:
91 float, time to wait for next file from the queue, in seconds. Default: 1
92 watch_suffix:
93 str, filename suffix to watch for. Default: ".gwf"
95 Example:
96 Basic usage::
98 from sgn_gwframe.sources import FrameWatchSource
99 from sgn.apps import Pipeline
101 src = FrameWatchSource(
102 name="L1_data",
103 channels=["L1:GDS-CALIB_STRAIN"],
104 watch_dir="/data/frames/L1",
105 duration=1, # 1 second buffers
106 )
108 pipeline = Pipeline()
109 pipeline.insert(src, ...)
110 pipeline.run()
112 Note:
113 Requires frame files following T050017 naming convention:
114 {site}-{description}-{gpstime}-{duration}.gwf
115 """
117 allow_dynamic_source_pads: ClassVar[bool] = False
119 channels: list[str]
120 watch_dir: str
121 discont_wait_time: float = 60
122 queue_timeout: float = 1
123 watch_suffix: str = ".gwf"
125 @property
126 def static_source_pads(self) -> list[str]: # type: ignore[override]
127 """Define source pads from channels."""
128 return self.channels
130 def worker_process( # type: ignore[override]
131 self,
132 context: WorkerContext,
133 channels: list[str],
134 watch_dir: str,
135 srcs: dict[str, SourcePad],
136 watch_suffix: str,
137 queue_timeout: float,
138 discont_wait_time: float,
139 start: int | None,
140 logger: logging.Logger,
141 ) -> None:
142 """Worker process that watches directory and reads in frame data."""
143 # Initialize on first call
144 if "initialized" not in context.state:
145 # Get starting time
146 if start is None:
147 start_time = int(gpstime.gpsnow())
148 logger.debug(
149 "No start time specified, using current GPS time: %d", start_time
150 )
151 else:
152 start_time = start
153 logger.debug("Starting at GPS time: %d", start_time)
155 # Setup watchdog observer and queue
156 file_queue: queue.Queue[tuple[str, int]] = queue.Queue()
157 event_handler = _FrameFileEventHandler(file_queue, watch_suffix, start_time)
158 observer = Observer()
159 observer.schedule(event_handler, path=watch_dir)
160 observer.daemon = True
161 observer.start()
163 # Store state
164 context.state["initialized"] = True
165 context.state["file_queue"] = file_queue
166 context.state["observer"] = observer
167 context.state["next_buffer_time"] = start_time
168 context.state["last_file_time"] = None
169 context.state["rates"] = {}
170 context.state["buffer_duration"] = None
171 context.state["pending_frame"] = None
172 context.state["pending_file_time"] = None
174 # Check if we should stop
175 if context.should_stop():
176 observer = context.state.get("observer") # type: ignore[assignment]
177 if observer is not None:
178 observer.stop()
179 observer.join()
180 return
182 # Get state
183 file_queue = context.state["file_queue"]
184 next_buffer_time = context.state["next_buffer_time"]
185 last_file_time = context.state["last_file_time"]
186 rates = context.state["rates"]
187 buffer_duration = context.state["buffer_duration"]
188 pending_frame = context.state["pending_frame"]
189 pending_file_time = context.state["pending_file_time"]
191 # Determine what action to take based on current state
192 action: WorkerAction | None = None
193 frame_data = None
194 gap_start_time = None
195 gap_end_time = None
196 file_time = None
198 # Case 1: Handle pending discontinuity
199 if pending_frame is not None:
200 gap_start_time = next_buffer_time
201 gap_end_time = gap_start_time + buffer_duration
202 action = WorkerAction.GAP
204 # Check if gap is filled - then also send the pending data
205 if gap_end_time >= pending_file_time:
206 frame_data = pending_frame
208 # Case 2: Try to get a file from the queue
209 else:
210 try:
211 filepath, file_time = _drain_and_get_file(
212 file_queue, queue_timeout, next_buffer_time, last_file_time, logger
213 )
214 except queue.Empty:
215 # Timeout case - need rates to send heartbeat or gap
216 if not rates or buffer_duration is None:
217 # Try to initialize rates from a sample file in the directory
218 result = _initialize_rates_from_sample(
219 watch_dir, watch_suffix, channels, logger
220 )
221 if result is not None:
222 rates, buffer_duration = result
223 context.state["rates"] = rates
224 context.state["buffer_duration"] = buffer_duration
225 else:
226 # Can't send anything without knowing rates
227 return
229 # Determine if we should send gap or heartbeat
230 time_waiting = gpstime.gpsnow() - next_buffer_time
231 if time_waiting >= discont_wait_time:
232 logger.warning(
233 "No data received for %.1f seconds (>= %.1f second timeout), "
234 "sending gap buffer",
235 time_waiting,
236 discont_wait_time,
237 )
238 gap_start_time = next_buffer_time
239 gap_end_time = gap_start_time + buffer_duration
240 action = WorkerAction.GAP
241 else:
242 action = WorkerAction.HEARTBEAT
243 else:
244 # Successfully got a file from queue - try to read it
245 try:
246 frame_data = _read_and_validate_frame(
247 filepath,
248 channels,
249 rates,
250 buffer_duration,
251 file_time,
252 )
253 except (FileNotFoundError, OSError, RuntimeError, ValueError):
254 logger.exception("Failed to read or validate file %s", filepath)
255 # Send gap if we know duration, otherwise heartbeat
256 if buffer_duration is not None:
257 gap_start_time = next_buffer_time
258 gap_end_time = gap_start_time + buffer_duration
259 action = WorkerAction.GAP
260 else:
261 action = WorkerAction.HEARTBEAT
262 else:
263 # Successfully read frame - process it
264 # Initialize rates if this is the first file
265 if not rates:
266 rates, buffer_duration = _initialize_rates_and_duration(
267 frame_data, channels
268 )
269 context.state["rates"] = rates
270 context.state["buffer_duration"] = buffer_duration
272 # Check for discontinuity
273 if last_file_time is not None and file_time > next_buffer_time:
274 logger.warning(
275 "Discontinuity detected: expected time %d, got %d "
276 "(gap of %.1f seconds)",
277 next_buffer_time,
278 file_time,
279 file_time - next_buffer_time,
280 )
281 # Store frame and send gap next iteration
282 context.state["pending_frame"] = frame_data
283 context.state["pending_file_time"] = file_time
284 gap_start_time = next_buffer_time
285 gap_end_time = gap_start_time + buffer_duration
286 action = WorkerAction.GAP
287 frame_data = None # Don't send data yet
288 else:
289 # Continuous data
290 action = WorkerAction.DATA
292 # Execute action based on determined state
293 # Initialize rates if needed for GAP or HEARTBEAT actions
294 if action in (WorkerAction.GAP, WorkerAction.HEARTBEAT) and (
295 not rates or buffer_duration is None
296 ):
297 result = _initialize_rates_from_sample(
298 watch_dir, watch_suffix, channels, logger
299 )
300 if result is not None:
301 rates, buffer_duration = result
302 context.state["rates"] = rates
303 context.state["buffer_duration"] = buffer_duration
304 else:
305 msg = (
306 f"Cannot initialize FrameWatchSource: sample rates for "
307 f"requested channels unknown and no existing files exist "
308 f"in {watch_dir}"
309 )
310 raise RuntimeError(msg)
312 if action == WorkerAction.DATA:
313 assert frame_data is not None
314 assert file_time is not None
315 _send_data_buffers(context, frame_data, srcs)
316 logger.debug(
317 "Sent frame at time %d, delay=%.3f seconds",
318 file_time,
319 gpstime.gpsnow() - file_time,
320 )
321 context.state["next_buffer_time"] = int(file_time + buffer_duration)
322 context.state["last_file_time"] = file_time
324 elif action == WorkerAction.GAP:
325 assert gap_start_time is not None
326 assert gap_end_time is not None
327 _send_gap_buffers(
328 context, channels, srcs, rates, gap_start_time, gap_end_time
329 )
330 context.state["next_buffer_time"] = gap_end_time
332 # If we also have pending data that's now ready, send it
333 if frame_data is not None:
334 _send_data_buffers(context, frame_data, srcs)
335 context.state["next_buffer_time"] = int(
336 pending_file_time + buffer_duration
337 )
338 context.state["last_file_time"] = pending_file_time
339 context.state["pending_frame"] = None
340 context.state["pending_file_time"] = None
342 elif action == WorkerAction.HEARTBEAT:
343 _send_heartbeat_buffers(context, channels, srcs, rates, next_buffer_time)
346def _initialize_rates_and_duration(
347 frame: dict[str, TimeSeries], channels: list[str]
348) -> tuple[dict[str, int], float]:
349 """Extract sample rates and buffer duration from first frame.
351 Returns:
352 tuple of (rates dict, buffer_duration in seconds)
353 """
354 rates = {channel: int(series.sample_rate) for channel, series in frame.items()}
355 buffer_duration = frame[channels[0]].duration
356 return rates, buffer_duration
359def _send_gap_buffers(
360 context: WorkerContext,
361 channels: list[str],
362 srcs: dict[str, SourcePad],
363 rates: dict[str, int],
364 gap_start: int,
365 gap_end: int,
366) -> None:
367 """Send gap buffers for all channels between gap_start and gap_end."""
368 for channel in channels:
369 pad = srcs[channel]
370 gap_samples = int((gap_end - gap_start) * rates[channel])
371 gap_buf = SeriesBuffer(
372 offset=Offset.fromsec(gap_start),
373 sample_rate=rates[channel],
374 data=None,
375 shape=(gap_samples,),
376 )
377 context.output_queue.put((pad, gap_buf))
380def _send_data_buffers(
381 context: WorkerContext,
382 frame: dict[str, TimeSeries],
383 srcs: dict[str, SourcePad],
384) -> None:
385 """Send data buffers for all channels from the frame."""
386 for channel, series in frame.items():
387 pad = srcs[channel]
388 buf = SeriesBuffer(
389 offset=Offset.fromsec(series.t0),
390 sample_rate=int(series.sample_rate),
391 data=series.array,
392 )
393 context.output_queue.put((pad, buf))
396def _send_heartbeat_buffers(
397 context: WorkerContext,
398 channels: list[str],
399 srcs: dict[str, SourcePad],
400 rates: dict[str, int],
401 offset_time: int,
402) -> None:
403 """Send zero-duration heartbeat buffers to keep pipeline alive."""
404 for channel in channels:
405 pad = srcs[channel]
406 heartbeat_buf = SeriesBuffer(
407 offset=Offset.fromsec(offset_time),
408 sample_rate=rates[channel],
409 data=None,
410 shape=(0,),
411 )
412 context.output_queue.put((pad, heartbeat_buf))
415def _drain_and_get_file(
416 file_queue: queue.Queue[tuple[str, int]],
417 queue_timeout: float,
418 next_buffer_time: int,
419 last_file_time: int | None,
420 logger: logging.Logger,
421) -> tuple[str, int]:
422 """Drain old files from queue and return current/future file.
424 Raises:
425 queue.Empty: If no file available within timeout
426 """
427 while True:
428 filepath, file_time = file_queue.get(timeout=queue_timeout)
430 # Discard files that are in the past
431 if last_file_time is not None and file_time < next_buffer_time:
432 logger.debug(
433 "Discarding old file at time %d (expected %d)",
434 file_time,
435 next_buffer_time,
436 )
437 continue # Get next file
439 # Found current/future file
440 return filepath, file_time
443def _read_and_validate_frame(
444 filepath: str,
445 channels: list[str],
446 rates: dict[str, int],
447 buffer_duration: float | None,
448 file_time: int,
449) -> dict[str, TimeSeries]:
450 """Read and validate frame file.
452 Returns:
453 Frame data dict
455 Raises:
456 FileNotFoundError: If file doesn't exist
457 OSError, RuntimeError, ValueError: If read or validation fails
458 """
459 # Check file exists
460 if not os.path.exists(filepath):
461 msg = f"File no longer exists: {filepath}"
462 raise FileNotFoundError(msg)
464 # Try reading with error recovery
465 frame = gwframe.read(filepath, channel=channels)
467 # Validate if we have expected rates/duration
468 if rates and buffer_duration is not None:
469 for channel, series in frame.items():
470 # Validate sample rate
471 actual_rate = int(series.sample_rate)
472 expected_rate = rates[channel]
473 if actual_rate != expected_rate:
474 msg = (
475 f"Sample rate mismatch for {channel}: "
476 f"expected {expected_rate}, got {actual_rate}"
477 )
478 raise ValueError(msg)
480 # Validate duration
481 if series.duration != buffer_duration:
482 msg = (
483 f"Duration mismatch for {channel}: "
484 f"expected {buffer_duration}, got {series.duration}"
485 )
486 raise ValueError(msg)
488 # Validate t0 matches filename
489 if series.t0 != file_time:
490 msg = (
491 f"Time mismatch for {channel}: "
492 f"filename suggests {file_time}, data has {series.t0}"
493 )
494 raise ValueError(msg)
496 return frame
499def _initialize_rates_from_sample(
500 watch_dir: str,
501 watch_suffix: str,
502 channels: list[str],
503 logger: logging.Logger,
504) -> tuple[dict[str, int], float] | None:
505 """Initialize rates and duration by reading a sample file from watch_dir.
507 Returns:
508 (rates dict, buffer_duration) if successful, None otherwise
509 """
510 files = sorted(
511 [f for f in os.listdir(watch_dir) if f.endswith(watch_suffix)],
512 reverse=True, # Get most recent file
513 )
514 if not files:
515 return None
517 sample_file = os.path.join(watch_dir, files[0])
518 try:
519 sample_frame = gwframe.read(sample_file, channel=channels)
520 rates, buffer_duration = _initialize_rates_and_duration(sample_frame, channels)
521 logger.debug(
522 "Initialized from sample file %s: rates=%s, duration=%s",
523 sample_file,
524 rates,
525 buffer_duration,
526 )
527 except (OSError, RuntimeError, ValueError, KeyError, IndexError) as e:
528 logger.warning("Could not read sample file %s: %s", sample_file, e)
529 return None
530 else:
531 return rates, buffer_duration