Coverage for src / infra / tools / command_runner.py: 86%

153 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""Standardized subprocess execution with timeout and process-group termination. 

2 

3This module provides CommandRunner for consistent subprocess handling across: 

4- src/beads_client.py 

5- quality_gate.py 

6- validation/legacy_runner.py 

7- validation/spec_runner.py 

8- validation/e2e.py 

9- validation/worktree.py 

10 

11Key features: 

12- Unified timeout handling with process-group termination 

13- Both sync and async execution 

14- Standardized output capture and tailing 

15- Consistent error handling 

16""" 

17 

18from __future__ import annotations 

19 

20import asyncio 

21import os 

22import signal 

23import subprocess 

24import sys 

25import time 

26from dataclasses import dataclass 

27from typing import TYPE_CHECKING 

28 

29from src.core.protocols import CommandRunnerPort 

30 

31if TYPE_CHECKING: 

32 from collections.abc import Mapping 

33 from pathlib import Path 

34 

35# Exit code used for timeout (matches GNU coreutils timeout command) 

36TIMEOUT_EXIT_CODE = 124 

37 

38# Default grace period before SIGKILL after SIGTERM (seconds) 

39DEFAULT_KILL_GRACE_SECONDS = 2.0 

40 

41 

42def _tail(text: str, max_chars: int = 800, max_lines: int = 20) -> str: 

43 """Truncate text to last N lines and M characters. 

44 

45 Args: 

46 text: The text to truncate. 

47 max_chars: Maximum number of characters to keep. 

48 max_lines: Maximum number of lines to keep. 

49 

50 Returns: 

51 The truncated text. 

52 """ 

53 if not text: 

54 return "" 

55 lines = text.splitlines() 

56 if len(lines) > max_lines: 

57 lines = lines[-max_lines:] 

58 clipped = "\n".join(lines) 

59 if len(clipped) > max_chars: 

60 return clipped[-max_chars:] 

61 return clipped 

62 

63 

64@dataclass 

65class CommandResult: 

66 """Result of a command execution. 

67 

68 Provides structured access to command output with helper methods 

69 for truncating long output. 

70 

71 Attributes: 

72 command: The command that was executed (list or shell string). 

73 returncode: Exit code of the command. 

74 stdout: Full stdout output. 

75 stderr: Full stderr output. 

76 duration_seconds: How long the command took. 

77 timed_out: Whether the command was killed due to timeout. 

78 """ 

79 

80 command: list[str] | str 

81 returncode: int 

82 stdout: str = "" 

83 stderr: str = "" 

84 duration_seconds: float = 0.0 

85 timed_out: bool = False 

86 

87 @property 

88 def ok(self) -> bool: 

89 """Whether the command succeeded (returncode == 0).""" 

90 return self.returncode == 0 

91 

92 def stdout_tail(self, max_chars: int = 800, max_lines: int = 20) -> str: 

93 """Get truncated stdout. 

94 

95 Args: 

96 max_chars: Maximum number of characters. 

97 max_lines: Maximum number of lines. 

98 

99 Returns: 

100 Truncated stdout string. 

101 """ 

102 return _tail(self.stdout, max_chars=max_chars, max_lines=max_lines) 

103 

104 def stderr_tail(self, max_chars: int = 800, max_lines: int = 20) -> str: 

105 """Get truncated stderr. 

106 

107 Args: 

108 max_chars: Maximum number of characters. 

109 max_lines: Maximum number of lines. 

110 

111 Returns: 

112 Truncated stderr string. 

113 """ 

114 return _tail(self.stderr, max_chars=max_chars, max_lines=max_lines) 

115 

116 

117class CommandRunner(CommandRunnerPort): 

118 """Runs commands with standardized timeout and process-group handling. 

119 

120 This class provides both sync and async execution methods with: 

121 - Configurable timeout 

122 - Process-group termination (kills child processes on timeout) 

123 - Consistent output capture 

124 - Optional environment variable merging 

125 

126 Example: 

127 runner = CommandRunner(cwd=Path("/my/repo"), timeout_seconds=60.0) 

128 result = runner.run(["pytest", "-v"]) 

129 if not result.ok: 

130 print(f"Failed: {result.stderr_tail()}") 

131 """ 

132 

133 def __init__( 

134 self, 

135 cwd: Path, 

136 timeout_seconds: float | None = None, 

137 kill_grace_seconds: float = DEFAULT_KILL_GRACE_SECONDS, 

138 ): 

139 """Initialize CommandRunner. 

140 

141 Args: 

142 cwd: Working directory for commands. 

143 timeout_seconds: Default timeout for commands. None means no timeout. 

144 kill_grace_seconds: Grace period after SIGTERM before SIGKILL. 

145 """ 

146 self.cwd = cwd 

147 self.timeout_seconds = timeout_seconds 

148 self.kill_grace_seconds = kill_grace_seconds 

149 

150 def run( 

151 self, 

152 cmd: list[str] | str, 

153 env: Mapping[str, str] | None = None, 

154 timeout: float | None = None, 

155 use_process_group: bool | None = None, 

156 shell: bool = False, 

157 cwd: Path | None = None, 

158 ) -> CommandResult: 

159 """Run a command synchronously. 

160 

161 Uses Popen with process-group termination on timeout. When timeout 

162 occurs, sends SIGTERM to the process group, waits kill_grace_seconds, 

163 then sends SIGKILL if still running. 

164 

165 Args: 

166 cmd: Command to run. Can be a list of strings (for non-shell mode) 

167 or a shell string when shell=True. 

168 env: Environment variables (merged with os.environ). 

169 timeout: Override default timeout for this command. 

170 use_process_group: Whether to use process group for termination. 

171 If None (default), uses process group on Unix, disabled on Windows. 

172 When True, creates a new session and kills the entire process group 

173 on timeout using os.killpg(). When False, only terminates the 

174 main process. Note: On Windows, process groups are not supported 

175 and this parameter is ignored (always treated as False). 

176 shell: If True, run command through shell (cmd should be a string). 

177 Defaults to False for backwards compatibility. 

178 cwd: Override working directory for this command. If None, uses self.cwd. 

179 

180 Returns: 

181 CommandResult with execution details. 

182 """ 

183 effective_timeout = timeout if timeout is not None else self.timeout_seconds 

184 effective_cwd = cwd if cwd is not None else self.cwd 

185 merged_env = self._merge_env(env) 

186 

187 # Use process group on Unix for proper child termination (default behavior) 

188 # On Windows, os.killpg is not available, so always disable process groups 

189 effective_use_process_group = ( 

190 use_process_group 

191 if use_process_group is not None 

192 else sys.platform != "win32" 

193 ) and sys.platform != "win32" 

194 

195 start = time.monotonic() 

196 proc = subprocess.Popen( 

197 cmd, 

198 cwd=effective_cwd, 

199 env=merged_env, 

200 stdout=subprocess.PIPE, 

201 stderr=subprocess.PIPE, 

202 text=True, 

203 start_new_session=effective_use_process_group, 

204 shell=shell, 

205 ) 

206 

207 try: 

208 stdout, stderr = proc.communicate(timeout=effective_timeout) 

209 duration = time.monotonic() - start 

210 return CommandResult( 

211 command=cmd, 

212 returncode=proc.returncode, 

213 stdout=stdout or "", 

214 stderr=stderr or "", 

215 duration_seconds=duration, 

216 timed_out=False, 

217 ) 

218 except subprocess.TimeoutExpired: 

219 # Terminate the process group properly 

220 duration = time.monotonic() - start 

221 stdout, stderr = self._terminate_process_sync( 

222 proc, effective_use_process_group 

223 ) 

224 return CommandResult( 

225 command=cmd, 

226 returncode=TIMEOUT_EXIT_CODE, 

227 stdout=stdout, 

228 stderr=stderr, 

229 duration_seconds=duration, 

230 timed_out=True, 

231 ) 

232 

233 def _terminate_process_sync( 

234 self, 

235 proc: subprocess.Popen[str], 

236 use_process_group: bool, 

237 ) -> tuple[str, str]: 

238 """Terminate a process and all its children synchronously. 

239 

240 Sends SIGTERM to the process group, waits kill_grace_seconds, 

241 then sends SIGKILL if still running. 

242 

243 Args: 

244 proc: The Popen process to terminate. 

245 use_process_group: Whether to kill the entire process group. 

246 

247 Returns: 

248 Tuple of (stdout, stderr) captured before termination. 

249 """ 

250 pgid = proc.pid if use_process_group else None 

251 

252 # Send SIGTERM to process group 

253 if pgid is not None: 

254 try: 

255 os.killpg(pgid, signal.SIGTERM) 

256 except (ProcessLookupError, PermissionError): 

257 pass 

258 else: 

259 proc.terminate() 

260 

261 # Wait for grace period 

262 try: 

263 stdout, stderr = proc.communicate(timeout=self.kill_grace_seconds) 

264 return stdout or "", stderr or "" 

265 except subprocess.TimeoutExpired: 

266 pass 

267 

268 # Force kill if still running 

269 if pgid is not None: 

270 try: 

271 os.killpg(pgid, signal.SIGKILL) 

272 except (ProcessLookupError, PermissionError): 

273 pass 

274 else: 

275 proc.kill() 

276 

277 # Wait for process to exit and capture any remaining output 

278 # Use a bounded timeout to avoid hanging if children hold pipes open 

279 # (can happen when use_process_group=False and children survive) 

280 try: 

281 stdout, stderr = proc.communicate(timeout=self.kill_grace_seconds) 

282 return stdout or "", stderr or "" 

283 except subprocess.TimeoutExpired: 

284 # Children are holding pipes open; close them and give up on output 

285 if proc.stdout: 

286 proc.stdout.close() 

287 if proc.stderr: 

288 proc.stderr.close() 

289 proc.wait() 

290 return "", "" 

291 

292 async def run_async( 

293 self, 

294 cmd: list[str] | str, 

295 env: Mapping[str, str] | None = None, 

296 timeout: float | None = None, 

297 use_process_group: bool | None = None, 

298 shell: bool = False, 

299 cwd: Path | None = None, 

300 ) -> CommandResult: 

301 """Run a command asynchronously. 

302 

303 Uses asyncio subprocess for proper async timeout handling with 

304 process-group termination. On timeout, sends SIGTERM to the process 

305 group, waits kill_grace_seconds, then sends SIGKILL if still running. 

306 

307 Args: 

308 cmd: Command to run. Can be a list of strings (for non-shell mode) 

309 or a shell string when shell=True. 

310 env: Environment variables (merged with os.environ). 

311 timeout: Override default timeout for this command. 

312 use_process_group: Whether to use process group for termination. 

313 If None (default), uses process group on Unix, disabled on Windows. 

314 When True, creates a new session (os.setsid) and kills the entire 

315 process group on timeout using os.killpg(). When False, only 

316 terminates the main process. Note: On Windows, process groups 

317 are not supported and this parameter is ignored (always treated 

318 as False). 

319 shell: If True, run command through shell (cmd should be a string). 

320 Defaults to False for backwards compatibility. 

321 cwd: Override working directory for this command. If None, uses self.cwd. 

322 

323 Returns: 

324 CommandResult with execution details. 

325 """ 

326 effective_timeout = timeout if timeout is not None else self.timeout_seconds 

327 effective_cwd = cwd if cwd is not None else self.cwd 

328 merged_env = self._merge_env(env) 

329 

330 # Use process group on Unix for proper child termination (default behavior) 

331 # On Windows, os.killpg is not available, so always disable process groups 

332 effective_use_process_group = ( 

333 use_process_group 

334 if use_process_group is not None 

335 else sys.platform != "win32" 

336 ) and sys.platform != "win32" 

337 

338 start = time.monotonic() 

339 try: 

340 if shell: 

341 # For shell mode, use create_subprocess_shell with command as string 

342 if isinstance(cmd, list): 

343 cmd = " ".join(cmd) 

344 proc = await asyncio.create_subprocess_shell( 

345 cmd, 

346 stdout=asyncio.subprocess.PIPE, 

347 stderr=asyncio.subprocess.PIPE, 

348 cwd=effective_cwd, 

349 env=merged_env, 

350 start_new_session=effective_use_process_group, 

351 ) 

352 else: 

353 # For non-shell mode, use create_subprocess_exec 

354 if isinstance(cmd, str): 

355 raise ValueError("cmd must be a list when shell=False") 

356 proc = await asyncio.create_subprocess_exec( 

357 *cmd, 

358 stdout=asyncio.subprocess.PIPE, 

359 stderr=asyncio.subprocess.PIPE, 

360 cwd=effective_cwd, 

361 env=merged_env, 

362 start_new_session=effective_use_process_group, 

363 ) 

364 

365 try: 

366 stdout_bytes, stderr_bytes = await asyncio.wait_for( 

367 proc.communicate(), 

368 timeout=effective_timeout, 

369 ) 

370 duration = time.monotonic() - start 

371 return CommandResult( 

372 command=cmd, 

373 returncode=proc.returncode or 0, 

374 stdout=stdout_bytes.decode() if stdout_bytes else "", 

375 stderr=stderr_bytes.decode() if stderr_bytes else "", 

376 duration_seconds=duration, 

377 timed_out=False, 

378 ) 

379 except TimeoutError: 

380 duration = time.monotonic() - start 

381 await self._terminate_process(proc, effective_use_process_group) 

382 return CommandResult( 

383 command=cmd, 

384 returncode=TIMEOUT_EXIT_CODE, 

385 stdout="", 

386 stderr="", 

387 duration_seconds=duration, 

388 timed_out=True, 

389 ) 

390 except Exception as e: 

391 duration = time.monotonic() - start 

392 return CommandResult( 

393 command=cmd, 

394 returncode=1, 

395 stdout="", 

396 stderr=str(e), 

397 duration_seconds=duration, 

398 timed_out=False, 

399 ) 

400 

401 async def _terminate_process( 

402 self, 

403 proc: asyncio.subprocess.Process, 

404 use_process_group: bool, 

405 ) -> None: 

406 """Terminate a process and all its children. 

407 

408 Sends SIGTERM to the process group, waits for grace period, 

409 then sends SIGKILL if still running. 

410 

411 Args: 

412 proc: The process to terminate. 

413 use_process_group: Whether to kill the entire process group. 

414 """ 

415 if proc.returncode is not None: 

416 return 

417 

418 pgid = proc.pid if use_process_group else None 

419 

420 try: 

421 # Send SIGTERM to process group 

422 if pgid is not None: 

423 try: 

424 os.killpg(pgid, signal.SIGTERM) 

425 except (ProcessLookupError, PermissionError): 

426 pass 

427 else: 

428 proc.terminate() 

429 

430 # Wait for grace period 

431 try: 

432 await asyncio.wait_for(proc.wait(), timeout=self.kill_grace_seconds) 

433 except TimeoutError: 

434 pass 

435 

436 # Force kill if still running 

437 if pgid is not None: 

438 try: 

439 os.killpg(pgid, signal.SIGKILL) 

440 except (ProcessLookupError, PermissionError): 

441 pass 

442 elif proc.returncode is None: 

443 proc.kill() 

444 

445 # Ensure process is fully reaped 

446 if proc.returncode is None: 

447 await proc.wait() 

448 except ProcessLookupError: 

449 pass 

450 

451 def _merge_env(self, env: Mapping[str, str] | None) -> dict[str, str]: 

452 """Merge provided env with os.environ. 

453 

454 Args: 

455 env: Environment variables to merge. 

456 

457 Returns: 

458 Merged environment dictionary. 

459 """ 

460 if env is None: 

461 return dict(os.environ) 

462 return {**os.environ, **env} 

463 

464 def _decode_output(self, data: bytes | str | None) -> str: 

465 """Decode subprocess output which may be bytes, str, or None. 

466 

467 Args: 

468 data: Output data from subprocess. 

469 

470 Returns: 

471 Decoded string. 

472 """ 

473 if data is None: 

474 return "" 

475 if isinstance(data, str): 

476 return data 

477 return data.decode() 

478 

479 

480def run_command( 

481 cmd: list[str] | str, 

482 cwd: Path, 

483 env: Mapping[str, str] | None = None, 

484 timeout_seconds: float | None = None, 

485 shell: bool = False, 

486) -> CommandResult: 

487 """Convenience function for running a single command. 

488 

489 Args: 

490 cmd: Command to run. Can be a list of strings (for non-shell mode) 

491 or a shell string when shell=True. 

492 cwd: Working directory. 

493 env: Environment variables (merged with os.environ). 

494 timeout_seconds: Timeout for the command. 

495 shell: If True, run command through shell (cmd should be a string). 

496 Defaults to False for backwards compatibility. 

497 

498 Returns: 

499 CommandResult with execution details. 

500 """ 

501 runner = CommandRunner(cwd=cwd, timeout_seconds=timeout_seconds) 

502 return runner.run(cmd, env=env, shell=shell) 

503 

504 

505async def run_command_async( 

506 cmd: list[str] | str, 

507 cwd: Path, 

508 env: Mapping[str, str] | None = None, 

509 timeout_seconds: float | None = None, 

510 shell: bool = False, 

511) -> CommandResult: 

512 """Convenience function for running a single command asynchronously. 

513 

514 Args: 

515 cmd: Command to run. Can be a list of strings (for non-shell mode) 

516 or a shell string when shell=True. 

517 cwd: Working directory. 

518 env: Environment variables (merged with os.environ). 

519 timeout_seconds: Timeout for the command. 

520 shell: If True, run command through shell (cmd should be a string). 

521 Defaults to False for backwards compatibility. 

522 

523 Returns: 

524 CommandResult with execution details. 

525 """ 

526 runner = CommandRunner(cwd=cwd, timeout_seconds=timeout_seconds) 

527 return await runner.run_async(cmd, env=env, shell=shell)