Coverage for src / infra / clients / review_output_parser.py: 26%
108 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""Review output parsing for Cerberus review-gate.
3This module provides ReviewOutputParser for parsing JSON output from
4the review-gate CLI and mapping exit codes to domain results. It handles:
5- JSON decoding and validation
6- Issue object mapping (aggregated_findings to ReviewIssue)
7- Exit-code to ReviewResult mapping
8- Parse error extraction
10This is a low-level component extracted from DefaultReviewer to enable
11independent testing of parsing logic.
12"""
14from __future__ import annotations
16import json
17import logging
18from dataclasses import dataclass, field
19from pathlib import Path # noqa: TC003 (runtime import for get_type_hints compatibility)
20from typing import TYPE_CHECKING
22if TYPE_CHECKING:
23 from src.core.protocols import MalaEventSink
26@dataclass
27class ReviewIssue:
28 """A single issue found during external review.
30 Matches the Cerberus JSON schema for issues.
31 """
33 file: str
34 line_start: int
35 line_end: int
36 priority: int | None # 0=P0, 1=P1, 2=P2, 3=P3, or None
37 title: str
38 body: str
39 reviewer: str # Which reviewer found this issue
42@dataclass
43class ReviewResult:
44 """Result of a Cerberus review-gate review.
46 Satisfies the ReviewOutcome protocol in lifecycle.py.
47 """
49 passed: bool
50 issues: list[ReviewIssue] = field(default_factory=list)
51 parse_error: str | None = None
52 fatal_error: bool = False
53 review_log_path: Path | None = None
56class ReviewOutputParser:
57 """Parses Cerberus review-gate JSON output and maps exit codes to results.
59 This class encapsulates all JSON parsing and exit-code interpretation logic.
60 It is stateless and can be used as a singleton or instantiated per-call.
62 Usage:
63 parser = ReviewOutputParser()
65 # Parse JSON output
66 passed, issues, error = parser.parse_json(stdout)
68 # Map exit code to ReviewResult
69 result = parser.map_exit_code_to_result(exit_code, stdout, stderr)
70 """
72 def parse_json(self, output: str) -> tuple[bool, list[ReviewIssue], str | None]:
73 """Parse Cerberus review-gate JSON output.
75 Args:
76 output: JSON string from review-gate wait --json.
78 Returns:
79 Tuple of (passed, issues, parse_error).
80 If parse_error is not None, passed will be False and issues empty.
81 """
82 if not output or not output.strip():
83 return False, [], "Empty output from review-gate"
85 try:
86 data = json.loads(output)
87 except json.JSONDecodeError as e:
88 return False, [], f"JSON parse error: {e}"
90 if not isinstance(data, dict):
91 return False, [], "Root element is not an object"
93 # Check consensus verdict (top-level consensus_verdict field)
94 verdict = data.get("consensus_verdict")
95 if verdict not in ("PASS", "FAIL", "NEEDS_WORK", "no_reviewers", "ERROR"):
96 return False, [], f"Invalid verdict: {verdict}"
98 passed = verdict == "PASS"
100 # Parse issues from aggregated_findings (may be empty for PASS verdict)
101 raw_issues = data.get("aggregated_findings", [])
102 if not isinstance(raw_issues, list):
103 return False, [], "'aggregated_findings' field must be an array"
105 issues: list[ReviewIssue] = []
106 for i, item in enumerate(raw_issues):
107 if not isinstance(item, dict):
108 return False, [], f"Issue {i} is not an object"
110 reviewer = item.get("reviewer", "")
111 if not isinstance(reviewer, str):
112 return False, [], f"Issue {i}: 'reviewer' must be a string"
114 # Cerberus uses file_path (can be null for non-file-specific findings)
115 file_path = item.get("file_path")
116 if file_path is None:
117 file_path = ""
118 elif not isinstance(file_path, str):
119 return False, [], f"Issue {i}: 'file_path' must be a string or null"
121 # line_start and line_end can be null
122 line_start = item.get("line_start")
123 if line_start is None:
124 line_start = 0
125 elif not isinstance(line_start, int):
126 return False, [], f"Issue {i}: 'line_start' must be an integer or null"
128 line_end = item.get("line_end")
129 if line_end is None:
130 line_end = 0
131 elif not isinstance(line_end, int):
132 return False, [], f"Issue {i}: 'line_end' must be an integer or null"
134 priority = item.get("priority")
135 if priority is not None and not isinstance(priority, int):
136 return False, [], f"Issue {i}: 'priority' must be an integer or null"
138 title = item.get("title", "")
139 if not isinstance(title, str):
140 return False, [], f"Issue {i}: 'title' must be a string"
142 body = item.get("body", "")
143 if not isinstance(body, str):
144 return False, [], f"Issue {i}: 'body' must be a string"
146 issues.append(
147 ReviewIssue(
148 file=file_path,
149 line_start=line_start,
150 line_end=line_end,
151 priority=priority,
152 title=title,
153 body=body,
154 reviewer=reviewer,
155 )
156 )
158 return passed, issues, None
160 def map_exit_code_to_result(
161 self,
162 exit_code: int,
163 stdout: str,
164 stderr: str,
165 review_log_path: Path | None = None,
166 event_sink: MalaEventSink | None = None,
167 ) -> ReviewResult:
168 """Map Cerberus review-gate exit code to ReviewResult.
170 Exit codes:
171 0 - PASS: all reviewers agree, no issues
172 1 - FAIL/NEEDS_WORK: legitimate review failure
173 2 - Parse error: malformed reviewer output
174 3 - Timeout: reviewers didn't respond in time
175 4 - No reviewers: no reviewer CLIs available
176 5 - Internal error: unexpected failure
178 Args:
179 exit_code: Exit code from review-gate wait command.
180 stdout: Stdout from the command (JSON output).
181 stderr: Stderr from the command (error messages).
182 review_log_path: Optional path to review session logs.
183 event_sink: Optional event sink for emitting warnings.
185 Returns:
186 ReviewResult with appropriate fields set.
187 """
188 # Exit codes 4 and 5 are fatal errors
189 if exit_code == 4:
190 return ReviewResult(
191 passed=False,
192 issues=[],
193 parse_error="No reviewers available",
194 fatal_error=True,
195 review_log_path=review_log_path,
196 )
198 if exit_code == 5:
199 error_msg = stderr.strip() if stderr else "Internal error"
200 return ReviewResult(
201 passed=False,
202 issues=[],
203 parse_error=error_msg,
204 fatal_error=True,
205 review_log_path=review_log_path,
206 )
208 # Exit code 3 is timeout (retryable)
209 if exit_code == 3:
210 return ReviewResult(
211 passed=False,
212 issues=[],
213 parse_error="timeout",
214 fatal_error=False,
215 review_log_path=review_log_path,
216 )
218 # Exit code 2 is parse error (retryable)
219 if exit_code == 2:
220 # Try to extract error from JSON parse_errors array
221 parse_error_msg = "Parse error"
222 try:
223 data = json.loads(stdout)
224 if isinstance(data, dict):
225 parse_errors = data.get("parse_errors", [])
226 if isinstance(parse_errors, list) and parse_errors:
227 parse_error_msg = "; ".join(
228 str(e.get("error", e)) if isinstance(e, dict) else str(e)
229 for e in parse_errors
230 )
231 except (json.JSONDecodeError, TypeError):
232 if stderr:
233 parse_error_msg = stderr.strip()
234 return ReviewResult(
235 passed=False,
236 issues=[],
237 parse_error=parse_error_msg,
238 fatal_error=False,
239 review_log_path=review_log_path,
240 )
242 # Exit codes 0 and 1: parse JSON output
243 json_passed, issues, parse_error = self.parse_json(stdout)
245 if parse_error:
246 # JSON parsing failed - treat as parse error (exit code 2 equivalent)
247 return ReviewResult(
248 passed=False,
249 issues=[],
250 parse_error=parse_error,
251 fatal_error=False,
252 review_log_path=review_log_path,
253 )
255 # Derive passed status from exit code
256 exit_passed = exit_code == 0
258 # Warn if exit code and JSON verdict disagree
259 if json_passed != exit_passed:
260 message = (
261 f"Exit code ({exit_code}) and JSON verdict "
262 f"({'PASS' if json_passed else 'FAIL'}) disagree; "
263 f"fail-closed: requiring both to pass"
264 )
265 if event_sink is not None:
266 event_sink.on_review_warning(message)
267 else:
268 # Always log this critical diagnostic even without event_sink
269 logging.warning(message)
271 # Security: fail-closed - BOTH exit code AND JSON verdict must pass
272 # This prevents a review from passing when the consensus verdict is
273 # FAIL, NEEDS_WORK, or no_reviewers even if exit code is 0
274 final_passed = exit_passed and json_passed
276 return ReviewResult(
277 passed=final_passed,
278 issues=issues,
279 parse_error=None,
280 fatal_error=False,
281 review_log_path=review_log_path,
282 )
285# Module-level convenience functions for backward compatibility
286# These delegate to a shared parser instance
288_parser = ReviewOutputParser()
291def parse_cerberus_json(output: str) -> tuple[bool, list[ReviewIssue], str | None]:
292 """Parse Cerberus review-gate JSON output.
294 This is a convenience function that delegates to ReviewOutputParser.parse_json().
295 """
296 return _parser.parse_json(output)
299def map_exit_code_to_result(
300 exit_code: int,
301 stdout: str,
302 stderr: str,
303 review_log_path: Path | None = None,
304 event_sink: MalaEventSink | None = None,
305) -> ReviewResult:
306 """Map Cerberus review-gate exit code to ReviewResult.
308 This is a convenience function that delegates to ReviewOutputParser.map_exit_code_to_result().
309 """
310 return _parser.map_exit_code_to_result(
311 exit_code, stdout, stderr, review_log_path, event_sink
312 )