Coverage for src / infra / tools / command_runner.py: 86%
153 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""Standardized subprocess execution with timeout and process-group termination.
3This module provides CommandRunner for consistent subprocess handling across:
4- src/beads_client.py
5- quality_gate.py
6- validation/legacy_runner.py
7- validation/spec_runner.py
8- validation/e2e.py
9- validation/worktree.py
11Key features:
12- Unified timeout handling with process-group termination
13- Both sync and async execution
14- Standardized output capture and tailing
15- Consistent error handling
16"""
18from __future__ import annotations
20import asyncio
21import os
22import signal
23import subprocess
24import sys
25import time
26from dataclasses import dataclass
27from typing import TYPE_CHECKING
29from src.core.protocols import CommandRunnerPort
31if TYPE_CHECKING:
32 from collections.abc import Mapping
33 from pathlib import Path
35# Exit code used for timeout (matches GNU coreutils timeout command)
36TIMEOUT_EXIT_CODE = 124
38# Default grace period before SIGKILL after SIGTERM (seconds)
39DEFAULT_KILL_GRACE_SECONDS = 2.0
42def _tail(text: str, max_chars: int = 800, max_lines: int = 20) -> str:
43 """Truncate text to last N lines and M characters.
45 Args:
46 text: The text to truncate.
47 max_chars: Maximum number of characters to keep.
48 max_lines: Maximum number of lines to keep.
50 Returns:
51 The truncated text.
52 """
53 if not text:
54 return ""
55 lines = text.splitlines()
56 if len(lines) > max_lines:
57 lines = lines[-max_lines:]
58 clipped = "\n".join(lines)
59 if len(clipped) > max_chars:
60 return clipped[-max_chars:]
61 return clipped
64@dataclass
65class CommandResult:
66 """Result of a command execution.
68 Provides structured access to command output with helper methods
69 for truncating long output.
71 Attributes:
72 command: The command that was executed (list or shell string).
73 returncode: Exit code of the command.
74 stdout: Full stdout output.
75 stderr: Full stderr output.
76 duration_seconds: How long the command took.
77 timed_out: Whether the command was killed due to timeout.
78 """
80 command: list[str] | str
81 returncode: int
82 stdout: str = ""
83 stderr: str = ""
84 duration_seconds: float = 0.0
85 timed_out: bool = False
87 @property
88 def ok(self) -> bool:
89 """Whether the command succeeded (returncode == 0)."""
90 return self.returncode == 0
92 def stdout_tail(self, max_chars: int = 800, max_lines: int = 20) -> str:
93 """Get truncated stdout.
95 Args:
96 max_chars: Maximum number of characters.
97 max_lines: Maximum number of lines.
99 Returns:
100 Truncated stdout string.
101 """
102 return _tail(self.stdout, max_chars=max_chars, max_lines=max_lines)
104 def stderr_tail(self, max_chars: int = 800, max_lines: int = 20) -> str:
105 """Get truncated stderr.
107 Args:
108 max_chars: Maximum number of characters.
109 max_lines: Maximum number of lines.
111 Returns:
112 Truncated stderr string.
113 """
114 return _tail(self.stderr, max_chars=max_chars, max_lines=max_lines)
117class CommandRunner(CommandRunnerPort):
118 """Runs commands with standardized timeout and process-group handling.
120 This class provides both sync and async execution methods with:
121 - Configurable timeout
122 - Process-group termination (kills child processes on timeout)
123 - Consistent output capture
124 - Optional environment variable merging
126 Example:
127 runner = CommandRunner(cwd=Path("/my/repo"), timeout_seconds=60.0)
128 result = runner.run(["pytest", "-v"])
129 if not result.ok:
130 print(f"Failed: {result.stderr_tail()}")
131 """
133 def __init__(
134 self,
135 cwd: Path,
136 timeout_seconds: float | None = None,
137 kill_grace_seconds: float = DEFAULT_KILL_GRACE_SECONDS,
138 ):
139 """Initialize CommandRunner.
141 Args:
142 cwd: Working directory for commands.
143 timeout_seconds: Default timeout for commands. None means no timeout.
144 kill_grace_seconds: Grace period after SIGTERM before SIGKILL.
145 """
146 self.cwd = cwd
147 self.timeout_seconds = timeout_seconds
148 self.kill_grace_seconds = kill_grace_seconds
150 def run(
151 self,
152 cmd: list[str] | str,
153 env: Mapping[str, str] | None = None,
154 timeout: float | None = None,
155 use_process_group: bool | None = None,
156 shell: bool = False,
157 cwd: Path | None = None,
158 ) -> CommandResult:
159 """Run a command synchronously.
161 Uses Popen with process-group termination on timeout. When timeout
162 occurs, sends SIGTERM to the process group, waits kill_grace_seconds,
163 then sends SIGKILL if still running.
165 Args:
166 cmd: Command to run. Can be a list of strings (for non-shell mode)
167 or a shell string when shell=True.
168 env: Environment variables (merged with os.environ).
169 timeout: Override default timeout for this command.
170 use_process_group: Whether to use process group for termination.
171 If None (default), uses process group on Unix, disabled on Windows.
172 When True, creates a new session and kills the entire process group
173 on timeout using os.killpg(). When False, only terminates the
174 main process. Note: On Windows, process groups are not supported
175 and this parameter is ignored (always treated as False).
176 shell: If True, run command through shell (cmd should be a string).
177 Defaults to False for backwards compatibility.
178 cwd: Override working directory for this command. If None, uses self.cwd.
180 Returns:
181 CommandResult with execution details.
182 """
183 effective_timeout = timeout if timeout is not None else self.timeout_seconds
184 effective_cwd = cwd if cwd is not None else self.cwd
185 merged_env = self._merge_env(env)
187 # Use process group on Unix for proper child termination (default behavior)
188 # On Windows, os.killpg is not available, so always disable process groups
189 effective_use_process_group = (
190 use_process_group
191 if use_process_group is not None
192 else sys.platform != "win32"
193 ) and sys.platform != "win32"
195 start = time.monotonic()
196 proc = subprocess.Popen(
197 cmd,
198 cwd=effective_cwd,
199 env=merged_env,
200 stdout=subprocess.PIPE,
201 stderr=subprocess.PIPE,
202 text=True,
203 start_new_session=effective_use_process_group,
204 shell=shell,
205 )
207 try:
208 stdout, stderr = proc.communicate(timeout=effective_timeout)
209 duration = time.monotonic() - start
210 return CommandResult(
211 command=cmd,
212 returncode=proc.returncode,
213 stdout=stdout or "",
214 stderr=stderr or "",
215 duration_seconds=duration,
216 timed_out=False,
217 )
218 except subprocess.TimeoutExpired:
219 # Terminate the process group properly
220 duration = time.monotonic() - start
221 stdout, stderr = self._terminate_process_sync(
222 proc, effective_use_process_group
223 )
224 return CommandResult(
225 command=cmd,
226 returncode=TIMEOUT_EXIT_CODE,
227 stdout=stdout,
228 stderr=stderr,
229 duration_seconds=duration,
230 timed_out=True,
231 )
233 def _terminate_process_sync(
234 self,
235 proc: subprocess.Popen[str],
236 use_process_group: bool,
237 ) -> tuple[str, str]:
238 """Terminate a process and all its children synchronously.
240 Sends SIGTERM to the process group, waits kill_grace_seconds,
241 then sends SIGKILL if still running.
243 Args:
244 proc: The Popen process to terminate.
245 use_process_group: Whether to kill the entire process group.
247 Returns:
248 Tuple of (stdout, stderr) captured before termination.
249 """
250 pgid = proc.pid if use_process_group else None
252 # Send SIGTERM to process group
253 if pgid is not None:
254 try:
255 os.killpg(pgid, signal.SIGTERM)
256 except (ProcessLookupError, PermissionError):
257 pass
258 else:
259 proc.terminate()
261 # Wait for grace period
262 try:
263 stdout, stderr = proc.communicate(timeout=self.kill_grace_seconds)
264 return stdout or "", stderr or ""
265 except subprocess.TimeoutExpired:
266 pass
268 # Force kill if still running
269 if pgid is not None:
270 try:
271 os.killpg(pgid, signal.SIGKILL)
272 except (ProcessLookupError, PermissionError):
273 pass
274 else:
275 proc.kill()
277 # Wait for process to exit and capture any remaining output
278 # Use a bounded timeout to avoid hanging if children hold pipes open
279 # (can happen when use_process_group=False and children survive)
280 try:
281 stdout, stderr = proc.communicate(timeout=self.kill_grace_seconds)
282 return stdout or "", stderr or ""
283 except subprocess.TimeoutExpired:
284 # Children are holding pipes open; close them and give up on output
285 if proc.stdout:
286 proc.stdout.close()
287 if proc.stderr:
288 proc.stderr.close()
289 proc.wait()
290 return "", ""
292 async def run_async(
293 self,
294 cmd: list[str] | str,
295 env: Mapping[str, str] | None = None,
296 timeout: float | None = None,
297 use_process_group: bool | None = None,
298 shell: bool = False,
299 cwd: Path | None = None,
300 ) -> CommandResult:
301 """Run a command asynchronously.
303 Uses asyncio subprocess for proper async timeout handling with
304 process-group termination. On timeout, sends SIGTERM to the process
305 group, waits kill_grace_seconds, then sends SIGKILL if still running.
307 Args:
308 cmd: Command to run. Can be a list of strings (for non-shell mode)
309 or a shell string when shell=True.
310 env: Environment variables (merged with os.environ).
311 timeout: Override default timeout for this command.
312 use_process_group: Whether to use process group for termination.
313 If None (default), uses process group on Unix, disabled on Windows.
314 When True, creates a new session (os.setsid) and kills the entire
315 process group on timeout using os.killpg(). When False, only
316 terminates the main process. Note: On Windows, process groups
317 are not supported and this parameter is ignored (always treated
318 as False).
319 shell: If True, run command through shell (cmd should be a string).
320 Defaults to False for backwards compatibility.
321 cwd: Override working directory for this command. If None, uses self.cwd.
323 Returns:
324 CommandResult with execution details.
325 """
326 effective_timeout = timeout if timeout is not None else self.timeout_seconds
327 effective_cwd = cwd if cwd is not None else self.cwd
328 merged_env = self._merge_env(env)
330 # Use process group on Unix for proper child termination (default behavior)
331 # On Windows, os.killpg is not available, so always disable process groups
332 effective_use_process_group = (
333 use_process_group
334 if use_process_group is not None
335 else sys.platform != "win32"
336 ) and sys.platform != "win32"
338 start = time.monotonic()
339 try:
340 if shell:
341 # For shell mode, use create_subprocess_shell with command as string
342 if isinstance(cmd, list):
343 cmd = " ".join(cmd)
344 proc = await asyncio.create_subprocess_shell(
345 cmd,
346 stdout=asyncio.subprocess.PIPE,
347 stderr=asyncio.subprocess.PIPE,
348 cwd=effective_cwd,
349 env=merged_env,
350 start_new_session=effective_use_process_group,
351 )
352 else:
353 # For non-shell mode, use create_subprocess_exec
354 if isinstance(cmd, str):
355 raise ValueError("cmd must be a list when shell=False")
356 proc = await asyncio.create_subprocess_exec(
357 *cmd,
358 stdout=asyncio.subprocess.PIPE,
359 stderr=asyncio.subprocess.PIPE,
360 cwd=effective_cwd,
361 env=merged_env,
362 start_new_session=effective_use_process_group,
363 )
365 try:
366 stdout_bytes, stderr_bytes = await asyncio.wait_for(
367 proc.communicate(),
368 timeout=effective_timeout,
369 )
370 duration = time.monotonic() - start
371 return CommandResult(
372 command=cmd,
373 returncode=proc.returncode or 0,
374 stdout=stdout_bytes.decode() if stdout_bytes else "",
375 stderr=stderr_bytes.decode() if stderr_bytes else "",
376 duration_seconds=duration,
377 timed_out=False,
378 )
379 except TimeoutError:
380 duration = time.monotonic() - start
381 await self._terminate_process(proc, effective_use_process_group)
382 return CommandResult(
383 command=cmd,
384 returncode=TIMEOUT_EXIT_CODE,
385 stdout="",
386 stderr="",
387 duration_seconds=duration,
388 timed_out=True,
389 )
390 except Exception as e:
391 duration = time.monotonic() - start
392 return CommandResult(
393 command=cmd,
394 returncode=1,
395 stdout="",
396 stderr=str(e),
397 duration_seconds=duration,
398 timed_out=False,
399 )
401 async def _terminate_process(
402 self,
403 proc: asyncio.subprocess.Process,
404 use_process_group: bool,
405 ) -> None:
406 """Terminate a process and all its children.
408 Sends SIGTERM to the process group, waits for grace period,
409 then sends SIGKILL if still running.
411 Args:
412 proc: The process to terminate.
413 use_process_group: Whether to kill the entire process group.
414 """
415 if proc.returncode is not None:
416 return
418 pgid = proc.pid if use_process_group else None
420 try:
421 # Send SIGTERM to process group
422 if pgid is not None:
423 try:
424 os.killpg(pgid, signal.SIGTERM)
425 except (ProcessLookupError, PermissionError):
426 pass
427 else:
428 proc.terminate()
430 # Wait for grace period
431 try:
432 await asyncio.wait_for(proc.wait(), timeout=self.kill_grace_seconds)
433 except TimeoutError:
434 pass
436 # Force kill if still running
437 if pgid is not None:
438 try:
439 os.killpg(pgid, signal.SIGKILL)
440 except (ProcessLookupError, PermissionError):
441 pass
442 elif proc.returncode is None:
443 proc.kill()
445 # Ensure process is fully reaped
446 if proc.returncode is None:
447 await proc.wait()
448 except ProcessLookupError:
449 pass
451 def _merge_env(self, env: Mapping[str, str] | None) -> dict[str, str]:
452 """Merge provided env with os.environ.
454 Args:
455 env: Environment variables to merge.
457 Returns:
458 Merged environment dictionary.
459 """
460 if env is None:
461 return dict(os.environ)
462 return {**os.environ, **env}
464 def _decode_output(self, data: bytes | str | None) -> str:
465 """Decode subprocess output which may be bytes, str, or None.
467 Args:
468 data: Output data from subprocess.
470 Returns:
471 Decoded string.
472 """
473 if data is None:
474 return ""
475 if isinstance(data, str):
476 return data
477 return data.decode()
480def run_command(
481 cmd: list[str] | str,
482 cwd: Path,
483 env: Mapping[str, str] | None = None,
484 timeout_seconds: float | None = None,
485 shell: bool = False,
486) -> CommandResult:
487 """Convenience function for running a single command.
489 Args:
490 cmd: Command to run. Can be a list of strings (for non-shell mode)
491 or a shell string when shell=True.
492 cwd: Working directory.
493 env: Environment variables (merged with os.environ).
494 timeout_seconds: Timeout for the command.
495 shell: If True, run command through shell (cmd should be a string).
496 Defaults to False for backwards compatibility.
498 Returns:
499 CommandResult with execution details.
500 """
501 runner = CommandRunner(cwd=cwd, timeout_seconds=timeout_seconds)
502 return runner.run(cmd, env=env, shell=shell)
505async def run_command_async(
506 cmd: list[str] | str,
507 cwd: Path,
508 env: Mapping[str, str] | None = None,
509 timeout_seconds: float | None = None,
510 shell: bool = False,
511) -> CommandResult:
512 """Convenience function for running a single command asynchronously.
514 Args:
515 cmd: Command to run. Can be a list of strings (for non-shell mode)
516 or a shell string when shell=True.
517 cwd: Working directory.
518 env: Environment variables (merged with os.environ).
519 timeout_seconds: Timeout for the command.
520 shell: If True, run command through shell (cmd should be a string).
521 Defaults to False for backwards compatibility.
523 Returns:
524 CommandResult with execution details.
525 """
526 runner = CommandRunner(cwd=cwd, timeout_seconds=timeout_seconds)
527 return await runner.run_async(cmd, env=env, shell=shell)