Coverage for little_loops / fsm / executor.py: 17%
393 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""FSM Executor - Runtime engine for FSM loop execution.
3This module provides the execution engine that runs FSM loops:
4- Executes actions (shell commands or slash commands)
5- Evaluates results using appropriate evaluators
6- Routes to next states based on verdicts
7- Tracks iteration count and enforces limits
8- Manages captured variables and context
9"""
11from __future__ import annotations
13import json
14import subprocess
15import sys
16import threading
17import time
18from collections.abc import Callable
19from dataclasses import dataclass, field
20from datetime import UTC, datetime
21from pathlib import Path
22from typing import Any, Protocol
24from little_loops.fsm.evaluators import (
25 EvaluationResult,
26 evaluate,
27 evaluate_exit_code,
28 evaluate_llm_structured,
29 evaluate_mcp_result,
30)
31from little_loops.fsm.handoff_handler import HandoffHandler
32from little_loops.fsm.interpolation import (
33 InterpolationContext,
34 InterpolationError,
35 interpolate,
36 interpolate_dict,
37)
38from little_loops.fsm.schema import FSMLoop, StateConfig
39from little_loops.fsm.signal_detector import DetectedSignal, SignalDetector
40from little_loops.session_log import get_current_session_jsonl
43@dataclass
44class ExecutionResult:
45 """Result from FSM execution.
47 Attributes:
48 final_state: Name of the state when execution stopped
49 iterations: Total iterations executed
50 terminated_by: Reason for termination (terminal, max_iterations, timeout, signal, error, handoff)
51 duration_ms: Total execution time in milliseconds
52 captured: All captured variable values
53 error: Error message if terminated_by is "error"
54 handoff: True if execution stopped due to handoff signal
55 continuation_prompt: Continuation context from handoff signal
56 """
58 final_state: str
59 iterations: int
60 terminated_by: str # "terminal", "max_iterations", "timeout", "signal", "error", "handoff"
61 duration_ms: int
62 captured: dict[str, dict[str, Any]]
63 error: str | None = None
64 handoff: bool = False
65 continuation_prompt: str | None = None
67 def to_dict(self) -> dict[str, Any]:
68 """Convert to dictionary for JSON serialization."""
69 result: dict[str, Any] = {
70 "final_state": self.final_state,
71 "iterations": self.iterations,
72 "terminated_by": self.terminated_by,
73 "duration_ms": self.duration_ms,
74 "captured": self.captured,
75 }
76 if self.error is not None:
77 result["error"] = self.error
78 if self.handoff:
79 result["handoff"] = self.handoff
80 if self.continuation_prompt is not None:
81 result["continuation_prompt"] = self.continuation_prompt
82 return result
85@dataclass
86class ActionResult:
87 """Result from action execution.
89 Attributes:
90 output: stdout from the action
91 stderr: stderr from the action
92 exit_code: Exit code from the action
93 duration_ms: Execution time in milliseconds
94 """
96 output: str
97 stderr: str
98 exit_code: int
99 duration_ms: int
102# Type for event callback
103EventCallback = Callable[[dict[str, Any]], None]
106class ActionRunner(Protocol):
107 """Protocol for action execution."""
109 def run(
110 self,
111 action: str,
112 timeout: int,
113 is_slash_command: bool,
114 on_output_line: Callable[[str], None] | None = None,
115 ) -> ActionResult:
116 """Execute an action and return the result.
118 Args:
119 action: The command to execute
120 timeout: Timeout in seconds
121 is_slash_command: True if this is a slash command (starts with /)
122 on_output_line: Optional callback invoked for each output line
124 Returns:
125 ActionResult with output, stderr, exit_code, duration_ms
126 """
127 ...
130class DefaultActionRunner:
131 """Execute actions via subprocess or Claude CLI."""
133 def __init__(self) -> None:
134 self._current_process: subprocess.Popen[str] | None = None
136 def run(
137 self,
138 action: str,
139 timeout: int,
140 is_slash_command: bool,
141 on_output_line: Callable[[str], None] | None = None,
142 ) -> ActionResult:
143 """Execute action and return result, streaming output line by line.
145 Args:
146 action: The command to execute
147 timeout: Timeout in seconds
148 is_slash_command: True if action starts with /
149 on_output_line: Optional callback invoked for each stdout line
151 Returns:
152 ActionResult with execution details
153 """
154 start = _now_ms()
156 if is_slash_command:
157 # Execute via Claude CLI
158 cmd = [
159 "claude",
160 "--dangerously-skip-permissions",
161 "--no-session-persistence",
162 "-p",
163 action,
164 ]
165 else:
166 # Shell command
167 cmd = ["bash", "-c", action]
169 process = subprocess.Popen(
170 cmd,
171 stdout=subprocess.PIPE,
172 stderr=subprocess.PIPE,
173 text=True,
174 )
175 self._current_process = process
176 output_chunks: list[str] = []
177 stderr_chunks: list[str] = []
179 def _drain_stderr() -> None:
180 assert process.stderr is not None
181 for line in process.stderr:
182 stderr_chunks.append(line)
184 stderr_thread = threading.Thread(target=_drain_stderr, daemon=True)
185 stderr_thread.start()
187 try:
188 for line in process.stdout: # type: ignore[union-attr]
189 output_chunks.append(line)
190 if on_output_line:
191 on_output_line(line.rstrip())
192 process.wait(timeout=timeout)
193 except subprocess.TimeoutExpired:
194 process.kill()
195 process.wait()
196 stderr_thread.join(timeout=5)
197 return ActionResult(
198 output="".join(output_chunks),
199 stderr="".join(stderr_chunks) or "Action timed out",
200 exit_code=124,
201 duration_ms=timeout * 1000,
202 )
203 finally:
204 self._current_process = None
205 stderr_thread.join(timeout=5)
206 stderr = "".join(stderr_chunks)
207 return ActionResult(
208 output="".join(output_chunks),
209 stderr=stderr,
210 exit_code=process.returncode,
211 duration_ms=_now_ms() - start,
212 )
215@dataclass
216class SimulationActionRunner:
217 """Action runner for simulation mode - prompts user instead of executing.
219 This runner allows users to trace through FSM logic without executing
220 real commands. It can either prompt interactively for results or use
221 predefined scenarios.
223 Attributes:
224 scenario: Predefined result pattern ("all-pass", "all-fail", "first-fail", "alternating")
225 call_count: Number of actions simulated so far
226 calls: List of all actions that would have been executed
227 """
229 scenario: str | None = None
230 call_count: int = 0
231 calls: list[str] = field(default_factory=list)
233 def run(
234 self,
235 action: str,
236 timeout: int,
237 is_slash_command: bool,
238 on_output_line: Callable[[str], None] | None = None,
239 ) -> ActionResult:
240 """Prompt user for simulated result instead of executing.
242 Args:
243 action: The command that would be executed
244 timeout: Timeout (ignored in simulation)
245 is_slash_command: Whether this is a slash command
246 on_output_line: Ignored in simulation
248 Returns:
249 ActionResult with simulated exit code
250 """
251 del timeout, on_output_line # unused in simulation
252 self.calls.append(action)
253 self.call_count += 1
255 cmd_type = "slash command" if is_slash_command else "shell command"
256 print(f" [SIMULATED] Would execute ({cmd_type}): {action}")
258 if self.scenario:
259 exit_code = self._scenario_result()
260 scenario_label = {
261 "all-pass": "Success (scenario: all-pass)",
262 "all-fail": "Failure (scenario: all-fail)",
263 "all-error": "Error (scenario: all-error)",
264 "first-fail": "Failure" if self.call_count == 1 else "Success",
265 "alternating": "Failure" if self.call_count % 2 == 1 else "Success",
266 }.get(self.scenario, "Success")
267 print(f" [AUTO] Result: {scenario_label}")
268 else:
269 exit_code = self._prompt_result()
271 return ActionResult(
272 output=f"[simulated output for: {action}]",
273 stderr="",
274 exit_code=exit_code,
275 duration_ms=0,
276 )
278 def _scenario_result(self) -> int:
279 """Return exit code based on scenario pattern.
281 Returns:
282 0 for success, 1 for failure, 2 for error based on scenario logic
283 """
284 if self.scenario == "all-pass":
285 return 0
286 elif self.scenario == "all-fail":
287 return 1
288 elif self.scenario == "all-error":
289 return 2
290 elif self.scenario == "first-fail":
291 # First call fails, rest pass
292 return 1 if self.call_count == 1 else 0
293 elif self.scenario == "alternating":
294 # Odd calls fail, even calls pass
295 return 1 if self.call_count % 2 == 1 else 0
296 return 0
298 def _prompt_result(self) -> int:
299 """Prompt user for simulated exit code.
301 Returns:
302 Exit code based on user selection
303 """
304 print()
305 print(" ? What should the simulated result be?")
306 print(" 1) Success (exit 0) [default]")
307 print(" 2) Failure (exit 1)")
308 print(" 3) Error (exit 2)")
310 while True:
311 try:
312 sys.stdout.write(" > ")
313 sys.stdout.flush()
314 choice = sys.stdin.readline().strip()
315 if choice in ("1", ""):
316 return 0
317 elif choice == "2":
318 return 1
319 elif choice == "3":
320 return 2
321 print(" Invalid choice. Enter 1, 2, or 3.")
322 except (EOFError, KeyboardInterrupt):
323 print()
324 return 0
327def _now_ms() -> int:
328 """Get current time in milliseconds."""
329 return int(time.time() * 1000)
332def _iso_now() -> str:
333 """Get current time as ISO 8601 string."""
334 return datetime.now(UTC).isoformat()
337class FSMExecutor:
338 """Execute an FSM loop.
340 The executor runs an FSM from its initial state until:
341 - A terminal state is reached
342 - max_iterations is exceeded
343 - A timeout occurs
344 - A shutdown signal is received
345 - An unrecoverable error occurs
347 Events are emitted via the callback for observability.
348 """
350 def __init__(
351 self,
352 fsm: FSMLoop,
353 event_callback: EventCallback | None = None,
354 action_runner: ActionRunner | None = None,
355 signal_detector: SignalDetector | None = None,
356 handoff_handler: HandoffHandler | None = None,
357 loops_dir: Path | None = None,
358 ):
359 """Initialize the executor.
361 Args:
362 fsm: The FSM loop to execute
363 event_callback: Optional callback for events
364 action_runner: Optional custom action runner (for testing)
365 signal_detector: Optional signal detector for output parsing
366 handoff_handler: Optional handler for handoff signals
367 loops_dir: Base directory for resolving sub-loop references
368 """
369 self.fsm = fsm
370 self.event_callback = event_callback or (lambda _: None)
371 self.action_runner: ActionRunner = action_runner or DefaultActionRunner()
372 self.signal_detector = signal_detector
373 self.handoff_handler = handoff_handler
374 self.loops_dir = loops_dir
376 # Runtime state
377 self.current_state = fsm.initial
378 self.iteration = 0
379 self.captured: dict[str, dict[str, Any]] = {}
380 self.prev_result: dict[str, Any] | None = None
381 self.started_at = ""
382 self.start_time_ms = 0
383 self.elapsed_offset_ms = (
384 0 # milliseconds from segments before current run (set by PersistentExecutor on resume)
385 )
387 # Shutdown flag for graceful signal handling
388 self._shutdown_requested = False
390 # Pending handoff signal (set by _run_action, checked by main loop)
391 self._pending_handoff: DetectedSignal | None = None
393 # Pending error payload from FATAL_ERROR signal (set by _run_action, checked by main loop)
394 self._pending_error: str | None = None
396 # Per-state retry tracking for max_retries support.
397 # _retry_counts[state_name] = number of consecutive re-entries into that state.
398 # Incremented each time we enter the same state as the previous iteration.
399 # Reset when a different state is entered, or after retry exhaustion.
400 self._retry_counts: dict[str, int] = {}
401 # State entered in the previous iteration (None on first iteration or after resume).
402 self._prev_state: str | None = None
404 def request_shutdown(self) -> None:
405 """Request graceful shutdown of the executor.
407 Sets a flag that will be checked at the start of each iteration,
408 allowing the loop to exit cleanly after the current state completes.
409 """
410 self._shutdown_requested = True
412 def run(self) -> ExecutionResult:
413 """Execute the FSM until terminal state or limits reached.
415 Returns:
416 ExecutionResult with final state and execution metadata
417 """
418 self.started_at = _iso_now()
419 self.start_time_ms = _now_ms()
421 self._emit("loop_start", {"loop": self.fsm.name})
423 try:
424 while True:
425 # Check shutdown request (signal handling)
426 if self._shutdown_requested:
427 return self._finish("signal")
429 # Check iteration limit
430 if self.iteration >= self.fsm.max_iterations:
431 return self._finish("max_iterations")
433 # Check timeout
434 if self.fsm.timeout:
435 elapsed = _now_ms() - self.start_time_ms + self.elapsed_offset_ms
436 if elapsed > self.fsm.timeout * 1000:
437 return self._finish("timeout")
439 # Get current state config
440 state_config = self.fsm.states[self.current_state]
442 # Update per-state retry tracking based on transition from previous iteration.
443 # If re-entering the same state consecutively, increment retry count.
444 # If entering a different state, clear the previous state's retry count.
445 if self._prev_state is not None:
446 if self.current_state == self._prev_state:
447 self._retry_counts[self.current_state] = (
448 self._retry_counts.get(self.current_state, 0) + 1
449 )
450 else:
451 self._retry_counts.pop(self._prev_state, None)
453 # Check terminal
454 if state_config.terminal:
455 # Handle maintain mode - restart loop instead of terminating
456 if self.fsm.maintain:
457 self.iteration += 1
458 maintain_target = state_config.on_maintain or self.fsm.initial
459 self._emit(
460 "route",
461 {
462 "from": self.current_state,
463 "to": maintain_target,
464 "reason": "maintain",
465 },
466 )
467 self._prev_state = self.current_state
468 self.current_state = maintain_target
469 continue
470 return self._finish("terminal")
472 # Check per-state retry limit. If the consecutive re-entry count exceeds
473 # max_retries, skip execution and route to on_retry_exhausted instead.
474 if state_config.max_retries is not None:
475 retry_count = self._retry_counts.get(self.current_state, 0)
476 if retry_count > state_config.max_retries:
477 # on_retry_exhausted is guaranteed non-None by validation when
478 # max_retries is set, but we fall back to an error if misconfigured.
479 exhausted_state: str = state_config.on_retry_exhausted or ""
480 if not exhausted_state:
481 return self._finish(
482 "error",
483 error=f"State '{self.current_state}' exceeded max_retries "
484 "but on_retry_exhausted is not set",
485 )
486 self._emit(
487 "retry_exhausted",
488 {
489 "state": self.current_state,
490 "retries": retry_count,
491 "next": exhausted_state,
492 },
493 )
494 self._retry_counts.pop(self.current_state, None)
495 self._prev_state = self.current_state
496 self.current_state = exhausted_state
497 continue
499 self.iteration += 1
500 self._emit(
501 "state_enter",
502 {
503 "state": self.current_state,
504 "iteration": self.iteration,
505 },
506 )
508 # Execute state
509 next_state = self._execute_state(state_config)
511 # Check for pending error signal (FATAL_ERROR)
512 if self._pending_error is not None:
513 return self._finish("error", error=self._pending_error)
515 # Check for pending handoff signal
516 if self._pending_handoff:
517 return self._handle_handoff(self._pending_handoff)
519 # Handle maintain mode
520 if next_state is None and self.fsm.maintain:
521 next_state = state_config.on_maintain or self.fsm.initial
523 # SIGKILL in _execute_state sets shutdown flag and returns None
524 if next_state is None and self._shutdown_requested:
525 return self._finish("signal")
527 if next_state is None:
528 return self._finish("error", error="No valid transition")
530 # At this point next_state is guaranteed to be str
531 resolved_next: str = next_state
533 self._emit(
534 "route",
535 {
536 "from": self.current_state,
537 "to": resolved_next,
538 },
539 )
541 self._prev_state = self.current_state
542 self.current_state = resolved_next
544 # Interruptible backoff sleep between iterations
545 if self.fsm.backoff and self.fsm.backoff > 0:
546 deadline = time.time() + self.fsm.backoff
547 while time.time() < deadline:
548 if self._shutdown_requested:
549 break
550 time.sleep(min(0.1, deadline - time.time()))
552 except InterpolationError as exc:
553 return self._finish(
554 "error",
555 error=(
556 f"Missing context variable in state '{self.current_state}': {exc}. "
557 f"Run with: ll-loop run {self.fsm.name} --context KEY=VALUE"
558 ),
559 )
560 except Exception as exc:
561 return self._finish("error", error=str(exc))
563 def _execute_sub_loop(self, state: StateConfig, ctx: InterpolationContext) -> str | None:
564 """Execute a sub-loop state by loading and running a child FSM.
566 Args:
567 state: The state configuration with loop field set
568 ctx: Interpolation context for routing
570 Returns:
571 Next state name based on child loop verdict, or None
572 """
573 from little_loops.cli.loop._helpers import resolve_loop_path
574 from little_loops.fsm.validation import load_and_validate
576 assert state.loop is not None # guarded by caller
577 loop_path = resolve_loop_path(state.loop, self.loops_dir or Path(".loops"))
578 child_fsm, _ = load_and_validate(loop_path)
580 # Pass parent context to child if requested
581 if state.context_passthrough:
582 child_fsm.context = {**self.fsm.context, **self.captured, **child_fsm.context}
584 child_executor = FSMExecutor(
585 child_fsm,
586 action_runner=self.action_runner,
587 loops_dir=self.loops_dir,
588 )
589 child_result = child_executor.run()
591 # Merge child captures back into parent under the state name
592 if state.context_passthrough and child_executor.captured:
593 self.captured[self.current_state] = child_executor.captured
595 # Route based on child termination reason
596 if child_result.terminated_by == "terminal":
597 return interpolate(state.on_yes, ctx) if state.on_yes else None
598 else:
599 # error, max_iterations, timeout, signal — all are failure
600 return interpolate(state.on_no, ctx) if state.on_no else None
602 def _execute_state(self, state: StateConfig) -> str | None:
603 """Execute a single state and return next state name.
605 Args:
606 state: The state configuration to execute
608 Returns:
609 Next state name, or None if no valid transition
610 """
611 # Build interpolation context
612 ctx = self._build_context()
614 # Dispatch to sub-loop handler if this is a sub-loop state
615 if state.loop is not None:
616 try:
617 return self._execute_sub_loop(state, ctx)
618 except (FileNotFoundError, ValueError):
619 if state.on_error:
620 return interpolate(state.on_error, ctx)
621 raise
623 # Handle unconditional transition
624 if state.next:
625 if state.action:
626 result = self._run_action(state.action, state, ctx)
627 self.prev_result = {
628 "output": result.output,
629 "exit_code": result.exit_code,
630 "state": self.current_state,
631 }
632 if result.exit_code is not None and result.exit_code < 0:
633 # Process killed by signal — do not silently advance via next
634 if state.on_error:
635 return interpolate(state.on_error, ctx)
636 self.request_shutdown()
637 return None
638 return interpolate(state.next, ctx)
640 # Execute action if present
641 action_result = None
642 if state.action:
643 action_result = self._run_action(state.action, state, ctx)
645 # Evaluate
646 eval_result = self._evaluate(state, action_result, ctx)
647 self.prev_result = {
648 "output": action_result.output if action_result else "",
649 "exit_code": action_result.exit_code if action_result else 0,
650 "state": self.current_state,
651 }
653 # Update context with result for routing interpolation
654 if eval_result:
655 ctx.result = {
656 "verdict": eval_result.verdict,
657 "details": eval_result.details,
658 }
660 # Route based on verdict
661 verdict = eval_result.verdict if eval_result else "yes"
662 return self._route(state, verdict, ctx)
664 def _run_action(
665 self,
666 action_template: str,
667 state: StateConfig,
668 ctx: InterpolationContext,
669 ) -> ActionResult:
670 """Execute action and optionally capture result.
672 Args:
673 action_template: Action string (may contain variables)
674 state: State configuration
675 ctx: Interpolation context
677 Returns:
678 ActionResult with output and exit code
679 """
680 action = interpolate(action_template, ctx)
681 action_mode = self._action_mode(state)
683 self._emit("action_start", {"action": action, "is_prompt": action_mode == "prompt"})
685 def _on_line(line: str) -> None:
686 self._emit("action_output", {"line": line})
688 if action_mode == "mcp_tool":
689 # Direct MCP tool call — bypass action_runner entirely
690 interpolated_params = interpolate_dict(state.params, ctx) if state.params else {}
691 cmd = ["mcp-call", action, json.dumps(interpolated_params)]
692 result = self._run_subprocess(
693 cmd,
694 timeout=state.timeout or self.fsm.default_timeout or 30,
695 on_output_line=_on_line,
696 )
697 else:
698 result = self.action_runner.run(
699 action,
700 timeout=state.timeout or self.fsm.default_timeout or 3600,
701 is_slash_command=action_mode == "prompt",
702 on_output_line=_on_line,
703 )
705 preview = result.output[-2000:].strip() if result.output else None
706 payload: dict[str, Any] = {
707 "exit_code": result.exit_code,
708 "duration_ms": result.duration_ms,
709 "output_preview": preview,
710 "is_prompt": action_mode == "prompt",
711 }
712 if action_mode == "prompt":
713 session_jsonl = get_current_session_jsonl()
714 payload["session_jsonl"] = str(session_jsonl) if session_jsonl else None
715 self._emit("action_complete", payload)
717 # Capture if requested
718 if state.capture:
719 self.captured[state.capture] = {
720 "output": result.output,
721 "stderr": result.stderr,
722 "exit_code": result.exit_code,
723 "duration_ms": result.duration_ms,
724 }
726 # Check for signals in output
727 if self.signal_detector:
728 signal = self.signal_detector.detect_first(result.output)
729 if signal:
730 if signal.signal_type == "handoff":
731 self._pending_handoff = signal
732 elif signal.signal_type == "error":
733 self._pending_error = signal.payload
734 elif signal.signal_type == "stop":
735 self.request_shutdown()
737 return result
739 def _run_subprocess(
740 self,
741 cmd: list[str],
742 timeout: int,
743 on_output_line: Any | None = None,
744 ) -> ActionResult:
745 """Run a subprocess directly and return ActionResult.
747 Follows the same Popen + stderr-drain-thread pattern as DefaultActionRunner.
749 Args:
750 cmd: Command and arguments to execute
751 timeout: Timeout in seconds
752 on_output_line: Optional callback for each stdout line
754 Returns:
755 ActionResult with output, stderr, exit_code, duration_ms
756 """
757 start = _now_ms()
758 process = subprocess.Popen(
759 cmd,
760 stdout=subprocess.PIPE,
761 stderr=subprocess.PIPE,
762 text=True,
763 )
764 output_chunks: list[str] = []
765 stderr_chunks: list[str] = []
767 def _drain_stderr() -> None:
768 assert process.stderr is not None
769 for line in process.stderr:
770 stderr_chunks.append(line)
772 stderr_thread = threading.Thread(target=_drain_stderr, daemon=True)
773 stderr_thread.start()
775 try:
776 for line in process.stdout: # type: ignore[union-attr]
777 output_chunks.append(line)
778 if on_output_line:
779 on_output_line(line.rstrip())
780 process.wait(timeout=timeout)
781 except subprocess.TimeoutExpired:
782 process.kill()
783 process.wait()
784 stderr_thread.join(timeout=5)
785 return ActionResult(
786 output="".join(output_chunks),
787 stderr="".join(stderr_chunks) or "MCP call timed out",
788 exit_code=124,
789 duration_ms=timeout * 1000,
790 )
791 finally:
792 pass
793 stderr_thread.join(timeout=5)
794 return ActionResult(
795 output="".join(output_chunks),
796 stderr="".join(stderr_chunks),
797 exit_code=process.returncode,
798 duration_ms=_now_ms() - start,
799 )
801 def _evaluate(
802 self,
803 state: StateConfig,
804 action_result: ActionResult | None,
805 ctx: InterpolationContext,
806 ) -> EvaluationResult | None:
807 """Evaluate action result.
809 Args:
810 state: State configuration
811 action_result: Result from action execution (may be None)
812 ctx: Interpolation context
814 Returns:
815 EvaluationResult, or None if no evaluation needed
816 """
817 if state.evaluate is None:
818 # Default evaluation based on action type
819 if action_result:
820 action_mode = self._action_mode(state)
822 if action_mode == "mcp_tool":
823 # MCP tool call: use mcp_result evaluator
824 result = evaluate_mcp_result(action_result.output, action_result.exit_code)
825 elif action_mode == "prompt":
826 # Slash command or prompt: use LLM evaluation
827 if not self.fsm.llm.enabled:
828 result = EvaluationResult(
829 verdict="error",
830 details={"error": "LLM evaluation disabled via --no-llm"},
831 )
832 else:
833 result = evaluate_llm_structured(
834 action_result.output,
835 model=self.fsm.llm.model,
836 max_tokens=self.fsm.llm.max_tokens,
837 timeout=self.fsm.llm.timeout,
838 )
839 else:
840 # Shell command: use exit code
841 result = evaluate_exit_code(action_result.exit_code)
843 self._emit(
844 "evaluate",
845 {
846 "type": "default",
847 "verdict": result.verdict,
848 **result.details,
849 },
850 )
851 return result
852 return None
854 # Explicit evaluation config
855 raw_output = action_result.output if action_result else ""
856 if state.evaluate.source:
857 try:
858 eval_input = interpolate(state.evaluate.source, ctx)
859 except InterpolationError:
860 eval_input = raw_output
861 else:
862 eval_input = raw_output
864 if state.evaluate.type == "llm_structured" and not self.fsm.llm.enabled:
865 result = EvaluationResult(
866 verdict="error",
867 details={"error": "LLM evaluation disabled via --no-llm"},
868 )
869 else:
870 result = evaluate(
871 config=state.evaluate,
872 output=eval_input,
873 exit_code=action_result.exit_code if action_result else 0,
874 context=ctx,
875 )
877 self._emit(
878 "evaluate",
879 {
880 "type": state.evaluate.type,
881 "verdict": result.verdict,
882 **result.details,
883 },
884 )
886 return result
888 def _route(
889 self,
890 state: StateConfig,
891 verdict: str,
892 ctx: InterpolationContext,
893 ) -> str | None:
894 """Determine next state from verdict.
896 Resolution order (from design doc):
897 1. next (unconditional) - handled before this method
898 2. route (full routing table)
899 3. on_success/on_failure/on_error (shorthand)
900 4. terminal - handled in main loop
901 5. error
903 Args:
904 state: State configuration
905 verdict: Verdict string from evaluation
906 ctx: Interpolation context
908 Returns:
909 Next state name, or None if no valid route
910 """
911 if state.route:
912 routes = state.route.routes
913 if verdict in routes:
914 return self._resolve_route(routes[verdict], ctx)
915 if state.route.default:
916 return self._resolve_route(state.route.default, ctx)
917 if verdict == "error" and state.route.error:
918 return self._resolve_route(state.route.error, ctx)
919 return None
921 # Shorthand routing
922 if verdict == "yes" and state.on_yes:
923 return self._resolve_route(state.on_yes, ctx)
924 if verdict == "no" and state.on_no:
925 return self._resolve_route(state.on_no, ctx)
926 if verdict == "error" and state.on_error:
927 return self._resolve_route(state.on_error, ctx)
928 if verdict == "partial" and state.on_partial:
929 return self._resolve_route(state.on_partial, ctx)
930 if verdict == "blocked" and state.on_blocked:
931 return self._resolve_route(state.on_blocked, ctx)
933 return None
935 def _resolve_route(self, route: str, ctx: InterpolationContext) -> str:
936 """Resolve route target, handling special tokens.
938 Args:
939 route: Route target string
940 ctx: Interpolation context
942 Returns:
943 Resolved state name
944 """
945 if route == "$current":
946 return self.current_state
947 return interpolate(route, ctx)
949 def _action_mode(self, state: StateConfig) -> str:
950 """Return execution mode for the state: 'prompt', 'shell', or 'mcp_tool'."""
951 if state.action_type == "mcp_tool":
952 return "mcp_tool"
953 if state.action_type in ("prompt", "slash_command"):
954 return "prompt"
955 if state.action_type == "shell":
956 return "shell"
957 # Heuristic: / prefix = slash_command (prompt mode)
958 if state.action is not None and state.action.startswith("/"):
959 return "prompt"
960 return "shell"
962 def _build_context(self) -> InterpolationContext:
963 """Build interpolation context for current state.
965 Returns:
966 InterpolationContext with all runtime values
967 """
968 return InterpolationContext(
969 context=self.fsm.context,
970 captured=self.captured,
971 prev=self.prev_result,
972 result=None,
973 state_name=self.current_state,
974 iteration=self.iteration,
975 loop_name=self.fsm.name,
976 started_at=self.started_at,
977 elapsed_ms=_now_ms() - self.start_time_ms + self.elapsed_offset_ms,
978 )
980 def _emit(self, event: str, data: dict[str, Any]) -> None:
981 """Emit an event via the callback."""
982 self.event_callback(
983 {
984 "event": event,
985 "ts": _iso_now(),
986 **data,
987 }
988 )
990 def _finish(self, terminated_by: str, error: str | None = None) -> ExecutionResult:
991 """Finalize execution and return result."""
992 self._emit(
993 "loop_complete",
994 {
995 "final_state": self.current_state,
996 "iterations": self.iteration,
997 "terminated_by": terminated_by,
998 },
999 )
1001 return ExecutionResult(
1002 final_state=self.current_state,
1003 iterations=self.iteration,
1004 terminated_by=terminated_by,
1005 duration_ms=_now_ms() - self.start_time_ms + self.elapsed_offset_ms,
1006 captured=self.captured,
1007 error=error,
1008 )
1010 def _handle_handoff(self, signal: DetectedSignal) -> ExecutionResult:
1011 """Handle a detected handoff signal.
1013 Emits a handoff_detected event and optionally invokes the handoff handler.
1015 Args:
1016 signal: The detected handoff signal
1018 Returns:
1019 ExecutionResult with handoff information
1020 """
1021 self._emit(
1022 "handoff_detected",
1023 {
1024 "state": self.current_state,
1025 "iteration": self.iteration,
1026 "continuation": signal.payload,
1027 },
1028 )
1030 # Invoke handler if configured
1031 if self.handoff_handler:
1032 result = self.handoff_handler.handle(self.fsm.name, signal.payload)
1033 if result.spawned_process is not None:
1034 self._emit(
1035 "handoff_spawned",
1036 {
1037 "pid": result.spawned_process.pid,
1038 "state": self.current_state,
1039 },
1040 )
1042 return ExecutionResult(
1043 final_state=self.current_state,
1044 iterations=self.iteration,
1045 terminated_by="handoff",
1046 duration_ms=_now_ms() - self.start_time_ms + self.elapsed_offset_ms,
1047 captured=self.captured,
1048 handoff=True,
1049 continuation_prompt=signal.payload,
1050 )