Coverage for little_loops / subprocess_utils.py: 21%

80 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""Subprocess utilities for Claude CLI invocation. 

2 

3Provides shared functionality for running Claude CLI commands with 

4real-time output streaming, timeout handling, and context handoff detection. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10import os 

11import re 

12import selectors 

13import subprocess 

14import time 

15from collections.abc import Callable 

16from pathlib import Path 

17 

18logger = logging.getLogger(__name__) 

19 

20# Callback type: (line: str, is_stderr: bool) -> None 

21OutputCallback = Callable[[str, bool], None] 

22 

23# Process lifecycle callback: (process: Popen) -> None 

24ProcessCallback = Callable[[subprocess.Popen[str]], None] 

25 

26# Context handoff detection pattern 

27CONTEXT_HANDOFF_PATTERN = re.compile(r"CONTEXT_HANDOFF:\s*Ready for fresh session") 

28CONTINUATION_PROMPT_PATH = Path(".claude/ll-continue-prompt.md") 

29 

30 

31def detect_context_handoff(output: str) -> bool: 

32 """Check if output contains a context handoff signal. 

33 

34 Args: 

35 output: Command output to check 

36 

37 Returns: 

38 True if context handoff was signaled 

39 """ 

40 return bool(CONTEXT_HANDOFF_PATTERN.search(output)) 

41 

42 

43def read_continuation_prompt(repo_path: Path | None = None) -> str | None: 

44 """Read the continuation prompt file if it exists. 

45 

46 Args: 

47 repo_path: Optional repository root path 

48 

49 Returns: 

50 Contents of continuation prompt, or None if not found 

51 """ 

52 prompt_path = (repo_path or Path.cwd()) / CONTINUATION_PROMPT_PATH 

53 if prompt_path.exists(): 

54 return prompt_path.read_text() 

55 return None 

56 

57 

58def run_claude_command( 

59 command: str, 

60 timeout: int = 3600, 

61 working_dir: Path | None = None, 

62 stream_callback: OutputCallback | None = None, 

63 on_process_start: ProcessCallback | None = None, 

64 on_process_end: ProcessCallback | None = None, 

65 idle_timeout: int = 0, 

66) -> subprocess.CompletedProcess[str]: 

67 """Invoke Claude CLI command with real-time output streaming. 

68 

69 Args: 

70 command: Command to pass to Claude CLI 

71 timeout: Timeout in seconds (0 for no timeout) 

72 working_dir: Optional working directory for the command 

73 stream_callback: Optional callback for streaming output lines. 

74 Called with (line, is_stderr) for each line of output. 

75 on_process_start: Optional callback invoked after process starts. 

76 Receives the Popen object for tracking/management. 

77 on_process_end: Optional callback invoked after process completes. 

78 Receives the Popen object. Called in finally block. 

79 idle_timeout: Kill process if no output for this many seconds (0 to disable). 

80 

81 Returns: 

82 CompletedProcess with stdout/stderr captured 

83 

84 Raises: 

85 subprocess.TimeoutExpired: If command exceeds timeout or idle timeout. 

86 When triggered by idle timeout, the output field is set to "idle_timeout". 

87 """ 

88 cmd_args = ["claude", "--dangerously-skip-permissions", "-p", command] 

89 

90 # Set environment to keep Claude in the project working directory (BUG-007) 

91 # This helps prevent file writes from leaking to the main repo in worktrees 

92 env = os.environ.copy() 

93 env["CLAUDE_BASH_MAINTAIN_PROJECT_WORKING_DIR"] = "1" 

94 

95 process = subprocess.Popen( 

96 cmd_args, 

97 stdout=subprocess.PIPE, 

98 stderr=subprocess.PIPE, 

99 text=True, 

100 bufsize=1, # Line buffered 

101 cwd=working_dir, 

102 env=env, 

103 ) 

104 

105 if on_process_start: 

106 on_process_start(process) 

107 

108 stdout_lines: list[str] = [] 

109 stderr_lines: list[str] = [] 

110 

111 # Use selectors for non-blocking read from both streams 

112 with selectors.DefaultSelector() as sel: 

113 if process.stdout: 

114 sel.register(process.stdout, selectors.EVENT_READ) 

115 if process.stderr: 

116 sel.register(process.stderr, selectors.EVENT_READ) 

117 

118 start_time = time.time() 

119 last_output_time = start_time 

120 

121 try: 

122 while sel.get_map(): 

123 now = time.time() 

124 if timeout and (now - start_time) > timeout: 

125 process.kill() 

126 try: 

127 process.wait(timeout=10) 

128 except subprocess.TimeoutExpired: 

129 logger.warning( 

130 "Process %s did not terminate within 10s after kill", 

131 process.pid, 

132 ) 

133 raise subprocess.TimeoutExpired(cmd_args, timeout) 

134 

135 if idle_timeout and (now - last_output_time) > idle_timeout: 

136 process.kill() 

137 try: 

138 process.wait(timeout=10) 

139 except subprocess.TimeoutExpired: 

140 logger.warning( 

141 "Process %s did not terminate within 10s after kill", 

142 process.pid, 

143 ) 

144 raise subprocess.TimeoutExpired(cmd_args, idle_timeout, output="idle_timeout") 

145 

146 ready = sel.select(timeout=1.0) 

147 for key, _ in ready: 

148 line = key.fileobj.readline() # type: ignore[union-attr] 

149 if not line: 

150 sel.unregister(key.fileobj) 

151 continue 

152 

153 last_output_time = time.time() 

154 line = line.rstrip("\n") 

155 is_stderr = key.fileobj is process.stderr 

156 

157 if is_stderr: 

158 stderr_lines.append(line) 

159 else: 

160 stdout_lines.append(line) 

161 

162 if stream_callback: 

163 stream_callback(line, is_stderr) 

164 

165 try: 

166 process.wait(timeout=30) 

167 except subprocess.TimeoutExpired: 

168 logger.warning( 

169 "Process %s did not exit within 30s after streams closed, killing", 

170 process.pid, 

171 ) 

172 process.kill() 

173 try: 

174 process.wait(timeout=10) 

175 except subprocess.TimeoutExpired: 

176 logger.warning( 

177 "Process %s did not terminate within 10s after kill", 

178 process.pid, 

179 ) 

180 finally: 

181 if on_process_end: 

182 on_process_end(process) 

183 

184 return subprocess.CompletedProcess( 

185 cmd_args, 

186 process.returncode if process.returncode is not None else -9, 

187 stdout="\n".join(stdout_lines), 

188 stderr="\n".join(stderr_lines), 

189 )