Coverage for src / infra / clients / cerberus_review.py: 46%
94 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""Cerberus review-gate adapter for mala orchestrator.
3This module provides a thin adapter for the Cerberus review-gate CLI.
4It handles:
5- CLI orchestration (spawn, wait, resolve)
6- Issue formatting for follow-up prompts
8JSON parsing and exit-code mapping are delegated to ReviewOutputParser.
9Subprocess management is delegated to CerberusGateCLI.
10"""
12from __future__ import annotations
14import logging
15from dataclasses import dataclass, field
16from pathlib import Path
17from typing import TYPE_CHECKING
19from src.infra.clients.cerberus_gate_cli import CerberusGateCLI
20from src.infra.clients.review_output_parser import (
21 ReviewIssue, # noqa: TC001 (used at runtime in format_review_issues)
22 ReviewResult,
23 map_exit_code_to_result,
24)
25from src.infra.tools.command_runner import CommandRunner
27logger = logging.getLogger(__name__)
29if TYPE_CHECKING:
30 from collections.abc import Sequence
32 from src.core.protocols import MalaEventSink
35@dataclass
36class DefaultReviewer:
37 """Default CodeReviewer implementation using Cerberus review-gate CLI.
39 This class conforms to the CodeReviewer protocol and provides the default
40 behavior for the orchestrator. Tests can inject alternative implementations.
42 The reviewer spawns review-gate CLI processes and parses their output.
43 Extra args/env can be provided to customize review-gate behavior.
44 For initial review with an empty diff, short-circuits to PASS without spawning.
46 Subprocess management is delegated to CerberusGateCLI; this class handles
47 orchestration logic, stale gate recovery, and result mapping.
48 """
50 repo_path: Path
51 bin_path: Path | None = None
52 spawn_args: tuple[str, ...] = field(default_factory=tuple)
53 wait_args: tuple[str, ...] = field(default_factory=tuple)
54 env: dict[str, str] = field(default_factory=dict)
55 event_sink: MalaEventSink | None = None
57 def _get_cli(self) -> CerberusGateCLI:
58 """Get the CerberusGateCLI instance for subprocess operations."""
59 return CerberusGateCLI(
60 repo_path=self.repo_path,
61 bin_path=self.bin_path,
62 env=self.env,
63 )
65 def _validate_review_gate_bin(self) -> str | None:
66 """Validate that the review-gate binary exists and is executable.
68 Delegates to CerberusGateCLI.validate_binary().
70 Returns:
71 None if the binary is valid, or an error message if not.
72 """
73 return self._get_cli().validate_binary()
75 @staticmethod
76 def _extract_wait_timeout(args: tuple[str, ...]) -> int | None:
77 """Extract --timeout value from wait args if provided.
79 Delegates to CerberusGateCLI.extract_wait_timeout().
81 Args:
82 args: Tuple of command-line arguments to search.
84 Returns:
85 The timeout value as int if found and valid, None otherwise.
86 """
87 return CerberusGateCLI.extract_wait_timeout(args)
89 async def __call__(
90 self,
91 diff_range: str,
92 context_file: Path | None = None,
93 timeout: int = 300,
94 claude_session_id: str | None = None,
95 *,
96 commit_shas: Sequence[str] | None = None,
97 ) -> ReviewResult:
98 cli = self._get_cli()
100 # Validate review-gate binary exists and is executable before proceeding
101 validation_error = cli.validate_binary()
102 if validation_error is not None:
103 return ReviewResult(
104 passed=False,
105 issues=[],
106 parse_error=validation_error,
107 fatal_error=True,
108 review_log_path=None,
109 )
111 runner = CommandRunner(cwd=self.repo_path)
112 env = cli.build_env(claude_session_id)
113 if "CLAUDE_SESSION_ID" not in env:
114 return ReviewResult(
115 passed=False,
116 issues=[],
117 parse_error="CLAUDE_SESSION_ID missing; must be provided by agent session id",
118 fatal_error=True,
119 review_log_path=None,
120 )
122 use_commits = bool(commit_shas)
124 # Check for empty diff (short-circuit without spawning review-gate)
125 # This avoids parse errors or failures when there's nothing to review.
126 # Only applies to range-based reviews; commit lists are reviewed directly.
127 if not use_commits and await cli.check_diff_empty(diff_range, runner):
128 return ReviewResult(
129 passed=True,
130 issues=[],
131 parse_error=None,
132 fatal_error=False,
133 review_log_path=None,
134 )
136 # Spawn code review
137 spawn_result = await cli.spawn_code_review(
138 diff_range=diff_range,
139 runner=runner,
140 env=env,
141 timeout=timeout,
142 spawn_args=self.spawn_args,
143 context_file=context_file,
144 commit_shas=commit_shas,
145 )
147 if spawn_result.timed_out:
148 return ReviewResult(
149 passed=False,
150 issues=[],
151 parse_error="spawn timeout",
152 fatal_error=False,
153 )
155 if not spawn_result.success:
156 # Check for stale gate from a previous attempt in this session.
157 if spawn_result.already_active:
158 if self.event_sink is not None:
159 self.event_sink.on_review_warning(
160 "Resolving stale gate from previous attempt"
161 )
162 resolve_result = await cli.resolve_gate(runner, env)
163 if resolve_result.success:
164 logger.info("Stale gate resolved: diff_range=%s", diff_range)
165 # Retry spawn after resolving the stale gate
166 spawn_result = await cli.spawn_code_review(
167 diff_range=diff_range,
168 runner=runner,
169 env=env,
170 timeout=timeout,
171 spawn_args=self.spawn_args,
172 context_file=context_file,
173 commit_shas=commit_shas,
174 )
175 if spawn_result.timed_out:
176 return ReviewResult(
177 passed=False,
178 issues=[],
179 parse_error="spawn timeout",
180 fatal_error=False,
181 )
182 if not spawn_result.success:
183 # If still "already active", another session owns the gate
184 if spawn_result.already_active:
185 return ReviewResult(
186 passed=False,
187 issues=[],
188 parse_error=(
189 "Another review gate is active (not from this session). "
190 "Wait for it to finish or resolve manually with "
191 "`review-gate resolve`."
192 ),
193 fatal_error=True,
194 )
195 return ReviewResult(
196 passed=False,
197 issues=[],
198 parse_error=f"spawn failed: {spawn_result.error_detail}",
199 fatal_error=False,
200 )
201 # Successfully spawned after clearing gate - continue to wait phase
202 else:
203 return ReviewResult(
204 passed=False,
205 issues=[],
206 parse_error=f"spawn failed: {spawn_result.error_detail} (auto-resolve failed: {resolve_result.error_detail})",
207 fatal_error=False,
208 )
209 else:
210 return ReviewResult(
211 passed=False,
212 issues=[],
213 parse_error=f"spawn failed: {spawn_result.error_detail}",
214 fatal_error=False,
215 )
217 # spawn-code-review doesn't output JSON to stdout - it just spawns reviewers
218 # and writes state to disk. We use --session-id with the Claude session ID
219 # to let the wait command find the state file.
220 session_id = env["CLAUDE_SESSION_ID"]
222 # Check if wait_args already specifies --timeout to avoid duplicates
223 user_timeout = CerberusGateCLI.extract_wait_timeout(self.wait_args)
225 # Wait for review completion
226 wait_result = await cli.wait_for_review(
227 session_id=session_id,
228 runner=runner,
229 env=env,
230 cli_timeout=timeout,
231 wait_args=self.wait_args,
232 user_timeout=user_timeout,
233 )
235 if wait_result.timed_out:
236 logger.warning(
237 "Review timeout: diff_range=%s after=%ds", diff_range, timeout
238 )
239 return ReviewResult(
240 passed=False,
241 issues=[],
242 parse_error="timeout",
243 fatal_error=False,
244 )
246 result = map_exit_code_to_result(
247 wait_result.returncode,
248 wait_result.stdout,
249 wait_result.stderr,
250 review_log_path=wait_result.session_dir,
251 event_sink=self.event_sink,
252 )
253 logger.info(
254 "Review completed: diff_range=%s passed=%s issues=%d",
255 diff_range,
256 result.passed,
257 len(result.issues),
258 )
259 return result
262def _to_relative_path(file_path: str, base_path: Path | None = None) -> str:
263 """Convert an absolute file path to a relative path for display.
265 Strips the base path prefix to show paths relative to the repository root.
267 Args:
268 file_path: Absolute or relative file path.
269 base_path: Base path (typically repository root) for relativization.
270 If None, uses Path.cwd() as fallback.
272 Returns:
273 Relative path suitable for display. If relativization fails,
274 returns the original path to preserve directory context.
275 """
276 # If already relative, return as-is
277 if not file_path.startswith("/"):
278 return file_path
280 # Use provided base_path or fall back to cwd
281 base = base_path.resolve() if base_path else Path.cwd()
282 try:
283 abs_path = Path(file_path)
284 if abs_path.is_relative_to(base):
285 return str(abs_path.relative_to(base))
286 except (ValueError, OSError):
287 pass
289 # Preserve original path if relativization fails (don't strip to filename)
290 return file_path
293def format_review_issues(
294 issues: list[ReviewIssue], base_path: Path | None = None
295) -> str:
296 """Format review issues as a human-readable string for follow-up prompts.
298 Args:
299 issues: List of ReviewIssue objects to format.
300 base_path: Base path (typically repository root) for path relativization.
301 If None, uses Path.cwd() as fallback.
303 Returns:
304 Formatted string with issues grouped by file.
305 """
306 if not issues:
307 return "No specific issues found."
309 lines = []
310 current_file = None
312 # Sort by file, then by line, then by priority (lower = more important)
313 sorted_issues = sorted(
314 issues,
315 key=lambda x: (
316 x.file,
317 x.line_start,
318 x.priority if x.priority is not None else 4,
319 ),
320 )
322 for issue in sorted_issues:
323 # Convert absolute paths to relative for cleaner display
324 display_file = _to_relative_path(issue.file, base_path)
325 if display_file != current_file:
326 if current_file is not None:
327 lines.append("") # Blank line between files
328 current_file = display_file
329 lines.append(f"File: {display_file}")
331 loc = (
332 f"L{issue.line_start}-{issue.line_end}"
333 if issue.line_start != issue.line_end
334 else f"L{issue.line_start}"
335 )
336 # Include reviewer attribution
337 reviewer_tag = f"[{issue.reviewer}]" if issue.reviewer else ""
338 lines.append(f" {loc}: {reviewer_tag} {issue.title}")
339 if issue.body:
340 lines.append(f" {issue.body}")
342 return "\n".join(lines)