Coverage for src / infra / clients / cerberus_gate_cli.py: 29%

129 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""Cerberus review-gate CLI subprocess management. 

2 

3This module provides CerberusGateCLI for managing subprocess interactions 

4with the review-gate CLI binary. It handles: 

5- Binary validation (exists and is executable) 

6- Environment variable merging (CLAUDE_SESSION_ID) 

7- Subprocess spawn/wait/resolve with timeout handling 

8- Exit code interpretation 

9 

10This is a low-level component extracted from DefaultReviewer to enable 

11independent testing of subprocess logic. 

12""" 

13 

14from __future__ import annotations 

15 

16import json 

17import os 

18import shutil 

19from dataclasses import dataclass, field 

20from pathlib import Path 

21from typing import TYPE_CHECKING 

22 

23if TYPE_CHECKING: 

24 from collections.abc import Sequence 

25 

26 from src.infra.tools.command_runner import CommandRunner 

27 

28 

29@dataclass 

30class SpawnResult: 

31 """Result of spawning a code review. 

32 

33 Attributes: 

34 success: Whether spawn succeeded. 

35 timed_out: Whether spawn timed out. 

36 error_detail: Error message if spawn failed. 

37 already_active: Whether failure was due to an existing active gate. 

38 """ 

39 

40 success: bool 

41 timed_out: bool = False 

42 error_detail: str = "" 

43 already_active: bool = False 

44 

45 

46@dataclass 

47class WaitResult: 

48 """Result of waiting for review completion. 

49 

50 Attributes: 

51 returncode: Exit code from wait command. 

52 stdout: Standard output (JSON). 

53 stderr: Standard error. 

54 timed_out: Whether wait timed out. 

55 session_dir: Path to session directory (extracted from JSON output). 

56 """ 

57 

58 returncode: int 

59 stdout: str = "" 

60 stderr: str = "" 

61 timed_out: bool = False 

62 session_dir: Path | None = None 

63 

64 

65@dataclass 

66class ResolveResult: 

67 """Result of resolving (clearing) an active gate. 

68 

69 Attributes: 

70 success: Whether resolve succeeded. 

71 error_detail: Error message if resolve failed. 

72 """ 

73 

74 success: bool 

75 error_detail: str = "" 

76 

77 

78@dataclass 

79class CerberusGateCLI: 

80 """Low-level CLI subprocess management for review-gate. 

81 

82 Handles subprocess spawn/wait/resolve, binary validation, env merging, 

83 and timeout handling. Performs minimal JSON parsing to extract transport 

84 fields (e.g., session_dir) but does not map exit codes to domain results 

85 - that responsibility belongs to DefaultReviewer. 

86 

87 Attributes: 

88 repo_path: Working directory for subprocess execution. 

89 bin_path: Optional path to directory containing review-gate binary. 

90 If None, uses bare "review-gate" from PATH. 

91 env: Additional environment variables to merge with os.environ. 

92 """ 

93 

94 repo_path: Path 

95 bin_path: Path | None = None 

96 env: dict[str, str] = field(default_factory=dict) 

97 

98 def _review_gate_bin(self) -> str: 

99 """Get the path to the review-gate binary.""" 

100 if self.bin_path is not None: 

101 return str(self.bin_path / "review-gate") 

102 return "review-gate" 

103 

104 def validate_binary(self) -> str | None: 

105 """Validate that the review-gate binary exists and is executable. 

106 

107 Uses the merged env's PATH (respecting self.env) when checking for 

108 the binary to avoid false negatives when callers inject PATH via cerberus_env. 

109 

110 Returns: 

111 None if the binary is valid, or an error message if not. 

112 """ 

113 if self.bin_path is not None: 

114 binary_path = self.bin_path / "review-gate" 

115 if not binary_path.exists(): 

116 return f"review-gate binary not found at {binary_path}" 

117 if not os.access(binary_path, os.X_OK): 

118 return f"review-gate binary at {binary_path} is not executable" 

119 else: 

120 # Build effective PATH by merging self.env with current os.environ 

121 if "PATH" in self.env: 

122 effective_path = ( 

123 self.env["PATH"] + os.pathsep + os.environ.get("PATH", "") 

124 ) 

125 else: 

126 effective_path = os.environ.get("PATH", "") 

127 if shutil.which("review-gate", path=effective_path) is None: 

128 return "review-gate binary not found in PATH" 

129 return None 

130 

131 def build_env(self, claude_session_id: str | None) -> dict[str, str]: 

132 """Build environment dict with CLAUDE_SESSION_ID merged. 

133 

134 Args: 

135 claude_session_id: Session ID to use, or None to use from env. 

136 

137 Returns: 

138 Merged environment dict with CLAUDE_SESSION_ID set. 

139 """ 

140 merged = dict(self.env) 

141 claude_session = ( 

142 claude_session_id 

143 or merged.get("CLAUDE_SESSION_ID") 

144 or os.environ.get("CLAUDE_SESSION_ID") 

145 ) 

146 if claude_session: 

147 merged["CLAUDE_SESSION_ID"] = claude_session 

148 return merged 

149 

150 async def spawn_code_review( 

151 self, 

152 diff_range: str, 

153 runner: CommandRunner, 

154 env: dict[str, str], 

155 timeout: int, 

156 spawn_args: tuple[str, ...] = (), 

157 context_file: Path | None = None, 

158 commit_shas: Sequence[str] | None = None, 

159 ) -> SpawnResult: 

160 """Spawn a code review subprocess. 

161 

162 Args: 

163 diff_range: Git diff range to review (e.g., "baseline..HEAD"). 

164 runner: CommandRunner instance for subprocess execution. 

165 env: Environment variables for the command. 

166 timeout: Timeout in seconds. 

167 spawn_args: Additional arguments for spawn-code-review. 

168 context_file: Optional context file path. 

169 commit_shas: Optional list of commit SHAs for commit-based review. 

170 

171 Returns: 

172 SpawnResult with success/failure status and error details. 

173 """ 

174 use_commits = bool(commit_shas) 

175 

176 spawn_cmd = [self._review_gate_bin(), "spawn-code-review"] 

177 # Always exclude .beads/ directory (auto-generated issue tracker files) 

178 spawn_cmd.extend(["--exclude", ".beads/"]) 

179 if context_file is not None: 

180 spawn_cmd.extend(["--context-file", str(context_file)]) 

181 if spawn_args: 

182 spawn_cmd.extend(spawn_args) 

183 if use_commits: 

184 spawn_cmd.append("--commit") 

185 spawn_cmd.extend(commit_shas or []) 

186 else: 

187 # Diff range is a positional argument 

188 spawn_cmd.append(diff_range) 

189 

190 result = await runner.run_async(spawn_cmd, env=env, timeout=timeout) 

191 if result.timed_out: 

192 return SpawnResult( 

193 success=False, timed_out=True, error_detail="spawn timeout" 

194 ) 

195 

196 if result.returncode != 0: 

197 stderr = result.stderr_tail() 

198 stdout = result.stdout_tail() 

199 detail = stderr or stdout or "spawn failed" 

200 combined = f"{stderr or ''} {stdout or ''}".lower() 

201 already_active = "already active" in combined 

202 return SpawnResult( 

203 success=False, 

204 timed_out=False, 

205 error_detail=detail, 

206 already_active=already_active, 

207 ) 

208 

209 return SpawnResult(success=True) 

210 

211 async def wait_for_review( 

212 self, 

213 session_id: str, 

214 runner: CommandRunner, 

215 env: dict[str, str], 

216 cli_timeout: int, 

217 wait_args: tuple[str, ...] = (), 

218 user_timeout: int | None = None, 

219 ) -> WaitResult: 

220 """Wait for review completion. 

221 

222 Args: 

223 session_id: Session ID to wait for. 

224 runner: CommandRunner instance for subprocess execution. 

225 env: Environment variables for the command. 

226 cli_timeout: Timeout value to pass to CLI (if user_timeout is None). 

227 wait_args: Additional arguments for wait command. 

228 user_timeout: User-specified timeout (if already in wait_args). 

229 

230 Returns: 

231 WaitResult with returncode, stdout, stderr, and timeout status. 

232 """ 

233 wait_cmd = [ 

234 self._review_gate_bin(), 

235 "wait", 

236 "--json", 

237 "--session-id", 

238 session_id, 

239 ] 

240 # Only add --timeout if not already specified in wait_args 

241 if user_timeout is None: 

242 wait_cmd.extend(["--timeout", str(cli_timeout)]) 

243 if wait_args: 

244 wait_cmd.extend(wait_args) 

245 

246 # Use CLI timeout + grace period for subprocess timeout 

247 effective_timeout = ( 

248 user_timeout if user_timeout is not None else cli_timeout 

249 ) + 30 

250 result = await runner.run_async(wait_cmd, env=env, timeout=effective_timeout) 

251 

252 # Extract session_dir from JSON output 

253 session_dir: Path | None = None 

254 try: 

255 wait_data = json.loads(result.stdout) 

256 if isinstance(wait_data, dict): 

257 raw_session_dir = wait_data.get("session_dir") 

258 if isinstance(raw_session_dir, str) and raw_session_dir: 

259 session_dir = Path(raw_session_dir) 

260 except (json.JSONDecodeError, TypeError): 

261 pass 

262 

263 return WaitResult( 

264 returncode=result.returncode, 

265 stdout=result.stdout, 

266 stderr=result.stderr, 

267 timed_out=result.timed_out, 

268 session_dir=session_dir, 

269 ) 

270 

271 async def resolve_gate( 

272 self, 

273 runner: CommandRunner, 

274 env: dict[str, str], 

275 reason: str = "mala: auto-clearing stale gate for retry", 

276 ) -> ResolveResult: 

277 """Resolve (clear) an active gate. 

278 

279 Args: 

280 runner: CommandRunner instance for subprocess execution. 

281 env: Environment variables for the command (must include CLAUDE_SESSION_ID). 

282 reason: Reason message for the resolve command. 

283 

284 Returns: 

285 ResolveResult with success/failure status and error details. 

286 """ 

287 resolve_cmd = [ 

288 self._review_gate_bin(), 

289 "resolve", 

290 "--reason", 

291 reason, 

292 ] 

293 try: 

294 result = await runner.run_async(resolve_cmd, env=env, timeout=30) 

295 if result.returncode == 0: 

296 return ResolveResult(success=True) 

297 stderr = result.stderr.strip() if result.stderr else "" 

298 stdout = result.stdout.strip() if result.stdout else "" 

299 return ResolveResult( 

300 success=False, error_detail=stderr or stdout or "resolve failed" 

301 ) 

302 except Exception as e: 

303 return ResolveResult(success=False, error_detail=str(e)) 

304 

305 async def check_diff_empty(self, diff_range: str, runner: CommandRunner) -> bool: 

306 """Check if the diff range has no changes. 

307 

308 Uses git diff --stat to check if there are any changes in the range. 

309 If the command fails, returns False to proceed with review (fail-open). 

310 

311 Args: 

312 diff_range: Git diff range to check (e.g., "baseline..HEAD"). 

313 runner: CommandRunner instance to execute git command. 

314 

315 Returns: 

316 True if the diff is empty (no changes), False otherwise. 

317 """ 

318 try: 

319 result = await runner.run_async( 

320 ["git", "diff", "--stat", diff_range], 

321 timeout=30, 

322 ) 

323 if result.returncode == 0 and not result.stdout.strip(): 

324 return True 

325 except Exception: 

326 # If diff check fails, proceed with review anyway (fail-open) 

327 pass 

328 return False 

329 

330 @staticmethod 

331 def extract_wait_timeout(args: tuple[str, ...]) -> int | None: 

332 """Extract --timeout value from wait args if provided. 

333 

334 Parses args to detect if a --timeout flag is already specified. 

335 Supports both '--timeout VALUE' and '--timeout=VALUE' formats. 

336 

337 Args: 

338 args: Tuple of command-line arguments to search. 

339 

340 Returns: 

341 The timeout value as int if found and valid, None otherwise. 

342 """ 

343 i = 0 

344 while i < len(args): 

345 arg = args[i] 

346 if arg.startswith("--timeout="): 

347 value = arg.split("=", 1)[1] 

348 if value.isdigit(): 

349 return int(value) 

350 return None 

351 if arg == "--timeout" and i + 1 < len(args): 

352 value = args[i + 1] 

353 if value.isdigit(): 

354 return int(value) 

355 return None 

356 i += 1 

357 return None