Coverage for little_loops / parallel / output_parsing.py: 97%

184 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-02-15 15:23 -0600

1"""Output parsing utilities for automation tools. 

2 

3Provides parsing functions for Claude CLI command outputs, enabling both 

4sequential and parallel issue processors to interpret structured command 

5responses consistently. 

6""" 

7 

8from __future__ import annotations 

9 

10import re 

11from typing import Any 

12 

13# Regex patterns for standardized output parsing 

14# Support #, ##, and ### headers with flexible spacing and optional formatting 

15# Handles: ## VERDICT, ###VERDICT, ## **VERDICT**, ## VERDICT 

16SECTION_PATTERN = re.compile( 

17 r"^#{1,3}\s*\**(\w+)\**\s*$", 

18 re.MULTILINE, 

19) 

20TABLE_ROW_PATTERN = re.compile(r"\|\s*(\w+)\s*\|\s*(\w+)\s*\|\s*(.+?)\s*\|") 

21STATUS_PATTERN = re.compile(r"^- (\w+): (\w+)", re.MULTILINE) 

22 

23# Valid verdicts for ready-issue 

24VALID_VERDICTS = ("READY", "CORRECTED", "NOT_READY", "NEEDS_REVIEW", "CLOSE") 

25 

26 

27def _clean_verdict_content(content: str) -> str: 

28 """Clean verdict content by removing common formatting artifacts. 

29 

30 Handles: 

31 - Code block markers (``` and `) 

32 - Markdown bold/italic (** and *) 

33 - Template brackets ([]) 

34 - Leading/trailing whitespace 

35 - Colons after verdict 

36 

37 Args: 

38 content: Raw verdict content from output 

39 

40 Returns: 

41 Cleaned content ready for verdict extraction 

42 """ 

43 # Remove code fence markers (``` or ```) 

44 content = re.sub(r"^```\w*\s*", "", content) 

45 content = re.sub(r"\s*```$", "", content) 

46 # Remove inline code backticks 

47 content = content.replace("`", "") 

48 # Remove markdown bold/italic 

49 content = content.replace("**", "").replace("*", "") 

50 # Remove template brackets 

51 content = content.strip("[]") 

52 return content.strip() 

53 

54 

55def _extract_verdict_from_text(text: str) -> str | None: 

56 """Extract a valid verdict from arbitrary text. 

57 

58 Searches for valid verdict keywords in the text, handling various 

59 formats like "READY", "The verdict is READY", "NOT_READY", etc. 

60 

61 Args: 

62 text: Text that may contain a verdict 

63 

64 Returns: 

65 Valid verdict string or None if not found 

66 """ 

67 text_upper = text.upper() 

68 

69 # Check each valid verdict (check NOT_READY before READY to avoid partial match) 

70 # Order matters: check longer/compound verdicts first 

71 for verdict in ("NOT_READY", "NEEDS_REVIEW", "CORRECTED", "READY", "CLOSE"): 

72 # Match verdict as a word boundary (not part of another word) 

73 # Handle both underscore and space variants 

74 patterns = [ 

75 rf"\b{verdict}\b", 

76 rf"\b{verdict.replace('_', ' ')}\b", # NOT READY, NEEDS REVIEW 

77 rf"\b{verdict.replace('_', '-')}\b", # NOT-READY, NEEDS-REVIEW 

78 ] 

79 for pattern in patterns: 

80 if re.search(pattern, text_upper): 

81 # Normalize to underscore format 

82 return verdict 

83 

84 # Try common Claude phrasings that map to verdicts 

85 # Note: Using re.IGNORECASE since patterns are lowercase 

86 phrasing_map = [ 

87 # Patterns for READY 

88 (r"\bissue\s+is\s+ready\b", "READY"), 

89 (r"\bready\s+for\s+implementation\b", "READY"), 

90 (r"\bimplementation[\s-]ready\b", "READY"), 

91 (r"\bapproved\s+for\s+implementation\b", "READY"), 

92 (r"\bproceed\s+(to|with)\s+implementation\b", "READY"), 

93 # Patterns for CLOSE 

94 (r"\bshould\s+be\s+closed\b", "CLOSE"), 

95 (r"\bclose\s+this\s+issue\b", "CLOSE"), 

96 (r"\bmark\s+as\s+closed\b", "CLOSE"), 

97 (r"\balready\s+fixed\b", "CLOSE"), 

98 (r"\binvalid\s+reference\b", "CLOSE"), 

99 (r"\bmove.*to.*completed\b", "CLOSE"), # "move this issue to the completed directory" 

100 (r"\bclosure\s+status\b", "CLOSE"), # "closure status" 

101 # Patterns for NOT_READY 

102 (r"\bnot\s+ready\b", "NOT_READY"), # General "not ready" pattern 

103 (r"\bneeds?\s+more\s+work\b", "NOT_READY"), 

104 (r"\brequires?\s+clarification\b", "NOT_READY"), 

105 (r"\bmissing\s+information\b", "NOT_READY"), 

106 # Patterns for CORRECTED 

107 (r"\bcorrections?\s+made\b", "CORRECTED"), 

108 (r"\bupdated?\s+and\s+ready\b", "CORRECTED"), 

109 (r"\bfixed?\s+and\s+ready\b", "CORRECTED"), 

110 ] 

111 

112 for pattern, verdict in phrasing_map: 

113 if re.search(pattern, text, re.IGNORECASE): 

114 return verdict 

115 

116 return None 

117 

118 

119def parse_sections(output: str) -> dict[str, str]: 

120 """Parse output into sections by ## SECTION_NAME headers. 

121 

122 The standardized slash command output format uses ## SECTION_NAME 

123 headers (uppercase with underscores) to delimit sections. 

124 

125 Args: 

126 output: The stdout from a slash command 

127 

128 Returns: 

129 dict mapping section names to their content 

130 """ 

131 sections: dict[str, str] = {} 

132 current_section = "PREAMBLE" 

133 current_content: list[str] = [] 

134 

135 for line in output.split("\n"): 

136 match = SECTION_PATTERN.match(line) 

137 if match: 

138 # Save previous section 

139 sections[current_section] = "\n".join(current_content).strip() 

140 current_section = match.group(1) 

141 current_content = [] 

142 else: 

143 current_content.append(line) 

144 

145 # Save final section 

146 sections[current_section] = "\n".join(current_content).strip() 

147 return sections 

148 

149 

150def parse_validation_table(section_content: str) -> dict[str, dict[str, str]]: 

151 """Parse a validation table from section content. 

152 

153 Expects format: 

154 | Check | Status | Details | 

155 |-------|--------|---------| 

156 | Format | PASS | ... | 

157 

158 Args: 

159 section_content: Content of the VALIDATION section 

160 

161 Returns: 

162 dict mapping check names to {status, details} 

163 """ 

164 results: dict[str, dict[str, str]] = {} 

165 for match in TABLE_ROW_PATTERN.finditer(section_content): 

166 check_name = match.group(1) 

167 # Skip header row indicators 

168 if check_name.lower() in ("check", "---", ""): 

169 continue 

170 results[check_name] = { 

171 "status": match.group(2).upper(), 

172 "details": match.group(3).strip(), 

173 } 

174 return results 

175 

176 

177def parse_status_lines(section_content: str) -> dict[str, str]: 

178 """Parse status lines from section content. 

179 

180 Expects format: 

181 - tests: PASS 

182 - lint: PASS 

183 

184 Args: 

185 section_content: Content of a section with status lines 

186 

187 Returns: 

188 dict mapping item names to status values 

189 """ 

190 results: dict[str, str] = {} 

191 for match in STATUS_PATTERN.finditer(section_content): 

192 results[match.group(1)] = match.group(2).upper() 

193 return results 

194 

195 

196def parse_ready_issue_output(output: str) -> dict[str, Any]: 

197 """Extract verdict and concerns from ready-issue output. 

198 

199 The ready-issue command outputs structured sections with a VERDICT 

200 section containing READY, CORRECTED, NOT_READY, NEEDS_REVIEW, or CLOSE. 

201 

202 Supports both old format (VERDICT: READY) and new standardized format 

203 (## VERDICT\\nREADY) for backwards compatibility. 

204 

205 Args: 

206 output: The stdout from the ready-issue command 

207 

208 Returns: 

209 dict with keys: 

210 - verdict: str ("READY", "CORRECTED", "NOT_READY", "NEEDS_REVIEW", 

211 "CLOSE", or "UNKNOWN") 

212 - concerns: list[str] of concern messages 

213 - is_ready: bool indicating if issue is ready for implementation 

214 - was_corrected: bool indicating if corrections were made 

215 - should_close: bool indicating if issue should be closed 

216 - close_reason: str|None (e.g., "already_fixed", "invalid_ref") 

217 - close_status: str|None (e.g., "Closed - Already Fixed") 

218 - corrections: list[str] of corrections made 

219 - validated_file_path: str|None path to the file that was validated 

220 - sections: dict of parsed sections (if standardized format) 

221 - validation: dict of validation results (if standardized format) 

222 """ 

223 # Try new standardized format first 

224 sections = parse_sections(output) 

225 verdict = "UNKNOWN" 

226 concerns: list[str] = [] 

227 corrections: list[str] = [] 

228 validation: dict[str, dict[str, str]] = {} 

229 close_reason: str | None = None 

230 close_status: str | None = None 

231 validated_file_path: str | None = None 

232 

233 # Strategy 1: Check for VERDICT section (new format with # or ## header) 

234 if "VERDICT" in sections: 

235 verdict_section = sections["VERDICT"].strip() 

236 

237 # Try each non-empty line until we find a verdict 

238 for line in verdict_section.split("\n"): 

239 line = line.strip() 

240 if not line: 

241 continue 

242 

243 # Clean the line of formatting artifacts 

244 cleaned = _clean_verdict_content(line) 

245 if not cleaned: 

246 continue 

247 

248 # Try to extract verdict from cleaned line 

249 extracted = _extract_verdict_from_text(cleaned) 

250 if extracted: 

251 verdict = extracted 

252 break 

253 

254 # Strategy 2: Old format (VERDICT: READY) anywhere in output 

255 if verdict == "UNKNOWN": 

256 verdict_match = re.search( 

257 r"VERDICT:\s*(READY|CORRECTED|NOT[_\s-]?READY|NEEDS[_\s-]?REVIEW|CLOSE)", 

258 output, 

259 re.IGNORECASE, 

260 ) 

261 if verdict_match: 

262 verdict = verdict_match.group(1).upper().replace(" ", "_").replace("-", "_") 

263 

264 # Strategy 3: Look for verdict keywords near "verdict" mentions 

265 if verdict == "UNKNOWN": 

266 # Find lines containing "verdict" and check for verdict keywords 

267 for line in output.split("\n"): 

268 if "verdict" in line.lower(): 

269 extracted = _extract_verdict_from_text(line) 

270 if extracted: 

271 verdict = extracted 

272 break 

273 

274 # Strategy 4: Scan entire output for standalone verdict keywords 

275 # (last resort - may have false positives but better than UNKNOWN) 

276 if verdict == "UNKNOWN": 

277 extracted = _extract_verdict_from_text(output) 

278 if extracted: 

279 verdict = extracted 

280 

281 # Strategy 5: Clean the entire output and retry extraction 

282 # Handles cases where formatting artifacts (bold, backticks) break word boundaries 

283 if verdict == "UNKNOWN": 

284 cleaned_output = _clean_verdict_content(output) 

285 extracted = _extract_verdict_from_text(cleaned_output) 

286 if extracted: 

287 verdict = extracted 

288 

289 # Parse CORRECTIONS_MADE section if present (moved before Strategy 6) 

290 if "CORRECTIONS_MADE" in sections: 

291 corrections_content = sections["CORRECTIONS_MADE"] 

292 for line in corrections_content.split("\n"): 

293 line = line.strip() 

294 if line.startswith("- ") and line != "- None": 

295 corrections.append(line[2:]) 

296 

297 # Strategy 6: Infer from READY_FOR section 

298 # If "READY_FOR" section exists with "Implementation: Yes", infer verdict 

299 if verdict == "UNKNOWN" and "READY_FOR" in sections: 

300 ready_for_content = sections["READY_FOR"] 

301 # Check for "implementation" + "yes" pattern (handles bold markers, colons, etc.) 

302 # Handles: "Implementation: Yes", "**Implementation:** Yes", etc. 

303 if re.search(r"implementation[\s:\*]*yes", ready_for_content, re.IGNORECASE): 

304 # If corrections were made, verdict is CORRECTED; otherwise READY 

305 verdict = "CORRECTED" if corrections else "READY" 

306 

307 # Parse CONCERNS section (new format) 

308 if "CONCERNS" in sections: 

309 concern_content = sections["CONCERNS"] 

310 for line in concern_content.split("\n"): 

311 line = line.strip() 

312 if line.startswith("- ") and line != "- None": 

313 concerns.append(line[2:]) # Remove "- " prefix 

314 

315 # Fall back to old concern detection 

316 if not concerns: 

317 for line in output.split("\n"): 

318 line_stripped = line.strip() 

319 if any( 

320 indicator in line_stripped 

321 for indicator in ["WARNING", "Concern:", "Issue:", "Missing:"] 

322 ): 

323 concerns.append(line_stripped) 

324 

325 # Parse CLOSE_REASON section if present (for CLOSE verdict) 

326 if "CLOSE_REASON" in sections: 

327 close_reason_content = sections["CLOSE_REASON"] 

328 # Look for "- Reason: <value>" line 

329 for line in close_reason_content.split("\n"): 

330 # Strip whitespace and bold markers (**) that Claude sometimes adds 

331 line = line.strip().replace("**", "") 

332 if line.lower().startswith("- reason:"): 

333 reason_value = line.split(":", 1)[1].strip().lower() 

334 # Also strip backticks that may wrap the value 

335 close_reason = reason_value.strip("`").strip() 

336 break 

337 # Also handle "Reason: <value>" without dash 

338 if line.lower().startswith("reason:"): 

339 reason_value = line.split(":", 1)[1].strip().lower() 

340 close_reason = reason_value.strip("`").strip() 

341 break 

342 

343 # Parse CLOSE_STATUS section if present 

344 if "CLOSE_STATUS" in sections: 

345 close_status_content = sections["CLOSE_STATUS"].strip() 

346 # Take first non-empty line as the status 

347 for line in close_status_content.split("\n"): 

348 line = line.strip() 

349 if line and not line.startswith("#"): 

350 close_status = line 

351 break 

352 

353 # Parse VALIDATED_FILE section if present (for path validation) 

354 if "VALIDATED_FILE" in sections: 

355 validated_file_content = sections["VALIDATED_FILE"].strip() 

356 # Take first non-empty line as the file path 

357 for line in validated_file_content.split("\n"): 

358 line = line.strip() 

359 # Skip empty lines, comments, and template placeholders 

360 if line and not line.startswith("#") and not line.startswith("["): 

361 # Strip markdown backticks that Claude sometimes wraps paths in 

362 validated_file_path = line.strip("`") 

363 break 

364 

365 # Parse VALIDATION section if present 

366 if "VALIDATION" in sections: 

367 validation = parse_validation_table(sections["VALIDATION"]) 

368 

369 # Determine flags based on verdict 

370 is_ready = verdict in ("READY", "CORRECTED") 

371 was_corrected = verdict == "CORRECTED" or len(corrections) > 0 

372 should_close = verdict == "CLOSE" 

373 

374 return { 

375 "verdict": verdict, 

376 "concerns": concerns, 

377 "is_ready": is_ready, 

378 "was_corrected": was_corrected, 

379 "should_close": should_close, 

380 "close_reason": close_reason, 

381 "close_status": close_status, 

382 "corrections": corrections, 

383 "validated_file_path": validated_file_path, 

384 "sections": sections, 

385 "validation": validation, 

386 } 

387 

388 

389def parse_manage_issue_output(output: str) -> dict[str, Any]: 

390 """Extract structured data from manage-issue output. 

391 

392 The manage-issue command outputs structured sections with metadata, 

393 files changed, commits, verification results, and final status. 

394 

395 Args: 

396 output: The stdout from the manage-issue command 

397 

398 Returns: 

399 dict with keys: 

400 - status: str ("COMPLETED", "FAILED", "BLOCKED", or "UNKNOWN") 

401 - files_changed: list[str] of modified files 

402 - files_created: list[str] of created files 

403 - commits: list[str] of commit hashes/messages 

404 - verification: dict of verification results 

405 - ooda_impact: dict of OODA impact status 

406 - sections: dict of all parsed sections 

407 """ 

408 sections = parse_sections(output) 

409 status = "UNKNOWN" 

410 files_changed: list[str] = [] 

411 files_created: list[str] = [] 

412 commits: list[str] = [] 

413 verification: dict[str, str] = {} 

414 ooda_impact: dict[str, str] = {} 

415 

416 # Parse RESULT section for status 

417 if "RESULT" in sections: 

418 status_match = re.search(r"Status:\s*(\w+)", sections["RESULT"]) 

419 if status_match: 

420 status = status_match.group(1).upper() 

421 

422 # Parse FILES_CHANGED section 

423 if "FILES_CHANGED" in sections: 

424 for line in sections["FILES_CHANGED"].split("\n"): 

425 line = line.strip() 

426 if line.startswith("- ") and line != "- None": 

427 files_changed.append(line[2:]) 

428 

429 # Parse FILES_CREATED section 

430 if "FILES_CREATED" in sections: 

431 for line in sections["FILES_CREATED"].split("\n"): 

432 line = line.strip() 

433 if line.startswith("- ") and line != "- None": 

434 files_created.append(line[2:]) 

435 

436 # Parse COMMITS section 

437 if "COMMITS" in sections: 

438 for line in sections["COMMITS"].split("\n"): 

439 line = line.strip() 

440 if line.startswith("- ") and line != "- None": 

441 commits.append(line[2:]) 

442 

443 # Parse VERIFICATION section 

444 if "VERIFICATION" in sections: 

445 verification = parse_status_lines(sections["VERIFICATION"]) 

446 

447 # Parse OODA_IMPACT section 

448 if "OODA_IMPACT" in sections: 

449 for line in sections["OODA_IMPACT"].split("\n"): 

450 line = line.strip() 

451 if line.startswith("- "): 

452 parts = line[2:].split(":", 1) 

453 if len(parts) == 2: 

454 ooda_impact[parts[0].strip()] = parts[1].strip().upper() 

455 

456 return { 

457 "status": status, 

458 "files_changed": files_changed, 

459 "files_created": files_created, 

460 "commits": commits, 

461 "verification": verification, 

462 "ooda_impact": ooda_impact, 

463 "sections": sections, 

464 }