Coverage for little_loops / output_parsing.py: 8%

185 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""Output parsing utilities for little-loops. 

2 

3Provides parsing functions for Claude CLI command outputs, 

4used by both issue_manager (ll-auto) and worker_pool (ll-parallel). 

5""" 

6 

7from __future__ import annotations 

8 

9import re 

10from typing import Any 

11 

12# Regex patterns for standardized output parsing 

13# Support #, ##, and ### headers with flexible spacing and optional formatting 

14# Handles: ## VERDICT, ###VERDICT, ## **VERDICT**, ## VERDICT 

15SECTION_PATTERN = re.compile( 

16 r"^#{1,3}\s*\**(\w+)\**\s*$", 

17 re.MULTILINE, 

18) 

19TABLE_ROW_PATTERN = re.compile(r"\|\s*(\w+)\s*\|\s*(\w+)\s*\|\s*(.+?)\s*\|") 

20STATUS_PATTERN = re.compile(r"^- (\w+): (\w+)", re.MULTILINE) 

21 

22# Valid verdicts for ready-issue 

23VALID_VERDICTS = ("READY", "CORRECTED", "NOT_READY", "NEEDS_REVIEW", "CLOSE", "BLOCKED") 

24 

25 

26def _clean_verdict_content(content: str) -> str: 

27 """Clean verdict content by removing common formatting artifacts. 

28 

29 Handles: 

30 - Code block markers (``` and `) 

31 - Markdown bold/italic (** and *) 

32 - Template brackets ([]) 

33 - Leading/trailing whitespace 

34 - Colons after verdict 

35 

36 Args: 

37 content: Raw verdict content from output 

38 

39 Returns: 

40 Cleaned content ready for verdict extraction 

41 """ 

42 # Remove code fence markers (``` or ```) 

43 content = re.sub(r"^```\w*\s*", "", content) 

44 content = re.sub(r"\s*```$", "", content) 

45 # Remove inline code backticks 

46 content = content.replace("`", "") 

47 # Remove markdown bold/italic 

48 content = content.replace("**", "").replace("*", "") 

49 # Remove template brackets 

50 content = content.strip("[]") 

51 return content.strip() 

52 

53 

54def _extract_verdict_from_text(text: str) -> str | None: 

55 """Extract a valid verdict from arbitrary text. 

56 

57 Searches for valid verdict keywords in the text, handling various 

58 formats like "READY", "The verdict is READY", "NOT_READY", etc. 

59 

60 Args: 

61 text: Text that may contain a verdict 

62 

63 Returns: 

64 Valid verdict string or None if not found 

65 """ 

66 text_upper = text.upper() 

67 

68 # Check each valid verdict (check NOT_READY before READY to avoid partial match) 

69 # Order matters: check longer/compound verdicts first 

70 for verdict in ("NOT_READY", "NEEDS_REVIEW", "CORRECTED", "BLOCKED", "READY", "CLOSE"): 

71 # Match verdict as a word boundary (not part of another word) 

72 # Handle both underscore and space variants 

73 patterns = [ 

74 rf"\b{verdict}\b", 

75 rf"\b{verdict.replace('_', ' ')}\b", # NOT READY, NEEDS REVIEW 

76 rf"\b{verdict.replace('_', '-')}\b", # NOT-READY, NEEDS-REVIEW 

77 ] 

78 for pattern in patterns: 

79 if re.search(pattern, text_upper): 

80 # Normalize to underscore format 

81 return verdict 

82 

83 # Try common Claude phrasings that map to verdicts 

84 # Note: Using re.IGNORECASE since patterns are lowercase 

85 phrasing_map = [ 

86 # Patterns for READY 

87 (r"\bissue\s+is\s+ready\b", "READY"), 

88 (r"\bready\s+for\s+implementation\b", "READY"), 

89 (r"\bimplementation[\s-]ready\b", "READY"), 

90 (r"\bapproved\s+for\s+implementation\b", "READY"), 

91 (r"\bproceed\s+(to|with)\s+implementation\b", "READY"), 

92 # Patterns for CLOSE 

93 (r"\bshould\s+be\s+closed\b", "CLOSE"), 

94 (r"\bclose\s+this\s+issue\b", "CLOSE"), 

95 (r"\bmark\s+as\s+closed\b", "CLOSE"), 

96 (r"\balready\s+fixed\b", "CLOSE"), 

97 (r"\binvalid\s+reference\b", "CLOSE"), 

98 (r"\bmove.*to.*completed\b", "CLOSE"), # "move this issue to the completed directory" 

99 (r"\bclosure\s+status\b", "CLOSE"), # "closure status" 

100 # Patterns for NOT_READY 

101 (r"\bnot\s+ready\b", "NOT_READY"), # General "not ready" pattern 

102 (r"\bneeds?\s+more\s+work\b", "NOT_READY"), 

103 (r"\brequires?\s+clarification\b", "NOT_READY"), 

104 (r"\bmissing\s+information\b", "NOT_READY"), 

105 # Patterns for CORRECTED 

106 (r"\bcorrections?\s+made\b", "CORRECTED"), 

107 (r"\bupdated?\s+and\s+ready\b", "CORRECTED"), 

108 (r"\bfixed?\s+and\s+ready\b", "CORRECTED"), 

109 ] 

110 

111 for pattern, verdict in phrasing_map: 

112 if re.search(pattern, text, re.IGNORECASE): 

113 return verdict 

114 

115 return None 

116 

117 

118def parse_sections(output: str) -> dict[str, str]: 

119 """Parse output into sections by ## SECTION_NAME headers. 

120 

121 The standardized slash command output format uses ## SECTION_NAME 

122 headers (uppercase with underscores) to delimit sections. 

123 

124 Args: 

125 output: The stdout from a slash command 

126 

127 Returns: 

128 dict mapping section names to their content 

129 """ 

130 sections: dict[str, str] = {} 

131 current_section = "PREAMBLE" 

132 current_content: list[str] = [] 

133 

134 for line in output.split("\n"): 

135 match = SECTION_PATTERN.match(line) 

136 if match: 

137 # Save previous section 

138 sections[current_section] = "\n".join(current_content).strip() 

139 current_section = match.group(1) 

140 current_content = [] 

141 else: 

142 current_content.append(line) 

143 

144 # Save final section 

145 sections[current_section] = "\n".join(current_content).strip() 

146 return sections 

147 

148 

149def parse_validation_table(section_content: str) -> dict[str, dict[str, str]]: 

150 """Parse a validation table from section content. 

151 

152 Expects format: 

153 | Check | Status | Details | 

154 |-------|--------|---------| 

155 | Format | PASS | ... | 

156 

157 Args: 

158 section_content: Content of the VALIDATION section 

159 

160 Returns: 

161 dict mapping check names to {status, details} 

162 """ 

163 results: dict[str, dict[str, str]] = {} 

164 for match in TABLE_ROW_PATTERN.finditer(section_content): 

165 check_name = match.group(1) 

166 # Skip header row indicators 

167 if check_name.lower() in ("check", "---", ""): 

168 continue 

169 results[check_name] = { 

170 "status": match.group(2).upper(), 

171 "details": match.group(3).strip(), 

172 } 

173 return results 

174 

175 

176def parse_status_lines(section_content: str) -> dict[str, str]: 

177 """Parse status lines from section content. 

178 

179 Expects format: 

180 - tests: PASS 

181 - lint: PASS 

182 

183 Args: 

184 section_content: Content of a section with status lines 

185 

186 Returns: 

187 dict mapping item names to status values 

188 """ 

189 results: dict[str, str] = {} 

190 for match in STATUS_PATTERN.finditer(section_content): 

191 results[match.group(1)] = match.group(2).upper() 

192 return results 

193 

194 

195def parse_ready_issue_output(output: str) -> dict[str, Any]: 

196 """Extract verdict and concerns from ready-issue output. 

197 

198 The ready-issue command outputs structured sections with a VERDICT 

199 section containing READY, CORRECTED, NOT_READY, NEEDS_REVIEW, or CLOSE. 

200 

201 Supports both old format (VERDICT: READY) and new standardized format 

202 (## VERDICT\\nREADY) for backwards compatibility. 

203 

204 Args: 

205 output: The stdout from the ready-issue command 

206 

207 Returns: 

208 dict with keys: 

209 - verdict: str ("READY", "CORRECTED", "NOT_READY", "NEEDS_REVIEW", 

210 "CLOSE", "BLOCKED", or "UNKNOWN") 

211 - concerns: list[str] of concern messages 

212 - is_ready: bool indicating if issue is ready for implementation 

213 - was_corrected: bool indicating if corrections were made 

214 - should_close: bool indicating if issue should be closed 

215 - close_reason: str|None (e.g., "already_fixed", "invalid_ref") 

216 - close_status: str|None (e.g., "Closed - Already Fixed") 

217 - corrections: list[str] of corrections made 

218 - validated_file_path: str|None path to the file that was validated 

219 - sections: dict of parsed sections (if standardized format) 

220 - validation: dict of validation results (if standardized format) 

221 """ 

222 # Try new standardized format first 

223 sections = parse_sections(output) 

224 verdict = "UNKNOWN" 

225 concerns: list[str] = [] 

226 corrections: list[str] = [] 

227 validation: dict[str, dict[str, str]] = {} 

228 close_reason: str | None = None 

229 close_status: str | None = None 

230 validated_file_path: str | None = None 

231 

232 # Strategy 1: Check for VERDICT section (new format with # or ## header) 

233 if "VERDICT" in sections: 

234 verdict_section = sections["VERDICT"].strip() 

235 

236 # Try each non-empty line until we find a verdict 

237 for line in verdict_section.split("\n"): 

238 line = line.strip() 

239 if not line: 

240 continue 

241 

242 # Clean the line of formatting artifacts 

243 cleaned = _clean_verdict_content(line) 

244 if not cleaned: 

245 continue 

246 

247 # Try to extract verdict from cleaned line 

248 extracted = _extract_verdict_from_text(cleaned) 

249 if extracted: 

250 verdict = extracted 

251 break 

252 

253 # Strategy 2: Old format (VERDICT: READY) anywhere in output 

254 if verdict == "UNKNOWN": 

255 verdict_match = re.search( 

256 r"VERDICT:\s*(READY|CORRECTED|NOT[_\s-]?READY|NEEDS[_\s-]?REVIEW|CLOSE|BLOCKED)", 

257 output, 

258 re.IGNORECASE, 

259 ) 

260 if verdict_match: 

261 verdict = verdict_match.group(1).upper().replace(" ", "_").replace("-", "_") 

262 

263 # Strategy 3: Look for verdict keywords near "verdict" mentions 

264 if verdict == "UNKNOWN": 

265 # Find lines containing "verdict" and check for verdict keywords 

266 for line in output.split("\n"): 

267 if "verdict" in line.lower(): 

268 extracted = _extract_verdict_from_text(line) 

269 if extracted: 

270 verdict = extracted 

271 break 

272 

273 # Strategy 4: Scan entire output for standalone verdict keywords 

274 # (last resort - may have false positives but better than UNKNOWN) 

275 if verdict == "UNKNOWN": 

276 extracted = _extract_verdict_from_text(output) 

277 if extracted: 

278 verdict = extracted 

279 

280 # Strategy 5: Clean the entire output and retry extraction 

281 # Handles cases where formatting artifacts (bold, backticks) break word boundaries 

282 if verdict == "UNKNOWN": 

283 cleaned_output = _clean_verdict_content(output) 

284 extracted = _extract_verdict_from_text(cleaned_output) 

285 if extracted: 

286 verdict = extracted 

287 

288 # Parse CORRECTIONS_MADE section if present (moved before Strategy 6) 

289 if "CORRECTIONS_MADE" in sections: 

290 corrections_content = sections["CORRECTIONS_MADE"] 

291 for line in corrections_content.split("\n"): 

292 line = line.strip() 

293 if line.startswith("- ") and line != "- None": 

294 corrections.append(line[2:]) 

295 

296 # Strategy 6: Infer from READY_FOR section 

297 # If "READY_FOR" section exists with "Implementation: Yes", infer verdict 

298 if verdict == "UNKNOWN" and "READY_FOR" in sections: 

299 ready_for_content = sections["READY_FOR"] 

300 # Check for "implementation" + "yes" pattern (handles bold markers, colons, etc.) 

301 # Handles: "Implementation: Yes", "**Implementation:** Yes", etc. 

302 if re.search(r"implementation[\s:\*]*yes", ready_for_content, re.IGNORECASE): 

303 # If corrections were made, verdict is CORRECTED; otherwise READY 

304 verdict = "CORRECTED" if corrections else "READY" 

305 

306 # Parse CONCERNS section (new format) 

307 if "CONCERNS" in sections: 

308 concern_content = sections["CONCERNS"] 

309 for line in concern_content.split("\n"): 

310 line = line.strip() 

311 if line.startswith("- ") and line != "- None": 

312 concerns.append(line[2:]) # Remove "- " prefix 

313 

314 # Fall back to old concern detection 

315 if not concerns: 

316 for line in output.split("\n"): 

317 line_stripped = line.strip() 

318 if any( 

319 indicator in line_stripped 

320 for indicator in ["WARNING", "Concern:", "Issue:", "Missing:"] 

321 ): 

322 concerns.append(line_stripped) 

323 

324 # Parse CLOSE_REASON section if present (for CLOSE verdict) 

325 if "CLOSE_REASON" in sections: 

326 close_reason_content = sections["CLOSE_REASON"] 

327 # Look for "- Reason: <value>" line 

328 for line in close_reason_content.split("\n"): 

329 # Strip whitespace and bold markers (**) that Claude sometimes adds 

330 line = line.strip().replace("**", "") 

331 if line.lower().startswith("- reason:"): 

332 reason_value = line.split(":", 1)[1].strip().lower() 

333 # Also strip backticks that may wrap the value 

334 close_reason = reason_value.strip("`").strip() 

335 break 

336 # Also handle "Reason: <value>" without dash 

337 if line.lower().startswith("reason:"): 

338 reason_value = line.split(":", 1)[1].strip().lower() 

339 close_reason = reason_value.strip("`").strip() 

340 break 

341 

342 # Parse CLOSE_STATUS section if present 

343 if "CLOSE_STATUS" in sections: 

344 close_status_content = sections["CLOSE_STATUS"].strip() 

345 # Take first non-empty line as the status 

346 for line in close_status_content.split("\n"): 

347 line = line.strip() 

348 if line and not line.startswith("#"): 

349 close_status = line 

350 break 

351 

352 # Parse VALIDATED_FILE section if present (for path validation) 

353 if "VALIDATED_FILE" in sections: 

354 validated_file_content = sections["VALIDATED_FILE"].strip() 

355 # Take first non-empty line as the file path 

356 for line in validated_file_content.split("\n"): 

357 line = line.strip() 

358 # Skip empty lines, comments, and template placeholders 

359 if line and not line.startswith("#") and not line.startswith("["): 

360 # Strip markdown backticks that Claude sometimes wraps paths in 

361 validated_file_path = line.strip("`") 

362 break 

363 

364 # Parse VALIDATION section if present 

365 if "VALIDATION" in sections: 

366 validation = parse_validation_table(sections["VALIDATION"]) 

367 

368 # Determine flags based on verdict 

369 is_ready = verdict in ("READY", "CORRECTED") 

370 was_corrected = verdict == "CORRECTED" or len(corrections) > 0 

371 should_close = verdict == "CLOSE" 

372 is_blocked = verdict == "BLOCKED" 

373 

374 return { 

375 "verdict": verdict, 

376 "concerns": concerns, 

377 "is_ready": is_ready, 

378 "was_corrected": was_corrected, 

379 "should_close": should_close, 

380 "is_blocked": is_blocked, 

381 "close_reason": close_reason, 

382 "close_status": close_status, 

383 "corrections": corrections, 

384 "validated_file_path": validated_file_path, 

385 "sections": sections, 

386 "validation": validation, 

387 } 

388 

389 

390def parse_manage_issue_output(output: str) -> dict[str, Any]: 

391 """Extract structured data from manage-issue output. 

392 

393 The manage-issue command outputs structured sections with metadata, 

394 files changed, commits, verification results, and final status. 

395 

396 Args: 

397 output: The stdout from the manage-issue command 

398 

399 Returns: 

400 dict with keys: 

401 - status: str ("COMPLETED", "FAILED", "BLOCKED", or "UNKNOWN") 

402 - files_changed: list[str] of modified files 

403 - files_created: list[str] of created files 

404 - commits: list[str] of commit hashes/messages 

405 - verification: dict of verification results 

406 - ooda_impact: dict of OODA impact status 

407 - sections: dict of all parsed sections 

408 """ 

409 sections = parse_sections(output) 

410 status = "UNKNOWN" 

411 files_changed: list[str] = [] 

412 files_created: list[str] = [] 

413 commits: list[str] = [] 

414 verification: dict[str, str] = {} 

415 ooda_impact: dict[str, str] = {} 

416 

417 # Parse RESULT section for status 

418 if "RESULT" in sections: 

419 status_match = re.search(r"Status:\s*(\w+)", sections["RESULT"]) 

420 if status_match: 

421 status = status_match.group(1).upper() 

422 

423 # Parse FILES_CHANGED section 

424 if "FILES_CHANGED" in sections: 

425 for line in sections["FILES_CHANGED"].split("\n"): 

426 line = line.strip() 

427 if line.startswith("- ") and line != "- None": 

428 files_changed.append(line[2:]) 

429 

430 # Parse FILES_CREATED section 

431 if "FILES_CREATED" in sections: 

432 for line in sections["FILES_CREATED"].split("\n"): 

433 line = line.strip() 

434 if line.startswith("- ") and line != "- None": 

435 files_created.append(line[2:]) 

436 

437 # Parse COMMITS section 

438 if "COMMITS" in sections: 

439 for line in sections["COMMITS"].split("\n"): 

440 line = line.strip() 

441 if line.startswith("- ") and line != "- None": 

442 commits.append(line[2:]) 

443 

444 # Parse VERIFICATION section 

445 if "VERIFICATION" in sections: 

446 verification = parse_status_lines(sections["VERIFICATION"]) 

447 

448 # Parse OODA_IMPACT section 

449 if "OODA_IMPACT" in sections: 

450 for line in sections["OODA_IMPACT"].split("\n"): 

451 line = line.strip() 

452 if line.startswith("- "): 

453 parts = line[2:].split(":", 1) 

454 if len(parts) == 2: 

455 ooda_impact[parts[0].strip()] = parts[1].strip().upper() 

456 

457 return { 

458 "status": status, 

459 "files_changed": files_changed, 

460 "files_created": files_created, 

461 "commits": commits, 

462 "verification": verification, 

463 "ooda_impact": ooda_impact, 

464 "sections": sections, 

465 }