Coverage for little_loops / fsm / persistence.py: 25%
227 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""State persistence and event streaming for FSM loops.
3This module provides persistence capabilities for FSM loop execution:
4- LoopState: Dataclass representing loop execution state
5- StatePersistence: File I/O for state and events
6- PersistentExecutor: Wrapper that persists state during execution
7- Utility functions for listing running loops and reading history
9File structure:
10 .loops/
11 ├── fix-types.yaml # Loop definition
12 ├── .running/ # Runtime state (auto-managed)
13 │ ├── fix-types.state.json
14 │ └── fix-types.events.jsonl
15 └── .history/ # Archived run logs (auto-populated)
16 └── fix-types/
17 └── 2024-01-15T103000/
18 ├── state.json
19 └── events.jsonl
20"""
22from __future__ import annotations
24import json
25import logging
26import shutil
27import time
28from dataclasses import dataclass, field
29from datetime import UTC, datetime
30from pathlib import Path
31from typing import Any
33from little_loops.fsm.executor import EventCallback, ExecutionResult, FSMExecutor
34from little_loops.fsm.schema import FSMLoop
36RUNNING_DIR = ".running"
37HISTORY_DIR = ".history"
39logger = logging.getLogger(__name__)
42def _iso_now() -> str:
43 """Return current time as ISO 8601 string."""
44 return datetime.now(UTC).isoformat()
47def _now_ms() -> int:
48 """Return current time in milliseconds."""
49 return int(time.time() * 1000)
52@dataclass
53class LoopState:
54 """Persistent state for an FSM loop execution.
56 This captures all runtime state needed to resume a loop:
57 - Current state and iteration
58 - Captured variables and previous result
59 - Last evaluation result
60 - Timestamps and status
62 Attributes:
63 loop_name: Name of the loop
64 current_state: Current FSM state name
65 iteration: Current iteration count (1-based)
66 captured: Captured action outputs by variable name
67 prev_result: Previous state's result (output, exit_code, state)
68 last_result: Last evaluation result (verdict, details)
69 started_at: ISO timestamp when loop started
70 updated_at: ISO timestamp when state was last saved
71 status: Execution status (running, completed, failed, interrupted, awaiting_continuation, timed_out)
72 continuation_prompt: Continuation context from handoff signal (if status is awaiting_continuation)
73 accumulated_ms: Total milliseconds elapsed across all segments up to this save (used to restore
74 elapsed time correctly after resume, so duration_ms and ${loop.elapsed_ms} reflect the
75 full loop lifetime rather than only the most recent segment)
76 """
78 loop_name: str
79 current_state: str
80 iteration: int
81 captured: dict[str, dict[str, Any]]
82 prev_result: dict[str, Any] | None
83 last_result: dict[str, Any] | None
84 started_at: str
85 updated_at: str
86 status: (
87 str # "running", "completed", "failed", "interrupted", "awaiting_continuation", "timed_out"
88 )
89 continuation_prompt: str | None = None
90 accumulated_ms: int = 0 # total elapsed ms across all segments (for resume offset)
91 retry_counts: dict[str, int] = field(default_factory=dict) # per-state retry tracking
92 active_sub_loop: str | None = None # name of currently executing sub-loop (observability)
94 def to_dict(self) -> dict[str, Any]:
95 """Convert to dictionary for JSON serialization."""
96 result = {
97 "loop_name": self.loop_name,
98 "current_state": self.current_state,
99 "iteration": self.iteration,
100 "captured": self.captured,
101 "prev_result": self.prev_result,
102 "last_result": self.last_result,
103 "started_at": self.started_at,
104 "updated_at": self.updated_at,
105 "status": self.status,
106 "accumulated_ms": self.accumulated_ms,
107 }
108 if self.continuation_prompt is not None:
109 result["continuation_prompt"] = self.continuation_prompt
110 if self.retry_counts:
111 result["retry_counts"] = self.retry_counts
112 if self.active_sub_loop is not None:
113 result["active_sub_loop"] = self.active_sub_loop
114 return result
116 @classmethod
117 def from_dict(cls, data: dict[str, Any]) -> LoopState:
118 """Create LoopState from dictionary.
120 Args:
121 data: Dictionary with loop state fields
123 Returns:
124 LoopState instance
125 """
126 return cls(
127 loop_name=data["loop_name"],
128 current_state=data["current_state"],
129 iteration=data["iteration"],
130 captured=data.get("captured", {}),
131 prev_result=data.get("prev_result"),
132 last_result=data.get("last_result"),
133 started_at=data["started_at"],
134 updated_at=data.get("updated_at", ""),
135 status=data["status"],
136 continuation_prompt=data.get("continuation_prompt"),
137 accumulated_ms=data.get("accumulated_ms", 0),
138 retry_counts=data.get("retry_counts", {}),
139 active_sub_loop=data.get("active_sub_loop"),
140 )
143class StatePersistence:
144 """Manage loop state persistence and event streaming.
146 Handles file I/O for:
147 - State file: JSON file with current execution state
148 - Events file: JSONL file with execution events (append-only)
150 Files are stored in .loops/.running/<loop_name>.*
151 """
153 def __init__(self, loop_name: str, loops_dir: Path | None = None) -> None:
154 """Initialize persistence for a loop.
156 Args:
157 loop_name: Name of the loop
158 loops_dir: Base directory for loops (default: .loops)
159 """
160 self.loop_name = loop_name
161 self.loops_dir = loops_dir or Path(".loops")
162 self.running_dir = self.loops_dir / RUNNING_DIR
163 self.state_file = self.running_dir / f"{loop_name}.state.json"
164 self.events_file = self.running_dir / f"{loop_name}.events.jsonl"
166 def initialize(self) -> None:
167 """Create running directory if needed."""
168 self.running_dir.mkdir(parents=True, exist_ok=True)
170 def save_state(self, state: LoopState) -> None:
171 """Save current state to file.
173 Updates the updated_at timestamp before saving.
175 Args:
176 state: LoopState to save
177 """
178 state.updated_at = _iso_now()
179 self.state_file.write_text(json.dumps(state.to_dict(), indent=2))
181 def load_state(self) -> LoopState | None:
182 """Load state from file, or None if not exists.
184 Returns:
185 LoopState if file exists and is valid, None otherwise
186 """
187 if not self.state_file.exists():
188 return None
189 try:
190 data = json.loads(self.state_file.read_text())
191 except json.JSONDecodeError:
192 return None
193 try:
194 return LoopState.from_dict(data)
195 except KeyError as e:
196 logger.warning("Corrupted state file %s: missing key %s", self.state_file, e)
197 return None
199 def clear_state(self) -> None:
200 """Remove state file."""
201 if self.state_file.exists():
202 self.state_file.unlink()
204 def append_event(self, event: dict[str, Any]) -> None:
205 """Append event to JSONL file.
207 Args:
208 event: Event dictionary to append
209 """
210 with open(self.events_file, "a", encoding="utf-8") as f:
211 f.write(json.dumps(event) + "\n")
213 def read_events(self) -> list[dict[str, Any]]:
214 """Read all events from file.
216 Returns:
217 List of event dictionaries, empty if file doesn't exist
218 """
219 if not self.events_file.exists():
220 return []
221 events: list[dict[str, Any]] = []
222 with open(self.events_file, encoding="utf-8") as f:
223 for line in f:
224 line = line.strip()
225 if line:
226 try:
227 events.append(json.loads(line))
228 except json.JSONDecodeError:
229 continue # Skip malformed lines
230 return events
232 def clear_events(self) -> None:
233 """Remove events file."""
234 if self.events_file.exists():
235 self.events_file.unlink()
237 def archive_run(self) -> Path | None:
238 """Archive current run files to .history/ before clearing.
240 Reads the current state to derive the run timestamp, then copies
241 both state.json and events.jsonl into:
242 <loops_dir>/.history/<loop_name>/<run_id>/
244 where run_id is a compact ISO timestamp derived from started_at
245 (e.g. "2024-01-15T103000" from "2024-01-15T10:30:00.123456+00:00").
247 Returns:
248 Path to the archive directory if files were archived, None if
249 there were no files to archive (fresh run).
250 """
251 has_state = self.state_file.exists()
252 has_events = self.events_file.exists()
253 if not has_state and not has_events:
254 return None
256 # Derive run ID from started_at in state file, or fall back to now
257 state = self.load_state()
258 if state is not None and state.started_at:
259 # Compact ISO: strip colons, dots, plus signs; take first 19 chars
260 # e.g. "2024-01-15T10:30:00.123+00:00" → "2024-01-15T103000"
261 run_id = state.started_at.replace(":", "").replace(".", "").replace("+", "")[:17]
262 else:
263 run_id = datetime.now(UTC).strftime("%Y-%m-%dT%H%M%S")
265 archive_dir = self.loops_dir / HISTORY_DIR / self.loop_name / run_id
266 archive_dir.mkdir(parents=True, exist_ok=True)
268 if has_state:
269 shutil.copy2(self.state_file, archive_dir / "state.json")
270 if has_events:
271 shutil.copy2(self.events_file, archive_dir / "events.jsonl")
273 return archive_dir
275 def clear_all(self) -> None:
276 """Archive current run files then clear state and events (for new run)."""
277 self.archive_run()
278 self.clear_state()
279 self.clear_events()
282class PersistentExecutor:
283 """FSM Executor with state persistence and event streaming.
285 Wraps FSMExecutor to:
286 - Save state after each state transition
287 - Append events to JSONL file as they occur
288 - Support resuming from saved state
289 - Support graceful shutdown via signal handling
290 """
292 def __init__(
293 self,
294 fsm: FSMLoop,
295 persistence: StatePersistence | None = None,
296 loops_dir: Path | None = None,
297 **executor_kwargs: Any,
298 ) -> None:
299 """Initialize persistent executor.
301 Args:
302 fsm: FSM loop definition
303 persistence: Optional pre-configured persistence (for testing)
304 loops_dir: Base directory for loops (default: .loops)
305 **executor_kwargs: Additional kwargs for FSMExecutor
306 """
307 from little_loops.fsm.handoff_handler import HandoffBehavior, HandoffHandler
308 from little_loops.fsm.signal_detector import SignalDetector
310 self.fsm = fsm
311 self.loops_dir = loops_dir
312 self.persistence = persistence or StatePersistence(fsm.name, loops_dir or Path(".loops"))
313 self.persistence.initialize()
315 # Create signal detector and handler based on FSM config
316 signal_detector = SignalDetector()
317 handoff_handler = HandoffHandler(HandoffBehavior(fsm.on_handoff))
319 # Create base executor with event callback that persists
320 self._executor = FSMExecutor(
321 fsm,
322 event_callback=self._handle_event,
323 signal_detector=signal_detector,
324 handoff_handler=handoff_handler,
325 loops_dir=self.loops_dir,
326 **executor_kwargs,
327 )
328 self._last_result: dict[str, Any] | None = None
329 self._continuation_prompt: str | None = None
330 self._on_event: EventCallback | None = None
332 def request_shutdown(self) -> None:
333 """Request graceful shutdown of the executor.
335 Delegates to the underlying FSMExecutor's request_shutdown method.
336 The loop will exit cleanly after the current state completes,
337 saving state as "interrupted" so it can be resumed later.
338 """
339 self._executor.request_shutdown()
341 def _handle_event(self, event: dict[str, Any]) -> None:
342 """Handle event: persist to file and save state.
344 Args:
345 event: Event dictionary from executor
346 """
347 self.persistence.append_event(event)
349 # Save state after state transitions
350 event_type = event.get("event")
351 if event_type in ("state_enter", "loop_complete"):
352 self._save_state()
354 # Track evaluation results for state persistence
355 if event_type == "evaluate":
356 self._last_result = {
357 "verdict": event.get("verdict"),
358 "details": {
359 k: v for k, v in event.items() if k not in ("event", "ts", "type", "verdict")
360 },
361 }
363 # Track handoff events for continuation prompt
364 if event_type == "handoff_detected":
365 self._continuation_prompt = event.get("continuation")
367 # Delegate to secondary observer (e.g. progress display)
368 if self._on_event is not None:
369 self._on_event(event)
371 def _save_state(self) -> None:
372 """Save current executor state to file."""
373 status = "running"
374 if self._executor.current_state:
375 state_config = self.fsm.states.get(self._executor.current_state)
376 if state_config and state_config.terminal:
377 status = "completed"
379 state = LoopState(
380 loop_name=self.fsm.name,
381 current_state=self._executor.current_state,
382 iteration=self._executor.iteration,
383 captured=self._executor.captured,
384 prev_result=self._executor.prev_result,
385 last_result=self._last_result,
386 started_at=self._executor.started_at,
387 updated_at="", # Will be set by save_state
388 status=status,
389 accumulated_ms=_now_ms()
390 - self._executor.start_time_ms
391 + self._executor.elapsed_offset_ms,
392 retry_counts=dict(self._executor._retry_counts),
393 )
394 self.persistence.save_state(state)
396 def run(self, clear_previous: bool = True) -> ExecutionResult:
397 """Run the FSM with persistence.
399 Args:
400 clear_previous: If True, clear previous state/events before running
402 Returns:
403 ExecutionResult from the execution
404 """
405 if clear_previous:
406 self.persistence.clear_all()
408 result = self._executor.run()
410 # Update final state
411 final_status = "completed" if result.terminated_by == "terminal" else "failed"
412 if result.terminated_by in ("max_iterations", "signal"):
413 final_status = "interrupted"
414 if result.terminated_by == "handoff":
415 final_status = "awaiting_continuation"
416 if result.terminated_by == "timeout":
417 final_status = "timed_out"
419 final_state = LoopState(
420 loop_name=self.fsm.name,
421 current_state=result.final_state,
422 iteration=result.iterations,
423 captured=result.captured,
424 prev_result=self._executor.prev_result,
425 last_result=self._last_result,
426 started_at=self._executor.started_at,
427 updated_at="",
428 status=final_status,
429 continuation_prompt=self._continuation_prompt,
430 accumulated_ms=result.duration_ms,
431 )
432 self.persistence.save_state(final_state)
434 return result
436 def resume(self) -> ExecutionResult | None:
437 """Resume from saved state, or None if no resumable state.
439 Resumable states are: "running" and "awaiting_continuation".
441 Returns:
442 ExecutionResult if resumed and completed, None if no resumable state
443 """
444 state = self.persistence.load_state()
445 if state is None:
446 return None
448 if state.status not in ("running", "awaiting_continuation"):
449 return None # Already completed/failed
451 # Restore executor state
452 self._executor.current_state = state.current_state
453 self._executor.iteration = state.iteration
454 self._executor.captured = state.captured
455 self._executor.prev_result = state.prev_result
456 self._executor.started_at = state.started_at
457 self._last_result = state.last_result
458 self._executor._retry_counts = dict(state.retry_counts)
460 # Restore accumulated elapsed time so duration_ms and ${loop.elapsed_ms} reflect
461 # the full loop lifetime (all segments), not just the resumed segment.
462 # FSMExecutor.run() will reset start_time_ms to _now_ms(), so we use elapsed_offset_ms
463 # to carry forward the time already spent before this resume.
464 self._executor.elapsed_offset_ms = state.accumulated_ms
466 # Clear any pending signals from previous run
467 self._executor._pending_handoff = None
468 self._executor._pending_error = None
470 # Emit resume event with continuation context if available
471 resume_event: dict[str, Any] = {
472 "event": "loop_resume",
473 "ts": _iso_now(),
474 "loop": self.fsm.name,
475 "from_state": state.current_state,
476 "iteration": state.iteration,
477 }
478 if state.status == "awaiting_continuation" and state.continuation_prompt:
479 resume_event["from_handoff"] = True
480 resume_event["continuation_prompt"] = state.continuation_prompt
481 self.persistence.append_event(resume_event)
483 # Continue execution (don't clear previous events)
484 return self.run(clear_previous=False)
487def list_running_loops(loops_dir: Path | None = None) -> list[LoopState]:
488 """List all loops with saved state.
490 Args:
491 loops_dir: Base directory for loops (default: .loops)
493 Returns:
494 List of LoopState objects for all loops with state files
495 """
496 base_dir = loops_dir or Path(".loops")
497 running_dir = base_dir / RUNNING_DIR
499 if not running_dir.exists():
500 return []
502 states: list[LoopState] = []
503 for state_file in running_dir.glob("*.state.json"):
504 try:
505 data = json.loads(state_file.read_text())
506 states.append(LoopState.from_dict(data))
507 except (json.JSONDecodeError, KeyError):
508 continue # Skip malformed files
510 return states
513def list_run_history(loop_name: str, loops_dir: Path | None = None) -> list[LoopState]:
514 """List archived runs for a loop, newest first.
516 Reads state files from .loops/.history/<loop_name>/*/state.json and returns
517 them sorted by started_at descending (most recent run first).
519 Args:
520 loop_name: Name of the loop
521 loops_dir: Base directory for loops (default: .loops)
523 Returns:
524 List of LoopState objects for all archived runs, newest first.
525 Returns an empty list if no history exists.
526 """
527 base_dir = loops_dir or Path(".loops")
528 history_loop_dir = base_dir / HISTORY_DIR / loop_name
530 if not history_loop_dir.exists():
531 return []
533 states: list[LoopState] = []
534 for state_file in history_loop_dir.glob("*/state.json"):
535 try:
536 data = json.loads(state_file.read_text())
537 states.append(LoopState.from_dict(data))
538 except (json.JSONDecodeError, KeyError):
539 continue
541 states.sort(key=lambda s: s.started_at, reverse=True)
542 return states
545def get_archived_events(
546 loop_name: str, run_id: str, loops_dir: Path | None = None
547) -> list[dict[str, Any]]:
548 """Read events for a specific archived run.
550 Args:
551 loop_name: Name of the loop
552 run_id: The run directory name (compact timestamp)
553 loops_dir: Base directory for loops (default: .loops)
555 Returns:
556 List of event dictionaries, empty if not found.
557 """
558 base_dir = loops_dir or Path(".loops")
559 events_file = base_dir / HISTORY_DIR / loop_name / run_id / "events.jsonl"
561 if not events_file.exists():
562 return []
564 events: list[dict[str, Any]] = []
565 with open(events_file, encoding="utf-8") as f:
566 for line in f:
567 line = line.strip()
568 if line:
569 try:
570 events.append(json.loads(line))
571 except json.JSONDecodeError:
572 continue
573 return events
576def get_loop_history(loop_name: str, loops_dir: Path | None = None) -> list[dict[str, Any]]:
577 """Get event history for a loop.
579 Args:
580 loop_name: Name of the loop
581 loops_dir: Base directory for loops (default: .loops)
583 Returns:
584 List of event dictionaries
585 """
586 persistence = StatePersistence(loop_name, loops_dir)
587 return persistence.read_events()