Coverage for little_loops / cli / loop / _helpers.py: 9%
268 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:20 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:20 -0500
1"""Shared helpers for ll-loop CLI subcommands."""
3from __future__ import annotations
5import argparse
6import signal
7import subprocess
8import sys
9import time
10from pathlib import Path
11from types import FrameType
12from typing import TYPE_CHECKING, Any
14from little_loops.cli.output import colorize, terminal_width
16if TYPE_CHECKING:
17 from little_loops.fsm.schema import FSMLoop
18 from little_loops.logger import Logger
20# Exit code mapping for terminated_by values
21EXIT_CODES: dict[str, int] = {
22 "terminal": 0,
23 "signal": 0,
24 "handoff": 0,
25 "max_iterations": 1,
26 "timeout": 1,
27}
29# Module-level shutdown state for signal handling
30_loop_shutdown_requested: bool = False
31_loop_executor: Any = None
32_loop_pid_file: Path | None = None
35def _loop_signal_handler(signum: int, frame: FrameType | None) -> None:
36 """Handle shutdown signals gracefully for ll-loop.
38 First signal: Set shutdown flag for graceful exit after current state.
39 Second signal: Force immediate exit.
40 """
41 global _loop_shutdown_requested
42 if _loop_shutdown_requested:
43 # Second signal - force exit
44 if _loop_pid_file is not None:
45 _loop_pid_file.unlink(missing_ok=True)
46 print("\nForce shutdown requested", file=sys.stderr)
47 sys.exit(1)
48 _loop_shutdown_requested = True
49 print("\nShutdown requested, will exit after current state...", file=sys.stderr)
50 if _loop_executor is not None:
51 _loop_executor.request_shutdown()
52 # Kill any child subprocess currently blocking in the action runner
53 inner = getattr(_loop_executor, "_executor", None)
54 if inner is not None:
55 runner = getattr(inner, "action_runner", None)
56 if runner is not None:
57 proc = getattr(runner, "_current_process", None)
58 if proc is not None:
59 proc.kill()
62def register_loop_signal_handlers(executor: Any, pid_file: Path | None = None) -> None:
63 """Register SIGINT/SIGTERM handlers for graceful loop shutdown.
65 Sets up signal handling so that Ctrl-C triggers a graceful shutdown
66 (calls executor.request_shutdown()) rather than raising KeyboardInterrupt.
67 A second Ctrl-C forces immediate exit with PID file cleanup.
69 Args:
70 executor: The PersistentExecutor instance to request shutdown on.
71 pid_file: Optional path to PID file to clean up on forced exit.
72 """
73 global _loop_shutdown_requested, _loop_executor, _loop_pid_file
74 _loop_shutdown_requested = False
75 _loop_executor = executor
76 _loop_pid_file = pid_file
77 signal.signal(signal.SIGINT, _loop_signal_handler)
78 signal.signal(signal.SIGTERM, _loop_signal_handler)
81def get_builtin_loops_dir() -> Path:
82 """Get the path to built-in loops bundled with the plugin."""
83 return Path(__file__).parent.parent.parent.parent.parent / "loops"
86def resolve_loop_path(name_or_path: str, loops_dir: Path) -> Path:
87 """Resolve loop name to file path."""
88 path = Path(name_or_path)
89 if path.exists():
90 return path
92 # Try <loops_dir>/<name>.fsm.yaml first (compiled FSM)
93 fsm_path = loops_dir / f"{name_or_path}.fsm.yaml"
94 if fsm_path.exists():
95 return fsm_path
97 # Fall back to <loops_dir>/<name>.yaml
98 loops_path = loops_dir / f"{name_or_path}.yaml"
99 if loops_path.exists():
100 return loops_path
102 # Fall back to built-in loops from plugin directory
103 builtin_path = get_builtin_loops_dir() / f"{name_or_path}.yaml"
104 if builtin_path.exists():
105 return builtin_path
107 raise FileNotFoundError(f"Loop not found: {name_or_path}")
110def load_loop(name_or_path: str, loops_dir: Path, logger: Logger) -> FSMLoop:
111 """Load and validate a loop.
113 Raises:
114 FileNotFoundError: If loop not found.
115 ValueError: If loop is invalid.
116 """
117 from little_loops.fsm.validation import load_and_validate
119 path = resolve_loop_path(name_or_path, loops_dir)
120 fsm, _ = load_and_validate(path)
121 return fsm
124def load_loop_with_spec(
125 name_or_path: str, loops_dir: Path, logger: Logger
126) -> tuple[FSMLoop, dict[str, Any]]:
127 """Load a loop and return both the FSMLoop and raw spec dict.
129 Used by commands that need access to raw YAML fields (e.g., description).
131 Raises:
132 FileNotFoundError: If loop not found.
133 ValueError: If loop is invalid.
134 """
135 import yaml
137 from little_loops.fsm.validation import load_and_validate
139 path = resolve_loop_path(name_or_path, loops_dir)
141 with open(path) as f:
142 spec = yaml.safe_load(f)
144 fsm, _ = load_and_validate(path)
145 return fsm, spec
148def print_execution_plan(fsm: FSMLoop) -> None:
149 """Print dry-run execution plan."""
150 tw = terminal_width()
151 print(colorize(f"Execution plan for: {fsm.name}", "1"))
152 print()
153 print("States:")
154 for name, state in fsm.states.items():
155 terminal_marker = colorize(" [TERMINAL]", "32") if state.terminal else ""
156 print(f" {colorize(f'[{name}]', '1')}{terminal_marker}")
157 if state.action:
158 if state.action_type == "prompt":
159 lines = state.action.strip().splitlines()
160 preview = "\n ".join(lines[:3])
161 if len(lines) > 3 or len(state.action) > 200:
162 preview += " ..."
163 print(f" action: |\n {preview}")
164 else:
165 max_action = tw - 16
166 action_display = (
167 state.action[:max_action] + "..."
168 if len(state.action) > max_action
169 else state.action
170 )
171 print(f" action: {action_display}")
172 if state.evaluate:
173 print(f" evaluate: {state.evaluate.type}")
174 if state.on_yes:
175 print(f" on_yes {colorize('->', '2')} {colorize(state.on_yes, '2')}")
176 if state.on_no:
177 print(f" on_no {colorize('->', '2')} {colorize(state.on_no, '2')}")
178 if state.on_error:
179 print(f" on_error {colorize('->', '2')} {colorize(state.on_error, '2')}")
180 if state.next:
181 print(f" next {colorize('->', '2')} {colorize(state.next, '2')}")
182 if state.route:
183 print(" route:")
184 for verdict, target in state.route.routes.items():
185 print(f" {verdict} {colorize('->', '2')} {colorize(target, '2')}")
186 if state.route.default:
187 print(f" _ {colorize('->', '2')} {colorize(state.route.default, '2')}")
188 print()
189 print(f"Initial state: {fsm.initial}")
190 print(f"Max iterations: {fsm.max_iterations}")
191 if fsm.timeout:
192 print(f"Timeout: {fsm.timeout}s")
193 if fsm.context:
194 print("Context:")
195 for key, value in fsm.context.items():
196 print(f" {key}: {value!r}")
199def run_background(
200 loop_name: str, args: argparse.Namespace, loops_dir: Path, subcommand: str = "run"
201) -> int:
202 """Launch loop as a detached background process.
204 Spawns a new process with start_new_session=True that re-executes
205 the loop with --foreground-internal. The parent writes the PID file
206 and returns immediately.
208 Args:
209 subcommand: The ll-loop subcommand to spawn ("run" or "resume").
211 Returns:
212 Exit code (0 = launched successfully).
213 """
214 running_dir = loops_dir / ".running"
215 running_dir.mkdir(parents=True, exist_ok=True)
217 pid_file = running_dir / f"{loop_name}.pid"
218 log_file = running_dir / f"{loop_name}.log"
220 # Build re-exec command with --foreground-internal instead of --background
221 cmd = [
222 sys.executable,
223 "-m",
224 "little_loops.cli.loop",
225 subcommand,
226 loop_name,
227 "--foreground-internal",
228 ]
230 # Forward relevant args
231 max_iter = getattr(args, "max_iterations", None)
232 if max_iter:
233 cmd.extend(["--max-iterations", str(max_iter)])
234 if getattr(args, "no_llm", False):
235 cmd.append("--no-llm")
236 llm_model = getattr(args, "llm_model", None)
237 if llm_model:
238 cmd.extend(["--llm-model", llm_model])
239 if getattr(args, "verbose", False):
240 cmd.append("--verbose")
241 if getattr(args, "show_diagrams", False):
242 cmd.append("--show-diagrams")
243 if getattr(args, "quiet", False):
244 cmd.append("--quiet")
245 if getattr(args, "queue", False):
246 cmd.append("--queue")
247 for kv in getattr(args, "context", None) or []:
248 cmd.extend(["--context", kv])
249 delay = getattr(args, "delay", None)
250 if delay is not None:
251 cmd.extend(["--delay", str(delay)])
252 handoff_threshold = getattr(args, "handoff_threshold", None)
253 if handoff_threshold is not None:
254 cmd.extend(["--handoff-threshold", str(handoff_threshold)])
256 with open(log_file, "w") as log_fh:
257 process = subprocess.Popen(
258 cmd,
259 start_new_session=True,
260 stdout=log_fh,
261 stderr=log_fh,
262 stdin=subprocess.DEVNULL,
263 )
265 pid_file.write_text(str(process.pid))
266 print(
267 f"Loop {colorize(loop_name, '1')} started in background (PID: {colorize(str(process.pid), '2')})"
268 )
269 print(f" Log: {colorize(str(log_file), '2')}")
270 print(f" Status: {colorize(f'll-loop status {loop_name}', '2')}")
271 print(f" Stop: {colorize(f'll-loop stop {loop_name}', '2')}")
272 return 0
275def run_foreground(
276 executor: Any, fsm: FSMLoop, args: argparse.Namespace, highlight_color: str = "32"
277) -> int:
278 """Run loop with progress display.
280 Args:
281 highlight_color: ANSI SGR code for the active FSM state highlight in verbose mode.
283 Returns:
284 Exit code (0 = success).
285 """
286 quiet = getattr(args, "quiet", False)
287 verbose = getattr(args, "verbose", False)
288 show_diagrams = getattr(args, "show_diagrams", False)
289 clear_screen = getattr(args, "clear", False)
290 if not quiet:
291 print(f"Running loop: {colorize(fsm.name, '1')}")
292 print(f"Max iterations: {colorize(str(fsm.max_iterations), '2')}")
293 print()
295 current_iteration = [0] # Use list to allow mutation in closure
296 loop_start_time = time.monotonic()
298 def display_progress(event: dict) -> None:
299 """Display progress for events."""
300 event_type = event.get("event")
301 tw = terminal_width()
302 max_line = tw - 8 # 8 chars for " " indent prefix
304 if event_type == "state_enter":
305 current_iteration[0] = event.get("iteration", 0)
306 state = event.get("state", "")
307 if not quiet:
308 elapsed_int = int(time.monotonic() - loop_start_time)
309 if elapsed_int < 60:
310 elapsed_str = f"{elapsed_int}s"
311 else:
312 elapsed_str = f"{elapsed_int // 60}m {elapsed_int % 60}s"
313 if clear_screen and sys.stdout.isatty():
314 print("\033[2J\033[H", end="", flush=True)
315 if show_diagrams:
316 from little_loops.cli.loop.layout import _render_fsm_diagram
318 diagram = _render_fsm_diagram(
319 fsm, highlight_state=state, highlight_color=highlight_color
320 )
321 print(diagram, flush=True)
322 if not quiet:
323 print(
324 f"[{current_iteration[0]}/{fsm.max_iterations}] {colorize(state, '1')} ({colorize(elapsed_str, '2')})",
325 end="",
326 flush=True,
327 )
329 elif event_type == "action_start":
330 if not quiet:
331 action = event.get("action", "")
332 is_prompt = event.get("is_prompt", False)
333 if is_prompt:
334 lines = action.strip().splitlines()
335 line_count = len(lines)
336 prompt_badge = "\u2726" # ✦
337 print(
338 f" -> {colorize(prompt_badge, '2')} {colorize(f'({line_count} lines)', '2')}",
339 flush=True,
340 )
341 show_count = line_count if verbose else min(5, line_count)
342 for line in lines[:show_count]:
343 display = line[:max_line] + "..." if len(line) > max_line else line
344 print(f" {display}", flush=True)
345 if line_count > show_count:
346 print(f" ... ({line_count - show_count} more lines)", flush=True)
347 else:
348 action_display = action[:max_line] + "..." if len(action) > max_line else action
349 print(f" -> {colorize(action_display, '2')}", flush=True)
351 elif event_type == "action_output":
352 if not quiet and verbose:
353 line = event.get("line", "")
354 if line.strip():
355 display = line[:max_line] + "..." if len(line) > max_line else line
356 print(f" {display}", flush=True)
358 elif event_type == "action_complete":
359 if not quiet:
360 duration_ms = event.get("duration_ms", 0)
361 exit_code = event.get("exit_code", 0)
362 output_preview = event.get("output_preview")
363 is_prompt = event.get("is_prompt", False)
364 duration_sec = duration_ms / 1000
365 if duration_sec < 60:
366 duration_str = f"{duration_sec:.1f}s"
367 else:
368 minutes = int(duration_sec // 60)
369 seconds = duration_sec % 60
370 duration_str = f"{minutes}m {seconds:.0f}s"
371 parts = [f" ({colorize(duration_str, '2')})"]
372 if exit_code == 124:
373 parts.append(colorize("timed out", "38;5;208"))
374 elif exit_code != 0:
375 parts.append(colorize(f"exit: {exit_code}", "38;5;208"))
376 print(" ".join(parts), flush=True)
377 # Skip output preview for prompt states (already streamed) and in verbose mode
378 # (lines already shown via action_output events). In non-verbose mode, show
379 # a tail summary for shell states.
380 if output_preview and not is_prompt and not verbose:
381 lines = [ln for ln in output_preview.splitlines() if ln.strip()]
382 show_lines = lines[-8:] if lines else []
383 for line in show_lines:
384 display = line[:max_line] + "..." if len(line) > max_line else line
385 print(f" {display}", flush=True)
387 elif event_type == "evaluate":
388 if not quiet:
389 verdict = event.get("verdict", "")
390 confidence = event.get("confidence")
391 reason = event.get("reason", "")
392 error = event.get("error", "")
393 if verdict in ("yes", "target", "progress"):
394 symbol = colorize("\u2713", "32") # green checkmark
395 verdict_colored = colorize(verdict, "32")
396 else:
397 symbol = colorize("\u2717", "38;5;208") # orange x mark
398 verdict_colored = (
399 colorize(verdict, "38;5;208")
400 if verdict in ("no", "error")
401 else colorize(verdict, "2")
402 )
403 # Build verdict line
404 if error and verdict == "error":
405 verdict_line = f"{symbol} {verdict_colored}: {error}"
406 elif confidence is not None:
407 verdict_line = (
408 f"{symbol} {verdict_colored} {colorize(f'({confidence:.2f})', '2')}"
409 )
410 else:
411 verdict_line = f"{symbol} {verdict_colored}"
412 print(f" {verdict_line}", flush=True)
413 # Show raw_preview for error verdicts to aid diagnosis
414 raw_preview = event.get("raw_preview", "")
415 if raw_preview and verdict == "error":
416 print(f" raw: {raw_preview[:200]}", flush=True)
417 # Show reason on a second line if present (and not already shown as error)
418 if reason and not (error and verdict == "error"):
419 reason_display = reason[:300] + "..." if len(reason) > 300 else reason
420 print(f" {reason_display}", flush=True)
422 elif event_type == "route":
423 if not quiet:
424 to_state = event.get("to", "")
425 print(f" {colorize('->', '2')} {colorize(to_state, '1')}", flush=True)
427 # Wire progress display via the proper observer slot on PersistentExecutor
428 if not quiet or show_diagrams:
429 executor._on_event = display_progress
431 result = executor.run()
433 if not quiet:
434 print()
435 duration_sec = result.duration_ms / 1000
436 if duration_sec < 60:
437 duration_str = f"{duration_sec:.1f}s"
438 else:
439 minutes = int(duration_sec // 60)
440 seconds = duration_sec % 60
441 duration_str = f"{minutes}m {seconds:.0f}s"
442 if result.terminated_by == "terminal":
443 state_colored = colorize(result.final_state, "32")
444 else:
445 state_colored = colorize(result.final_state, "38;5;208")
446 print(f"Loop completed: {state_colored} ({result.iterations} iterations, {duration_str})")
448 return EXIT_CODES.get(result.terminated_by, 1)