Coverage for little_loops / fsm / executor.py: 17%

393 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""FSM Executor - Runtime engine for FSM loop execution. 

2 

3This module provides the execution engine that runs FSM loops: 

4- Executes actions (shell commands or slash commands) 

5- Evaluates results using appropriate evaluators 

6- Routes to next states based on verdicts 

7- Tracks iteration count and enforces limits 

8- Manages captured variables and context 

9""" 

10 

11from __future__ import annotations 

12 

13import json 

14import subprocess 

15import sys 

16import threading 

17import time 

18from collections.abc import Callable 

19from dataclasses import dataclass, field 

20from datetime import UTC, datetime 

21from pathlib import Path 

22from typing import Any, Protocol 

23 

24from little_loops.fsm.evaluators import ( 

25 EvaluationResult, 

26 evaluate, 

27 evaluate_exit_code, 

28 evaluate_llm_structured, 

29 evaluate_mcp_result, 

30) 

31from little_loops.fsm.handoff_handler import HandoffHandler 

32from little_loops.fsm.interpolation import ( 

33 InterpolationContext, 

34 InterpolationError, 

35 interpolate, 

36 interpolate_dict, 

37) 

38from little_loops.fsm.schema import FSMLoop, StateConfig 

39from little_loops.fsm.signal_detector import DetectedSignal, SignalDetector 

40from little_loops.session_log import get_current_session_jsonl 

41 

42 

43@dataclass 

44class ExecutionResult: 

45 """Result from FSM execution. 

46 

47 Attributes: 

48 final_state: Name of the state when execution stopped 

49 iterations: Total iterations executed 

50 terminated_by: Reason for termination (terminal, max_iterations, timeout, signal, error, handoff) 

51 duration_ms: Total execution time in milliseconds 

52 captured: All captured variable values 

53 error: Error message if terminated_by is "error" 

54 handoff: True if execution stopped due to handoff signal 

55 continuation_prompt: Continuation context from handoff signal 

56 """ 

57 

58 final_state: str 

59 iterations: int 

60 terminated_by: str # "terminal", "max_iterations", "timeout", "signal", "error", "handoff" 

61 duration_ms: int 

62 captured: dict[str, dict[str, Any]] 

63 error: str | None = None 

64 handoff: bool = False 

65 continuation_prompt: str | None = None 

66 

67 def to_dict(self) -> dict[str, Any]: 

68 """Convert to dictionary for JSON serialization.""" 

69 result: dict[str, Any] = { 

70 "final_state": self.final_state, 

71 "iterations": self.iterations, 

72 "terminated_by": self.terminated_by, 

73 "duration_ms": self.duration_ms, 

74 "captured": self.captured, 

75 } 

76 if self.error is not None: 

77 result["error"] = self.error 

78 if self.handoff: 

79 result["handoff"] = self.handoff 

80 if self.continuation_prompt is not None: 

81 result["continuation_prompt"] = self.continuation_prompt 

82 return result 

83 

84 

85@dataclass 

86class ActionResult: 

87 """Result from action execution. 

88 

89 Attributes: 

90 output: stdout from the action 

91 stderr: stderr from the action 

92 exit_code: Exit code from the action 

93 duration_ms: Execution time in milliseconds 

94 """ 

95 

96 output: str 

97 stderr: str 

98 exit_code: int 

99 duration_ms: int 

100 

101 

102# Type for event callback 

103EventCallback = Callable[[dict[str, Any]], None] 

104 

105 

106class ActionRunner(Protocol): 

107 """Protocol for action execution.""" 

108 

109 def run( 

110 self, 

111 action: str, 

112 timeout: int, 

113 is_slash_command: bool, 

114 on_output_line: Callable[[str], None] | None = None, 

115 ) -> ActionResult: 

116 """Execute an action and return the result. 

117 

118 Args: 

119 action: The command to execute 

120 timeout: Timeout in seconds 

121 is_slash_command: True if this is a slash command (starts with /) 

122 on_output_line: Optional callback invoked for each output line 

123 

124 Returns: 

125 ActionResult with output, stderr, exit_code, duration_ms 

126 """ 

127 ... 

128 

129 

130class DefaultActionRunner: 

131 """Execute actions via subprocess or Claude CLI.""" 

132 

133 def __init__(self) -> None: 

134 self._current_process: subprocess.Popen[str] | None = None 

135 

136 def run( 

137 self, 

138 action: str, 

139 timeout: int, 

140 is_slash_command: bool, 

141 on_output_line: Callable[[str], None] | None = None, 

142 ) -> ActionResult: 

143 """Execute action and return result, streaming output line by line. 

144 

145 Args: 

146 action: The command to execute 

147 timeout: Timeout in seconds 

148 is_slash_command: True if action starts with / 

149 on_output_line: Optional callback invoked for each stdout line 

150 

151 Returns: 

152 ActionResult with execution details 

153 """ 

154 start = _now_ms() 

155 

156 if is_slash_command: 

157 # Execute via Claude CLI 

158 cmd = [ 

159 "claude", 

160 "--dangerously-skip-permissions", 

161 "--no-session-persistence", 

162 "-p", 

163 action, 

164 ] 

165 else: 

166 # Shell command 

167 cmd = ["bash", "-c", action] 

168 

169 process = subprocess.Popen( 

170 cmd, 

171 stdout=subprocess.PIPE, 

172 stderr=subprocess.PIPE, 

173 text=True, 

174 ) 

175 self._current_process = process 

176 output_chunks: list[str] = [] 

177 stderr_chunks: list[str] = [] 

178 

179 def _drain_stderr() -> None: 

180 assert process.stderr is not None 

181 for line in process.stderr: 

182 stderr_chunks.append(line) 

183 

184 stderr_thread = threading.Thread(target=_drain_stderr, daemon=True) 

185 stderr_thread.start() 

186 

187 try: 

188 for line in process.stdout: # type: ignore[union-attr] 

189 output_chunks.append(line) 

190 if on_output_line: 

191 on_output_line(line.rstrip()) 

192 process.wait(timeout=timeout) 

193 except subprocess.TimeoutExpired: 

194 process.kill() 

195 process.wait() 

196 stderr_thread.join(timeout=5) 

197 return ActionResult( 

198 output="".join(output_chunks), 

199 stderr="".join(stderr_chunks) or "Action timed out", 

200 exit_code=124, 

201 duration_ms=timeout * 1000, 

202 ) 

203 finally: 

204 self._current_process = None 

205 stderr_thread.join(timeout=5) 

206 stderr = "".join(stderr_chunks) 

207 return ActionResult( 

208 output="".join(output_chunks), 

209 stderr=stderr, 

210 exit_code=process.returncode, 

211 duration_ms=_now_ms() - start, 

212 ) 

213 

214 

215@dataclass 

216class SimulationActionRunner: 

217 """Action runner for simulation mode - prompts user instead of executing. 

218 

219 This runner allows users to trace through FSM logic without executing 

220 real commands. It can either prompt interactively for results or use 

221 predefined scenarios. 

222 

223 Attributes: 

224 scenario: Predefined result pattern ("all-pass", "all-fail", "first-fail", "alternating") 

225 call_count: Number of actions simulated so far 

226 calls: List of all actions that would have been executed 

227 """ 

228 

229 scenario: str | None = None 

230 call_count: int = 0 

231 calls: list[str] = field(default_factory=list) 

232 

233 def run( 

234 self, 

235 action: str, 

236 timeout: int, 

237 is_slash_command: bool, 

238 on_output_line: Callable[[str], None] | None = None, 

239 ) -> ActionResult: 

240 """Prompt user for simulated result instead of executing. 

241 

242 Args: 

243 action: The command that would be executed 

244 timeout: Timeout (ignored in simulation) 

245 is_slash_command: Whether this is a slash command 

246 on_output_line: Ignored in simulation 

247 

248 Returns: 

249 ActionResult with simulated exit code 

250 """ 

251 del timeout, on_output_line # unused in simulation 

252 self.calls.append(action) 

253 self.call_count += 1 

254 

255 cmd_type = "slash command" if is_slash_command else "shell command" 

256 print(f" [SIMULATED] Would execute ({cmd_type}): {action}") 

257 

258 if self.scenario: 

259 exit_code = self._scenario_result() 

260 scenario_label = { 

261 "all-pass": "Success (scenario: all-pass)", 

262 "all-fail": "Failure (scenario: all-fail)", 

263 "all-error": "Error (scenario: all-error)", 

264 "first-fail": "Failure" if self.call_count == 1 else "Success", 

265 "alternating": "Failure" if self.call_count % 2 == 1 else "Success", 

266 }.get(self.scenario, "Success") 

267 print(f" [AUTO] Result: {scenario_label}") 

268 else: 

269 exit_code = self._prompt_result() 

270 

271 return ActionResult( 

272 output=f"[simulated output for: {action}]", 

273 stderr="", 

274 exit_code=exit_code, 

275 duration_ms=0, 

276 ) 

277 

278 def _scenario_result(self) -> int: 

279 """Return exit code based on scenario pattern. 

280 

281 Returns: 

282 0 for success, 1 for failure, 2 for error based on scenario logic 

283 """ 

284 if self.scenario == "all-pass": 

285 return 0 

286 elif self.scenario == "all-fail": 

287 return 1 

288 elif self.scenario == "all-error": 

289 return 2 

290 elif self.scenario == "first-fail": 

291 # First call fails, rest pass 

292 return 1 if self.call_count == 1 else 0 

293 elif self.scenario == "alternating": 

294 # Odd calls fail, even calls pass 

295 return 1 if self.call_count % 2 == 1 else 0 

296 return 0 

297 

298 def _prompt_result(self) -> int: 

299 """Prompt user for simulated exit code. 

300 

301 Returns: 

302 Exit code based on user selection 

303 """ 

304 print() 

305 print(" ? What should the simulated result be?") 

306 print(" 1) Success (exit 0) [default]") 

307 print(" 2) Failure (exit 1)") 

308 print(" 3) Error (exit 2)") 

309 

310 while True: 

311 try: 

312 sys.stdout.write(" > ") 

313 sys.stdout.flush() 

314 choice = sys.stdin.readline().strip() 

315 if choice in ("1", ""): 

316 return 0 

317 elif choice == "2": 

318 return 1 

319 elif choice == "3": 

320 return 2 

321 print(" Invalid choice. Enter 1, 2, or 3.") 

322 except (EOFError, KeyboardInterrupt): 

323 print() 

324 return 0 

325 

326 

327def _now_ms() -> int: 

328 """Get current time in milliseconds.""" 

329 return int(time.time() * 1000) 

330 

331 

332def _iso_now() -> str: 

333 """Get current time as ISO 8601 string.""" 

334 return datetime.now(UTC).isoformat() 

335 

336 

337class FSMExecutor: 

338 """Execute an FSM loop. 

339 

340 The executor runs an FSM from its initial state until: 

341 - A terminal state is reached 

342 - max_iterations is exceeded 

343 - A timeout occurs 

344 - A shutdown signal is received 

345 - An unrecoverable error occurs 

346 

347 Events are emitted via the callback for observability. 

348 """ 

349 

350 def __init__( 

351 self, 

352 fsm: FSMLoop, 

353 event_callback: EventCallback | None = None, 

354 action_runner: ActionRunner | None = None, 

355 signal_detector: SignalDetector | None = None, 

356 handoff_handler: HandoffHandler | None = None, 

357 loops_dir: Path | None = None, 

358 ): 

359 """Initialize the executor. 

360 

361 Args: 

362 fsm: The FSM loop to execute 

363 event_callback: Optional callback for events 

364 action_runner: Optional custom action runner (for testing) 

365 signal_detector: Optional signal detector for output parsing 

366 handoff_handler: Optional handler for handoff signals 

367 loops_dir: Base directory for resolving sub-loop references 

368 """ 

369 self.fsm = fsm 

370 self.event_callback = event_callback or (lambda _: None) 

371 self.action_runner: ActionRunner = action_runner or DefaultActionRunner() 

372 self.signal_detector = signal_detector 

373 self.handoff_handler = handoff_handler 

374 self.loops_dir = loops_dir 

375 

376 # Runtime state 

377 self.current_state = fsm.initial 

378 self.iteration = 0 

379 self.captured: dict[str, dict[str, Any]] = {} 

380 self.prev_result: dict[str, Any] | None = None 

381 self.started_at = "" 

382 self.start_time_ms = 0 

383 self.elapsed_offset_ms = ( 

384 0 # milliseconds from segments before current run (set by PersistentExecutor on resume) 

385 ) 

386 

387 # Shutdown flag for graceful signal handling 

388 self._shutdown_requested = False 

389 

390 # Pending handoff signal (set by _run_action, checked by main loop) 

391 self._pending_handoff: DetectedSignal | None = None 

392 

393 # Pending error payload from FATAL_ERROR signal (set by _run_action, checked by main loop) 

394 self._pending_error: str | None = None 

395 

396 # Per-state retry tracking for max_retries support. 

397 # _retry_counts[state_name] = number of consecutive re-entries into that state. 

398 # Incremented each time we enter the same state as the previous iteration. 

399 # Reset when a different state is entered, or after retry exhaustion. 

400 self._retry_counts: dict[str, int] = {} 

401 # State entered in the previous iteration (None on first iteration or after resume). 

402 self._prev_state: str | None = None 

403 

404 def request_shutdown(self) -> None: 

405 """Request graceful shutdown of the executor. 

406 

407 Sets a flag that will be checked at the start of each iteration, 

408 allowing the loop to exit cleanly after the current state completes. 

409 """ 

410 self._shutdown_requested = True 

411 

412 def run(self) -> ExecutionResult: 

413 """Execute the FSM until terminal state or limits reached. 

414 

415 Returns: 

416 ExecutionResult with final state and execution metadata 

417 """ 

418 self.started_at = _iso_now() 

419 self.start_time_ms = _now_ms() 

420 

421 self._emit("loop_start", {"loop": self.fsm.name}) 

422 

423 try: 

424 while True: 

425 # Check shutdown request (signal handling) 

426 if self._shutdown_requested: 

427 return self._finish("signal") 

428 

429 # Check iteration limit 

430 if self.iteration >= self.fsm.max_iterations: 

431 return self._finish("max_iterations") 

432 

433 # Check timeout 

434 if self.fsm.timeout: 

435 elapsed = _now_ms() - self.start_time_ms + self.elapsed_offset_ms 

436 if elapsed > self.fsm.timeout * 1000: 

437 return self._finish("timeout") 

438 

439 # Get current state config 

440 state_config = self.fsm.states[self.current_state] 

441 

442 # Update per-state retry tracking based on transition from previous iteration. 

443 # If re-entering the same state consecutively, increment retry count. 

444 # If entering a different state, clear the previous state's retry count. 

445 if self._prev_state is not None: 

446 if self.current_state == self._prev_state: 

447 self._retry_counts[self.current_state] = ( 

448 self._retry_counts.get(self.current_state, 0) + 1 

449 ) 

450 else: 

451 self._retry_counts.pop(self._prev_state, None) 

452 

453 # Check terminal 

454 if state_config.terminal: 

455 # Handle maintain mode - restart loop instead of terminating 

456 if self.fsm.maintain: 

457 self.iteration += 1 

458 maintain_target = state_config.on_maintain or self.fsm.initial 

459 self._emit( 

460 "route", 

461 { 

462 "from": self.current_state, 

463 "to": maintain_target, 

464 "reason": "maintain", 

465 }, 

466 ) 

467 self._prev_state = self.current_state 

468 self.current_state = maintain_target 

469 continue 

470 return self._finish("terminal") 

471 

472 # Check per-state retry limit. If the consecutive re-entry count exceeds 

473 # max_retries, skip execution and route to on_retry_exhausted instead. 

474 if state_config.max_retries is not None: 

475 retry_count = self._retry_counts.get(self.current_state, 0) 

476 if retry_count > state_config.max_retries: 

477 # on_retry_exhausted is guaranteed non-None by validation when 

478 # max_retries is set, but we fall back to an error if misconfigured. 

479 exhausted_state: str = state_config.on_retry_exhausted or "" 

480 if not exhausted_state: 

481 return self._finish( 

482 "error", 

483 error=f"State '{self.current_state}' exceeded max_retries " 

484 "but on_retry_exhausted is not set", 

485 ) 

486 self._emit( 

487 "retry_exhausted", 

488 { 

489 "state": self.current_state, 

490 "retries": retry_count, 

491 "next": exhausted_state, 

492 }, 

493 ) 

494 self._retry_counts.pop(self.current_state, None) 

495 self._prev_state = self.current_state 

496 self.current_state = exhausted_state 

497 continue 

498 

499 self.iteration += 1 

500 self._emit( 

501 "state_enter", 

502 { 

503 "state": self.current_state, 

504 "iteration": self.iteration, 

505 }, 

506 ) 

507 

508 # Execute state 

509 next_state = self._execute_state(state_config) 

510 

511 # Check for pending error signal (FATAL_ERROR) 

512 if self._pending_error is not None: 

513 return self._finish("error", error=self._pending_error) 

514 

515 # Check for pending handoff signal 

516 if self._pending_handoff: 

517 return self._handle_handoff(self._pending_handoff) 

518 

519 # Handle maintain mode 

520 if next_state is None and self.fsm.maintain: 

521 next_state = state_config.on_maintain or self.fsm.initial 

522 

523 # SIGKILL in _execute_state sets shutdown flag and returns None 

524 if next_state is None and self._shutdown_requested: 

525 return self._finish("signal") 

526 

527 if next_state is None: 

528 return self._finish("error", error="No valid transition") 

529 

530 # At this point next_state is guaranteed to be str 

531 resolved_next: str = next_state 

532 

533 self._emit( 

534 "route", 

535 { 

536 "from": self.current_state, 

537 "to": resolved_next, 

538 }, 

539 ) 

540 

541 self._prev_state = self.current_state 

542 self.current_state = resolved_next 

543 

544 # Interruptible backoff sleep between iterations 

545 if self.fsm.backoff and self.fsm.backoff > 0: 

546 deadline = time.time() + self.fsm.backoff 

547 while time.time() < deadline: 

548 if self._shutdown_requested: 

549 break 

550 time.sleep(min(0.1, deadline - time.time())) 

551 

552 except InterpolationError as exc: 

553 return self._finish( 

554 "error", 

555 error=( 

556 f"Missing context variable in state '{self.current_state}': {exc}. " 

557 f"Run with: ll-loop run {self.fsm.name} --context KEY=VALUE" 

558 ), 

559 ) 

560 except Exception as exc: 

561 return self._finish("error", error=str(exc)) 

562 

563 def _execute_sub_loop(self, state: StateConfig, ctx: InterpolationContext) -> str | None: 

564 """Execute a sub-loop state by loading and running a child FSM. 

565 

566 Args: 

567 state: The state configuration with loop field set 

568 ctx: Interpolation context for routing 

569 

570 Returns: 

571 Next state name based on child loop verdict, or None 

572 """ 

573 from little_loops.cli.loop._helpers import resolve_loop_path 

574 from little_loops.fsm.validation import load_and_validate 

575 

576 assert state.loop is not None # guarded by caller 

577 loop_path = resolve_loop_path(state.loop, self.loops_dir or Path(".loops")) 

578 child_fsm, _ = load_and_validate(loop_path) 

579 

580 # Pass parent context to child if requested 

581 if state.context_passthrough: 

582 child_fsm.context = {**self.fsm.context, **self.captured, **child_fsm.context} 

583 

584 child_executor = FSMExecutor( 

585 child_fsm, 

586 action_runner=self.action_runner, 

587 loops_dir=self.loops_dir, 

588 ) 

589 child_result = child_executor.run() 

590 

591 # Merge child captures back into parent under the state name 

592 if state.context_passthrough and child_executor.captured: 

593 self.captured[self.current_state] = child_executor.captured 

594 

595 # Route based on child termination reason 

596 if child_result.terminated_by == "terminal": 

597 return interpolate(state.on_yes, ctx) if state.on_yes else None 

598 else: 

599 # error, max_iterations, timeout, signal — all are failure 

600 return interpolate(state.on_no, ctx) if state.on_no else None 

601 

602 def _execute_state(self, state: StateConfig) -> str | None: 

603 """Execute a single state and return next state name. 

604 

605 Args: 

606 state: The state configuration to execute 

607 

608 Returns: 

609 Next state name, or None if no valid transition 

610 """ 

611 # Build interpolation context 

612 ctx = self._build_context() 

613 

614 # Dispatch to sub-loop handler if this is a sub-loop state 

615 if state.loop is not None: 

616 try: 

617 return self._execute_sub_loop(state, ctx) 

618 except (FileNotFoundError, ValueError): 

619 if state.on_error: 

620 return interpolate(state.on_error, ctx) 

621 raise 

622 

623 # Handle unconditional transition 

624 if state.next: 

625 if state.action: 

626 result = self._run_action(state.action, state, ctx) 

627 self.prev_result = { 

628 "output": result.output, 

629 "exit_code": result.exit_code, 

630 "state": self.current_state, 

631 } 

632 if result.exit_code is not None and result.exit_code < 0: 

633 # Process killed by signal — do not silently advance via next 

634 if state.on_error: 

635 return interpolate(state.on_error, ctx) 

636 self.request_shutdown() 

637 return None 

638 return interpolate(state.next, ctx) 

639 

640 # Execute action if present 

641 action_result = None 

642 if state.action: 

643 action_result = self._run_action(state.action, state, ctx) 

644 

645 # Evaluate 

646 eval_result = self._evaluate(state, action_result, ctx) 

647 self.prev_result = { 

648 "output": action_result.output if action_result else "", 

649 "exit_code": action_result.exit_code if action_result else 0, 

650 "state": self.current_state, 

651 } 

652 

653 # Update context with result for routing interpolation 

654 if eval_result: 

655 ctx.result = { 

656 "verdict": eval_result.verdict, 

657 "details": eval_result.details, 

658 } 

659 

660 # Route based on verdict 

661 verdict = eval_result.verdict if eval_result else "yes" 

662 return self._route(state, verdict, ctx) 

663 

664 def _run_action( 

665 self, 

666 action_template: str, 

667 state: StateConfig, 

668 ctx: InterpolationContext, 

669 ) -> ActionResult: 

670 """Execute action and optionally capture result. 

671 

672 Args: 

673 action_template: Action string (may contain variables) 

674 state: State configuration 

675 ctx: Interpolation context 

676 

677 Returns: 

678 ActionResult with output and exit code 

679 """ 

680 action = interpolate(action_template, ctx) 

681 action_mode = self._action_mode(state) 

682 

683 self._emit("action_start", {"action": action, "is_prompt": action_mode == "prompt"}) 

684 

685 def _on_line(line: str) -> None: 

686 self._emit("action_output", {"line": line}) 

687 

688 if action_mode == "mcp_tool": 

689 # Direct MCP tool call — bypass action_runner entirely 

690 interpolated_params = interpolate_dict(state.params, ctx) if state.params else {} 

691 cmd = ["mcp-call", action, json.dumps(interpolated_params)] 

692 result = self._run_subprocess( 

693 cmd, 

694 timeout=state.timeout or self.fsm.default_timeout or 30, 

695 on_output_line=_on_line, 

696 ) 

697 else: 

698 result = self.action_runner.run( 

699 action, 

700 timeout=state.timeout or self.fsm.default_timeout or 3600, 

701 is_slash_command=action_mode == "prompt", 

702 on_output_line=_on_line, 

703 ) 

704 

705 preview = result.output[-2000:].strip() if result.output else None 

706 payload: dict[str, Any] = { 

707 "exit_code": result.exit_code, 

708 "duration_ms": result.duration_ms, 

709 "output_preview": preview, 

710 "is_prompt": action_mode == "prompt", 

711 } 

712 if action_mode == "prompt": 

713 session_jsonl = get_current_session_jsonl() 

714 payload["session_jsonl"] = str(session_jsonl) if session_jsonl else None 

715 self._emit("action_complete", payload) 

716 

717 # Capture if requested 

718 if state.capture: 

719 self.captured[state.capture] = { 

720 "output": result.output, 

721 "stderr": result.stderr, 

722 "exit_code": result.exit_code, 

723 "duration_ms": result.duration_ms, 

724 } 

725 

726 # Check for signals in output 

727 if self.signal_detector: 

728 signal = self.signal_detector.detect_first(result.output) 

729 if signal: 

730 if signal.signal_type == "handoff": 

731 self._pending_handoff = signal 

732 elif signal.signal_type == "error": 

733 self._pending_error = signal.payload 

734 elif signal.signal_type == "stop": 

735 self.request_shutdown() 

736 

737 return result 

738 

739 def _run_subprocess( 

740 self, 

741 cmd: list[str], 

742 timeout: int, 

743 on_output_line: Any | None = None, 

744 ) -> ActionResult: 

745 """Run a subprocess directly and return ActionResult. 

746 

747 Follows the same Popen + stderr-drain-thread pattern as DefaultActionRunner. 

748 

749 Args: 

750 cmd: Command and arguments to execute 

751 timeout: Timeout in seconds 

752 on_output_line: Optional callback for each stdout line 

753 

754 Returns: 

755 ActionResult with output, stderr, exit_code, duration_ms 

756 """ 

757 start = _now_ms() 

758 process = subprocess.Popen( 

759 cmd, 

760 stdout=subprocess.PIPE, 

761 stderr=subprocess.PIPE, 

762 text=True, 

763 ) 

764 output_chunks: list[str] = [] 

765 stderr_chunks: list[str] = [] 

766 

767 def _drain_stderr() -> None: 

768 assert process.stderr is not None 

769 for line in process.stderr: 

770 stderr_chunks.append(line) 

771 

772 stderr_thread = threading.Thread(target=_drain_stderr, daemon=True) 

773 stderr_thread.start() 

774 

775 try: 

776 for line in process.stdout: # type: ignore[union-attr] 

777 output_chunks.append(line) 

778 if on_output_line: 

779 on_output_line(line.rstrip()) 

780 process.wait(timeout=timeout) 

781 except subprocess.TimeoutExpired: 

782 process.kill() 

783 process.wait() 

784 stderr_thread.join(timeout=5) 

785 return ActionResult( 

786 output="".join(output_chunks), 

787 stderr="".join(stderr_chunks) or "MCP call timed out", 

788 exit_code=124, 

789 duration_ms=timeout * 1000, 

790 ) 

791 finally: 

792 pass 

793 stderr_thread.join(timeout=5) 

794 return ActionResult( 

795 output="".join(output_chunks), 

796 stderr="".join(stderr_chunks), 

797 exit_code=process.returncode, 

798 duration_ms=_now_ms() - start, 

799 ) 

800 

801 def _evaluate( 

802 self, 

803 state: StateConfig, 

804 action_result: ActionResult | None, 

805 ctx: InterpolationContext, 

806 ) -> EvaluationResult | None: 

807 """Evaluate action result. 

808 

809 Args: 

810 state: State configuration 

811 action_result: Result from action execution (may be None) 

812 ctx: Interpolation context 

813 

814 Returns: 

815 EvaluationResult, or None if no evaluation needed 

816 """ 

817 if state.evaluate is None: 

818 # Default evaluation based on action type 

819 if action_result: 

820 action_mode = self._action_mode(state) 

821 

822 if action_mode == "mcp_tool": 

823 # MCP tool call: use mcp_result evaluator 

824 result = evaluate_mcp_result(action_result.output, action_result.exit_code) 

825 elif action_mode == "prompt": 

826 # Slash command or prompt: use LLM evaluation 

827 if not self.fsm.llm.enabled: 

828 result = EvaluationResult( 

829 verdict="error", 

830 details={"error": "LLM evaluation disabled via --no-llm"}, 

831 ) 

832 else: 

833 result = evaluate_llm_structured( 

834 action_result.output, 

835 model=self.fsm.llm.model, 

836 max_tokens=self.fsm.llm.max_tokens, 

837 timeout=self.fsm.llm.timeout, 

838 ) 

839 else: 

840 # Shell command: use exit code 

841 result = evaluate_exit_code(action_result.exit_code) 

842 

843 self._emit( 

844 "evaluate", 

845 { 

846 "type": "default", 

847 "verdict": result.verdict, 

848 **result.details, 

849 }, 

850 ) 

851 return result 

852 return None 

853 

854 # Explicit evaluation config 

855 raw_output = action_result.output if action_result else "" 

856 if state.evaluate.source: 

857 try: 

858 eval_input = interpolate(state.evaluate.source, ctx) 

859 except InterpolationError: 

860 eval_input = raw_output 

861 else: 

862 eval_input = raw_output 

863 

864 if state.evaluate.type == "llm_structured" and not self.fsm.llm.enabled: 

865 result = EvaluationResult( 

866 verdict="error", 

867 details={"error": "LLM evaluation disabled via --no-llm"}, 

868 ) 

869 else: 

870 result = evaluate( 

871 config=state.evaluate, 

872 output=eval_input, 

873 exit_code=action_result.exit_code if action_result else 0, 

874 context=ctx, 

875 ) 

876 

877 self._emit( 

878 "evaluate", 

879 { 

880 "type": state.evaluate.type, 

881 "verdict": result.verdict, 

882 **result.details, 

883 }, 

884 ) 

885 

886 return result 

887 

888 def _route( 

889 self, 

890 state: StateConfig, 

891 verdict: str, 

892 ctx: InterpolationContext, 

893 ) -> str | None: 

894 """Determine next state from verdict. 

895 

896 Resolution order (from design doc): 

897 1. next (unconditional) - handled before this method 

898 2. route (full routing table) 

899 3. on_success/on_failure/on_error (shorthand) 

900 4. terminal - handled in main loop 

901 5. error 

902 

903 Args: 

904 state: State configuration 

905 verdict: Verdict string from evaluation 

906 ctx: Interpolation context 

907 

908 Returns: 

909 Next state name, or None if no valid route 

910 """ 

911 if state.route: 

912 routes = state.route.routes 

913 if verdict in routes: 

914 return self._resolve_route(routes[verdict], ctx) 

915 if state.route.default: 

916 return self._resolve_route(state.route.default, ctx) 

917 if verdict == "error" and state.route.error: 

918 return self._resolve_route(state.route.error, ctx) 

919 return None 

920 

921 # Shorthand routing 

922 if verdict == "yes" and state.on_yes: 

923 return self._resolve_route(state.on_yes, ctx) 

924 if verdict == "no" and state.on_no: 

925 return self._resolve_route(state.on_no, ctx) 

926 if verdict == "error" and state.on_error: 

927 return self._resolve_route(state.on_error, ctx) 

928 if verdict == "partial" and state.on_partial: 

929 return self._resolve_route(state.on_partial, ctx) 

930 if verdict == "blocked" and state.on_blocked: 

931 return self._resolve_route(state.on_blocked, ctx) 

932 

933 return None 

934 

935 def _resolve_route(self, route: str, ctx: InterpolationContext) -> str: 

936 """Resolve route target, handling special tokens. 

937 

938 Args: 

939 route: Route target string 

940 ctx: Interpolation context 

941 

942 Returns: 

943 Resolved state name 

944 """ 

945 if route == "$current": 

946 return self.current_state 

947 return interpolate(route, ctx) 

948 

949 def _action_mode(self, state: StateConfig) -> str: 

950 """Return execution mode for the state: 'prompt', 'shell', or 'mcp_tool'.""" 

951 if state.action_type == "mcp_tool": 

952 return "mcp_tool" 

953 if state.action_type in ("prompt", "slash_command"): 

954 return "prompt" 

955 if state.action_type == "shell": 

956 return "shell" 

957 # Heuristic: / prefix = slash_command (prompt mode) 

958 if state.action is not None and state.action.startswith("/"): 

959 return "prompt" 

960 return "shell" 

961 

962 def _build_context(self) -> InterpolationContext: 

963 """Build interpolation context for current state. 

964 

965 Returns: 

966 InterpolationContext with all runtime values 

967 """ 

968 return InterpolationContext( 

969 context=self.fsm.context, 

970 captured=self.captured, 

971 prev=self.prev_result, 

972 result=None, 

973 state_name=self.current_state, 

974 iteration=self.iteration, 

975 loop_name=self.fsm.name, 

976 started_at=self.started_at, 

977 elapsed_ms=_now_ms() - self.start_time_ms + self.elapsed_offset_ms, 

978 ) 

979 

980 def _emit(self, event: str, data: dict[str, Any]) -> None: 

981 """Emit an event via the callback.""" 

982 self.event_callback( 

983 { 

984 "event": event, 

985 "ts": _iso_now(), 

986 **data, 

987 } 

988 ) 

989 

990 def _finish(self, terminated_by: str, error: str | None = None) -> ExecutionResult: 

991 """Finalize execution and return result.""" 

992 self._emit( 

993 "loop_complete", 

994 { 

995 "final_state": self.current_state, 

996 "iterations": self.iteration, 

997 "terminated_by": terminated_by, 

998 }, 

999 ) 

1000 

1001 return ExecutionResult( 

1002 final_state=self.current_state, 

1003 iterations=self.iteration, 

1004 terminated_by=terminated_by, 

1005 duration_ms=_now_ms() - self.start_time_ms + self.elapsed_offset_ms, 

1006 captured=self.captured, 

1007 error=error, 

1008 ) 

1009 

1010 def _handle_handoff(self, signal: DetectedSignal) -> ExecutionResult: 

1011 """Handle a detected handoff signal. 

1012 

1013 Emits a handoff_detected event and optionally invokes the handoff handler. 

1014 

1015 Args: 

1016 signal: The detected handoff signal 

1017 

1018 Returns: 

1019 ExecutionResult with handoff information 

1020 """ 

1021 self._emit( 

1022 "handoff_detected", 

1023 { 

1024 "state": self.current_state, 

1025 "iteration": self.iteration, 

1026 "continuation": signal.payload, 

1027 }, 

1028 ) 

1029 

1030 # Invoke handler if configured 

1031 if self.handoff_handler: 

1032 result = self.handoff_handler.handle(self.fsm.name, signal.payload) 

1033 if result.spawned_process is not None: 

1034 self._emit( 

1035 "handoff_spawned", 

1036 { 

1037 "pid": result.spawned_process.pid, 

1038 "state": self.current_state, 

1039 }, 

1040 ) 

1041 

1042 return ExecutionResult( 

1043 final_state=self.current_state, 

1044 iterations=self.iteration, 

1045 terminated_by="handoff", 

1046 duration_ms=_now_ms() - self.start_time_ms + self.elapsed_offset_ms, 

1047 captured=self.captured, 

1048 handoff=True, 

1049 continuation_prompt=signal.payload, 

1050 )