Coverage for little_loops / issue_parser.py: 25%
256 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""Issue file parsing for little-loops.
3Parses issue markdown files to extract metadata like priority, ID, type, and title.
4"""
6from __future__ import annotations
8import logging
9import re
10from dataclasses import dataclass, field
11from pathlib import Path
12from typing import TYPE_CHECKING, Any
14from little_loops.frontmatter import parse_frontmatter
16if TYPE_CHECKING:
17 from little_loops.config import BRConfig
20logger = logging.getLogger(__name__)
22# Regex pattern for issue IDs in list items
23# Matches: "- FEAT-001", "- BUG-123", "* ENH-005", "- FEAT-001 (some note)"
24# Also handles bold markdown: "- **ENH-1000**: description"
25ISSUE_ID_PATTERN = re.compile(r"^[-*]\s+\*{0,2}([A-Z]+-\d+)", re.MULTILINE)
28_NORMALIZED_RE = re.compile(r"^P[0-5]-(BUG|FEAT|ENH)-[0-9]{3,}-[a-z0-9-]+\.md$")
29_ISSUE_TYPE_RE = re.compile(r"-(BUG|FEAT|ENH)-")
32def is_normalized(filename: str) -> bool:
33 """Check whether an issue filename conforms to naming conventions.
35 Args:
36 filename: The basename of the issue file (e.g. 'P2-BUG-010-my-issue.md').
38 Returns:
39 True if the filename matches ``^P[0-5]-(BUG|FEAT|ENH)-[0-9]{3,}-[a-z0-9-]+\\.md$``.
40 """
41 return bool(_NORMALIZED_RE.match(filename))
44def is_formatted(issue_path: Path, templates_dir: Path | None = None) -> bool:
45 """Check whether an issue file has been formatted.
47 An issue is considered formatted if either:
48 1. Its ## Session Log contains a ``/ll:format-issue`` entry, OR
49 2. It has all required sections per its type template (structural check).
51 Args:
52 issue_path: Path to the issue markdown file.
53 templates_dir: Optional override for the templates directory.
55 Returns:
56 True if the issue is formatted by either criterion, False otherwise.
57 Returns False for files whose type cannot be determined or whose template
58 cannot be loaded.
59 """
60 from little_loops.issue_template import load_issue_sections
61 from little_loops.session_log import parse_session_log
63 try:
64 content = issue_path.read_text(encoding="utf-8")
65 except Exception:
66 return False
68 # Criterion 1: /ll:format-issue appears in the session log
69 if "/ll:format-issue" in parse_session_log(content):
70 return True
72 # Criterion 2: all required sections are present as ## headings
73 type_match = _ISSUE_TYPE_RE.search(issue_path.name)
74 if not type_match:
75 return False
76 issue_type = type_match.group(1)
78 try:
79 sections_data = load_issue_sections(issue_type, templates_dir)
80 except Exception:
81 return False
83 required: set[str] = set()
84 for name, defn in sections_data.get("common_sections", {}).items():
85 if defn.get("required") is True and not defn.get("deprecated", False):
86 required.add(name)
87 for name, defn in sections_data.get("type_sections", {}).items():
88 if defn.get("level") == "required" and not defn.get("deprecated", False):
89 required.add(name)
91 if not required:
92 return True
94 headings = {m.strip() for m in re.findall(r"^##\s+(.+)$", content, re.MULTILINE)}
95 return required.issubset(headings)
98def slugify(text: str) -> str:
99 """Convert text to slug format for filenames.
101 Args:
102 text: Text to convert
104 Returns:
105 Lowercase slug with hyphens
106 """
107 text = re.sub(r"[^\w\s-]", "", text)
108 text = re.sub(r"[-\s]+", "-", text)
109 return text.strip("-").lower()
112def get_next_issue_number(config: BRConfig, category: str | None = None) -> int:
113 """Determine the next globally unique issue number.
115 Scans ALL issue directories (active and completed) to find the highest
116 existing number across ALL issue types (BUG, FEAT, ENH). Issue numbers
117 are globally unique regardless of type.
119 Args:
120 config: Project configuration
121 category: Unused, kept for backwards compatibility
123 Returns:
124 Next available issue number (globally unique across all types)
125 """
126 max_num = 0
128 # Get all known prefixes from configuration
129 all_prefixes = [cat_config.prefix for cat_config in config.issues.categories.values()]
131 # Directories to scan: ALL category directories + completed + deferred
132 dirs_to_scan = [config.get_completed_dir(), config.get_deferred_dir()]
133 for cat_name in config.issues.categories:
134 dirs_to_scan.append(config.get_issue_dir(cat_name))
136 if not all_prefixes:
137 return max_num + 1
139 # Pre-compile a single union regex to match any known prefix
140 prefix_pattern = re.compile(r"(?:" + "|".join(re.escape(p) for p in all_prefixes) + r")-(\d+)")
142 for dir_path in dirs_to_scan:
143 if not dir_path.exists():
144 continue
145 for file in dir_path.glob("*.md"):
146 match = prefix_pattern.search(file.name)
147 if match:
148 num = int(match.group(1))
149 if num > max_num:
150 max_num = num
152 return max_num + 1
155@dataclass
156class ProductImpact:
157 """Product impact assessment for an issue.
159 Attributes:
160 goal_alignment: ID of the strategic priority this supports
161 persona_impact: ID of the persona affected
162 business_value: Business value assessment (high|medium|low)
163 user_benefit: Description of how this helps the target user
164 """
166 goal_alignment: str | None = None
167 persona_impact: str | None = None
168 business_value: str | None = None # high|medium|low
169 user_benefit: str | None = None
171 def to_dict(self) -> dict[str, Any]:
172 """Convert to dictionary for JSON serialization."""
173 return {
174 "goal_alignment": self.goal_alignment,
175 "persona_impact": self.persona_impact,
176 "business_value": self.business_value,
177 "user_benefit": self.user_benefit,
178 }
180 @classmethod
181 def from_dict(cls, data: dict[str, Any] | None) -> ProductImpact | None:
182 """Create ProductImpact from dictionary.
184 Args:
185 data: Dictionary with product impact fields, or None
187 Returns:
188 ProductImpact instance or None if data is None/empty
189 """
190 if not data:
191 return None
192 return cls(
193 goal_alignment=data.get("goal_alignment"),
194 persona_impact=data.get("persona_impact"),
195 business_value=data.get("business_value"),
196 user_benefit=data.get("user_benefit"),
197 )
200@dataclass
201class IssueInfo:
202 """Parsed information from an issue file.
204 Attributes:
205 path: Path to the issue file
206 issue_type: Type of issue (e.g., "bugs", "features")
207 priority: Priority level (e.g., "P0", "P1")
208 issue_id: Issue identifier (e.g., "BUG-123")
209 title: Issue title from markdown header
210 blocked_by: List of issue IDs that block this issue
211 blocks: List of issue IDs that this issue blocks
212 discovered_by: Source command/workflow that created this issue
213 product_impact: Product impact assessment (optional)
214 effort: Effort estimate (1=low, 2=medium, 3=high), inferred from priority if absent
215 impact: Impact estimate (1=low, 2=medium, 3=high), inferred from priority if absent
216 confidence_score: Readiness score (0-100) written by /ll:confidence-check, or None
217 outcome_confidence: Outcome confidence (0-100) written by /ll:confidence-check, or None
218 testable: Whether TDD phase should be applied; False skips TDD, None treated as testable
219 session_commands: Distinct /ll:* commands found in the ## Session Log section
220 session_command_counts: Per-command occurrence counts from the ## Session Log section
221 """
223 path: Path
224 issue_type: str
225 priority: str
226 issue_id: str
227 title: str
228 blocked_by: list[str] = field(default_factory=list)
229 blocks: list[str] = field(default_factory=list)
230 discovered_by: str | None = None
231 product_impact: ProductImpact | None = None
232 effort: int | None = None
233 impact: int | None = None
234 confidence_score: int | None = None
235 outcome_confidence: int | None = None
236 testable: bool | None = None
237 session_commands: list[str] = field(default_factory=list)
238 session_command_counts: dict[str, int] = field(default_factory=dict)
240 @property
241 def priority_int(self) -> int:
242 """Convert priority to integer for comparison (lower = higher priority)."""
243 # Support P0-P5 priorities
244 match = re.match(r"^P(\d+)$", self.priority)
245 if match:
246 return int(match.group(1))
247 return 99 # Unknown priority sorts last
249 def to_dict(self) -> dict[str, Any]:
250 """Convert to dictionary for JSON serialization."""
251 return {
252 "path": str(self.path),
253 "issue_type": self.issue_type,
254 "priority": self.priority,
255 "issue_id": self.issue_id,
256 "title": self.title,
257 "blocked_by": self.blocked_by,
258 "blocks": self.blocks,
259 "discovered_by": self.discovered_by,
260 "product_impact": (self.product_impact.to_dict() if self.product_impact else None),
261 "effort": self.effort,
262 "impact": self.impact,
263 "confidence_score": self.confidence_score,
264 "outcome_confidence": self.outcome_confidence,
265 "testable": self.testable,
266 "session_commands": self.session_commands,
267 "session_command_counts": self.session_command_counts,
268 }
270 @classmethod
271 def from_dict(cls, data: dict[str, Any]) -> IssueInfo:
272 """Create IssueInfo from dictionary."""
273 return cls(
274 path=Path(data["path"]),
275 issue_type=data["issue_type"],
276 priority=data["priority"],
277 issue_id=data["issue_id"],
278 title=data["title"],
279 blocked_by=data.get("blocked_by", []),
280 blocks=data.get("blocks", []),
281 discovered_by=data.get("discovered_by"),
282 product_impact=ProductImpact.from_dict(data.get("product_impact")),
283 effort=data.get("effort"),
284 impact=data.get("impact"),
285 confidence_score=data.get("confidence_score"),
286 outcome_confidence=data.get("outcome_confidence"),
287 testable=data.get("testable"),
288 session_commands=data.get("session_commands", []),
289 session_command_counts=data.get("session_command_counts", {}),
290 )
293class IssueParser:
294 """Parses issue files based on project configuration.
296 Uses BRConfig to understand issue categories, prefixes, and priorities.
297 """
299 def __init__(self, config: BRConfig) -> None:
300 """Initialize parser with project configuration.
302 Args:
303 config: Project configuration
304 """
305 self.config = config
306 self._build_prefix_map()
308 def _build_prefix_map(self) -> None:
309 """Build mapping from issue prefixes to category names."""
310 self._prefix_to_category: dict[str, str] = {}
311 for category_name, category in self.config.issues.categories.items():
312 self._prefix_to_category[category.prefix] = category_name
314 def parse_file(self, issue_path: Path) -> IssueInfo:
315 """Parse an issue file to extract metadata.
317 Args:
318 issue_path: Path to the issue markdown file
320 Returns:
321 Parsed IssueInfo
322 """
323 filename = issue_path.name
325 # Parse priority from filename prefix (e.g., P1-BUG-123-...)
326 priority = self._parse_priority(filename)
328 # Parse issue type and ID from filename
329 issue_type, issue_id = self._parse_type_and_id(filename, issue_path)
331 # Read content once for all content-based parsing
332 content = self._read_content(issue_path)
334 # Parse frontmatter for discovered_by, product impact, effort, and impact
335 frontmatter = parse_frontmatter(content)
336 discovered_by = frontmatter.get("discovered_by")
337 product_impact = self._parse_product_impact(frontmatter)
338 effort_raw = frontmatter.get("effort")
339 impact_raw = frontmatter.get("impact")
340 effort = int(effort_raw) if effort_raw is not None and str(effort_raw).isdigit() else None
341 impact = int(impact_raw) if impact_raw is not None and str(impact_raw).isdigit() else None
342 confidence_raw = frontmatter.get("confidence_score")
343 outcome_raw = frontmatter.get("outcome_confidence")
344 confidence_score = (
345 int(confidence_raw)
346 if confidence_raw is not None and str(confidence_raw).isdigit()
347 else None
348 )
349 outcome_confidence = (
350 int(outcome_raw) if outcome_raw is not None and str(outcome_raw).isdigit() else None
351 )
352 testable_raw = frontmatter.get("testable")
353 if isinstance(testable_raw, str):
354 testable_value: bool | None = (
355 testable_raw.lower() == "true"
356 if testable_raw.lower() in ("true", "false")
357 else None
358 )
359 else:
360 testable_value = testable_raw
362 # Parse title and dependencies from file content
363 title = self._parse_title_from_content(content, issue_path)
364 blocked_by = self._parse_blocked_by(content)
365 blocks = self._parse_blocks(content)
367 # Parse session commands from ## Session Log section
368 from little_loops.session_log import count_session_commands, parse_session_log
370 session_commands = parse_session_log(content)
371 session_command_counts = count_session_commands(content)
373 return IssueInfo(
374 path=issue_path,
375 issue_type=issue_type,
376 priority=priority,
377 issue_id=issue_id,
378 title=title,
379 blocked_by=blocked_by,
380 blocks=blocks,
381 discovered_by=discovered_by,
382 product_impact=product_impact,
383 effort=effort,
384 impact=impact,
385 confidence_score=confidence_score,
386 outcome_confidence=outcome_confidence,
387 testable=testable_value,
388 session_commands=session_commands,
389 session_command_counts=session_command_counts,
390 )
392 def _parse_priority(self, filename: str) -> str:
393 """Extract priority from filename.
395 Args:
396 filename: Issue filename
398 Returns:
399 Priority string (e.g., "P1") or last priority if not found
400 """
401 for priority in self.config.issue_priorities:
402 if filename.startswith(f"{priority}-"):
403 return priority
404 # Default to lowest priority if not found
405 return self.config.issue_priorities[-1] if self.config.issue_priorities else "P3"
407 def _get_category_for_prefix(self, prefix: str) -> str:
408 """Get category name from issue prefix.
410 Args:
411 prefix: Issue prefix (e.g., "BUG", "FEAT")
413 Returns:
414 Category name (e.g., "bugs", "features"), defaults to "bugs"
415 """
416 return self._prefix_to_category.get(prefix, "bugs")
418 def _parse_type_and_id(self, filename: str, issue_path: Path) -> tuple[str, str]:
419 """Extract issue type and ID from filename.
421 Args:
422 filename: Issue filename
423 issue_path: Full path to issue file
425 Returns:
426 Tuple of (issue_type, issue_id)
427 """
428 # Try to match known prefixes (BUG, FEAT, ENH, etc.)
429 for prefix, category in self._prefix_to_category.items():
430 pattern = rf"({prefix})-(\d+)"
431 match = re.search(pattern, filename)
432 if match:
433 issue_id = f"{match.group(1)}-{match.group(2)}"
434 return category, issue_id
436 # Fall back to inferring from directory
437 parent_name = issue_path.parent.name
438 for category_name, category_config in self.config.issues.categories.items():
439 if parent_name == category_config.dir:
440 # Generate ID from filename
441 issue_id = self._generate_id_from_filename(filename, category_config.prefix)
442 return category_name, issue_id
444 # Last resort: use filename as ID
445 return "bugs", filename.replace(".md", "")
447 def _generate_id_from_filename(self, filename: str, prefix: str) -> str:
448 """Generate an issue ID from filename when not explicitly present.
450 Args:
451 filename: Issue filename
452 prefix: Issue prefix to use
454 Returns:
455 Generated issue ID
456 """
457 # Try to extract a number from the filename
458 numbers = re.findall(r"\d+", filename)
459 if numbers:
460 return f"{prefix}-{numbers[0]}"
461 # Use next sequential number instead of hash-based fallback
462 # This ensures IDs are deterministic and don't collide with existing issues
463 category = self._get_category_for_prefix(prefix)
464 next_num = get_next_issue_number(self.config, category)
465 return f"{prefix}-{next_num:03d}"
467 def _read_content(self, issue_path: Path) -> str:
468 """Read file content, returning empty string on error.
470 Args:
471 issue_path: Path to issue file
473 Returns:
474 File content or empty string on error
475 """
476 try:
477 return issue_path.read_text(encoding="utf-8")
478 except Exception as e:
479 logger.warning("Failed to read %s: %s", issue_path.name, e)
480 return ""
482 def _parse_title_from_content(self, content: str, issue_path: Path) -> str:
483 """Extract title from issue file content.
485 Args:
486 content: Pre-read file content
487 issue_path: Path to issue file (for fallback)
489 Returns:
490 Issue title or filename stem as fallback
491 """
492 if content:
493 # Look for markdown header: # ISSUE-ID: Title
494 match = re.search(r"^#\s+[\w-]+:\s*(.+)$", content, re.MULTILINE)
495 if match:
496 return match.group(1).strip()
497 # Try first header of any format
498 match = re.search(r"^#\s+(.+)$", content, re.MULTILINE)
499 if match:
500 return match.group(1).strip()
501 # Fall back to filename
502 return issue_path.stem
504 def _parse_section_items(self, content: str, section_name: str) -> list[str]:
505 """Extract issue IDs from a markdown section.
507 Finds section header (## Section Name) and extracts issue IDs
508 from list items until the next section or end of file.
509 Skips content inside code fences.
511 Args:
512 content: File content to parse
513 section_name: Section name to find (e.g., "Blocked By")
515 Returns:
516 List of issue IDs found in the section
517 """
518 if not content:
519 return []
521 # Strip code fences to avoid matching sections in examples
522 content_without_code = self._strip_code_fences(content)
524 # Match section header case-insensitively
525 section_pattern = rf"^##\s+{re.escape(section_name)}\s*$"
526 match = re.search(section_pattern, content_without_code, re.MULTILINE | re.IGNORECASE)
527 if not match:
528 return []
530 # Get content after section header until next ## header or end
531 start = match.end()
532 next_section = re.search(r"^##\s+", content_without_code[start:], re.MULTILINE)
533 if next_section:
534 section_content = content_without_code[start : start + next_section.start()]
535 else:
536 section_content = content_without_code[start:]
538 # Extract issue IDs from list items
539 issue_ids = ISSUE_ID_PATTERN.findall(section_content)
540 return issue_ids
542 def _strip_code_fences(self, content: str) -> str:
543 """Remove code fence blocks from content.
545 Replaces content between ``` markers with empty lines to preserve
546 line numbers while removing code fence content from parsing.
548 Args:
549 content: File content
551 Returns:
552 Content with code fence blocks replaced by empty lines
553 """
554 # Match code fences: ``` or ```language through closing ```
555 result = []
556 in_fence = False
557 for line in content.split("\n"):
558 if line.startswith("```"):
559 in_fence = not in_fence
560 result.append("") # Preserve line count
561 elif in_fence:
562 result.append("") # Replace fenced content with empty line
563 else:
564 result.append(line)
565 return "\n".join(result)
567 def _parse_blocked_by(self, content: str) -> list[str]:
568 """Extract issue IDs from ## Blocked By section.
570 Args:
571 content: File content to parse
573 Returns:
574 List of issue IDs that block this issue
575 """
576 return self._parse_section_items(content, "Blocked By")
578 def _parse_blocks(self, content: str) -> list[str]:
579 """Extract issue IDs from ## Blocks section.
581 Args:
582 content: File content to parse
584 Returns:
585 List of issue IDs that this issue blocks
586 """
587 return self._parse_section_items(content, "Blocks")
589 def _parse_product_impact(self, frontmatter: dict[str, Any]) -> ProductImpact | None:
590 """Extract product impact from frontmatter.
592 Args:
593 frontmatter: Dictionary of frontmatter fields
595 Returns:
596 ProductImpact instance if any product fields are present, None otherwise
597 """
598 # Check if any product fields are present
599 product_fields = ("goal_alignment", "persona_impact", "business_value", "user_benefit")
600 if not any(frontmatter.get(key) for key in product_fields):
601 return None
603 return ProductImpact(
604 goal_alignment=frontmatter.get("goal_alignment"),
605 persona_impact=frontmatter.get("persona_impact"),
606 business_value=frontmatter.get("business_value"),
607 user_benefit=frontmatter.get("user_benefit"),
608 )
611def find_issues(
612 config: BRConfig,
613 category: str | None = None,
614 skip_ids: set[str] | None = None,
615 only_ids: list[str] | set[str] | None = None,
616 type_prefixes: set[str] | None = None,
617) -> list[IssueInfo]:
618 """Find all issues matching criteria.
620 Args:
621 config: Project configuration
622 category: Optional category to filter (e.g., "bugs")
623 skip_ids: Issue IDs to skip
624 only_ids: If provided, only include these issue IDs. When a list,
625 results are returned in list order (input sequence preserved).
626 When a set, results are sorted by priority as usual.
627 type_prefixes: If provided, only include issues whose ID starts with
628 one of these prefixes (e.g., {"BUG", "ENH"})
630 Returns:
631 List of IssueInfo sorted by priority, or in only_ids list order when
632 only_ids is a list
633 """
634 skip_ids = skip_ids or set()
635 parser = IssueParser(config)
636 issues: list[IssueInfo] = []
638 # Get completed and deferred directories for duplicate detection
639 completed_dir = config.get_completed_dir()
640 deferred_dir = config.get_deferred_dir()
642 # Determine which categories to search
643 if category:
644 categories = [category] if category in config.issue_categories else []
645 else:
646 categories = config.issue_categories
648 for cat in categories:
649 issue_dir = config.get_issue_dir(cat)
650 if not issue_dir.exists():
651 continue
653 for issue_file in issue_dir.glob("*.md"):
654 # Pre-flight check: skip if already exists in completed or deferred directory
655 completed_path = completed_dir / issue_file.name
656 if completed_path.exists():
657 continue
658 deferred_path = deferred_dir / issue_file.name
659 if deferred_path.exists():
660 continue
662 info = parser.parse_file(issue_file)
663 # Apply skip filter
664 if info.issue_id in skip_ids:
665 continue
666 # Apply only filter (if specified)
667 if only_ids is not None and info.issue_id not in only_ids:
668 continue
669 # Apply type filter (if specified)
670 if type_prefixes is not None:
671 prefix = info.issue_id.split("-", 1)[0]
672 if prefix not in type_prefixes:
673 continue
674 issues.append(info)
676 # When only_ids is a list, preserve input order; otherwise sort by priority
677 if isinstance(only_ids, list):
678 order = {issue_id: i for i, issue_id in enumerate(only_ids)}
679 issues.sort(key=lambda x: order.get(x.issue_id, len(only_ids)))
680 else:
681 issues.sort(key=lambda x: (x.priority_int, x.issue_id))
682 return issues
685def find_highest_priority_issue(
686 config: BRConfig,
687 category: str | None = None,
688 skip_ids: set[str] | None = None,
689 only_ids: set[str] | None = None,
690 type_prefixes: set[str] | None = None,
691) -> IssueInfo | None:
692 """Find the highest priority issue.
694 Args:
695 config: Project configuration
696 category: Optional category to filter
697 skip_ids: Issue IDs to skip
698 only_ids: If provided, only include these issue IDs
699 type_prefixes: If provided, only include issues with these type prefixes
701 Returns:
702 Highest priority IssueInfo or None if no issues found
703 """
704 issues = find_issues(config, category, skip_ids, only_ids, type_prefixes)
705 return issues[0] if issues else None