Coverage for src / infra / clients / review_output_parser.py: 26%

108 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""Review output parsing for Cerberus review-gate. 

2 

3This module provides ReviewOutputParser for parsing JSON output from 

4the review-gate CLI and mapping exit codes to domain results. It handles: 

5- JSON decoding and validation 

6- Issue object mapping (aggregated_findings to ReviewIssue) 

7- Exit-code to ReviewResult mapping 

8- Parse error extraction 

9 

10This is a low-level component extracted from DefaultReviewer to enable 

11independent testing of parsing logic. 

12""" 

13 

14from __future__ import annotations 

15 

16import json 

17import logging 

18from dataclasses import dataclass, field 

19from pathlib import Path # noqa: TC003 (runtime import for get_type_hints compatibility) 

20from typing import TYPE_CHECKING 

21 

22if TYPE_CHECKING: 

23 from src.core.protocols import MalaEventSink 

24 

25 

26@dataclass 

27class ReviewIssue: 

28 """A single issue found during external review. 

29 

30 Matches the Cerberus JSON schema for issues. 

31 """ 

32 

33 file: str 

34 line_start: int 

35 line_end: int 

36 priority: int | None # 0=P0, 1=P1, 2=P2, 3=P3, or None 

37 title: str 

38 body: str 

39 reviewer: str # Which reviewer found this issue 

40 

41 

42@dataclass 

43class ReviewResult: 

44 """Result of a Cerberus review-gate review. 

45 

46 Satisfies the ReviewOutcome protocol in lifecycle.py. 

47 """ 

48 

49 passed: bool 

50 issues: list[ReviewIssue] = field(default_factory=list) 

51 parse_error: str | None = None 

52 fatal_error: bool = False 

53 review_log_path: Path | None = None 

54 

55 

56class ReviewOutputParser: 

57 """Parses Cerberus review-gate JSON output and maps exit codes to results. 

58 

59 This class encapsulates all JSON parsing and exit-code interpretation logic. 

60 It is stateless and can be used as a singleton or instantiated per-call. 

61 

62 Usage: 

63 parser = ReviewOutputParser() 

64 

65 # Parse JSON output 

66 passed, issues, error = parser.parse_json(stdout) 

67 

68 # Map exit code to ReviewResult 

69 result = parser.map_exit_code_to_result(exit_code, stdout, stderr) 

70 """ 

71 

72 def parse_json(self, output: str) -> tuple[bool, list[ReviewIssue], str | None]: 

73 """Parse Cerberus review-gate JSON output. 

74 

75 Args: 

76 output: JSON string from review-gate wait --json. 

77 

78 Returns: 

79 Tuple of (passed, issues, parse_error). 

80 If parse_error is not None, passed will be False and issues empty. 

81 """ 

82 if not output or not output.strip(): 

83 return False, [], "Empty output from review-gate" 

84 

85 try: 

86 data = json.loads(output) 

87 except json.JSONDecodeError as e: 

88 return False, [], f"JSON parse error: {e}" 

89 

90 if not isinstance(data, dict): 

91 return False, [], "Root element is not an object" 

92 

93 # Check consensus verdict (top-level consensus_verdict field) 

94 verdict = data.get("consensus_verdict") 

95 if verdict not in ("PASS", "FAIL", "NEEDS_WORK", "no_reviewers", "ERROR"): 

96 return False, [], f"Invalid verdict: {verdict}" 

97 

98 passed = verdict == "PASS" 

99 

100 # Parse issues from aggregated_findings (may be empty for PASS verdict) 

101 raw_issues = data.get("aggregated_findings", []) 

102 if not isinstance(raw_issues, list): 

103 return False, [], "'aggregated_findings' field must be an array" 

104 

105 issues: list[ReviewIssue] = [] 

106 for i, item in enumerate(raw_issues): 

107 if not isinstance(item, dict): 

108 return False, [], f"Issue {i} is not an object" 

109 

110 reviewer = item.get("reviewer", "") 

111 if not isinstance(reviewer, str): 

112 return False, [], f"Issue {i}: 'reviewer' must be a string" 

113 

114 # Cerberus uses file_path (can be null for non-file-specific findings) 

115 file_path = item.get("file_path") 

116 if file_path is None: 

117 file_path = "" 

118 elif not isinstance(file_path, str): 

119 return False, [], f"Issue {i}: 'file_path' must be a string or null" 

120 

121 # line_start and line_end can be null 

122 line_start = item.get("line_start") 

123 if line_start is None: 

124 line_start = 0 

125 elif not isinstance(line_start, int): 

126 return False, [], f"Issue {i}: 'line_start' must be an integer or null" 

127 

128 line_end = item.get("line_end") 

129 if line_end is None: 

130 line_end = 0 

131 elif not isinstance(line_end, int): 

132 return False, [], f"Issue {i}: 'line_end' must be an integer or null" 

133 

134 priority = item.get("priority") 

135 if priority is not None and not isinstance(priority, int): 

136 return False, [], f"Issue {i}: 'priority' must be an integer or null" 

137 

138 title = item.get("title", "") 

139 if not isinstance(title, str): 

140 return False, [], f"Issue {i}: 'title' must be a string" 

141 

142 body = item.get("body", "") 

143 if not isinstance(body, str): 

144 return False, [], f"Issue {i}: 'body' must be a string" 

145 

146 issues.append( 

147 ReviewIssue( 

148 file=file_path, 

149 line_start=line_start, 

150 line_end=line_end, 

151 priority=priority, 

152 title=title, 

153 body=body, 

154 reviewer=reviewer, 

155 ) 

156 ) 

157 

158 return passed, issues, None 

159 

160 def map_exit_code_to_result( 

161 self, 

162 exit_code: int, 

163 stdout: str, 

164 stderr: str, 

165 review_log_path: Path | None = None, 

166 event_sink: MalaEventSink | None = None, 

167 ) -> ReviewResult: 

168 """Map Cerberus review-gate exit code to ReviewResult. 

169 

170 Exit codes: 

171 0 - PASS: all reviewers agree, no issues 

172 1 - FAIL/NEEDS_WORK: legitimate review failure 

173 2 - Parse error: malformed reviewer output 

174 3 - Timeout: reviewers didn't respond in time 

175 4 - No reviewers: no reviewer CLIs available 

176 5 - Internal error: unexpected failure 

177 

178 Args: 

179 exit_code: Exit code from review-gate wait command. 

180 stdout: Stdout from the command (JSON output). 

181 stderr: Stderr from the command (error messages). 

182 review_log_path: Optional path to review session logs. 

183 event_sink: Optional event sink for emitting warnings. 

184 

185 Returns: 

186 ReviewResult with appropriate fields set. 

187 """ 

188 # Exit codes 4 and 5 are fatal errors 

189 if exit_code == 4: 

190 return ReviewResult( 

191 passed=False, 

192 issues=[], 

193 parse_error="No reviewers available", 

194 fatal_error=True, 

195 review_log_path=review_log_path, 

196 ) 

197 

198 if exit_code == 5: 

199 error_msg = stderr.strip() if stderr else "Internal error" 

200 return ReviewResult( 

201 passed=False, 

202 issues=[], 

203 parse_error=error_msg, 

204 fatal_error=True, 

205 review_log_path=review_log_path, 

206 ) 

207 

208 # Exit code 3 is timeout (retryable) 

209 if exit_code == 3: 

210 return ReviewResult( 

211 passed=False, 

212 issues=[], 

213 parse_error="timeout", 

214 fatal_error=False, 

215 review_log_path=review_log_path, 

216 ) 

217 

218 # Exit code 2 is parse error (retryable) 

219 if exit_code == 2: 

220 # Try to extract error from JSON parse_errors array 

221 parse_error_msg = "Parse error" 

222 try: 

223 data = json.loads(stdout) 

224 if isinstance(data, dict): 

225 parse_errors = data.get("parse_errors", []) 

226 if isinstance(parse_errors, list) and parse_errors: 

227 parse_error_msg = "; ".join( 

228 str(e.get("error", e)) if isinstance(e, dict) else str(e) 

229 for e in parse_errors 

230 ) 

231 except (json.JSONDecodeError, TypeError): 

232 if stderr: 

233 parse_error_msg = stderr.strip() 

234 return ReviewResult( 

235 passed=False, 

236 issues=[], 

237 parse_error=parse_error_msg, 

238 fatal_error=False, 

239 review_log_path=review_log_path, 

240 ) 

241 

242 # Exit codes 0 and 1: parse JSON output 

243 json_passed, issues, parse_error = self.parse_json(stdout) 

244 

245 if parse_error: 

246 # JSON parsing failed - treat as parse error (exit code 2 equivalent) 

247 return ReviewResult( 

248 passed=False, 

249 issues=[], 

250 parse_error=parse_error, 

251 fatal_error=False, 

252 review_log_path=review_log_path, 

253 ) 

254 

255 # Derive passed status from exit code 

256 exit_passed = exit_code == 0 

257 

258 # Warn if exit code and JSON verdict disagree 

259 if json_passed != exit_passed: 

260 message = ( 

261 f"Exit code ({exit_code}) and JSON verdict " 

262 f"({'PASS' if json_passed else 'FAIL'}) disagree; " 

263 f"fail-closed: requiring both to pass" 

264 ) 

265 if event_sink is not None: 

266 event_sink.on_review_warning(message) 

267 else: 

268 # Always log this critical diagnostic even without event_sink 

269 logging.warning(message) 

270 

271 # Security: fail-closed - BOTH exit code AND JSON verdict must pass 

272 # This prevents a review from passing when the consensus verdict is 

273 # FAIL, NEEDS_WORK, or no_reviewers even if exit code is 0 

274 final_passed = exit_passed and json_passed 

275 

276 return ReviewResult( 

277 passed=final_passed, 

278 issues=issues, 

279 parse_error=None, 

280 fatal_error=False, 

281 review_log_path=review_log_path, 

282 ) 

283 

284 

285# Module-level convenience functions for backward compatibility 

286# These delegate to a shared parser instance 

287 

288_parser = ReviewOutputParser() 

289 

290 

291def parse_cerberus_json(output: str) -> tuple[bool, list[ReviewIssue], str | None]: 

292 """Parse Cerberus review-gate JSON output. 

293 

294 This is a convenience function that delegates to ReviewOutputParser.parse_json(). 

295 """ 

296 return _parser.parse_json(output) 

297 

298 

299def map_exit_code_to_result( 

300 exit_code: int, 

301 stdout: str, 

302 stderr: str, 

303 review_log_path: Path | None = None, 

304 event_sink: MalaEventSink | None = None, 

305) -> ReviewResult: 

306 """Map Cerberus review-gate exit code to ReviewResult. 

307 

308 This is a convenience function that delegates to ReviewOutputParser.map_exit_code_to_result(). 

309 """ 

310 return _parser.map_exit_code_to_result( 

311 exit_code, stdout, stderr, review_log_path, event_sink 

312 )