Coverage for src / infra / clients / cerberus_gate_cli.py: 29%
129 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""Cerberus review-gate CLI subprocess management.
3This module provides CerberusGateCLI for managing subprocess interactions
4with the review-gate CLI binary. It handles:
5- Binary validation (exists and is executable)
6- Environment variable merging (CLAUDE_SESSION_ID)
7- Subprocess spawn/wait/resolve with timeout handling
8- Exit code interpretation
10This is a low-level component extracted from DefaultReviewer to enable
11independent testing of subprocess logic.
12"""
14from __future__ import annotations
16import json
17import os
18import shutil
19from dataclasses import dataclass, field
20from pathlib import Path
21from typing import TYPE_CHECKING
23if TYPE_CHECKING:
24 from collections.abc import Sequence
26 from src.infra.tools.command_runner import CommandRunner
29@dataclass
30class SpawnResult:
31 """Result of spawning a code review.
33 Attributes:
34 success: Whether spawn succeeded.
35 timed_out: Whether spawn timed out.
36 error_detail: Error message if spawn failed.
37 already_active: Whether failure was due to an existing active gate.
38 """
40 success: bool
41 timed_out: bool = False
42 error_detail: str = ""
43 already_active: bool = False
46@dataclass
47class WaitResult:
48 """Result of waiting for review completion.
50 Attributes:
51 returncode: Exit code from wait command.
52 stdout: Standard output (JSON).
53 stderr: Standard error.
54 timed_out: Whether wait timed out.
55 session_dir: Path to session directory (extracted from JSON output).
56 """
58 returncode: int
59 stdout: str = ""
60 stderr: str = ""
61 timed_out: bool = False
62 session_dir: Path | None = None
65@dataclass
66class ResolveResult:
67 """Result of resolving (clearing) an active gate.
69 Attributes:
70 success: Whether resolve succeeded.
71 error_detail: Error message if resolve failed.
72 """
74 success: bool
75 error_detail: str = ""
78@dataclass
79class CerberusGateCLI:
80 """Low-level CLI subprocess management for review-gate.
82 Handles subprocess spawn/wait/resolve, binary validation, env merging,
83 and timeout handling. Performs minimal JSON parsing to extract transport
84 fields (e.g., session_dir) but does not map exit codes to domain results
85 - that responsibility belongs to DefaultReviewer.
87 Attributes:
88 repo_path: Working directory for subprocess execution.
89 bin_path: Optional path to directory containing review-gate binary.
90 If None, uses bare "review-gate" from PATH.
91 env: Additional environment variables to merge with os.environ.
92 """
94 repo_path: Path
95 bin_path: Path | None = None
96 env: dict[str, str] = field(default_factory=dict)
98 def _review_gate_bin(self) -> str:
99 """Get the path to the review-gate binary."""
100 if self.bin_path is not None:
101 return str(self.bin_path / "review-gate")
102 return "review-gate"
104 def validate_binary(self) -> str | None:
105 """Validate that the review-gate binary exists and is executable.
107 Uses the merged env's PATH (respecting self.env) when checking for
108 the binary to avoid false negatives when callers inject PATH via cerberus_env.
110 Returns:
111 None if the binary is valid, or an error message if not.
112 """
113 if self.bin_path is not None:
114 binary_path = self.bin_path / "review-gate"
115 if not binary_path.exists():
116 return f"review-gate binary not found at {binary_path}"
117 if not os.access(binary_path, os.X_OK):
118 return f"review-gate binary at {binary_path} is not executable"
119 else:
120 # Build effective PATH by merging self.env with current os.environ
121 if "PATH" in self.env:
122 effective_path = (
123 self.env["PATH"] + os.pathsep + os.environ.get("PATH", "")
124 )
125 else:
126 effective_path = os.environ.get("PATH", "")
127 if shutil.which("review-gate", path=effective_path) is None:
128 return "review-gate binary not found in PATH"
129 return None
131 def build_env(self, claude_session_id: str | None) -> dict[str, str]:
132 """Build environment dict with CLAUDE_SESSION_ID merged.
134 Args:
135 claude_session_id: Session ID to use, or None to use from env.
137 Returns:
138 Merged environment dict with CLAUDE_SESSION_ID set.
139 """
140 merged = dict(self.env)
141 claude_session = (
142 claude_session_id
143 or merged.get("CLAUDE_SESSION_ID")
144 or os.environ.get("CLAUDE_SESSION_ID")
145 )
146 if claude_session:
147 merged["CLAUDE_SESSION_ID"] = claude_session
148 return merged
150 async def spawn_code_review(
151 self,
152 diff_range: str,
153 runner: CommandRunner,
154 env: dict[str, str],
155 timeout: int,
156 spawn_args: tuple[str, ...] = (),
157 context_file: Path | None = None,
158 commit_shas: Sequence[str] | None = None,
159 ) -> SpawnResult:
160 """Spawn a code review subprocess.
162 Args:
163 diff_range: Git diff range to review (e.g., "baseline..HEAD").
164 runner: CommandRunner instance for subprocess execution.
165 env: Environment variables for the command.
166 timeout: Timeout in seconds.
167 spawn_args: Additional arguments for spawn-code-review.
168 context_file: Optional context file path.
169 commit_shas: Optional list of commit SHAs for commit-based review.
171 Returns:
172 SpawnResult with success/failure status and error details.
173 """
174 use_commits = bool(commit_shas)
176 spawn_cmd = [self._review_gate_bin(), "spawn-code-review"]
177 # Always exclude .beads/ directory (auto-generated issue tracker files)
178 spawn_cmd.extend(["--exclude", ".beads/"])
179 if context_file is not None:
180 spawn_cmd.extend(["--context-file", str(context_file)])
181 if spawn_args:
182 spawn_cmd.extend(spawn_args)
183 if use_commits:
184 spawn_cmd.append("--commit")
185 spawn_cmd.extend(commit_shas or [])
186 else:
187 # Diff range is a positional argument
188 spawn_cmd.append(diff_range)
190 result = await runner.run_async(spawn_cmd, env=env, timeout=timeout)
191 if result.timed_out:
192 return SpawnResult(
193 success=False, timed_out=True, error_detail="spawn timeout"
194 )
196 if result.returncode != 0:
197 stderr = result.stderr_tail()
198 stdout = result.stdout_tail()
199 detail = stderr or stdout or "spawn failed"
200 combined = f"{stderr or ''} {stdout or ''}".lower()
201 already_active = "already active" in combined
202 return SpawnResult(
203 success=False,
204 timed_out=False,
205 error_detail=detail,
206 already_active=already_active,
207 )
209 return SpawnResult(success=True)
211 async def wait_for_review(
212 self,
213 session_id: str,
214 runner: CommandRunner,
215 env: dict[str, str],
216 cli_timeout: int,
217 wait_args: tuple[str, ...] = (),
218 user_timeout: int | None = None,
219 ) -> WaitResult:
220 """Wait for review completion.
222 Args:
223 session_id: Session ID to wait for.
224 runner: CommandRunner instance for subprocess execution.
225 env: Environment variables for the command.
226 cli_timeout: Timeout value to pass to CLI (if user_timeout is None).
227 wait_args: Additional arguments for wait command.
228 user_timeout: User-specified timeout (if already in wait_args).
230 Returns:
231 WaitResult with returncode, stdout, stderr, and timeout status.
232 """
233 wait_cmd = [
234 self._review_gate_bin(),
235 "wait",
236 "--json",
237 "--session-id",
238 session_id,
239 ]
240 # Only add --timeout if not already specified in wait_args
241 if user_timeout is None:
242 wait_cmd.extend(["--timeout", str(cli_timeout)])
243 if wait_args:
244 wait_cmd.extend(wait_args)
246 # Use CLI timeout + grace period for subprocess timeout
247 effective_timeout = (
248 user_timeout if user_timeout is not None else cli_timeout
249 ) + 30
250 result = await runner.run_async(wait_cmd, env=env, timeout=effective_timeout)
252 # Extract session_dir from JSON output
253 session_dir: Path | None = None
254 try:
255 wait_data = json.loads(result.stdout)
256 if isinstance(wait_data, dict):
257 raw_session_dir = wait_data.get("session_dir")
258 if isinstance(raw_session_dir, str) and raw_session_dir:
259 session_dir = Path(raw_session_dir)
260 except (json.JSONDecodeError, TypeError):
261 pass
263 return WaitResult(
264 returncode=result.returncode,
265 stdout=result.stdout,
266 stderr=result.stderr,
267 timed_out=result.timed_out,
268 session_dir=session_dir,
269 )
271 async def resolve_gate(
272 self,
273 runner: CommandRunner,
274 env: dict[str, str],
275 reason: str = "mala: auto-clearing stale gate for retry",
276 ) -> ResolveResult:
277 """Resolve (clear) an active gate.
279 Args:
280 runner: CommandRunner instance for subprocess execution.
281 env: Environment variables for the command (must include CLAUDE_SESSION_ID).
282 reason: Reason message for the resolve command.
284 Returns:
285 ResolveResult with success/failure status and error details.
286 """
287 resolve_cmd = [
288 self._review_gate_bin(),
289 "resolve",
290 "--reason",
291 reason,
292 ]
293 try:
294 result = await runner.run_async(resolve_cmd, env=env, timeout=30)
295 if result.returncode == 0:
296 return ResolveResult(success=True)
297 stderr = result.stderr.strip() if result.stderr else ""
298 stdout = result.stdout.strip() if result.stdout else ""
299 return ResolveResult(
300 success=False, error_detail=stderr or stdout or "resolve failed"
301 )
302 except Exception as e:
303 return ResolveResult(success=False, error_detail=str(e))
305 async def check_diff_empty(self, diff_range: str, runner: CommandRunner) -> bool:
306 """Check if the diff range has no changes.
308 Uses git diff --stat to check if there are any changes in the range.
309 If the command fails, returns False to proceed with review (fail-open).
311 Args:
312 diff_range: Git diff range to check (e.g., "baseline..HEAD").
313 runner: CommandRunner instance to execute git command.
315 Returns:
316 True if the diff is empty (no changes), False otherwise.
317 """
318 try:
319 result = await runner.run_async(
320 ["git", "diff", "--stat", diff_range],
321 timeout=30,
322 )
323 if result.returncode == 0 and not result.stdout.strip():
324 return True
325 except Exception:
326 # If diff check fails, proceed with review anyway (fail-open)
327 pass
328 return False
330 @staticmethod
331 def extract_wait_timeout(args: tuple[str, ...]) -> int | None:
332 """Extract --timeout value from wait args if provided.
334 Parses args to detect if a --timeout flag is already specified.
335 Supports both '--timeout VALUE' and '--timeout=VALUE' formats.
337 Args:
338 args: Tuple of command-line arguments to search.
340 Returns:
341 The timeout value as int if found and valid, None otherwise.
342 """
343 i = 0
344 while i < len(args):
345 arg = args[i]
346 if arg.startswith("--timeout="):
347 value = arg.split("=", 1)[1]
348 if value.isdigit():
349 return int(value)
350 return None
351 if arg == "--timeout" and i + 1 < len(args):
352 value = args[i + 1]
353 if value.isdigit():
354 return int(value)
355 return None
356 i += 1
357 return None