Coverage for little_loops / cli / loop / _helpers.py: 9%

268 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:20 -0500

1"""Shared helpers for ll-loop CLI subcommands.""" 

2 

3from __future__ import annotations 

4 

5import argparse 

6import signal 

7import subprocess 

8import sys 

9import time 

10from pathlib import Path 

11from types import FrameType 

12from typing import TYPE_CHECKING, Any 

13 

14from little_loops.cli.output import colorize, terminal_width 

15 

16if TYPE_CHECKING: 

17 from little_loops.fsm.schema import FSMLoop 

18 from little_loops.logger import Logger 

19 

20# Exit code mapping for terminated_by values 

21EXIT_CODES: dict[str, int] = { 

22 "terminal": 0, 

23 "signal": 0, 

24 "handoff": 0, 

25 "max_iterations": 1, 

26 "timeout": 1, 

27} 

28 

29# Module-level shutdown state for signal handling 

30_loop_shutdown_requested: bool = False 

31_loop_executor: Any = None 

32_loop_pid_file: Path | None = None 

33 

34 

35def _loop_signal_handler(signum: int, frame: FrameType | None) -> None: 

36 """Handle shutdown signals gracefully for ll-loop. 

37 

38 First signal: Set shutdown flag for graceful exit after current state. 

39 Second signal: Force immediate exit. 

40 """ 

41 global _loop_shutdown_requested 

42 if _loop_shutdown_requested: 

43 # Second signal - force exit 

44 if _loop_pid_file is not None: 

45 _loop_pid_file.unlink(missing_ok=True) 

46 print("\nForce shutdown requested", file=sys.stderr) 

47 sys.exit(1) 

48 _loop_shutdown_requested = True 

49 print("\nShutdown requested, will exit after current state...", file=sys.stderr) 

50 if _loop_executor is not None: 

51 _loop_executor.request_shutdown() 

52 # Kill any child subprocess currently blocking in the action runner 

53 inner = getattr(_loop_executor, "_executor", None) 

54 if inner is not None: 

55 runner = getattr(inner, "action_runner", None) 

56 if runner is not None: 

57 proc = getattr(runner, "_current_process", None) 

58 if proc is not None: 

59 proc.kill() 

60 

61 

62def register_loop_signal_handlers(executor: Any, pid_file: Path | None = None) -> None: 

63 """Register SIGINT/SIGTERM handlers for graceful loop shutdown. 

64 

65 Sets up signal handling so that Ctrl-C triggers a graceful shutdown 

66 (calls executor.request_shutdown()) rather than raising KeyboardInterrupt. 

67 A second Ctrl-C forces immediate exit with PID file cleanup. 

68 

69 Args: 

70 executor: The PersistentExecutor instance to request shutdown on. 

71 pid_file: Optional path to PID file to clean up on forced exit. 

72 """ 

73 global _loop_shutdown_requested, _loop_executor, _loop_pid_file 

74 _loop_shutdown_requested = False 

75 _loop_executor = executor 

76 _loop_pid_file = pid_file 

77 signal.signal(signal.SIGINT, _loop_signal_handler) 

78 signal.signal(signal.SIGTERM, _loop_signal_handler) 

79 

80 

81def get_builtin_loops_dir() -> Path: 

82 """Get the path to built-in loops bundled with the plugin.""" 

83 return Path(__file__).parent.parent.parent.parent.parent / "loops" 

84 

85 

86def resolve_loop_path(name_or_path: str, loops_dir: Path) -> Path: 

87 """Resolve loop name to file path.""" 

88 path = Path(name_or_path) 

89 if path.exists(): 

90 return path 

91 

92 # Try <loops_dir>/<name>.fsm.yaml first (compiled FSM) 

93 fsm_path = loops_dir / f"{name_or_path}.fsm.yaml" 

94 if fsm_path.exists(): 

95 return fsm_path 

96 

97 # Fall back to <loops_dir>/<name>.yaml 

98 loops_path = loops_dir / f"{name_or_path}.yaml" 

99 if loops_path.exists(): 

100 return loops_path 

101 

102 # Fall back to built-in loops from plugin directory 

103 builtin_path = get_builtin_loops_dir() / f"{name_or_path}.yaml" 

104 if builtin_path.exists(): 

105 return builtin_path 

106 

107 raise FileNotFoundError(f"Loop not found: {name_or_path}") 

108 

109 

110def load_loop(name_or_path: str, loops_dir: Path, logger: Logger) -> FSMLoop: 

111 """Load and validate a loop. 

112 

113 Raises: 

114 FileNotFoundError: If loop not found. 

115 ValueError: If loop is invalid. 

116 """ 

117 from little_loops.fsm.validation import load_and_validate 

118 

119 path = resolve_loop_path(name_or_path, loops_dir) 

120 fsm, _ = load_and_validate(path) 

121 return fsm 

122 

123 

124def load_loop_with_spec( 

125 name_or_path: str, loops_dir: Path, logger: Logger 

126) -> tuple[FSMLoop, dict[str, Any]]: 

127 """Load a loop and return both the FSMLoop and raw spec dict. 

128 

129 Used by commands that need access to raw YAML fields (e.g., description). 

130 

131 Raises: 

132 FileNotFoundError: If loop not found. 

133 ValueError: If loop is invalid. 

134 """ 

135 import yaml 

136 

137 from little_loops.fsm.validation import load_and_validate 

138 

139 path = resolve_loop_path(name_or_path, loops_dir) 

140 

141 with open(path) as f: 

142 spec = yaml.safe_load(f) 

143 

144 fsm, _ = load_and_validate(path) 

145 return fsm, spec 

146 

147 

148def print_execution_plan(fsm: FSMLoop) -> None: 

149 """Print dry-run execution plan.""" 

150 tw = terminal_width() 

151 print(colorize(f"Execution plan for: {fsm.name}", "1")) 

152 print() 

153 print("States:") 

154 for name, state in fsm.states.items(): 

155 terminal_marker = colorize(" [TERMINAL]", "32") if state.terminal else "" 

156 print(f" {colorize(f'[{name}]', '1')}{terminal_marker}") 

157 if state.action: 

158 if state.action_type == "prompt": 

159 lines = state.action.strip().splitlines() 

160 preview = "\n ".join(lines[:3]) 

161 if len(lines) > 3 or len(state.action) > 200: 

162 preview += " ..." 

163 print(f" action: |\n {preview}") 

164 else: 

165 max_action = tw - 16 

166 action_display = ( 

167 state.action[:max_action] + "..." 

168 if len(state.action) > max_action 

169 else state.action 

170 ) 

171 print(f" action: {action_display}") 

172 if state.evaluate: 

173 print(f" evaluate: {state.evaluate.type}") 

174 if state.on_yes: 

175 print(f" on_yes {colorize('->', '2')} {colorize(state.on_yes, '2')}") 

176 if state.on_no: 

177 print(f" on_no {colorize('->', '2')} {colorize(state.on_no, '2')}") 

178 if state.on_error: 

179 print(f" on_error {colorize('->', '2')} {colorize(state.on_error, '2')}") 

180 if state.next: 

181 print(f" next {colorize('->', '2')} {colorize(state.next, '2')}") 

182 if state.route: 

183 print(" route:") 

184 for verdict, target in state.route.routes.items(): 

185 print(f" {verdict} {colorize('->', '2')} {colorize(target, '2')}") 

186 if state.route.default: 

187 print(f" _ {colorize('->', '2')} {colorize(state.route.default, '2')}") 

188 print() 

189 print(f"Initial state: {fsm.initial}") 

190 print(f"Max iterations: {fsm.max_iterations}") 

191 if fsm.timeout: 

192 print(f"Timeout: {fsm.timeout}s") 

193 if fsm.context: 

194 print("Context:") 

195 for key, value in fsm.context.items(): 

196 print(f" {key}: {value!r}") 

197 

198 

199def run_background( 

200 loop_name: str, args: argparse.Namespace, loops_dir: Path, subcommand: str = "run" 

201) -> int: 

202 """Launch loop as a detached background process. 

203 

204 Spawns a new process with start_new_session=True that re-executes 

205 the loop with --foreground-internal. The parent writes the PID file 

206 and returns immediately. 

207 

208 Args: 

209 subcommand: The ll-loop subcommand to spawn ("run" or "resume"). 

210 

211 Returns: 

212 Exit code (0 = launched successfully). 

213 """ 

214 running_dir = loops_dir / ".running" 

215 running_dir.mkdir(parents=True, exist_ok=True) 

216 

217 pid_file = running_dir / f"{loop_name}.pid" 

218 log_file = running_dir / f"{loop_name}.log" 

219 

220 # Build re-exec command with --foreground-internal instead of --background 

221 cmd = [ 

222 sys.executable, 

223 "-m", 

224 "little_loops.cli.loop", 

225 subcommand, 

226 loop_name, 

227 "--foreground-internal", 

228 ] 

229 

230 # Forward relevant args 

231 max_iter = getattr(args, "max_iterations", None) 

232 if max_iter: 

233 cmd.extend(["--max-iterations", str(max_iter)]) 

234 if getattr(args, "no_llm", False): 

235 cmd.append("--no-llm") 

236 llm_model = getattr(args, "llm_model", None) 

237 if llm_model: 

238 cmd.extend(["--llm-model", llm_model]) 

239 if getattr(args, "verbose", False): 

240 cmd.append("--verbose") 

241 if getattr(args, "show_diagrams", False): 

242 cmd.append("--show-diagrams") 

243 if getattr(args, "quiet", False): 

244 cmd.append("--quiet") 

245 if getattr(args, "queue", False): 

246 cmd.append("--queue") 

247 for kv in getattr(args, "context", None) or []: 

248 cmd.extend(["--context", kv]) 

249 delay = getattr(args, "delay", None) 

250 if delay is not None: 

251 cmd.extend(["--delay", str(delay)]) 

252 handoff_threshold = getattr(args, "handoff_threshold", None) 

253 if handoff_threshold is not None: 

254 cmd.extend(["--handoff-threshold", str(handoff_threshold)]) 

255 

256 with open(log_file, "w") as log_fh: 

257 process = subprocess.Popen( 

258 cmd, 

259 start_new_session=True, 

260 stdout=log_fh, 

261 stderr=log_fh, 

262 stdin=subprocess.DEVNULL, 

263 ) 

264 

265 pid_file.write_text(str(process.pid)) 

266 print( 

267 f"Loop {colorize(loop_name, '1')} started in background (PID: {colorize(str(process.pid), '2')})" 

268 ) 

269 print(f" Log: {colorize(str(log_file), '2')}") 

270 print(f" Status: {colorize(f'll-loop status {loop_name}', '2')}") 

271 print(f" Stop: {colorize(f'll-loop stop {loop_name}', '2')}") 

272 return 0 

273 

274 

275def run_foreground( 

276 executor: Any, fsm: FSMLoop, args: argparse.Namespace, highlight_color: str = "32" 

277) -> int: 

278 """Run loop with progress display. 

279 

280 Args: 

281 highlight_color: ANSI SGR code for the active FSM state highlight in verbose mode. 

282 

283 Returns: 

284 Exit code (0 = success). 

285 """ 

286 quiet = getattr(args, "quiet", False) 

287 verbose = getattr(args, "verbose", False) 

288 show_diagrams = getattr(args, "show_diagrams", False) 

289 clear_screen = getattr(args, "clear", False) 

290 if not quiet: 

291 print(f"Running loop: {colorize(fsm.name, '1')}") 

292 print(f"Max iterations: {colorize(str(fsm.max_iterations), '2')}") 

293 print() 

294 

295 current_iteration = [0] # Use list to allow mutation in closure 

296 loop_start_time = time.monotonic() 

297 

298 def display_progress(event: dict) -> None: 

299 """Display progress for events.""" 

300 event_type = event.get("event") 

301 tw = terminal_width() 

302 max_line = tw - 8 # 8 chars for " " indent prefix 

303 

304 if event_type == "state_enter": 

305 current_iteration[0] = event.get("iteration", 0) 

306 state = event.get("state", "") 

307 if not quiet: 

308 elapsed_int = int(time.monotonic() - loop_start_time) 

309 if elapsed_int < 60: 

310 elapsed_str = f"{elapsed_int}s" 

311 else: 

312 elapsed_str = f"{elapsed_int // 60}m {elapsed_int % 60}s" 

313 if clear_screen and sys.stdout.isatty(): 

314 print("\033[2J\033[H", end="", flush=True) 

315 if show_diagrams: 

316 from little_loops.cli.loop.layout import _render_fsm_diagram 

317 

318 diagram = _render_fsm_diagram( 

319 fsm, highlight_state=state, highlight_color=highlight_color 

320 ) 

321 print(diagram, flush=True) 

322 if not quiet: 

323 print( 

324 f"[{current_iteration[0]}/{fsm.max_iterations}] {colorize(state, '1')} ({colorize(elapsed_str, '2')})", 

325 end="", 

326 flush=True, 

327 ) 

328 

329 elif event_type == "action_start": 

330 if not quiet: 

331 action = event.get("action", "") 

332 is_prompt = event.get("is_prompt", False) 

333 if is_prompt: 

334 lines = action.strip().splitlines() 

335 line_count = len(lines) 

336 prompt_badge = "\u2726" # ✦ 

337 print( 

338 f" -> {colorize(prompt_badge, '2')} {colorize(f'({line_count} lines)', '2')}", 

339 flush=True, 

340 ) 

341 show_count = line_count if verbose else min(5, line_count) 

342 for line in lines[:show_count]: 

343 display = line[:max_line] + "..." if len(line) > max_line else line 

344 print(f" {display}", flush=True) 

345 if line_count > show_count: 

346 print(f" ... ({line_count - show_count} more lines)", flush=True) 

347 else: 

348 action_display = action[:max_line] + "..." if len(action) > max_line else action 

349 print(f" -> {colorize(action_display, '2')}", flush=True) 

350 

351 elif event_type == "action_output": 

352 if not quiet and verbose: 

353 line = event.get("line", "") 

354 if line.strip(): 

355 display = line[:max_line] + "..." if len(line) > max_line else line 

356 print(f" {display}", flush=True) 

357 

358 elif event_type == "action_complete": 

359 if not quiet: 

360 duration_ms = event.get("duration_ms", 0) 

361 exit_code = event.get("exit_code", 0) 

362 output_preview = event.get("output_preview") 

363 is_prompt = event.get("is_prompt", False) 

364 duration_sec = duration_ms / 1000 

365 if duration_sec < 60: 

366 duration_str = f"{duration_sec:.1f}s" 

367 else: 

368 minutes = int(duration_sec // 60) 

369 seconds = duration_sec % 60 

370 duration_str = f"{minutes}m {seconds:.0f}s" 

371 parts = [f" ({colorize(duration_str, '2')})"] 

372 if exit_code == 124: 

373 parts.append(colorize("timed out", "38;5;208")) 

374 elif exit_code != 0: 

375 parts.append(colorize(f"exit: {exit_code}", "38;5;208")) 

376 print(" ".join(parts), flush=True) 

377 # Skip output preview for prompt states (already streamed) and in verbose mode 

378 # (lines already shown via action_output events). In non-verbose mode, show 

379 # a tail summary for shell states. 

380 if output_preview and not is_prompt and not verbose: 

381 lines = [ln for ln in output_preview.splitlines() if ln.strip()] 

382 show_lines = lines[-8:] if lines else [] 

383 for line in show_lines: 

384 display = line[:max_line] + "..." if len(line) > max_line else line 

385 print(f" {display}", flush=True) 

386 

387 elif event_type == "evaluate": 

388 if not quiet: 

389 verdict = event.get("verdict", "") 

390 confidence = event.get("confidence") 

391 reason = event.get("reason", "") 

392 error = event.get("error", "") 

393 if verdict in ("yes", "target", "progress"): 

394 symbol = colorize("\u2713", "32") # green checkmark 

395 verdict_colored = colorize(verdict, "32") 

396 else: 

397 symbol = colorize("\u2717", "38;5;208") # orange x mark 

398 verdict_colored = ( 

399 colorize(verdict, "38;5;208") 

400 if verdict in ("no", "error") 

401 else colorize(verdict, "2") 

402 ) 

403 # Build verdict line 

404 if error and verdict == "error": 

405 verdict_line = f"{symbol} {verdict_colored}: {error}" 

406 elif confidence is not None: 

407 verdict_line = ( 

408 f"{symbol} {verdict_colored} {colorize(f'({confidence:.2f})', '2')}" 

409 ) 

410 else: 

411 verdict_line = f"{symbol} {verdict_colored}" 

412 print(f" {verdict_line}", flush=True) 

413 # Show raw_preview for error verdicts to aid diagnosis 

414 raw_preview = event.get("raw_preview", "") 

415 if raw_preview and verdict == "error": 

416 print(f" raw: {raw_preview[:200]}", flush=True) 

417 # Show reason on a second line if present (and not already shown as error) 

418 if reason and not (error and verdict == "error"): 

419 reason_display = reason[:300] + "..." if len(reason) > 300 else reason 

420 print(f" {reason_display}", flush=True) 

421 

422 elif event_type == "route": 

423 if not quiet: 

424 to_state = event.get("to", "") 

425 print(f" {colorize('->', '2')} {colorize(to_state, '1')}", flush=True) 

426 

427 # Wire progress display via the proper observer slot on PersistentExecutor 

428 if not quiet or show_diagrams: 

429 executor._on_event = display_progress 

430 

431 result = executor.run() 

432 

433 if not quiet: 

434 print() 

435 duration_sec = result.duration_ms / 1000 

436 if duration_sec < 60: 

437 duration_str = f"{duration_sec:.1f}s" 

438 else: 

439 minutes = int(duration_sec // 60) 

440 seconds = duration_sec % 60 

441 duration_str = f"{minutes}m {seconds:.0f}s" 

442 if result.terminated_by == "terminal": 

443 state_colored = colorize(result.final_state, "32") 

444 else: 

445 state_colored = colorize(result.final_state, "38;5;208") 

446 print(f"Loop completed: {state_colored} ({result.iterations} iterations, {duration_str})") 

447 

448 return EXIT_CODES.get(result.terminated_by, 1)