Coverage for src / infra / clients / cerberus_review.py: 46%

94 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""Cerberus review-gate adapter for mala orchestrator. 

2 

3This module provides a thin adapter for the Cerberus review-gate CLI. 

4It handles: 

5- CLI orchestration (spawn, wait, resolve) 

6- Issue formatting for follow-up prompts 

7 

8JSON parsing and exit-code mapping are delegated to ReviewOutputParser. 

9Subprocess management is delegated to CerberusGateCLI. 

10""" 

11 

12from __future__ import annotations 

13 

14import logging 

15from dataclasses import dataclass, field 

16from pathlib import Path 

17from typing import TYPE_CHECKING 

18 

19from src.infra.clients.cerberus_gate_cli import CerberusGateCLI 

20from src.infra.clients.review_output_parser import ( 

21 ReviewIssue, # noqa: TC001 (used at runtime in format_review_issues) 

22 ReviewResult, 

23 map_exit_code_to_result, 

24) 

25from src.infra.tools.command_runner import CommandRunner 

26 

27logger = logging.getLogger(__name__) 

28 

29if TYPE_CHECKING: 

30 from collections.abc import Sequence 

31 

32 from src.core.protocols import MalaEventSink 

33 

34 

35@dataclass 

36class DefaultReviewer: 

37 """Default CodeReviewer implementation using Cerberus review-gate CLI. 

38 

39 This class conforms to the CodeReviewer protocol and provides the default 

40 behavior for the orchestrator. Tests can inject alternative implementations. 

41 

42 The reviewer spawns review-gate CLI processes and parses their output. 

43 Extra args/env can be provided to customize review-gate behavior. 

44 For initial review with an empty diff, short-circuits to PASS without spawning. 

45 

46 Subprocess management is delegated to CerberusGateCLI; this class handles 

47 orchestration logic, stale gate recovery, and result mapping. 

48 """ 

49 

50 repo_path: Path 

51 bin_path: Path | None = None 

52 spawn_args: tuple[str, ...] = field(default_factory=tuple) 

53 wait_args: tuple[str, ...] = field(default_factory=tuple) 

54 env: dict[str, str] = field(default_factory=dict) 

55 event_sink: MalaEventSink | None = None 

56 

57 def _get_cli(self) -> CerberusGateCLI: 

58 """Get the CerberusGateCLI instance for subprocess operations.""" 

59 return CerberusGateCLI( 

60 repo_path=self.repo_path, 

61 bin_path=self.bin_path, 

62 env=self.env, 

63 ) 

64 

65 def _validate_review_gate_bin(self) -> str | None: 

66 """Validate that the review-gate binary exists and is executable. 

67 

68 Delegates to CerberusGateCLI.validate_binary(). 

69 

70 Returns: 

71 None if the binary is valid, or an error message if not. 

72 """ 

73 return self._get_cli().validate_binary() 

74 

75 @staticmethod 

76 def _extract_wait_timeout(args: tuple[str, ...]) -> int | None: 

77 """Extract --timeout value from wait args if provided. 

78 

79 Delegates to CerberusGateCLI.extract_wait_timeout(). 

80 

81 Args: 

82 args: Tuple of command-line arguments to search. 

83 

84 Returns: 

85 The timeout value as int if found and valid, None otherwise. 

86 """ 

87 return CerberusGateCLI.extract_wait_timeout(args) 

88 

89 async def __call__( 

90 self, 

91 diff_range: str, 

92 context_file: Path | None = None, 

93 timeout: int = 300, 

94 claude_session_id: str | None = None, 

95 *, 

96 commit_shas: Sequence[str] | None = None, 

97 ) -> ReviewResult: 

98 cli = self._get_cli() 

99 

100 # Validate review-gate binary exists and is executable before proceeding 

101 validation_error = cli.validate_binary() 

102 if validation_error is not None: 

103 return ReviewResult( 

104 passed=False, 

105 issues=[], 

106 parse_error=validation_error, 

107 fatal_error=True, 

108 review_log_path=None, 

109 ) 

110 

111 runner = CommandRunner(cwd=self.repo_path) 

112 env = cli.build_env(claude_session_id) 

113 if "CLAUDE_SESSION_ID" not in env: 

114 return ReviewResult( 

115 passed=False, 

116 issues=[], 

117 parse_error="CLAUDE_SESSION_ID missing; must be provided by agent session id", 

118 fatal_error=True, 

119 review_log_path=None, 

120 ) 

121 

122 use_commits = bool(commit_shas) 

123 

124 # Check for empty diff (short-circuit without spawning review-gate) 

125 # This avoids parse errors or failures when there's nothing to review. 

126 # Only applies to range-based reviews; commit lists are reviewed directly. 

127 if not use_commits and await cli.check_diff_empty(diff_range, runner): 

128 return ReviewResult( 

129 passed=True, 

130 issues=[], 

131 parse_error=None, 

132 fatal_error=False, 

133 review_log_path=None, 

134 ) 

135 

136 # Spawn code review 

137 spawn_result = await cli.spawn_code_review( 

138 diff_range=diff_range, 

139 runner=runner, 

140 env=env, 

141 timeout=timeout, 

142 spawn_args=self.spawn_args, 

143 context_file=context_file, 

144 commit_shas=commit_shas, 

145 ) 

146 

147 if spawn_result.timed_out: 

148 return ReviewResult( 

149 passed=False, 

150 issues=[], 

151 parse_error="spawn timeout", 

152 fatal_error=False, 

153 ) 

154 

155 if not spawn_result.success: 

156 # Check for stale gate from a previous attempt in this session. 

157 if spawn_result.already_active: 

158 if self.event_sink is not None: 

159 self.event_sink.on_review_warning( 

160 "Resolving stale gate from previous attempt" 

161 ) 

162 resolve_result = await cli.resolve_gate(runner, env) 

163 if resolve_result.success: 

164 logger.info("Stale gate resolved: diff_range=%s", diff_range) 

165 # Retry spawn after resolving the stale gate 

166 spawn_result = await cli.spawn_code_review( 

167 diff_range=diff_range, 

168 runner=runner, 

169 env=env, 

170 timeout=timeout, 

171 spawn_args=self.spawn_args, 

172 context_file=context_file, 

173 commit_shas=commit_shas, 

174 ) 

175 if spawn_result.timed_out: 

176 return ReviewResult( 

177 passed=False, 

178 issues=[], 

179 parse_error="spawn timeout", 

180 fatal_error=False, 

181 ) 

182 if not spawn_result.success: 

183 # If still "already active", another session owns the gate 

184 if spawn_result.already_active: 

185 return ReviewResult( 

186 passed=False, 

187 issues=[], 

188 parse_error=( 

189 "Another review gate is active (not from this session). " 

190 "Wait for it to finish or resolve manually with " 

191 "`review-gate resolve`." 

192 ), 

193 fatal_error=True, 

194 ) 

195 return ReviewResult( 

196 passed=False, 

197 issues=[], 

198 parse_error=f"spawn failed: {spawn_result.error_detail}", 

199 fatal_error=False, 

200 ) 

201 # Successfully spawned after clearing gate - continue to wait phase 

202 else: 

203 return ReviewResult( 

204 passed=False, 

205 issues=[], 

206 parse_error=f"spawn failed: {spawn_result.error_detail} (auto-resolve failed: {resolve_result.error_detail})", 

207 fatal_error=False, 

208 ) 

209 else: 

210 return ReviewResult( 

211 passed=False, 

212 issues=[], 

213 parse_error=f"spawn failed: {spawn_result.error_detail}", 

214 fatal_error=False, 

215 ) 

216 

217 # spawn-code-review doesn't output JSON to stdout - it just spawns reviewers 

218 # and writes state to disk. We use --session-id with the Claude session ID 

219 # to let the wait command find the state file. 

220 session_id = env["CLAUDE_SESSION_ID"] 

221 

222 # Check if wait_args already specifies --timeout to avoid duplicates 

223 user_timeout = CerberusGateCLI.extract_wait_timeout(self.wait_args) 

224 

225 # Wait for review completion 

226 wait_result = await cli.wait_for_review( 

227 session_id=session_id, 

228 runner=runner, 

229 env=env, 

230 cli_timeout=timeout, 

231 wait_args=self.wait_args, 

232 user_timeout=user_timeout, 

233 ) 

234 

235 if wait_result.timed_out: 

236 logger.warning( 

237 "Review timeout: diff_range=%s after=%ds", diff_range, timeout 

238 ) 

239 return ReviewResult( 

240 passed=False, 

241 issues=[], 

242 parse_error="timeout", 

243 fatal_error=False, 

244 ) 

245 

246 result = map_exit_code_to_result( 

247 wait_result.returncode, 

248 wait_result.stdout, 

249 wait_result.stderr, 

250 review_log_path=wait_result.session_dir, 

251 event_sink=self.event_sink, 

252 ) 

253 logger.info( 

254 "Review completed: diff_range=%s passed=%s issues=%d", 

255 diff_range, 

256 result.passed, 

257 len(result.issues), 

258 ) 

259 return result 

260 

261 

262def _to_relative_path(file_path: str, base_path: Path | None = None) -> str: 

263 """Convert an absolute file path to a relative path for display. 

264 

265 Strips the base path prefix to show paths relative to the repository root. 

266 

267 Args: 

268 file_path: Absolute or relative file path. 

269 base_path: Base path (typically repository root) for relativization. 

270 If None, uses Path.cwd() as fallback. 

271 

272 Returns: 

273 Relative path suitable for display. If relativization fails, 

274 returns the original path to preserve directory context. 

275 """ 

276 # If already relative, return as-is 

277 if not file_path.startswith("/"): 

278 return file_path 

279 

280 # Use provided base_path or fall back to cwd 

281 base = base_path.resolve() if base_path else Path.cwd() 

282 try: 

283 abs_path = Path(file_path) 

284 if abs_path.is_relative_to(base): 

285 return str(abs_path.relative_to(base)) 

286 except (ValueError, OSError): 

287 pass 

288 

289 # Preserve original path if relativization fails (don't strip to filename) 

290 return file_path 

291 

292 

293def format_review_issues( 

294 issues: list[ReviewIssue], base_path: Path | None = None 

295) -> str: 

296 """Format review issues as a human-readable string for follow-up prompts. 

297 

298 Args: 

299 issues: List of ReviewIssue objects to format. 

300 base_path: Base path (typically repository root) for path relativization. 

301 If None, uses Path.cwd() as fallback. 

302 

303 Returns: 

304 Formatted string with issues grouped by file. 

305 """ 

306 if not issues: 

307 return "No specific issues found." 

308 

309 lines = [] 

310 current_file = None 

311 

312 # Sort by file, then by line, then by priority (lower = more important) 

313 sorted_issues = sorted( 

314 issues, 

315 key=lambda x: ( 

316 x.file, 

317 x.line_start, 

318 x.priority if x.priority is not None else 4, 

319 ), 

320 ) 

321 

322 for issue in sorted_issues: 

323 # Convert absolute paths to relative for cleaner display 

324 display_file = _to_relative_path(issue.file, base_path) 

325 if display_file != current_file: 

326 if current_file is not None: 

327 lines.append("") # Blank line between files 

328 current_file = display_file 

329 lines.append(f"File: {display_file}") 

330 

331 loc = ( 

332 f"L{issue.line_start}-{issue.line_end}" 

333 if issue.line_start != issue.line_end 

334 else f"L{issue.line_start}" 

335 ) 

336 # Include reviewer attribution 

337 reviewer_tag = f"[{issue.reviewer}]" if issue.reviewer else "" 

338 lines.append(f" {loc}: {reviewer_tag} {issue.title}") 

339 if issue.body: 

340 lines.append(f" {issue.body}") 

341 

342 return "\n".join(lines)