Coverage for little_loops / parallel / output_parsing.py: 97%
184 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-02-15 15:23 -0600
« prev ^ index » next coverage.py v7.12.0, created at 2026-02-15 15:23 -0600
1"""Output parsing utilities for automation tools.
3Provides parsing functions for Claude CLI command outputs, enabling both
4sequential and parallel issue processors to interpret structured command
5responses consistently.
6"""
8from __future__ import annotations
10import re
11from typing import Any
13# Regex patterns for standardized output parsing
14# Support #, ##, and ### headers with flexible spacing and optional formatting
15# Handles: ## VERDICT, ###VERDICT, ## **VERDICT**, ## VERDICT
16SECTION_PATTERN = re.compile(
17 r"^#{1,3}\s*\**(\w+)\**\s*$",
18 re.MULTILINE,
19)
20TABLE_ROW_PATTERN = re.compile(r"\|\s*(\w+)\s*\|\s*(\w+)\s*\|\s*(.+?)\s*\|")
21STATUS_PATTERN = re.compile(r"^- (\w+): (\w+)", re.MULTILINE)
23# Valid verdicts for ready-issue
24VALID_VERDICTS = ("READY", "CORRECTED", "NOT_READY", "NEEDS_REVIEW", "CLOSE")
27def _clean_verdict_content(content: str) -> str:
28 """Clean verdict content by removing common formatting artifacts.
30 Handles:
31 - Code block markers (``` and `)
32 - Markdown bold/italic (** and *)
33 - Template brackets ([])
34 - Leading/trailing whitespace
35 - Colons after verdict
37 Args:
38 content: Raw verdict content from output
40 Returns:
41 Cleaned content ready for verdict extraction
42 """
43 # Remove code fence markers (``` or ```)
44 content = re.sub(r"^```\w*\s*", "", content)
45 content = re.sub(r"\s*```$", "", content)
46 # Remove inline code backticks
47 content = content.replace("`", "")
48 # Remove markdown bold/italic
49 content = content.replace("**", "").replace("*", "")
50 # Remove template brackets
51 content = content.strip("[]")
52 return content.strip()
55def _extract_verdict_from_text(text: str) -> str | None:
56 """Extract a valid verdict from arbitrary text.
58 Searches for valid verdict keywords in the text, handling various
59 formats like "READY", "The verdict is READY", "NOT_READY", etc.
61 Args:
62 text: Text that may contain a verdict
64 Returns:
65 Valid verdict string or None if not found
66 """
67 text_upper = text.upper()
69 # Check each valid verdict (check NOT_READY before READY to avoid partial match)
70 # Order matters: check longer/compound verdicts first
71 for verdict in ("NOT_READY", "NEEDS_REVIEW", "CORRECTED", "READY", "CLOSE"):
72 # Match verdict as a word boundary (not part of another word)
73 # Handle both underscore and space variants
74 patterns = [
75 rf"\b{verdict}\b",
76 rf"\b{verdict.replace('_', ' ')}\b", # NOT READY, NEEDS REVIEW
77 rf"\b{verdict.replace('_', '-')}\b", # NOT-READY, NEEDS-REVIEW
78 ]
79 for pattern in patterns:
80 if re.search(pattern, text_upper):
81 # Normalize to underscore format
82 return verdict
84 # Try common Claude phrasings that map to verdicts
85 # Note: Using re.IGNORECASE since patterns are lowercase
86 phrasing_map = [
87 # Patterns for READY
88 (r"\bissue\s+is\s+ready\b", "READY"),
89 (r"\bready\s+for\s+implementation\b", "READY"),
90 (r"\bimplementation[\s-]ready\b", "READY"),
91 (r"\bapproved\s+for\s+implementation\b", "READY"),
92 (r"\bproceed\s+(to|with)\s+implementation\b", "READY"),
93 # Patterns for CLOSE
94 (r"\bshould\s+be\s+closed\b", "CLOSE"),
95 (r"\bclose\s+this\s+issue\b", "CLOSE"),
96 (r"\bmark\s+as\s+closed\b", "CLOSE"),
97 (r"\balready\s+fixed\b", "CLOSE"),
98 (r"\binvalid\s+reference\b", "CLOSE"),
99 (r"\bmove.*to.*completed\b", "CLOSE"), # "move this issue to the completed directory"
100 (r"\bclosure\s+status\b", "CLOSE"), # "closure status"
101 # Patterns for NOT_READY
102 (r"\bnot\s+ready\b", "NOT_READY"), # General "not ready" pattern
103 (r"\bneeds?\s+more\s+work\b", "NOT_READY"),
104 (r"\brequires?\s+clarification\b", "NOT_READY"),
105 (r"\bmissing\s+information\b", "NOT_READY"),
106 # Patterns for CORRECTED
107 (r"\bcorrections?\s+made\b", "CORRECTED"),
108 (r"\bupdated?\s+and\s+ready\b", "CORRECTED"),
109 (r"\bfixed?\s+and\s+ready\b", "CORRECTED"),
110 ]
112 for pattern, verdict in phrasing_map:
113 if re.search(pattern, text, re.IGNORECASE):
114 return verdict
116 return None
119def parse_sections(output: str) -> dict[str, str]:
120 """Parse output into sections by ## SECTION_NAME headers.
122 The standardized slash command output format uses ## SECTION_NAME
123 headers (uppercase with underscores) to delimit sections.
125 Args:
126 output: The stdout from a slash command
128 Returns:
129 dict mapping section names to their content
130 """
131 sections: dict[str, str] = {}
132 current_section = "PREAMBLE"
133 current_content: list[str] = []
135 for line in output.split("\n"):
136 match = SECTION_PATTERN.match(line)
137 if match:
138 # Save previous section
139 sections[current_section] = "\n".join(current_content).strip()
140 current_section = match.group(1)
141 current_content = []
142 else:
143 current_content.append(line)
145 # Save final section
146 sections[current_section] = "\n".join(current_content).strip()
147 return sections
150def parse_validation_table(section_content: str) -> dict[str, dict[str, str]]:
151 """Parse a validation table from section content.
153 Expects format:
154 | Check | Status | Details |
155 |-------|--------|---------|
156 | Format | PASS | ... |
158 Args:
159 section_content: Content of the VALIDATION section
161 Returns:
162 dict mapping check names to {status, details}
163 """
164 results: dict[str, dict[str, str]] = {}
165 for match in TABLE_ROW_PATTERN.finditer(section_content):
166 check_name = match.group(1)
167 # Skip header row indicators
168 if check_name.lower() in ("check", "---", ""):
169 continue
170 results[check_name] = {
171 "status": match.group(2).upper(),
172 "details": match.group(3).strip(),
173 }
174 return results
177def parse_status_lines(section_content: str) -> dict[str, str]:
178 """Parse status lines from section content.
180 Expects format:
181 - tests: PASS
182 - lint: PASS
184 Args:
185 section_content: Content of a section with status lines
187 Returns:
188 dict mapping item names to status values
189 """
190 results: dict[str, str] = {}
191 for match in STATUS_PATTERN.finditer(section_content):
192 results[match.group(1)] = match.group(2).upper()
193 return results
196def parse_ready_issue_output(output: str) -> dict[str, Any]:
197 """Extract verdict and concerns from ready-issue output.
199 The ready-issue command outputs structured sections with a VERDICT
200 section containing READY, CORRECTED, NOT_READY, NEEDS_REVIEW, or CLOSE.
202 Supports both old format (VERDICT: READY) and new standardized format
203 (## VERDICT\\nREADY) for backwards compatibility.
205 Args:
206 output: The stdout from the ready-issue command
208 Returns:
209 dict with keys:
210 - verdict: str ("READY", "CORRECTED", "NOT_READY", "NEEDS_REVIEW",
211 "CLOSE", or "UNKNOWN")
212 - concerns: list[str] of concern messages
213 - is_ready: bool indicating if issue is ready for implementation
214 - was_corrected: bool indicating if corrections were made
215 - should_close: bool indicating if issue should be closed
216 - close_reason: str|None (e.g., "already_fixed", "invalid_ref")
217 - close_status: str|None (e.g., "Closed - Already Fixed")
218 - corrections: list[str] of corrections made
219 - validated_file_path: str|None path to the file that was validated
220 - sections: dict of parsed sections (if standardized format)
221 - validation: dict of validation results (if standardized format)
222 """
223 # Try new standardized format first
224 sections = parse_sections(output)
225 verdict = "UNKNOWN"
226 concerns: list[str] = []
227 corrections: list[str] = []
228 validation: dict[str, dict[str, str]] = {}
229 close_reason: str | None = None
230 close_status: str | None = None
231 validated_file_path: str | None = None
233 # Strategy 1: Check for VERDICT section (new format with # or ## header)
234 if "VERDICT" in sections:
235 verdict_section = sections["VERDICT"].strip()
237 # Try each non-empty line until we find a verdict
238 for line in verdict_section.split("\n"):
239 line = line.strip()
240 if not line:
241 continue
243 # Clean the line of formatting artifacts
244 cleaned = _clean_verdict_content(line)
245 if not cleaned:
246 continue
248 # Try to extract verdict from cleaned line
249 extracted = _extract_verdict_from_text(cleaned)
250 if extracted:
251 verdict = extracted
252 break
254 # Strategy 2: Old format (VERDICT: READY) anywhere in output
255 if verdict == "UNKNOWN":
256 verdict_match = re.search(
257 r"VERDICT:\s*(READY|CORRECTED|NOT[_\s-]?READY|NEEDS[_\s-]?REVIEW|CLOSE)",
258 output,
259 re.IGNORECASE,
260 )
261 if verdict_match:
262 verdict = verdict_match.group(1).upper().replace(" ", "_").replace("-", "_")
264 # Strategy 3: Look for verdict keywords near "verdict" mentions
265 if verdict == "UNKNOWN":
266 # Find lines containing "verdict" and check for verdict keywords
267 for line in output.split("\n"):
268 if "verdict" in line.lower():
269 extracted = _extract_verdict_from_text(line)
270 if extracted:
271 verdict = extracted
272 break
274 # Strategy 4: Scan entire output for standalone verdict keywords
275 # (last resort - may have false positives but better than UNKNOWN)
276 if verdict == "UNKNOWN":
277 extracted = _extract_verdict_from_text(output)
278 if extracted:
279 verdict = extracted
281 # Strategy 5: Clean the entire output and retry extraction
282 # Handles cases where formatting artifacts (bold, backticks) break word boundaries
283 if verdict == "UNKNOWN":
284 cleaned_output = _clean_verdict_content(output)
285 extracted = _extract_verdict_from_text(cleaned_output)
286 if extracted:
287 verdict = extracted
289 # Parse CORRECTIONS_MADE section if present (moved before Strategy 6)
290 if "CORRECTIONS_MADE" in sections:
291 corrections_content = sections["CORRECTIONS_MADE"]
292 for line in corrections_content.split("\n"):
293 line = line.strip()
294 if line.startswith("- ") and line != "- None":
295 corrections.append(line[2:])
297 # Strategy 6: Infer from READY_FOR section
298 # If "READY_FOR" section exists with "Implementation: Yes", infer verdict
299 if verdict == "UNKNOWN" and "READY_FOR" in sections:
300 ready_for_content = sections["READY_FOR"]
301 # Check for "implementation" + "yes" pattern (handles bold markers, colons, etc.)
302 # Handles: "Implementation: Yes", "**Implementation:** Yes", etc.
303 if re.search(r"implementation[\s:\*]*yes", ready_for_content, re.IGNORECASE):
304 # If corrections were made, verdict is CORRECTED; otherwise READY
305 verdict = "CORRECTED" if corrections else "READY"
307 # Parse CONCERNS section (new format)
308 if "CONCERNS" in sections:
309 concern_content = sections["CONCERNS"]
310 for line in concern_content.split("\n"):
311 line = line.strip()
312 if line.startswith("- ") and line != "- None":
313 concerns.append(line[2:]) # Remove "- " prefix
315 # Fall back to old concern detection
316 if not concerns:
317 for line in output.split("\n"):
318 line_stripped = line.strip()
319 if any(
320 indicator in line_stripped
321 for indicator in ["WARNING", "Concern:", "Issue:", "Missing:"]
322 ):
323 concerns.append(line_stripped)
325 # Parse CLOSE_REASON section if present (for CLOSE verdict)
326 if "CLOSE_REASON" in sections:
327 close_reason_content = sections["CLOSE_REASON"]
328 # Look for "- Reason: <value>" line
329 for line in close_reason_content.split("\n"):
330 # Strip whitespace and bold markers (**) that Claude sometimes adds
331 line = line.strip().replace("**", "")
332 if line.lower().startswith("- reason:"):
333 reason_value = line.split(":", 1)[1].strip().lower()
334 # Also strip backticks that may wrap the value
335 close_reason = reason_value.strip("`").strip()
336 break
337 # Also handle "Reason: <value>" without dash
338 if line.lower().startswith("reason:"):
339 reason_value = line.split(":", 1)[1].strip().lower()
340 close_reason = reason_value.strip("`").strip()
341 break
343 # Parse CLOSE_STATUS section if present
344 if "CLOSE_STATUS" in sections:
345 close_status_content = sections["CLOSE_STATUS"].strip()
346 # Take first non-empty line as the status
347 for line in close_status_content.split("\n"):
348 line = line.strip()
349 if line and not line.startswith("#"):
350 close_status = line
351 break
353 # Parse VALIDATED_FILE section if present (for path validation)
354 if "VALIDATED_FILE" in sections:
355 validated_file_content = sections["VALIDATED_FILE"].strip()
356 # Take first non-empty line as the file path
357 for line in validated_file_content.split("\n"):
358 line = line.strip()
359 # Skip empty lines, comments, and template placeholders
360 if line and not line.startswith("#") and not line.startswith("["):
361 # Strip markdown backticks that Claude sometimes wraps paths in
362 validated_file_path = line.strip("`")
363 break
365 # Parse VALIDATION section if present
366 if "VALIDATION" in sections:
367 validation = parse_validation_table(sections["VALIDATION"])
369 # Determine flags based on verdict
370 is_ready = verdict in ("READY", "CORRECTED")
371 was_corrected = verdict == "CORRECTED" or len(corrections) > 0
372 should_close = verdict == "CLOSE"
374 return {
375 "verdict": verdict,
376 "concerns": concerns,
377 "is_ready": is_ready,
378 "was_corrected": was_corrected,
379 "should_close": should_close,
380 "close_reason": close_reason,
381 "close_status": close_status,
382 "corrections": corrections,
383 "validated_file_path": validated_file_path,
384 "sections": sections,
385 "validation": validation,
386 }
389def parse_manage_issue_output(output: str) -> dict[str, Any]:
390 """Extract structured data from manage-issue output.
392 The manage-issue command outputs structured sections with metadata,
393 files changed, commits, verification results, and final status.
395 Args:
396 output: The stdout from the manage-issue command
398 Returns:
399 dict with keys:
400 - status: str ("COMPLETED", "FAILED", "BLOCKED", or "UNKNOWN")
401 - files_changed: list[str] of modified files
402 - files_created: list[str] of created files
403 - commits: list[str] of commit hashes/messages
404 - verification: dict of verification results
405 - ooda_impact: dict of OODA impact status
406 - sections: dict of all parsed sections
407 """
408 sections = parse_sections(output)
409 status = "UNKNOWN"
410 files_changed: list[str] = []
411 files_created: list[str] = []
412 commits: list[str] = []
413 verification: dict[str, str] = {}
414 ooda_impact: dict[str, str] = {}
416 # Parse RESULT section for status
417 if "RESULT" in sections:
418 status_match = re.search(r"Status:\s*(\w+)", sections["RESULT"])
419 if status_match:
420 status = status_match.group(1).upper()
422 # Parse FILES_CHANGED section
423 if "FILES_CHANGED" in sections:
424 for line in sections["FILES_CHANGED"].split("\n"):
425 line = line.strip()
426 if line.startswith("- ") and line != "- None":
427 files_changed.append(line[2:])
429 # Parse FILES_CREATED section
430 if "FILES_CREATED" in sections:
431 for line in sections["FILES_CREATED"].split("\n"):
432 line = line.strip()
433 if line.startswith("- ") and line != "- None":
434 files_created.append(line[2:])
436 # Parse COMMITS section
437 if "COMMITS" in sections:
438 for line in sections["COMMITS"].split("\n"):
439 line = line.strip()
440 if line.startswith("- ") and line != "- None":
441 commits.append(line[2:])
443 # Parse VERIFICATION section
444 if "VERIFICATION" in sections:
445 verification = parse_status_lines(sections["VERIFICATION"])
447 # Parse OODA_IMPACT section
448 if "OODA_IMPACT" in sections:
449 for line in sections["OODA_IMPACT"].split("\n"):
450 line = line.strip()
451 if line.startswith("- "):
452 parts = line[2:].split(":", 1)
453 if len(parts) == 2:
454 ooda_impact[parts[0].strip()] = parts[1].strip().upper()
456 return {
457 "status": status,
458 "files_changed": files_changed,
459 "files_created": files_created,
460 "commits": commits,
461 "verification": verification,
462 "ooda_impact": ooda_impact,
463 "sections": sections,
464 }