Coverage for little_loops / fsm / persistence.py: 25%

227 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""State persistence and event streaming for FSM loops. 

2 

3This module provides persistence capabilities for FSM loop execution: 

4- LoopState: Dataclass representing loop execution state 

5- StatePersistence: File I/O for state and events 

6- PersistentExecutor: Wrapper that persists state during execution 

7- Utility functions for listing running loops and reading history 

8 

9File structure: 

10 .loops/ 

11 ├── fix-types.yaml # Loop definition 

12 ├── .running/ # Runtime state (auto-managed) 

13 │ ├── fix-types.state.json 

14 │ └── fix-types.events.jsonl 

15 └── .history/ # Archived run logs (auto-populated) 

16 └── fix-types/ 

17 └── 2024-01-15T103000/ 

18 ├── state.json 

19 └── events.jsonl 

20""" 

21 

22from __future__ import annotations 

23 

24import json 

25import logging 

26import shutil 

27import time 

28from dataclasses import dataclass, field 

29from datetime import UTC, datetime 

30from pathlib import Path 

31from typing import Any 

32 

33from little_loops.fsm.executor import EventCallback, ExecutionResult, FSMExecutor 

34from little_loops.fsm.schema import FSMLoop 

35 

36RUNNING_DIR = ".running" 

37HISTORY_DIR = ".history" 

38 

39logger = logging.getLogger(__name__) 

40 

41 

42def _iso_now() -> str: 

43 """Return current time as ISO 8601 string.""" 

44 return datetime.now(UTC).isoformat() 

45 

46 

47def _now_ms() -> int: 

48 """Return current time in milliseconds.""" 

49 return int(time.time() * 1000) 

50 

51 

52@dataclass 

53class LoopState: 

54 """Persistent state for an FSM loop execution. 

55 

56 This captures all runtime state needed to resume a loop: 

57 - Current state and iteration 

58 - Captured variables and previous result 

59 - Last evaluation result 

60 - Timestamps and status 

61 

62 Attributes: 

63 loop_name: Name of the loop 

64 current_state: Current FSM state name 

65 iteration: Current iteration count (1-based) 

66 captured: Captured action outputs by variable name 

67 prev_result: Previous state's result (output, exit_code, state) 

68 last_result: Last evaluation result (verdict, details) 

69 started_at: ISO timestamp when loop started 

70 updated_at: ISO timestamp when state was last saved 

71 status: Execution status (running, completed, failed, interrupted, awaiting_continuation, timed_out) 

72 continuation_prompt: Continuation context from handoff signal (if status is awaiting_continuation) 

73 accumulated_ms: Total milliseconds elapsed across all segments up to this save (used to restore 

74 elapsed time correctly after resume, so duration_ms and ${loop.elapsed_ms} reflect the 

75 full loop lifetime rather than only the most recent segment) 

76 """ 

77 

78 loop_name: str 

79 current_state: str 

80 iteration: int 

81 captured: dict[str, dict[str, Any]] 

82 prev_result: dict[str, Any] | None 

83 last_result: dict[str, Any] | None 

84 started_at: str 

85 updated_at: str 

86 status: ( 

87 str # "running", "completed", "failed", "interrupted", "awaiting_continuation", "timed_out" 

88 ) 

89 continuation_prompt: str | None = None 

90 accumulated_ms: int = 0 # total elapsed ms across all segments (for resume offset) 

91 retry_counts: dict[str, int] = field(default_factory=dict) # per-state retry tracking 

92 active_sub_loop: str | None = None # name of currently executing sub-loop (observability) 

93 

94 def to_dict(self) -> dict[str, Any]: 

95 """Convert to dictionary for JSON serialization.""" 

96 result = { 

97 "loop_name": self.loop_name, 

98 "current_state": self.current_state, 

99 "iteration": self.iteration, 

100 "captured": self.captured, 

101 "prev_result": self.prev_result, 

102 "last_result": self.last_result, 

103 "started_at": self.started_at, 

104 "updated_at": self.updated_at, 

105 "status": self.status, 

106 "accumulated_ms": self.accumulated_ms, 

107 } 

108 if self.continuation_prompt is not None: 

109 result["continuation_prompt"] = self.continuation_prompt 

110 if self.retry_counts: 

111 result["retry_counts"] = self.retry_counts 

112 if self.active_sub_loop is not None: 

113 result["active_sub_loop"] = self.active_sub_loop 

114 return result 

115 

116 @classmethod 

117 def from_dict(cls, data: dict[str, Any]) -> LoopState: 

118 """Create LoopState from dictionary. 

119 

120 Args: 

121 data: Dictionary with loop state fields 

122 

123 Returns: 

124 LoopState instance 

125 """ 

126 return cls( 

127 loop_name=data["loop_name"], 

128 current_state=data["current_state"], 

129 iteration=data["iteration"], 

130 captured=data.get("captured", {}), 

131 prev_result=data.get("prev_result"), 

132 last_result=data.get("last_result"), 

133 started_at=data["started_at"], 

134 updated_at=data.get("updated_at", ""), 

135 status=data["status"], 

136 continuation_prompt=data.get("continuation_prompt"), 

137 accumulated_ms=data.get("accumulated_ms", 0), 

138 retry_counts=data.get("retry_counts", {}), 

139 active_sub_loop=data.get("active_sub_loop"), 

140 ) 

141 

142 

143class StatePersistence: 

144 """Manage loop state persistence and event streaming. 

145 

146 Handles file I/O for: 

147 - State file: JSON file with current execution state 

148 - Events file: JSONL file with execution events (append-only) 

149 

150 Files are stored in .loops/.running/<loop_name>.* 

151 """ 

152 

153 def __init__(self, loop_name: str, loops_dir: Path | None = None) -> None: 

154 """Initialize persistence for a loop. 

155 

156 Args: 

157 loop_name: Name of the loop 

158 loops_dir: Base directory for loops (default: .loops) 

159 """ 

160 self.loop_name = loop_name 

161 self.loops_dir = loops_dir or Path(".loops") 

162 self.running_dir = self.loops_dir / RUNNING_DIR 

163 self.state_file = self.running_dir / f"{loop_name}.state.json" 

164 self.events_file = self.running_dir / f"{loop_name}.events.jsonl" 

165 

166 def initialize(self) -> None: 

167 """Create running directory if needed.""" 

168 self.running_dir.mkdir(parents=True, exist_ok=True) 

169 

170 def save_state(self, state: LoopState) -> None: 

171 """Save current state to file. 

172 

173 Updates the updated_at timestamp before saving. 

174 

175 Args: 

176 state: LoopState to save 

177 """ 

178 state.updated_at = _iso_now() 

179 self.state_file.write_text(json.dumps(state.to_dict(), indent=2)) 

180 

181 def load_state(self) -> LoopState | None: 

182 """Load state from file, or None if not exists. 

183 

184 Returns: 

185 LoopState if file exists and is valid, None otherwise 

186 """ 

187 if not self.state_file.exists(): 

188 return None 

189 try: 

190 data = json.loads(self.state_file.read_text()) 

191 except json.JSONDecodeError: 

192 return None 

193 try: 

194 return LoopState.from_dict(data) 

195 except KeyError as e: 

196 logger.warning("Corrupted state file %s: missing key %s", self.state_file, e) 

197 return None 

198 

199 def clear_state(self) -> None: 

200 """Remove state file.""" 

201 if self.state_file.exists(): 

202 self.state_file.unlink() 

203 

204 def append_event(self, event: dict[str, Any]) -> None: 

205 """Append event to JSONL file. 

206 

207 Args: 

208 event: Event dictionary to append 

209 """ 

210 with open(self.events_file, "a", encoding="utf-8") as f: 

211 f.write(json.dumps(event) + "\n") 

212 

213 def read_events(self) -> list[dict[str, Any]]: 

214 """Read all events from file. 

215 

216 Returns: 

217 List of event dictionaries, empty if file doesn't exist 

218 """ 

219 if not self.events_file.exists(): 

220 return [] 

221 events: list[dict[str, Any]] = [] 

222 with open(self.events_file, encoding="utf-8") as f: 

223 for line in f: 

224 line = line.strip() 

225 if line: 

226 try: 

227 events.append(json.loads(line)) 

228 except json.JSONDecodeError: 

229 continue # Skip malformed lines 

230 return events 

231 

232 def clear_events(self) -> None: 

233 """Remove events file.""" 

234 if self.events_file.exists(): 

235 self.events_file.unlink() 

236 

237 def archive_run(self) -> Path | None: 

238 """Archive current run files to .history/ before clearing. 

239 

240 Reads the current state to derive the run timestamp, then copies 

241 both state.json and events.jsonl into: 

242 <loops_dir>/.history/<loop_name>/<run_id>/ 

243 

244 where run_id is a compact ISO timestamp derived from started_at 

245 (e.g. "2024-01-15T103000" from "2024-01-15T10:30:00.123456+00:00"). 

246 

247 Returns: 

248 Path to the archive directory if files were archived, None if 

249 there were no files to archive (fresh run). 

250 """ 

251 has_state = self.state_file.exists() 

252 has_events = self.events_file.exists() 

253 if not has_state and not has_events: 

254 return None 

255 

256 # Derive run ID from started_at in state file, or fall back to now 

257 state = self.load_state() 

258 if state is not None and state.started_at: 

259 # Compact ISO: strip colons, dots, plus signs; take first 19 chars 

260 # e.g. "2024-01-15T10:30:00.123+00:00" → "2024-01-15T103000" 

261 run_id = state.started_at.replace(":", "").replace(".", "").replace("+", "")[:17] 

262 else: 

263 run_id = datetime.now(UTC).strftime("%Y-%m-%dT%H%M%S") 

264 

265 archive_dir = self.loops_dir / HISTORY_DIR / self.loop_name / run_id 

266 archive_dir.mkdir(parents=True, exist_ok=True) 

267 

268 if has_state: 

269 shutil.copy2(self.state_file, archive_dir / "state.json") 

270 if has_events: 

271 shutil.copy2(self.events_file, archive_dir / "events.jsonl") 

272 

273 return archive_dir 

274 

275 def clear_all(self) -> None: 

276 """Archive current run files then clear state and events (for new run).""" 

277 self.archive_run() 

278 self.clear_state() 

279 self.clear_events() 

280 

281 

282class PersistentExecutor: 

283 """FSM Executor with state persistence and event streaming. 

284 

285 Wraps FSMExecutor to: 

286 - Save state after each state transition 

287 - Append events to JSONL file as they occur 

288 - Support resuming from saved state 

289 - Support graceful shutdown via signal handling 

290 """ 

291 

292 def __init__( 

293 self, 

294 fsm: FSMLoop, 

295 persistence: StatePersistence | None = None, 

296 loops_dir: Path | None = None, 

297 **executor_kwargs: Any, 

298 ) -> None: 

299 """Initialize persistent executor. 

300 

301 Args: 

302 fsm: FSM loop definition 

303 persistence: Optional pre-configured persistence (for testing) 

304 loops_dir: Base directory for loops (default: .loops) 

305 **executor_kwargs: Additional kwargs for FSMExecutor 

306 """ 

307 from little_loops.fsm.handoff_handler import HandoffBehavior, HandoffHandler 

308 from little_loops.fsm.signal_detector import SignalDetector 

309 

310 self.fsm = fsm 

311 self.loops_dir = loops_dir 

312 self.persistence = persistence or StatePersistence(fsm.name, loops_dir or Path(".loops")) 

313 self.persistence.initialize() 

314 

315 # Create signal detector and handler based on FSM config 

316 signal_detector = SignalDetector() 

317 handoff_handler = HandoffHandler(HandoffBehavior(fsm.on_handoff)) 

318 

319 # Create base executor with event callback that persists 

320 self._executor = FSMExecutor( 

321 fsm, 

322 event_callback=self._handle_event, 

323 signal_detector=signal_detector, 

324 handoff_handler=handoff_handler, 

325 loops_dir=self.loops_dir, 

326 **executor_kwargs, 

327 ) 

328 self._last_result: dict[str, Any] | None = None 

329 self._continuation_prompt: str | None = None 

330 self._on_event: EventCallback | None = None 

331 

332 def request_shutdown(self) -> None: 

333 """Request graceful shutdown of the executor. 

334 

335 Delegates to the underlying FSMExecutor's request_shutdown method. 

336 The loop will exit cleanly after the current state completes, 

337 saving state as "interrupted" so it can be resumed later. 

338 """ 

339 self._executor.request_shutdown() 

340 

341 def _handle_event(self, event: dict[str, Any]) -> None: 

342 """Handle event: persist to file and save state. 

343 

344 Args: 

345 event: Event dictionary from executor 

346 """ 

347 self.persistence.append_event(event) 

348 

349 # Save state after state transitions 

350 event_type = event.get("event") 

351 if event_type in ("state_enter", "loop_complete"): 

352 self._save_state() 

353 

354 # Track evaluation results for state persistence 

355 if event_type == "evaluate": 

356 self._last_result = { 

357 "verdict": event.get("verdict"), 

358 "details": { 

359 k: v for k, v in event.items() if k not in ("event", "ts", "type", "verdict") 

360 }, 

361 } 

362 

363 # Track handoff events for continuation prompt 

364 if event_type == "handoff_detected": 

365 self._continuation_prompt = event.get("continuation") 

366 

367 # Delegate to secondary observer (e.g. progress display) 

368 if self._on_event is not None: 

369 self._on_event(event) 

370 

371 def _save_state(self) -> None: 

372 """Save current executor state to file.""" 

373 status = "running" 

374 if self._executor.current_state: 

375 state_config = self.fsm.states.get(self._executor.current_state) 

376 if state_config and state_config.terminal: 

377 status = "completed" 

378 

379 state = LoopState( 

380 loop_name=self.fsm.name, 

381 current_state=self._executor.current_state, 

382 iteration=self._executor.iteration, 

383 captured=self._executor.captured, 

384 prev_result=self._executor.prev_result, 

385 last_result=self._last_result, 

386 started_at=self._executor.started_at, 

387 updated_at="", # Will be set by save_state 

388 status=status, 

389 accumulated_ms=_now_ms() 

390 - self._executor.start_time_ms 

391 + self._executor.elapsed_offset_ms, 

392 retry_counts=dict(self._executor._retry_counts), 

393 ) 

394 self.persistence.save_state(state) 

395 

396 def run(self, clear_previous: bool = True) -> ExecutionResult: 

397 """Run the FSM with persistence. 

398 

399 Args: 

400 clear_previous: If True, clear previous state/events before running 

401 

402 Returns: 

403 ExecutionResult from the execution 

404 """ 

405 if clear_previous: 

406 self.persistence.clear_all() 

407 

408 result = self._executor.run() 

409 

410 # Update final state 

411 final_status = "completed" if result.terminated_by == "terminal" else "failed" 

412 if result.terminated_by in ("max_iterations", "signal"): 

413 final_status = "interrupted" 

414 if result.terminated_by == "handoff": 

415 final_status = "awaiting_continuation" 

416 if result.terminated_by == "timeout": 

417 final_status = "timed_out" 

418 

419 final_state = LoopState( 

420 loop_name=self.fsm.name, 

421 current_state=result.final_state, 

422 iteration=result.iterations, 

423 captured=result.captured, 

424 prev_result=self._executor.prev_result, 

425 last_result=self._last_result, 

426 started_at=self._executor.started_at, 

427 updated_at="", 

428 status=final_status, 

429 continuation_prompt=self._continuation_prompt, 

430 accumulated_ms=result.duration_ms, 

431 ) 

432 self.persistence.save_state(final_state) 

433 

434 return result 

435 

436 def resume(self) -> ExecutionResult | None: 

437 """Resume from saved state, or None if no resumable state. 

438 

439 Resumable states are: "running" and "awaiting_continuation". 

440 

441 Returns: 

442 ExecutionResult if resumed and completed, None if no resumable state 

443 """ 

444 state = self.persistence.load_state() 

445 if state is None: 

446 return None 

447 

448 if state.status not in ("running", "awaiting_continuation"): 

449 return None # Already completed/failed 

450 

451 # Restore executor state 

452 self._executor.current_state = state.current_state 

453 self._executor.iteration = state.iteration 

454 self._executor.captured = state.captured 

455 self._executor.prev_result = state.prev_result 

456 self._executor.started_at = state.started_at 

457 self._last_result = state.last_result 

458 self._executor._retry_counts = dict(state.retry_counts) 

459 

460 # Restore accumulated elapsed time so duration_ms and ${loop.elapsed_ms} reflect 

461 # the full loop lifetime (all segments), not just the resumed segment. 

462 # FSMExecutor.run() will reset start_time_ms to _now_ms(), so we use elapsed_offset_ms 

463 # to carry forward the time already spent before this resume. 

464 self._executor.elapsed_offset_ms = state.accumulated_ms 

465 

466 # Clear any pending signals from previous run 

467 self._executor._pending_handoff = None 

468 self._executor._pending_error = None 

469 

470 # Emit resume event with continuation context if available 

471 resume_event: dict[str, Any] = { 

472 "event": "loop_resume", 

473 "ts": _iso_now(), 

474 "loop": self.fsm.name, 

475 "from_state": state.current_state, 

476 "iteration": state.iteration, 

477 } 

478 if state.status == "awaiting_continuation" and state.continuation_prompt: 

479 resume_event["from_handoff"] = True 

480 resume_event["continuation_prompt"] = state.continuation_prompt 

481 self.persistence.append_event(resume_event) 

482 

483 # Continue execution (don't clear previous events) 

484 return self.run(clear_previous=False) 

485 

486 

487def list_running_loops(loops_dir: Path | None = None) -> list[LoopState]: 

488 """List all loops with saved state. 

489 

490 Args: 

491 loops_dir: Base directory for loops (default: .loops) 

492 

493 Returns: 

494 List of LoopState objects for all loops with state files 

495 """ 

496 base_dir = loops_dir or Path(".loops") 

497 running_dir = base_dir / RUNNING_DIR 

498 

499 if not running_dir.exists(): 

500 return [] 

501 

502 states: list[LoopState] = [] 

503 for state_file in running_dir.glob("*.state.json"): 

504 try: 

505 data = json.loads(state_file.read_text()) 

506 states.append(LoopState.from_dict(data)) 

507 except (json.JSONDecodeError, KeyError): 

508 continue # Skip malformed files 

509 

510 return states 

511 

512 

513def list_run_history(loop_name: str, loops_dir: Path | None = None) -> list[LoopState]: 

514 """List archived runs for a loop, newest first. 

515 

516 Reads state files from .loops/.history/<loop_name>/*/state.json and returns 

517 them sorted by started_at descending (most recent run first). 

518 

519 Args: 

520 loop_name: Name of the loop 

521 loops_dir: Base directory for loops (default: .loops) 

522 

523 Returns: 

524 List of LoopState objects for all archived runs, newest first. 

525 Returns an empty list if no history exists. 

526 """ 

527 base_dir = loops_dir or Path(".loops") 

528 history_loop_dir = base_dir / HISTORY_DIR / loop_name 

529 

530 if not history_loop_dir.exists(): 

531 return [] 

532 

533 states: list[LoopState] = [] 

534 for state_file in history_loop_dir.glob("*/state.json"): 

535 try: 

536 data = json.loads(state_file.read_text()) 

537 states.append(LoopState.from_dict(data)) 

538 except (json.JSONDecodeError, KeyError): 

539 continue 

540 

541 states.sort(key=lambda s: s.started_at, reverse=True) 

542 return states 

543 

544 

545def get_archived_events( 

546 loop_name: str, run_id: str, loops_dir: Path | None = None 

547) -> list[dict[str, Any]]: 

548 """Read events for a specific archived run. 

549 

550 Args: 

551 loop_name: Name of the loop 

552 run_id: The run directory name (compact timestamp) 

553 loops_dir: Base directory for loops (default: .loops) 

554 

555 Returns: 

556 List of event dictionaries, empty if not found. 

557 """ 

558 base_dir = loops_dir or Path(".loops") 

559 events_file = base_dir / HISTORY_DIR / loop_name / run_id / "events.jsonl" 

560 

561 if not events_file.exists(): 

562 return [] 

563 

564 events: list[dict[str, Any]] = [] 

565 with open(events_file, encoding="utf-8") as f: 

566 for line in f: 

567 line = line.strip() 

568 if line: 

569 try: 

570 events.append(json.loads(line)) 

571 except json.JSONDecodeError: 

572 continue 

573 return events 

574 

575 

576def get_loop_history(loop_name: str, loops_dir: Path | None = None) -> list[dict[str, Any]]: 

577 """Get event history for a loop. 

578 

579 Args: 

580 loop_name: Name of the loop 

581 loops_dir: Base directory for loops (default: .loops) 

582 

583 Returns: 

584 List of event dictionaries 

585 """ 

586 persistence = StatePersistence(loop_name, loops_dir) 

587 return persistence.read_events()