Coverage for src / sgn_gwframe / sources / framewatch.py: 93.7%

221 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-12 13:29 -0800

1"""A source element to watch directories for new frame files in real-time.""" 

2 

3# Copyright (C) 2022 Ron Tapia 

4# Copyright (C) 2024-2025 Becca Ewing, Yun-Jing Huang 

5 

6from __future__ import annotations 

7 

8import os 

9import queue 

10from dataclasses import dataclass 

11from enum import Enum 

12from typing import TYPE_CHECKING, ClassVar 

13 

14import gpstime 

15import gwframe 

16from sgnts.base import Offset, SeriesBuffer, TSResourceSource 

17from watchdog.events import PatternMatchingEventHandler 

18from watchdog.observers import Observer 

19 

20if TYPE_CHECKING: 

21 import logging 

22 

23 from gwframe import TimeSeries 

24 from sgn.base import SourcePad 

25 from sgn.subprocess import WorkerContext 

26 

27from sgn_gwframe.base import from_T050017 

28 

29 

30class WorkerAction(Enum): 

31 """Actions that the worker can take in a single iteration.""" 

32 

33 DATA = "data" 

34 GAP = "gap" 

35 HEARTBEAT = "heartbeat" 

36 

37 

38class _FrameFileEventHandler(PatternMatchingEventHandler): 

39 """Custom watchdog event handler for tracking new frame files.""" 

40 

41 def __init__(self, queue, watch_suffix, current_watermark): 

42 self.queue = queue 

43 self.watch_suffix = watch_suffix 

44 self.current_watermark = current_watermark 

45 super().__init__(patterns=[f"*{self.watch_suffix}"]) 

46 

47 def on_closed(self, event): 

48 if event.is_directory: 

49 return 

50 self._handle_event(event.src_path) 

51 

52 def on_moved(self, event): 

53 if event.is_directory: 

54 return 

55 self._handle_event(event.dest_path) 

56 

57 def _handle_event(self, path): 

58 extension = os.path.splitext(path)[1] 

59 if extension != self.watch_suffix: 

60 return 

61 # skip any files past the current watermark 

62 _, _, start, _ = from_T050017(path) 

63 if start < self.current_watermark: 

64 return 

65 self.queue.put((path, start)) 

66 

67 

68@dataclass(kw_only=True) 

69class FrameWatchSource(TSResourceSource): 

70 """Source element to watch a directory for new frame files in real-time. 

71 

72 Monitors a directory for new frame files, automatically reading and sending 

73 data as files arrive. Handles discontinuities with gap detection, validates 

74 data integrity, and supports both live streaming (start=None) and replay 

75 from a specific GPS time. 

76 

77 For multi-IFO coherent analysis, use multiple FrameWatchSource elements 

78 (one per IFO) and let the pipeline handle synchronization. 

79 

80 Args: 

81 channels: 

82 list[str], channel names to read from frame files. 

83 Source pads will be automatically generated for each channel, with 

84 channel name as pad name. 

85 watch_dir: 

86 str, directory path to watch for new frame files. 

87 discont_wait_time: 

88 float, time to wait before sending a gap buffer when no data arrives, 

89 in seconds. Default: 60 

90 queue_timeout: 

91 float, time to wait for next file from the queue, in seconds. Default: 1 

92 watch_suffix: 

93 str, filename suffix to watch for. Default: ".gwf" 

94 

95 Example: 

96 Basic usage:: 

97 

98 from sgn_gwframe.sources import FrameWatchSource 

99 from sgn.apps import Pipeline 

100 

101 src = FrameWatchSource( 

102 name="L1_data", 

103 channels=["L1:GDS-CALIB_STRAIN"], 

104 watch_dir="/data/frames/L1", 

105 duration=1, # 1 second buffers 

106 ) 

107 

108 pipeline = Pipeline() 

109 pipeline.insert(src, ...) 

110 pipeline.run() 

111 

112 Note: 

113 Requires frame files following T050017 naming convention: 

114 {site}-{description}-{gpstime}-{duration}.gwf 

115 """ 

116 

117 allow_dynamic_source_pads: ClassVar[bool] = False 

118 

119 channels: list[str] 

120 watch_dir: str 

121 discont_wait_time: float = 60 

122 queue_timeout: float = 1 

123 watch_suffix: str = ".gwf" 

124 

125 @property 

126 def static_source_pads(self) -> list[str]: # type: ignore[override] 

127 """Define source pads from channels.""" 

128 return self.channels 

129 

130 def worker_process( # type: ignore[override] 

131 self, 

132 context: WorkerContext, 

133 channels: list[str], 

134 watch_dir: str, 

135 srcs: dict[str, SourcePad], 

136 watch_suffix: str, 

137 queue_timeout: float, 

138 discont_wait_time: float, 

139 start: int | None, 

140 logger: logging.Logger, 

141 ) -> None: 

142 """Worker process that watches directory and reads in frame data.""" 

143 # Initialize on first call 

144 if "initialized" not in context.state: 

145 # Get starting time 

146 if start is None: 

147 start_time = int(gpstime.gpsnow()) 

148 logger.debug( 

149 "No start time specified, using current GPS time: %d", start_time 

150 ) 

151 else: 

152 start_time = start 

153 logger.debug("Starting at GPS time: %d", start_time) 

154 

155 # Setup watchdog observer and queue 

156 file_queue: queue.Queue[tuple[str, int]] = queue.Queue() 

157 event_handler = _FrameFileEventHandler(file_queue, watch_suffix, start_time) 

158 observer = Observer() 

159 observer.schedule(event_handler, path=watch_dir) 

160 observer.daemon = True 

161 observer.start() 

162 

163 # Store state 

164 context.state["initialized"] = True 

165 context.state["file_queue"] = file_queue 

166 context.state["observer"] = observer 

167 context.state["next_buffer_time"] = start_time 

168 context.state["last_file_time"] = None 

169 context.state["rates"] = {} 

170 context.state["buffer_duration"] = None 

171 context.state["pending_frame"] = None 

172 context.state["pending_file_time"] = None 

173 

174 # Check if we should stop 

175 if context.should_stop(): 

176 observer = context.state.get("observer") # type: ignore[assignment] 

177 if observer is not None: 

178 observer.stop() 

179 observer.join() 

180 return 

181 

182 # Get state 

183 file_queue = context.state["file_queue"] 

184 next_buffer_time = context.state["next_buffer_time"] 

185 last_file_time = context.state["last_file_time"] 

186 rates = context.state["rates"] 

187 buffer_duration = context.state["buffer_duration"] 

188 pending_frame = context.state["pending_frame"] 

189 pending_file_time = context.state["pending_file_time"] 

190 

191 # Determine what action to take based on current state 

192 action: WorkerAction | None = None 

193 frame_data = None 

194 gap_start_time = None 

195 gap_end_time = None 

196 file_time = None 

197 

198 # Case 1: Handle pending discontinuity 

199 if pending_frame is not None: 

200 gap_start_time = next_buffer_time 

201 gap_end_time = gap_start_time + buffer_duration 

202 action = WorkerAction.GAP 

203 

204 # Check if gap is filled - then also send the pending data 

205 if gap_end_time >= pending_file_time: 

206 frame_data = pending_frame 

207 

208 # Case 2: Try to get a file from the queue 

209 else: 

210 try: 

211 filepath, file_time = _drain_and_get_file( 

212 file_queue, queue_timeout, next_buffer_time, last_file_time, logger 

213 ) 

214 except queue.Empty: 

215 # Timeout case - need rates to send heartbeat or gap 

216 if not rates or buffer_duration is None: 

217 # Try to initialize rates from a sample file in the directory 

218 result = _initialize_rates_from_sample( 

219 watch_dir, watch_suffix, channels, logger 

220 ) 

221 if result is not None: 

222 rates, buffer_duration = result 

223 context.state["rates"] = rates 

224 context.state["buffer_duration"] = buffer_duration 

225 else: 

226 # Can't send anything without knowing rates 

227 return 

228 

229 # Determine if we should send gap or heartbeat 

230 time_waiting = gpstime.gpsnow() - next_buffer_time 

231 if time_waiting >= discont_wait_time: 

232 logger.warning( 

233 "No data received for %.1f seconds (>= %.1f second timeout), " 

234 "sending gap buffer", 

235 time_waiting, 

236 discont_wait_time, 

237 ) 

238 gap_start_time = next_buffer_time 

239 gap_end_time = gap_start_time + buffer_duration 

240 action = WorkerAction.GAP 

241 else: 

242 action = WorkerAction.HEARTBEAT 

243 else: 

244 # Successfully got a file from queue - try to read it 

245 try: 

246 frame_data = _read_and_validate_frame( 

247 filepath, 

248 channels, 

249 rates, 

250 buffer_duration, 

251 file_time, 

252 ) 

253 except (FileNotFoundError, OSError, RuntimeError, ValueError): 

254 logger.exception("Failed to read or validate file %s", filepath) 

255 # Send gap if we know duration, otherwise heartbeat 

256 if buffer_duration is not None: 

257 gap_start_time = next_buffer_time 

258 gap_end_time = gap_start_time + buffer_duration 

259 action = WorkerAction.GAP 

260 else: 

261 action = WorkerAction.HEARTBEAT 

262 else: 

263 # Successfully read frame - process it 

264 # Initialize rates if this is the first file 

265 if not rates: 

266 rates, buffer_duration = _initialize_rates_and_duration( 

267 frame_data, channels 

268 ) 

269 context.state["rates"] = rates 

270 context.state["buffer_duration"] = buffer_duration 

271 

272 # Check for discontinuity 

273 if last_file_time is not None and file_time > next_buffer_time: 

274 logger.warning( 

275 "Discontinuity detected: expected time %d, got %d " 

276 "(gap of %.1f seconds)", 

277 next_buffer_time, 

278 file_time, 

279 file_time - next_buffer_time, 

280 ) 

281 # Store frame and send gap next iteration 

282 context.state["pending_frame"] = frame_data 

283 context.state["pending_file_time"] = file_time 

284 gap_start_time = next_buffer_time 

285 gap_end_time = gap_start_time + buffer_duration 

286 action = WorkerAction.GAP 

287 frame_data = None # Don't send data yet 

288 else: 

289 # Continuous data 

290 action = WorkerAction.DATA 

291 

292 # Execute action based on determined state 

293 # Initialize rates if needed for GAP or HEARTBEAT actions 

294 if action in (WorkerAction.GAP, WorkerAction.HEARTBEAT) and ( 

295 not rates or buffer_duration is None 

296 ): 

297 result = _initialize_rates_from_sample( 

298 watch_dir, watch_suffix, channels, logger 

299 ) 

300 if result is not None: 

301 rates, buffer_duration = result 

302 context.state["rates"] = rates 

303 context.state["buffer_duration"] = buffer_duration 

304 else: 

305 msg = ( 

306 f"Cannot initialize FrameWatchSource: sample rates for " 

307 f"requested channels unknown and no existing files exist " 

308 f"in {watch_dir}" 

309 ) 

310 raise RuntimeError(msg) 

311 

312 if action == WorkerAction.DATA: 

313 assert frame_data is not None 

314 assert file_time is not None 

315 _send_data_buffers(context, frame_data, srcs) 

316 logger.debug( 

317 "Sent frame at time %d, delay=%.3f seconds", 

318 file_time, 

319 gpstime.gpsnow() - file_time, 

320 ) 

321 context.state["next_buffer_time"] = int(file_time + buffer_duration) 

322 context.state["last_file_time"] = file_time 

323 

324 elif action == WorkerAction.GAP: 

325 assert gap_start_time is not None 

326 assert gap_end_time is not None 

327 _send_gap_buffers( 

328 context, channels, srcs, rates, gap_start_time, gap_end_time 

329 ) 

330 context.state["next_buffer_time"] = gap_end_time 

331 

332 # If we also have pending data that's now ready, send it 

333 if frame_data is not None: 

334 _send_data_buffers(context, frame_data, srcs) 

335 context.state["next_buffer_time"] = int( 

336 pending_file_time + buffer_duration 

337 ) 

338 context.state["last_file_time"] = pending_file_time 

339 context.state["pending_frame"] = None 

340 context.state["pending_file_time"] = None 

341 

342 elif action == WorkerAction.HEARTBEAT: 

343 _send_heartbeat_buffers(context, channels, srcs, rates, next_buffer_time) 

344 

345 

346def _initialize_rates_and_duration( 

347 frame: dict[str, TimeSeries], channels: list[str] 

348) -> tuple[dict[str, int], float]: 

349 """Extract sample rates and buffer duration from first frame. 

350 

351 Returns: 

352 tuple of (rates dict, buffer_duration in seconds) 

353 """ 

354 rates = {channel: int(series.sample_rate) for channel, series in frame.items()} 

355 buffer_duration = frame[channels[0]].duration 

356 return rates, buffer_duration 

357 

358 

359def _send_gap_buffers( 

360 context: WorkerContext, 

361 channels: list[str], 

362 srcs: dict[str, SourcePad], 

363 rates: dict[str, int], 

364 gap_start: int, 

365 gap_end: int, 

366) -> None: 

367 """Send gap buffers for all channels between gap_start and gap_end.""" 

368 for channel in channels: 

369 pad = srcs[channel] 

370 gap_samples = int((gap_end - gap_start) * rates[channel]) 

371 gap_buf = SeriesBuffer( 

372 offset=Offset.fromsec(gap_start), 

373 sample_rate=rates[channel], 

374 data=None, 

375 shape=(gap_samples,), 

376 ) 

377 context.output_queue.put((pad, gap_buf)) 

378 

379 

380def _send_data_buffers( 

381 context: WorkerContext, 

382 frame: dict[str, TimeSeries], 

383 srcs: dict[str, SourcePad], 

384) -> None: 

385 """Send data buffers for all channels from the frame.""" 

386 for channel, series in frame.items(): 

387 pad = srcs[channel] 

388 buf = SeriesBuffer( 

389 offset=Offset.fromsec(series.t0), 

390 sample_rate=int(series.sample_rate), 

391 data=series.array, 

392 ) 

393 context.output_queue.put((pad, buf)) 

394 

395 

396def _send_heartbeat_buffers( 

397 context: WorkerContext, 

398 channels: list[str], 

399 srcs: dict[str, SourcePad], 

400 rates: dict[str, int], 

401 offset_time: int, 

402) -> None: 

403 """Send zero-duration heartbeat buffers to keep pipeline alive.""" 

404 for channel in channels: 

405 pad = srcs[channel] 

406 heartbeat_buf = SeriesBuffer( 

407 offset=Offset.fromsec(offset_time), 

408 sample_rate=rates[channel], 

409 data=None, 

410 shape=(0,), 

411 ) 

412 context.output_queue.put((pad, heartbeat_buf)) 

413 

414 

415def _drain_and_get_file( 

416 file_queue: queue.Queue[tuple[str, int]], 

417 queue_timeout: float, 

418 next_buffer_time: int, 

419 last_file_time: int | None, 

420 logger: logging.Logger, 

421) -> tuple[str, int]: 

422 """Drain old files from queue and return current/future file. 

423 

424 Raises: 

425 queue.Empty: If no file available within timeout 

426 """ 

427 while True: 

428 filepath, file_time = file_queue.get(timeout=queue_timeout) 

429 

430 # Discard files that are in the past 

431 if last_file_time is not None and file_time < next_buffer_time: 

432 logger.debug( 

433 "Discarding old file at time %d (expected %d)", 

434 file_time, 

435 next_buffer_time, 

436 ) 

437 continue # Get next file 

438 

439 # Found current/future file 

440 return filepath, file_time 

441 

442 

443def _read_and_validate_frame( 

444 filepath: str, 

445 channels: list[str], 

446 rates: dict[str, int], 

447 buffer_duration: float | None, 

448 file_time: int, 

449) -> dict[str, TimeSeries]: 

450 """Read and validate frame file. 

451 

452 Returns: 

453 Frame data dict 

454 

455 Raises: 

456 FileNotFoundError: If file doesn't exist 

457 OSError, RuntimeError, ValueError: If read or validation fails 

458 """ 

459 # Check file exists 

460 if not os.path.exists(filepath): 

461 msg = f"File no longer exists: {filepath}" 

462 raise FileNotFoundError(msg) 

463 

464 # Try reading with error recovery 

465 frame = gwframe.read(filepath, channel=channels) 

466 

467 # Validate if we have expected rates/duration 

468 if rates and buffer_duration is not None: 

469 for channel, series in frame.items(): 

470 # Validate sample rate 

471 actual_rate = int(series.sample_rate) 

472 expected_rate = rates[channel] 

473 if actual_rate != expected_rate: 

474 msg = ( 

475 f"Sample rate mismatch for {channel}: " 

476 f"expected {expected_rate}, got {actual_rate}" 

477 ) 

478 raise ValueError(msg) 

479 

480 # Validate duration 

481 if series.duration != buffer_duration: 

482 msg = ( 

483 f"Duration mismatch for {channel}: " 

484 f"expected {buffer_duration}, got {series.duration}" 

485 ) 

486 raise ValueError(msg) 

487 

488 # Validate t0 matches filename 

489 if series.t0 != file_time: 

490 msg = ( 

491 f"Time mismatch for {channel}: " 

492 f"filename suggests {file_time}, data has {series.t0}" 

493 ) 

494 raise ValueError(msg) 

495 

496 return frame 

497 

498 

499def _initialize_rates_from_sample( 

500 watch_dir: str, 

501 watch_suffix: str, 

502 channels: list[str], 

503 logger: logging.Logger, 

504) -> tuple[dict[str, int], float] | None: 

505 """Initialize rates and duration by reading a sample file from watch_dir. 

506 

507 Returns: 

508 (rates dict, buffer_duration) if successful, None otherwise 

509 """ 

510 files = sorted( 

511 [f for f in os.listdir(watch_dir) if f.endswith(watch_suffix)], 

512 reverse=True, # Get most recent file 

513 ) 

514 if not files: 

515 return None 

516 

517 sample_file = os.path.join(watch_dir, files[0]) 

518 try: 

519 sample_frame = gwframe.read(sample_file, channel=channels) 

520 rates, buffer_duration = _initialize_rates_and_duration(sample_frame, channels) 

521 logger.debug( 

522 "Initialized from sample file %s: rates=%s, duration=%s", 

523 sample_file, 

524 rates, 

525 buffer_duration, 

526 ) 

527 except (OSError, RuntimeError, ValueError, KeyError, IndexError) as e: 

528 logger.warning("Could not read sample file %s: %s", sample_file, e) 

529 return None 

530 else: 

531 return rates, buffer_duration