Coverage for little_loops / subprocess_utils.py: 21%
80 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""Subprocess utilities for Claude CLI invocation.
3Provides shared functionality for running Claude CLI commands with
4real-time output streaming, timeout handling, and context handoff detection.
5"""
7from __future__ import annotations
9import logging
10import os
11import re
12import selectors
13import subprocess
14import time
15from collections.abc import Callable
16from pathlib import Path
18logger = logging.getLogger(__name__)
20# Callback type: (line: str, is_stderr: bool) -> None
21OutputCallback = Callable[[str, bool], None]
23# Process lifecycle callback: (process: Popen) -> None
24ProcessCallback = Callable[[subprocess.Popen[str]], None]
26# Context handoff detection pattern
27CONTEXT_HANDOFF_PATTERN = re.compile(r"CONTEXT_HANDOFF:\s*Ready for fresh session")
28CONTINUATION_PROMPT_PATH = Path(".claude/ll-continue-prompt.md")
31def detect_context_handoff(output: str) -> bool:
32 """Check if output contains a context handoff signal.
34 Args:
35 output: Command output to check
37 Returns:
38 True if context handoff was signaled
39 """
40 return bool(CONTEXT_HANDOFF_PATTERN.search(output))
43def read_continuation_prompt(repo_path: Path | None = None) -> str | None:
44 """Read the continuation prompt file if it exists.
46 Args:
47 repo_path: Optional repository root path
49 Returns:
50 Contents of continuation prompt, or None if not found
51 """
52 prompt_path = (repo_path or Path.cwd()) / CONTINUATION_PROMPT_PATH
53 if prompt_path.exists():
54 return prompt_path.read_text()
55 return None
58def run_claude_command(
59 command: str,
60 timeout: int = 3600,
61 working_dir: Path | None = None,
62 stream_callback: OutputCallback | None = None,
63 on_process_start: ProcessCallback | None = None,
64 on_process_end: ProcessCallback | None = None,
65 idle_timeout: int = 0,
66) -> subprocess.CompletedProcess[str]:
67 """Invoke Claude CLI command with real-time output streaming.
69 Args:
70 command: Command to pass to Claude CLI
71 timeout: Timeout in seconds (0 for no timeout)
72 working_dir: Optional working directory for the command
73 stream_callback: Optional callback for streaming output lines.
74 Called with (line, is_stderr) for each line of output.
75 on_process_start: Optional callback invoked after process starts.
76 Receives the Popen object for tracking/management.
77 on_process_end: Optional callback invoked after process completes.
78 Receives the Popen object. Called in finally block.
79 idle_timeout: Kill process if no output for this many seconds (0 to disable).
81 Returns:
82 CompletedProcess with stdout/stderr captured
84 Raises:
85 subprocess.TimeoutExpired: If command exceeds timeout or idle timeout.
86 When triggered by idle timeout, the output field is set to "idle_timeout".
87 """
88 cmd_args = ["claude", "--dangerously-skip-permissions", "-p", command]
90 # Set environment to keep Claude in the project working directory (BUG-007)
91 # This helps prevent file writes from leaking to the main repo in worktrees
92 env = os.environ.copy()
93 env["CLAUDE_BASH_MAINTAIN_PROJECT_WORKING_DIR"] = "1"
95 process = subprocess.Popen(
96 cmd_args,
97 stdout=subprocess.PIPE,
98 stderr=subprocess.PIPE,
99 text=True,
100 bufsize=1, # Line buffered
101 cwd=working_dir,
102 env=env,
103 )
105 if on_process_start:
106 on_process_start(process)
108 stdout_lines: list[str] = []
109 stderr_lines: list[str] = []
111 # Use selectors for non-blocking read from both streams
112 with selectors.DefaultSelector() as sel:
113 if process.stdout:
114 sel.register(process.stdout, selectors.EVENT_READ)
115 if process.stderr:
116 sel.register(process.stderr, selectors.EVENT_READ)
118 start_time = time.time()
119 last_output_time = start_time
121 try:
122 while sel.get_map():
123 now = time.time()
124 if timeout and (now - start_time) > timeout:
125 process.kill()
126 try:
127 process.wait(timeout=10)
128 except subprocess.TimeoutExpired:
129 logger.warning(
130 "Process %s did not terminate within 10s after kill",
131 process.pid,
132 )
133 raise subprocess.TimeoutExpired(cmd_args, timeout)
135 if idle_timeout and (now - last_output_time) > idle_timeout:
136 process.kill()
137 try:
138 process.wait(timeout=10)
139 except subprocess.TimeoutExpired:
140 logger.warning(
141 "Process %s did not terminate within 10s after kill",
142 process.pid,
143 )
144 raise subprocess.TimeoutExpired(cmd_args, idle_timeout, output="idle_timeout")
146 ready = sel.select(timeout=1.0)
147 for key, _ in ready:
148 line = key.fileobj.readline() # type: ignore[union-attr]
149 if not line:
150 sel.unregister(key.fileobj)
151 continue
153 last_output_time = time.time()
154 line = line.rstrip("\n")
155 is_stderr = key.fileobj is process.stderr
157 if is_stderr:
158 stderr_lines.append(line)
159 else:
160 stdout_lines.append(line)
162 if stream_callback:
163 stream_callback(line, is_stderr)
165 try:
166 process.wait(timeout=30)
167 except subprocess.TimeoutExpired:
168 logger.warning(
169 "Process %s did not exit within 30s after streams closed, killing",
170 process.pid,
171 )
172 process.kill()
173 try:
174 process.wait(timeout=10)
175 except subprocess.TimeoutExpired:
176 logger.warning(
177 "Process %s did not terminate within 10s after kill",
178 process.pid,
179 )
180 finally:
181 if on_process_end:
182 on_process_end(process)
184 return subprocess.CompletedProcess(
185 cmd_args,
186 process.returncode if process.returncode is not None else -9,
187 stdout="\n".join(stdout_lines),
188 stderr="\n".join(stderr_lines),
189 )