Coverage for little_loops / cli / loop.py: 0%
663 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-02-13 16:40 -0600
« prev ^ index » next coverage.py v7.12.0, created at 2026-02-13 16:40 -0600
1"""ll-loop: Execute FSM-based automation loops."""
3from __future__ import annotations
5import argparse
6from pathlib import Path
7from typing import Any
10def main_loop() -> int:
11 """Entry point for ll-loop command.
13 Execute FSM-based automation loops.
15 Returns:
16 Exit code (0 = success)
17 """
18 import yaml
20 from little_loops.config import BRConfig
21 from little_loops.fsm.compilers import compile_paradigm
22 from little_loops.fsm.concurrency import LockManager
23 from little_loops.fsm.persistence import (
24 PersistentExecutor,
25 StatePersistence,
26 get_loop_history,
27 list_running_loops,
28 )
29 from little_loops.fsm.schema import FSMLoop
30 from little_loops.fsm.validation import load_and_validate
32 # Load config for loops_dir
33 config = BRConfig(Path.cwd())
34 loops_dir = Path(config.loops.loops_dir)
36 # Check if first positional arg is a subcommand or a loop name
37 # This enables "ll-loop fix-types" shorthand for "ll-loop run fix-types"
38 known_subcommands = {
39 "run",
40 "compile",
41 "validate",
42 "list",
43 "status",
44 "stop",
45 "resume",
46 "history",
47 "test",
48 "simulate",
49 "install",
50 "show",
51 }
53 # Pre-process args: if first positional arg is not a subcommand, insert "run"
54 import sys as _sys
56 argv = _sys.argv[1:]
57 if argv and not argv[0].startswith("-") and argv[0] not in known_subcommands:
58 # First arg is a loop name, not a subcommand - insert "run"
59 argv = ["run"] + argv
61 parser = argparse.ArgumentParser(
62 prog="ll-loop",
63 description="Execute FSM-based automation loops",
64 formatter_class=argparse.RawDescriptionHelpFormatter,
65 epilog="""
66Examples:
67 %(prog)s fix-types # Run loop from .loops/fix-types.yaml
68 %(prog)s run fix-types --dry-run # Show execution plan
69 %(prog)s validate fix-types # Validate loop definition
70 %(prog)s test fix-types # Run single test iteration
71 %(prog)s simulate fix-types # Interactive simulation (dry-run with prompts)
72 %(prog)s compile paradigm.yaml # Compile paradigm to FSM
73 %(prog)s list # List available loops
74 %(prog)s list --running # List running loops
75 %(prog)s status fix-types # Show loop status
76 %(prog)s stop fix-types # Stop a running loop
77 %(prog)s resume fix-types # Resume interrupted loop
78 %(prog)s history fix-types # Show execution history
79""",
80 )
82 subparsers = parser.add_subparsers(dest="command")
84 # Run subcommand
85 run_parser = subparsers.add_parser("run", help="Run a loop")
86 run_parser.add_argument("loop", help="Loop name or path")
87 run_parser.add_argument("--max-iterations", "-n", type=int, help="Override iteration limit")
88 run_parser.add_argument("--no-llm", action="store_true", help="Disable LLM evaluation")
89 run_parser.add_argument("--llm-model", type=str, help="Override LLM model")
90 run_parser.add_argument(
91 "--dry-run", action="store_true", help="Show execution plan without running"
92 )
93 run_parser.add_argument(
94 "--background", "-b", action="store_true", help="Run as daemon (not yet implemented)"
95 )
96 run_parser.add_argument("--quiet", "-q", action="store_true", help="Suppress progress output")
97 run_parser.add_argument(
98 "--queue", action="store_true", help="Wait for conflicting loops to finish"
99 )
101 # Compile subcommand
102 compile_parser = subparsers.add_parser("compile", help="Compile paradigm to FSM")
103 compile_parser.add_argument("input", help="Input paradigm YAML file")
104 compile_parser.add_argument("-o", "--output", help="Output FSM YAML file")
106 # Validate subcommand
107 validate_parser = subparsers.add_parser("validate", help="Validate loop definition")
108 validate_parser.add_argument("loop", help="Loop name or path")
110 # List subcommand
111 list_parser = subparsers.add_parser("list", help="List loops")
112 list_parser.add_argument("--running", action="store_true", help="Only show running loops")
114 # Status subcommand
115 status_parser = subparsers.add_parser("status", help="Show loop status")
116 status_parser.add_argument("loop", help="Loop name")
118 # Stop subcommand
119 stop_parser = subparsers.add_parser("stop", help="Stop a running loop")
120 stop_parser.add_argument("loop", help="Loop name")
122 # Resume subcommand
123 resume_parser = subparsers.add_parser("resume", help="Resume an interrupted loop")
124 resume_parser.add_argument("loop", help="Loop name or path")
126 # History subcommand
127 history_parser = subparsers.add_parser("history", help="Show loop execution history")
128 history_parser.add_argument("loop", help="Loop name")
129 history_parser.add_argument(
130 "--tail", "-n", type=int, default=50, help="Last N events (default: 50)"
131 )
133 # Test subcommand
134 test_parser = subparsers.add_parser(
135 "test", help="Run a single test iteration to verify loop configuration"
136 )
137 test_parser.add_argument("loop", help="Loop name")
139 # Simulate subcommand
140 simulate_parser = subparsers.add_parser(
141 "simulate",
142 help="Trace loop execution interactively without running commands",
143 )
144 simulate_parser.add_argument("loop", help="Loop name or path")
145 simulate_parser.add_argument(
146 "--scenario",
147 choices=["all-pass", "all-fail", "first-fail", "alternating"],
148 help="Auto-select results based on pattern instead of prompting",
149 )
150 simulate_parser.add_argument(
151 "--max-iterations",
152 "-n",
153 type=int,
154 help="Override max iterations for simulation (default: min of loop config or 20)",
155 )
157 # Install subcommand
158 install_parser = subparsers.add_parser(
159 "install",
160 help="Copy a built-in loop to .loops/ for customization",
161 )
162 install_parser.add_argument("loop", help="Built-in loop name to install")
164 # Show subcommand
165 show_parser = subparsers.add_parser("show", help="Show loop details and structure")
166 show_parser.add_argument("loop", help="Loop name or path")
168 args = parser.parse_args(argv)
170 from little_loops.logger import Logger
172 logger = Logger(verbose=not getattr(args, "quiet", False))
174 def get_builtin_loops_dir() -> Path:
175 """Get the path to built-in loops bundled with the plugin."""
176 return Path(__file__).parent.parent.parent.parent / "loops"
178 def resolve_loop_path(name_or_path: str) -> Path:
179 """Resolve loop name to path, preferring compiled FSM over paradigm."""
180 path = Path(name_or_path)
181 if path.exists():
182 return path
184 # Try <loops_dir>/<name>.fsm.yaml first (compiled FSM)
185 fsm_path = loops_dir / f"{name_or_path}.fsm.yaml"
186 if fsm_path.exists():
187 return fsm_path
189 # Fall back to <loops_dir>/<name>.yaml (paradigm)
190 loops_path = loops_dir / f"{name_or_path}.yaml"
191 if loops_path.exists():
192 return loops_path
194 # Fall back to built-in loops from plugin directory
195 builtin_path = get_builtin_loops_dir() / f"{name_or_path}.yaml"
196 if builtin_path.exists():
197 return builtin_path
199 raise FileNotFoundError(f"Loop not found: {name_or_path}")
201 def print_execution_plan(fsm: FSMLoop) -> None:
202 """Print dry-run execution plan."""
203 print(f"Execution plan for: {fsm.name}")
204 print()
205 print("States:")
206 for name, state in fsm.states.items():
207 terminal_marker = " [TERMINAL]" if state.terminal else ""
208 print(f" [{name}]{terminal_marker}")
209 if state.action:
210 if len(state.action) > 70:
211 action_display = state.action[:70] + "..."
212 else:
213 action_display = state.action
214 print(f" action: {action_display}")
215 if state.evaluate:
216 print(f" evaluate: {state.evaluate.type}")
217 if state.on_success:
218 print(f" on_success -> {state.on_success}")
219 if state.on_failure:
220 print(f" on_failure -> {state.on_failure}")
221 if state.on_error:
222 print(f" on_error -> {state.on_error}")
223 if state.next:
224 print(f" next -> {state.next}")
225 if state.route:
226 print(" route:")
227 for verdict, target in state.route.routes.items():
228 print(f" {verdict} -> {target}")
229 if state.route.default:
230 print(f" _ -> {state.route.default}")
231 print()
232 print(f"Initial state: {fsm.initial}")
233 print(f"Max iterations: {fsm.max_iterations}")
234 if fsm.timeout:
235 print(f"Timeout: {fsm.timeout}s")
237 def run_foreground(executor: PersistentExecutor, fsm: FSMLoop) -> int:
238 """Run loop with progress display."""
239 if not getattr(args, "quiet", False):
240 print(f"Running loop: {fsm.name}")
241 print(f"Max iterations: {fsm.max_iterations}")
242 print()
244 current_iteration = [0] # Use list to allow mutation in closure
246 def display_progress(event: dict) -> None:
247 """Display progress for events."""
248 event_type = event.get("event")
250 if event_type == "state_enter":
251 current_iteration[0] = event.get("iteration", 0)
252 state = event.get("state", "")
253 print(f"[{current_iteration[0]}/{fsm.max_iterations}] {state}", end="")
255 elif event_type == "action_start":
256 action = event.get("action", "")
257 action_display = action[:60] + "..." if len(action) > 60 else action
258 print(f" -> {action_display}")
260 elif event_type == "evaluate":
261 verdict = event.get("verdict", "")
262 confidence = event.get("confidence")
263 if verdict in ("success", "target", "progress"):
264 symbol = "\u2713" # checkmark
265 else:
266 symbol = "\u2717" # x mark
267 if confidence is not None:
268 print(f" {symbol} {verdict} (confidence: {confidence:.2f})")
269 else:
270 print(f" {symbol} {verdict}")
272 elif event_type == "route":
273 to_state = event.get("to", "")
274 print(f" -> {to_state}")
276 # Create wrapper to combine persistence callback with progress display
277 original_handle = executor._handle_event
278 quiet = getattr(args, "quiet", False)
280 def combined_handler(event: dict) -> None:
281 original_handle(event)
282 if not quiet:
283 display_progress(event)
285 # Use object.__setattr__ to bypass method assignment check
286 object.__setattr__(executor, "_handle_event", combined_handler)
288 result = executor.run()
290 if not quiet:
291 print()
292 duration_sec = result.duration_ms / 1000
293 if duration_sec < 60:
294 duration_str = f"{duration_sec:.1f}s"
295 else:
296 minutes = int(duration_sec // 60)
297 seconds = duration_sec % 60
298 duration_str = f"{minutes}m {seconds:.0f}s"
299 print(
300 f"Loop completed: {result.final_state} "
301 f"({result.iterations} iterations, {duration_str})"
302 )
304 return 0 if result.terminated_by == "terminal" else 1
306 def cmd_run(loop_name: str) -> int:
307 """Run a loop."""
308 try:
309 path = resolve_loop_path(loop_name)
311 # Load the file to check format
312 with open(path) as f:
313 spec = yaml.safe_load(f)
315 # Auto-compile if it's a paradigm file (has 'paradigm' but no 'initial')
316 if "paradigm" in spec and "initial" not in spec:
317 logger.info(f"Auto-compiling paradigm file: {path}")
318 fsm = compile_paradigm(spec)
319 else:
320 fsm = load_and_validate(path)
321 except FileNotFoundError as e:
322 logger.error(str(e))
323 return 1
324 except ValueError as e:
325 logger.error(f"Validation error: {e}")
326 return 1
328 # Apply overrides
329 if args.max_iterations:
330 fsm.max_iterations = args.max_iterations
331 if args.no_llm:
332 fsm.llm.enabled = False
333 if args.llm_model:
334 fsm.llm.model = args.llm_model
336 # Dry run
337 if args.dry_run:
338 print_execution_plan(fsm)
339 return 0
341 # Background mode not implemented
342 if getattr(args, "background", False):
343 logger.warning("Background mode not yet implemented, running in foreground")
345 # Scope-based locking
346 lock_manager = LockManager(loops_dir)
347 scope = fsm.scope or ["."]
349 if not lock_manager.acquire(fsm.name, scope):
350 conflict = lock_manager.find_conflict(scope)
351 if conflict and getattr(args, "queue", False):
352 logger.info(f"Waiting for conflicting loop '{conflict.loop_name}' to finish...")
353 if not lock_manager.wait_for_scope(scope, timeout=3600):
354 logger.error("Timeout waiting for scope to become available")
355 return 1
356 # Re-acquire after waiting
357 if not lock_manager.acquire(fsm.name, scope):
358 logger.error("Failed to acquire lock after waiting")
359 return 1
360 elif conflict:
361 logger.error(f"Scope conflict with running loop: {conflict.loop_name}")
362 logger.info(f" Conflicting scope: {conflict.scope}")
363 logger.info(" Use --queue to wait for it to finish")
364 return 1
365 else:
366 # Unexpected: find_conflict returned None but acquire failed
367 logger.error("Failed to acquire scope lock (unknown reason)")
368 return 1
370 try:
371 executor = PersistentExecutor(fsm, loops_dir=loops_dir)
372 return run_foreground(executor, fsm)
373 finally:
374 lock_manager.release(fsm.name)
376 def cmd_compile() -> int:
377 """Compile paradigm YAML to FSM."""
378 input_path = Path(args.input)
379 if not input_path.exists():
380 logger.error(f"Input file not found: {input_path}")
381 return 1
383 try:
384 with open(input_path) as f:
385 spec = yaml.safe_load(f)
386 fsm = compile_paradigm(spec)
387 except ValueError as e:
388 logger.error(f"Compilation error: {e}")
389 return 1
390 except yaml.YAMLError as e:
391 logger.error(f"YAML parse error: {e}")
392 return 1
394 output_path = (
395 Path(args.output)
396 if args.output
397 else Path(str(input_path).replace(".yaml", ".fsm.yaml"))
398 )
400 # Convert FSMLoop to dict for YAML output
401 fsm_dict: dict[str, Any] = {
402 "name": fsm.name,
403 "paradigm": fsm.paradigm,
404 "initial": fsm.initial,
405 "states": {name: state.to_dict() for name, state in fsm.states.items()},
406 "max_iterations": fsm.max_iterations,
407 }
408 if fsm.context:
409 fsm_dict["context"] = fsm.context
410 if fsm.maintain:
411 fsm_dict["maintain"] = fsm.maintain
412 if fsm.backoff:
413 fsm_dict["backoff"] = fsm.backoff
414 if fsm.timeout:
415 fsm_dict["timeout"] = fsm.timeout
417 with open(output_path, "w") as f:
418 yaml.dump(fsm_dict, f, default_flow_style=False, sort_keys=False)
420 logger.success(f"Compiled to: {output_path}")
421 return 0
423 def cmd_validate(loop_name: str) -> int:
424 """Validate a loop definition."""
425 try:
426 path = resolve_loop_path(loop_name)
428 # Load the file to check format
429 with open(path) as f:
430 spec = yaml.safe_load(f)
432 # Auto-compile if it's a paradigm file (has 'paradigm' but no 'initial')
433 if "paradigm" in spec and "initial" not in spec:
434 logger.info(f"Compiling paradigm file for validation: {path}")
435 fsm = compile_paradigm(spec)
436 else:
437 fsm = load_and_validate(path)
439 logger.success(f"{loop_name} is valid")
440 print(f" States: {', '.join(fsm.states.keys())}")
441 print(f" Initial: {fsm.initial}")
442 print(f" Max iterations: {fsm.max_iterations}")
443 return 0
444 except FileNotFoundError as e:
445 logger.error(str(e))
446 return 1
447 except ValueError as e:
448 logger.error(f"{loop_name} is invalid: {e}")
449 return 1
451 def cmd_list() -> int:
452 """List loops."""
453 if getattr(args, "running", False):
454 states = list_running_loops(loops_dir)
455 if not states:
456 print("No running loops")
457 return 0
458 print("Running loops:")
459 for state in states:
460 print(f" {state.loop_name}: {state.current_state} (iteration {state.iteration})")
461 return 0
463 # Collect project loops
464 project_names: set[str] = set()
465 yaml_files: list[Path] = []
466 if loops_dir.exists():
467 yaml_files = list(loops_dir.glob("*.yaml"))
468 project_names = {p.stem for p in yaml_files}
470 # Collect built-in loops (excluding those overridden by project)
471 builtin_dir = get_builtin_loops_dir()
472 builtin_files: list[Path] = []
473 if builtin_dir.exists():
474 builtin_files = [
475 f for f in sorted(builtin_dir.glob("*.yaml")) if f.stem not in project_names
476 ]
478 if not yaml_files and not builtin_files:
479 print("No loops available")
480 return 0
482 print("Available loops:")
483 for path in sorted(yaml_files):
484 print(f" {path.stem}")
485 for path in builtin_files:
486 print(f" {path.stem} [built-in]")
487 return 0
489 def cmd_install(loop_name: str) -> int:
490 """Copy a built-in loop to .loops/ for customization."""
491 import shutil
493 builtin_dir = get_builtin_loops_dir()
494 source = builtin_dir / f"{loop_name}.yaml"
496 if not source.exists():
497 available = [f.stem for f in builtin_dir.glob("*.yaml")] if builtin_dir.exists() else []
498 logger.error(f"No built-in loop named '{loop_name}'")
499 if available:
500 print(f"Available built-in loops: {', '.join(sorted(available))}")
501 return 1
503 loops_dir.mkdir(exist_ok=True)
504 dest = loops_dir / f"{loop_name}.yaml"
506 if dest.exists():
507 logger.error(f"Loop already exists: {dest}")
508 print("Remove it first or edit it directly.")
509 return 1
511 shutil.copy2(source, dest)
512 print(f"Installed {loop_name} to {dest}")
513 print("You can now customize it by editing the file.")
514 return 0
516 def cmd_status(loop_name: str) -> int:
517 """Show loop status."""
518 persistence = StatePersistence(loop_name, loops_dir)
519 state = persistence.load_state()
521 if state is None:
522 logger.error(f"No state found for: {loop_name}")
523 return 1
525 print(f"Loop: {state.loop_name}")
526 print(f"Status: {state.status}")
527 print(f"Current state: {state.current_state}")
528 print(f"Iteration: {state.iteration}")
529 print(f"Started: {state.started_at}")
530 print(f"Updated: {state.updated_at}")
531 if state.continuation_prompt:
532 # Show truncated continuation context
533 prompt_preview = state.continuation_prompt[:200]
534 if len(state.continuation_prompt) > 200:
535 prompt_preview += "..."
536 print(f"Continuation context: {prompt_preview}")
537 return 0
539 def cmd_stop(loop_name: str) -> int:
540 """Stop a running loop."""
541 persistence = StatePersistence(loop_name, loops_dir)
542 state = persistence.load_state()
544 if state is None:
545 logger.error(f"No state found for: {loop_name}")
546 return 1
548 if state.status != "running":
549 logger.error(f"Loop not running: {loop_name} (status: {state.status})")
550 return 1
552 state.status = "interrupted"
553 persistence.save_state(state)
554 logger.success(f"Marked {loop_name} as interrupted")
555 return 0
557 def cmd_resume(loop_name: str) -> int:
558 """Resume an interrupted loop."""
559 try:
560 path = resolve_loop_path(loop_name)
562 # Load the file to check format
563 with open(path) as f:
564 spec = yaml.safe_load(f)
566 # Auto-compile if it's a paradigm file (has 'paradigm' but no 'initial')
567 if "paradigm" in spec and "initial" not in spec:
568 logger.info(f"Auto-compiling paradigm file: {path}")
569 fsm = compile_paradigm(spec)
570 else:
571 fsm = load_and_validate(path)
572 except FileNotFoundError as e:
573 logger.error(str(e))
574 return 1
575 except ValueError as e:
576 logger.error(f"Validation error: {e}")
577 return 1
579 # Check state before resuming to show context
580 persistence = StatePersistence(loop_name, loops_dir)
581 state = persistence.load_state()
582 if state and state.status == "awaiting_continuation":
583 print(f"Resuming from context handoff (iteration {state.iteration})...")
584 if state.continuation_prompt:
585 # Show truncated continuation context
586 prompt_preview = state.continuation_prompt[:500]
587 if len(state.continuation_prompt) > 500:
588 prompt_preview += "..."
589 print(f"Context: {prompt_preview}")
590 print()
592 executor = PersistentExecutor(fsm, loops_dir=loops_dir)
593 result = executor.resume()
595 if result is None:
596 logger.warning(f"Nothing to resume for: {loop_name}")
597 return 1
599 duration_sec = result.duration_ms / 1000
600 if duration_sec < 60:
601 duration_str = f"{duration_sec:.1f}s"
602 else:
603 minutes = int(duration_sec // 60)
604 seconds = duration_sec % 60
605 duration_str = f"{minutes}m {seconds:.0f}s"
607 logger.success(
608 f"Resumed and completed: {result.final_state} "
609 f"({result.iterations} iterations, {duration_str})"
610 )
611 return 0 if result.terminated_by == "terminal" else 1
613 def cmd_history(loop_name: str) -> int:
614 """Show loop history."""
615 events = get_loop_history(loop_name, loops_dir)
617 if not events:
618 print(f"No history for: {loop_name}")
619 return 0
621 # Show last N events
622 tail = getattr(args, "tail", 50)
623 for event in events[-tail:]:
624 ts = event.get("ts", "")[:19] # Truncate to seconds
625 event_type = event.get("event", "")
626 details = {k: v for k, v in event.items() if k not in ("event", "ts")}
627 print(f"{ts} {event_type}: {details}")
629 return 0
631 def cmd_test(loop_name: str) -> int:
632 """Run a single test iteration to verify loop configuration.
634 Executes the initial state's action and evaluation, then reports
635 what the loop would do without actually transitioning further.
636 """
637 from little_loops.fsm.evaluators import EvaluationResult, evaluate, evaluate_exit_code
638 from little_loops.fsm.executor import DefaultActionRunner
639 from little_loops.fsm.interpolation import InterpolationContext
641 try:
642 path = resolve_loop_path(loop_name)
644 # Load the file to check format
645 with open(path) as f:
646 spec = yaml.safe_load(f)
648 # Auto-compile if it's a paradigm file
649 if "paradigm" in spec and "initial" not in spec:
650 fsm = compile_paradigm(spec)
651 else:
652 fsm = load_and_validate(path)
653 except FileNotFoundError as e:
654 logger.error(str(e))
655 return 1
656 except ValueError as e:
657 logger.error(f"Validation error: {e}")
658 return 1
660 # Get initial state
661 initial = fsm.initial
662 state_config = fsm.states[initial]
664 print(f"## Test Iteration: {loop_name}")
665 print()
666 print(f"State: {initial}")
668 # If no action, report and exit
669 if not state_config.action:
670 print(f"Initial state '{initial}' has no action to test")
671 print()
672 print("\u2713 Loop structure is valid (no check action to execute)")
673 return 0
675 action = state_config.action
676 is_slash = action.startswith("/") or state_config.action_type in (
677 "prompt",
678 "slash_command",
679 )
681 print(f"Action: {action}")
682 print()
684 if is_slash:
685 print("Note: Slash commands require Claude CLI; skipping actual execution.")
686 print()
687 print("Verdict: SKIPPED (slash command)")
688 print()
689 print("\u2713 Loop structure is valid (slash command not executed)")
690 return 0
692 # Run the action
693 runner = DefaultActionRunner()
694 timeout = state_config.timeout or 120
695 result = runner.run(action, timeout=timeout, is_slash_command=False)
697 print(f"Exit code: {result.exit_code}")
699 # Truncate output for display
700 output_lines = result.output.strip().split("\n")
701 if len(output_lines) > 10:
702 extra = len(output_lines) - 10
703 output_preview = "\n".join(output_lines[:10]) + f"\n... ({extra} more lines)"
704 elif len(result.output) > 500:
705 output_preview = result.output[:500] + "..."
706 else:
707 output_preview = result.output.strip() if result.output.strip() else "(empty)"
709 print(f"Output:\n{output_preview}")
711 if result.stderr:
712 stderr_lines = result.stderr.strip().split("\n")
713 if len(stderr_lines) > 5:
714 extra = len(stderr_lines) - 5
715 stderr_preview = "\n".join(stderr_lines[:5]) + f"\n... ({extra} more lines)"
716 else:
717 stderr_preview = result.stderr.strip()
718 print(f"Stderr:\n{stderr_preview}")
720 print()
722 # Evaluate
723 ctx = InterpolationContext()
724 eval_result: EvaluationResult
726 if state_config.evaluate:
727 eval_result = evaluate(
728 config=state_config.evaluate,
729 output=result.output,
730 exit_code=result.exit_code,
731 context=ctx,
732 )
733 evaluator_type: str = state_config.evaluate.type
734 else:
735 # Default to exit_code evaluation
736 eval_result = evaluate_exit_code(result.exit_code)
737 evaluator_type = "exit_code (default)"
739 print(f"Evaluator: {evaluator_type}")
740 print(f"Verdict: {eval_result.verdict.upper()}")
742 if eval_result.details:
743 for key, value in eval_result.details.items():
744 if key != "exit_code" or evaluator_type != "exit_code (default)":
745 print(f" {key}: {value}")
747 # Determine next state based on verdict
748 verdict = eval_result.verdict
749 next_state = None
751 if state_config.route:
752 routes = state_config.route.routes
753 if verdict in routes:
754 next_state = routes[verdict]
755 elif state_config.route.default:
756 next_state = state_config.route.default
757 else:
758 if verdict == "success" and state_config.on_success:
759 next_state = state_config.on_success
760 elif verdict == "failure" and state_config.on_failure:
761 next_state = state_config.on_failure
762 elif verdict == "error" and state_config.on_error:
763 next_state = state_config.on_error
765 print()
766 if next_state:
767 print(f"Would transition: {initial} \u2192 {next_state}")
768 else:
769 print(f"Would transition: {initial} \u2192 (no route for '{verdict}')")
771 # Summary
772 print()
773 has_error = eval_result.verdict == "error" or "error" in eval_result.details
774 if has_error:
775 print("\u26a0 Loop has issues - review the error details above")
776 return 1
777 else:
778 print("\u2713 Loop appears to be configured correctly")
779 return 0
781 def cmd_simulate(loop_name: str) -> int:
782 """Run interactive simulation of loop execution.
784 Traces through loop logic without executing commands, allowing users
785 to verify state transitions and understand loop behavior.
786 """
787 from little_loops.fsm.executor import FSMExecutor, SimulationActionRunner
789 try:
790 path = resolve_loop_path(loop_name)
792 # Load the file to check format
793 with open(path) as f:
794 spec = yaml.safe_load(f)
796 # Auto-compile if it's a paradigm file
797 if "paradigm" in spec and "initial" not in spec:
798 fsm = compile_paradigm(spec)
799 else:
800 fsm = load_and_validate(path)
801 except FileNotFoundError as e:
802 logger.error(str(e))
803 return 1
804 except ValueError as e:
805 logger.error(f"Validation error: {e}")
806 return 1
808 # Apply CLI overrides
809 if args.max_iterations:
810 fsm.max_iterations = args.max_iterations
811 else:
812 # Limit iterations for simulation safety (cap at 20 unless overridden)
813 if fsm.max_iterations > 20:
814 logger.info(
815 f"Limiting simulation to 20 iterations (loop config: {fsm.max_iterations})"
816 )
817 fsm.max_iterations = 20
819 # Create simulation runner
820 sim_runner = SimulationActionRunner(scenario=args.scenario)
822 # Track simulation state
823 states_visited: list[str] = []
825 def simulation_callback(event: dict) -> None:
826 """Display simulation progress."""
827 event_type = event.get("event")
829 if event_type == "state_enter":
830 iteration = event.get("iteration", 0)
831 state = event.get("state", "")
832 states_visited.append(state)
833 print()
834 print(f"[{iteration}] State: {state}")
836 elif event_type == "action_start":
837 action = event.get("action", "")
838 action_display = action[:70] + "..." if len(action) > 70 else action
839 print(f" Action: {action_display}")
841 elif event_type == "evaluate":
842 evaluator = event.get("type", "exit_code")
843 verdict = event.get("verdict", "")
844 print(f" Evaluator: {evaluator}")
845 print(f" Result: {verdict.upper()}")
847 elif event_type == "route":
848 from_state = event.get("from", "")
849 to_state = event.get("to", "")
850 print(f" Transition: {from_state} \u2192 {to_state}")
852 # Print header
853 mode_str = f"scenario={args.scenario}" if args.scenario else "interactive"
854 print(f"=== SIMULATION: {fsm.name} ({mode_str}) ===")
856 # Run simulation
857 executor = FSMExecutor(
858 fsm,
859 event_callback=simulation_callback,
860 action_runner=sim_runner,
861 )
862 result = executor.run()
864 # Print summary
865 print()
866 print("=== Summary ===")
867 arrow = " \u2192 "
868 print(f"States visited: {arrow.join(states_visited)}")
869 print(f"Iterations: {result.iterations}")
870 print(f"Would have executed {len(sim_runner.calls)} commands")
871 print(f"Terminated by: {result.terminated_by}")
873 return 0
875 def cmd_show(loop_name: str) -> int:
876 """Show loop details and structure."""
877 try:
878 path = resolve_loop_path(loop_name)
880 with open(path) as f:
881 spec = yaml.safe_load(f)
883 # Auto-compile paradigm files
884 if "paradigm" in spec and "initial" not in spec:
885 fsm = compile_paradigm(spec)
886 else:
887 fsm = load_and_validate(path)
888 except FileNotFoundError as e:
889 logger.error(str(e))
890 return 1
891 except ValueError as e:
892 logger.error(f"Invalid loop: {e}")
893 return 1
895 # --- Metadata ---
896 print(f"Loop: {fsm.name}")
897 if fsm.paradigm:
898 print(f"Paradigm: {fsm.paradigm}")
899 description = spec.get("description", "").strip()
900 if description:
901 print(f"Description: {description}")
902 print(f"Max iterations: {fsm.max_iterations}")
903 if fsm.timeout:
904 print(f"Timeout: {fsm.timeout}s")
905 if fsm.backoff:
906 print(f"Backoff: {fsm.backoff}s")
907 if fsm.maintain:
908 print("Maintain: yes (restarts after completion)")
909 if fsm.context:
910 print(f"Context variables: {', '.join(fsm.context.keys())}")
911 if fsm.scope:
912 print(f"Scope: {', '.join(fsm.scope)}")
913 print(f"Source: {path}")
915 # --- States & Transitions ---
916 print()
917 print("States:")
918 for name, state in fsm.states.items():
919 terminal_marker = " [TERMINAL]" if state.terminal else ""
920 initial_marker = " [INITIAL]" if name == fsm.initial else ""
921 print(f" [{name}]{initial_marker}{terminal_marker}")
922 if state.action:
923 action_display = (
924 state.action[:70] + "..." if len(state.action) > 70 else state.action
925 )
926 print(f" action: {action_display}")
927 if state.action_type:
928 print(f" type: {state.action_type}")
929 if state.evaluate:
930 print(f" evaluate: {state.evaluate.type}")
931 if state.on_success:
932 print(f" on_success ──→ {state.on_success}")
933 if state.on_failure:
934 print(f" on_failure ──→ {state.on_failure}")
935 if state.on_error:
936 print(f" on_error ──→ {state.on_error}")
937 if state.next:
938 print(f" next ──→ {state.next}")
939 if state.route:
940 print(" route:")
941 for verdict, target in state.route.routes.items():
942 print(f" {verdict} ──→ {target}")
943 if state.route.default:
944 print(f" _ ──→ {state.route.default}")
946 # --- ASCII FSM Diagram ---
947 print()
948 print("Diagram:")
949 # Build adjacency for diagram
950 edges: list[tuple[str, str, str]] = [] # (from, to, label)
951 for name, state in fsm.states.items():
952 if state.on_success:
953 edges.append((name, state.on_success, "success"))
954 if state.on_failure:
955 edges.append((name, state.on_failure, "fail"))
956 if state.on_error:
957 edges.append((name, state.on_error, "error"))
958 if state.next:
959 edges.append((name, state.next, "next"))
960 if state.route:
961 for verdict, target in state.route.routes.items():
962 edges.append((name, target, verdict))
963 if state.route.default:
964 edges.append((name, state.route.default, "_"))
966 # Trace linear path from initial state for main flow
967 visited: set[str] = set()
968 main_path: list[str] = []
969 current = fsm.initial
970 while current and current not in visited:
971 visited.add(current)
972 main_path.append(current)
973 st = fsm.states.get(current)
974 if not st or st.terminal:
975 break
976 # Follow primary transition
977 nxt = st.on_success or st.next
978 if nxt:
979 current = nxt
980 elif st.route:
981 # Pick first route entry as primary
982 first_target = next(iter(st.route.routes.values()), None)
983 current = first_target or st.route.default or ""
984 else:
985 break
987 # Render main flow
988 if main_path:
989 flow_parts = [f"[{s}]" for s in main_path]
990 print(f" {' ──→ '.join(flow_parts)}")
992 # Render back-edges and alternate transitions
993 for src, dst, label in edges:
994 if src in visited and dst in visited:
995 # Skip edges already shown in main flow
996 src_idx = main_path.index(src) if src in main_path else -1
997 dst_idx = main_path.index(dst) if dst in main_path else -1
998 if dst_idx == src_idx + 1 and label in ("success", "next"):
999 continue
1000 print(f" [{src}] ──({label})──→ [{dst}]")
1002 # --- Run Command ---
1003 print()
1004 print("Run command:")
1005 print(f" ll-loop run {loop_name}")
1007 return 0
1009 # Dispatch commands
1010 if args.command == "run":
1011 return cmd_run(args.loop)
1012 elif args.command == "compile":
1013 return cmd_compile()
1014 elif args.command == "validate":
1015 return cmd_validate(args.loop)
1016 elif args.command == "list":
1017 return cmd_list()
1018 elif args.command == "status":
1019 return cmd_status(args.loop)
1020 elif args.command == "stop":
1021 return cmd_stop(args.loop)
1022 elif args.command == "resume":
1023 return cmd_resume(args.loop)
1024 elif args.command == "history":
1025 return cmd_history(args.loop)
1026 elif args.command == "test":
1027 return cmd_test(args.loop)
1028 elif args.command == "simulate":
1029 return cmd_simulate(args.loop)
1030 elif args.command == "install":
1031 return cmd_install(args.loop)
1032 elif args.command == "show":
1033 return cmd_show(args.loop)
1034 else:
1035 parser.print_help()
1036 return 1