Coverage for little_loops / output_parsing.py: 8%
185 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""Output parsing utilities for little-loops.
3Provides parsing functions for Claude CLI command outputs,
4used by both issue_manager (ll-auto) and worker_pool (ll-parallel).
5"""
7from __future__ import annotations
9import re
10from typing import Any
12# Regex patterns for standardized output parsing
13# Support #, ##, and ### headers with flexible spacing and optional formatting
14# Handles: ## VERDICT, ###VERDICT, ## **VERDICT**, ## VERDICT
15SECTION_PATTERN = re.compile(
16 r"^#{1,3}\s*\**(\w+)\**\s*$",
17 re.MULTILINE,
18)
19TABLE_ROW_PATTERN = re.compile(r"\|\s*(\w+)\s*\|\s*(\w+)\s*\|\s*(.+?)\s*\|")
20STATUS_PATTERN = re.compile(r"^- (\w+): (\w+)", re.MULTILINE)
22# Valid verdicts for ready-issue
23VALID_VERDICTS = ("READY", "CORRECTED", "NOT_READY", "NEEDS_REVIEW", "CLOSE", "BLOCKED")
26def _clean_verdict_content(content: str) -> str:
27 """Clean verdict content by removing common formatting artifacts.
29 Handles:
30 - Code block markers (``` and `)
31 - Markdown bold/italic (** and *)
32 - Template brackets ([])
33 - Leading/trailing whitespace
34 - Colons after verdict
36 Args:
37 content: Raw verdict content from output
39 Returns:
40 Cleaned content ready for verdict extraction
41 """
42 # Remove code fence markers (``` or ```)
43 content = re.sub(r"^```\w*\s*", "", content)
44 content = re.sub(r"\s*```$", "", content)
45 # Remove inline code backticks
46 content = content.replace("`", "")
47 # Remove markdown bold/italic
48 content = content.replace("**", "").replace("*", "")
49 # Remove template brackets
50 content = content.strip("[]")
51 return content.strip()
54def _extract_verdict_from_text(text: str) -> str | None:
55 """Extract a valid verdict from arbitrary text.
57 Searches for valid verdict keywords in the text, handling various
58 formats like "READY", "The verdict is READY", "NOT_READY", etc.
60 Args:
61 text: Text that may contain a verdict
63 Returns:
64 Valid verdict string or None if not found
65 """
66 text_upper = text.upper()
68 # Check each valid verdict (check NOT_READY before READY to avoid partial match)
69 # Order matters: check longer/compound verdicts first
70 for verdict in ("NOT_READY", "NEEDS_REVIEW", "CORRECTED", "BLOCKED", "READY", "CLOSE"):
71 # Match verdict as a word boundary (not part of another word)
72 # Handle both underscore and space variants
73 patterns = [
74 rf"\b{verdict}\b",
75 rf"\b{verdict.replace('_', ' ')}\b", # NOT READY, NEEDS REVIEW
76 rf"\b{verdict.replace('_', '-')}\b", # NOT-READY, NEEDS-REVIEW
77 ]
78 for pattern in patterns:
79 if re.search(pattern, text_upper):
80 # Normalize to underscore format
81 return verdict
83 # Try common Claude phrasings that map to verdicts
84 # Note: Using re.IGNORECASE since patterns are lowercase
85 phrasing_map = [
86 # Patterns for READY
87 (r"\bissue\s+is\s+ready\b", "READY"),
88 (r"\bready\s+for\s+implementation\b", "READY"),
89 (r"\bimplementation[\s-]ready\b", "READY"),
90 (r"\bapproved\s+for\s+implementation\b", "READY"),
91 (r"\bproceed\s+(to|with)\s+implementation\b", "READY"),
92 # Patterns for CLOSE
93 (r"\bshould\s+be\s+closed\b", "CLOSE"),
94 (r"\bclose\s+this\s+issue\b", "CLOSE"),
95 (r"\bmark\s+as\s+closed\b", "CLOSE"),
96 (r"\balready\s+fixed\b", "CLOSE"),
97 (r"\binvalid\s+reference\b", "CLOSE"),
98 (r"\bmove.*to.*completed\b", "CLOSE"), # "move this issue to the completed directory"
99 (r"\bclosure\s+status\b", "CLOSE"), # "closure status"
100 # Patterns for NOT_READY
101 (r"\bnot\s+ready\b", "NOT_READY"), # General "not ready" pattern
102 (r"\bneeds?\s+more\s+work\b", "NOT_READY"),
103 (r"\brequires?\s+clarification\b", "NOT_READY"),
104 (r"\bmissing\s+information\b", "NOT_READY"),
105 # Patterns for CORRECTED
106 (r"\bcorrections?\s+made\b", "CORRECTED"),
107 (r"\bupdated?\s+and\s+ready\b", "CORRECTED"),
108 (r"\bfixed?\s+and\s+ready\b", "CORRECTED"),
109 ]
111 for pattern, verdict in phrasing_map:
112 if re.search(pattern, text, re.IGNORECASE):
113 return verdict
115 return None
118def parse_sections(output: str) -> dict[str, str]:
119 """Parse output into sections by ## SECTION_NAME headers.
121 The standardized slash command output format uses ## SECTION_NAME
122 headers (uppercase with underscores) to delimit sections.
124 Args:
125 output: The stdout from a slash command
127 Returns:
128 dict mapping section names to their content
129 """
130 sections: dict[str, str] = {}
131 current_section = "PREAMBLE"
132 current_content: list[str] = []
134 for line in output.split("\n"):
135 match = SECTION_PATTERN.match(line)
136 if match:
137 # Save previous section
138 sections[current_section] = "\n".join(current_content).strip()
139 current_section = match.group(1)
140 current_content = []
141 else:
142 current_content.append(line)
144 # Save final section
145 sections[current_section] = "\n".join(current_content).strip()
146 return sections
149def parse_validation_table(section_content: str) -> dict[str, dict[str, str]]:
150 """Parse a validation table from section content.
152 Expects format:
153 | Check | Status | Details |
154 |-------|--------|---------|
155 | Format | PASS | ... |
157 Args:
158 section_content: Content of the VALIDATION section
160 Returns:
161 dict mapping check names to {status, details}
162 """
163 results: dict[str, dict[str, str]] = {}
164 for match in TABLE_ROW_PATTERN.finditer(section_content):
165 check_name = match.group(1)
166 # Skip header row indicators
167 if check_name.lower() in ("check", "---", ""):
168 continue
169 results[check_name] = {
170 "status": match.group(2).upper(),
171 "details": match.group(3).strip(),
172 }
173 return results
176def parse_status_lines(section_content: str) -> dict[str, str]:
177 """Parse status lines from section content.
179 Expects format:
180 - tests: PASS
181 - lint: PASS
183 Args:
184 section_content: Content of a section with status lines
186 Returns:
187 dict mapping item names to status values
188 """
189 results: dict[str, str] = {}
190 for match in STATUS_PATTERN.finditer(section_content):
191 results[match.group(1)] = match.group(2).upper()
192 return results
195def parse_ready_issue_output(output: str) -> dict[str, Any]:
196 """Extract verdict and concerns from ready-issue output.
198 The ready-issue command outputs structured sections with a VERDICT
199 section containing READY, CORRECTED, NOT_READY, NEEDS_REVIEW, or CLOSE.
201 Supports both old format (VERDICT: READY) and new standardized format
202 (## VERDICT\\nREADY) for backwards compatibility.
204 Args:
205 output: The stdout from the ready-issue command
207 Returns:
208 dict with keys:
209 - verdict: str ("READY", "CORRECTED", "NOT_READY", "NEEDS_REVIEW",
210 "CLOSE", "BLOCKED", or "UNKNOWN")
211 - concerns: list[str] of concern messages
212 - is_ready: bool indicating if issue is ready for implementation
213 - was_corrected: bool indicating if corrections were made
214 - should_close: bool indicating if issue should be closed
215 - close_reason: str|None (e.g., "already_fixed", "invalid_ref")
216 - close_status: str|None (e.g., "Closed - Already Fixed")
217 - corrections: list[str] of corrections made
218 - validated_file_path: str|None path to the file that was validated
219 - sections: dict of parsed sections (if standardized format)
220 - validation: dict of validation results (if standardized format)
221 """
222 # Try new standardized format first
223 sections = parse_sections(output)
224 verdict = "UNKNOWN"
225 concerns: list[str] = []
226 corrections: list[str] = []
227 validation: dict[str, dict[str, str]] = {}
228 close_reason: str | None = None
229 close_status: str | None = None
230 validated_file_path: str | None = None
232 # Strategy 1: Check for VERDICT section (new format with # or ## header)
233 if "VERDICT" in sections:
234 verdict_section = sections["VERDICT"].strip()
236 # Try each non-empty line until we find a verdict
237 for line in verdict_section.split("\n"):
238 line = line.strip()
239 if not line:
240 continue
242 # Clean the line of formatting artifacts
243 cleaned = _clean_verdict_content(line)
244 if not cleaned:
245 continue
247 # Try to extract verdict from cleaned line
248 extracted = _extract_verdict_from_text(cleaned)
249 if extracted:
250 verdict = extracted
251 break
253 # Strategy 2: Old format (VERDICT: READY) anywhere in output
254 if verdict == "UNKNOWN":
255 verdict_match = re.search(
256 r"VERDICT:\s*(READY|CORRECTED|NOT[_\s-]?READY|NEEDS[_\s-]?REVIEW|CLOSE|BLOCKED)",
257 output,
258 re.IGNORECASE,
259 )
260 if verdict_match:
261 verdict = verdict_match.group(1).upper().replace(" ", "_").replace("-", "_")
263 # Strategy 3: Look for verdict keywords near "verdict" mentions
264 if verdict == "UNKNOWN":
265 # Find lines containing "verdict" and check for verdict keywords
266 for line in output.split("\n"):
267 if "verdict" in line.lower():
268 extracted = _extract_verdict_from_text(line)
269 if extracted:
270 verdict = extracted
271 break
273 # Strategy 4: Scan entire output for standalone verdict keywords
274 # (last resort - may have false positives but better than UNKNOWN)
275 if verdict == "UNKNOWN":
276 extracted = _extract_verdict_from_text(output)
277 if extracted:
278 verdict = extracted
280 # Strategy 5: Clean the entire output and retry extraction
281 # Handles cases where formatting artifacts (bold, backticks) break word boundaries
282 if verdict == "UNKNOWN":
283 cleaned_output = _clean_verdict_content(output)
284 extracted = _extract_verdict_from_text(cleaned_output)
285 if extracted:
286 verdict = extracted
288 # Parse CORRECTIONS_MADE section if present (moved before Strategy 6)
289 if "CORRECTIONS_MADE" in sections:
290 corrections_content = sections["CORRECTIONS_MADE"]
291 for line in corrections_content.split("\n"):
292 line = line.strip()
293 if line.startswith("- ") and line != "- None":
294 corrections.append(line[2:])
296 # Strategy 6: Infer from READY_FOR section
297 # If "READY_FOR" section exists with "Implementation: Yes", infer verdict
298 if verdict == "UNKNOWN" and "READY_FOR" in sections:
299 ready_for_content = sections["READY_FOR"]
300 # Check for "implementation" + "yes" pattern (handles bold markers, colons, etc.)
301 # Handles: "Implementation: Yes", "**Implementation:** Yes", etc.
302 if re.search(r"implementation[\s:\*]*yes", ready_for_content, re.IGNORECASE):
303 # If corrections were made, verdict is CORRECTED; otherwise READY
304 verdict = "CORRECTED" if corrections else "READY"
306 # Parse CONCERNS section (new format)
307 if "CONCERNS" in sections:
308 concern_content = sections["CONCERNS"]
309 for line in concern_content.split("\n"):
310 line = line.strip()
311 if line.startswith("- ") and line != "- None":
312 concerns.append(line[2:]) # Remove "- " prefix
314 # Fall back to old concern detection
315 if not concerns:
316 for line in output.split("\n"):
317 line_stripped = line.strip()
318 if any(
319 indicator in line_stripped
320 for indicator in ["WARNING", "Concern:", "Issue:", "Missing:"]
321 ):
322 concerns.append(line_stripped)
324 # Parse CLOSE_REASON section if present (for CLOSE verdict)
325 if "CLOSE_REASON" in sections:
326 close_reason_content = sections["CLOSE_REASON"]
327 # Look for "- Reason: <value>" line
328 for line in close_reason_content.split("\n"):
329 # Strip whitespace and bold markers (**) that Claude sometimes adds
330 line = line.strip().replace("**", "")
331 if line.lower().startswith("- reason:"):
332 reason_value = line.split(":", 1)[1].strip().lower()
333 # Also strip backticks that may wrap the value
334 close_reason = reason_value.strip("`").strip()
335 break
336 # Also handle "Reason: <value>" without dash
337 if line.lower().startswith("reason:"):
338 reason_value = line.split(":", 1)[1].strip().lower()
339 close_reason = reason_value.strip("`").strip()
340 break
342 # Parse CLOSE_STATUS section if present
343 if "CLOSE_STATUS" in sections:
344 close_status_content = sections["CLOSE_STATUS"].strip()
345 # Take first non-empty line as the status
346 for line in close_status_content.split("\n"):
347 line = line.strip()
348 if line and not line.startswith("#"):
349 close_status = line
350 break
352 # Parse VALIDATED_FILE section if present (for path validation)
353 if "VALIDATED_FILE" in sections:
354 validated_file_content = sections["VALIDATED_FILE"].strip()
355 # Take first non-empty line as the file path
356 for line in validated_file_content.split("\n"):
357 line = line.strip()
358 # Skip empty lines, comments, and template placeholders
359 if line and not line.startswith("#") and not line.startswith("["):
360 # Strip markdown backticks that Claude sometimes wraps paths in
361 validated_file_path = line.strip("`")
362 break
364 # Parse VALIDATION section if present
365 if "VALIDATION" in sections:
366 validation = parse_validation_table(sections["VALIDATION"])
368 # Determine flags based on verdict
369 is_ready = verdict in ("READY", "CORRECTED")
370 was_corrected = verdict == "CORRECTED" or len(corrections) > 0
371 should_close = verdict == "CLOSE"
372 is_blocked = verdict == "BLOCKED"
374 return {
375 "verdict": verdict,
376 "concerns": concerns,
377 "is_ready": is_ready,
378 "was_corrected": was_corrected,
379 "should_close": should_close,
380 "is_blocked": is_blocked,
381 "close_reason": close_reason,
382 "close_status": close_status,
383 "corrections": corrections,
384 "validated_file_path": validated_file_path,
385 "sections": sections,
386 "validation": validation,
387 }
390def parse_manage_issue_output(output: str) -> dict[str, Any]:
391 """Extract structured data from manage-issue output.
393 The manage-issue command outputs structured sections with metadata,
394 files changed, commits, verification results, and final status.
396 Args:
397 output: The stdout from the manage-issue command
399 Returns:
400 dict with keys:
401 - status: str ("COMPLETED", "FAILED", "BLOCKED", or "UNKNOWN")
402 - files_changed: list[str] of modified files
403 - files_created: list[str] of created files
404 - commits: list[str] of commit hashes/messages
405 - verification: dict of verification results
406 - ooda_impact: dict of OODA impact status
407 - sections: dict of all parsed sections
408 """
409 sections = parse_sections(output)
410 status = "UNKNOWN"
411 files_changed: list[str] = []
412 files_created: list[str] = []
413 commits: list[str] = []
414 verification: dict[str, str] = {}
415 ooda_impact: dict[str, str] = {}
417 # Parse RESULT section for status
418 if "RESULT" in sections:
419 status_match = re.search(r"Status:\s*(\w+)", sections["RESULT"])
420 if status_match:
421 status = status_match.group(1).upper()
423 # Parse FILES_CHANGED section
424 if "FILES_CHANGED" in sections:
425 for line in sections["FILES_CHANGED"].split("\n"):
426 line = line.strip()
427 if line.startswith("- ") and line != "- None":
428 files_changed.append(line[2:])
430 # Parse FILES_CREATED section
431 if "FILES_CREATED" in sections:
432 for line in sections["FILES_CREATED"].split("\n"):
433 line = line.strip()
434 if line.startswith("- ") and line != "- None":
435 files_created.append(line[2:])
437 # Parse COMMITS section
438 if "COMMITS" in sections:
439 for line in sections["COMMITS"].split("\n"):
440 line = line.strip()
441 if line.startswith("- ") and line != "- None":
442 commits.append(line[2:])
444 # Parse VERIFICATION section
445 if "VERIFICATION" in sections:
446 verification = parse_status_lines(sections["VERIFICATION"])
448 # Parse OODA_IMPACT section
449 if "OODA_IMPACT" in sections:
450 for line in sections["OODA_IMPACT"].split("\n"):
451 line = line.strip()
452 if line.startswith("- "):
453 parts = line[2:].split(":", 1)
454 if len(parts) == 2:
455 ooda_impact[parts[0].strip()] = parts[1].strip().upper()
457 return {
458 "status": status,
459 "files_changed": files_changed,
460 "files_created": files_created,
461 "commits": commits,
462 "verification": verification,
463 "ooda_impact": ooda_impact,
464 "sections": sections,
465 }