Coverage for little_loops / dependency_mapper.py: 88%
576 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-02-15 15:23 -0600
« prev ^ index » next coverage.py v7.12.0, created at 2026-02-15 15:23 -0600
1"""Cross-issue dependency discovery and mapping.
3Analyzes active issues to discover potential dependencies based on
4file overlap and validates existing dependency references for integrity.
6Complements dependency_graph.py:
7- dependency_graph.py = execution ordering (existing, unchanged)
8- dependency_mapper.py = discovery and proposal of new relationships (new)
9"""
11from __future__ import annotations
13import logging
14import re
15from dataclasses import dataclass, field
16from pathlib import Path
17from typing import TYPE_CHECKING
19from little_loops.dependency_graph import DependencyGraph
20from little_loops.text_utils import extract_file_paths
22if TYPE_CHECKING:
23 from little_loops.issue_parser import IssueInfo
25logger = logging.getLogger(__name__)
27_CODE_FENCE = re.compile(r"```[\s\S]*?```", re.MULTILINE)
29# Semantic target extraction patterns
30_PASCAL_CASE = re.compile(r"\b([A-Z][a-z]+(?:[A-Z][a-z]+)+)\b")
31_FUNCTION_REF = re.compile(r"`(\w+)\(\)`")
32_COMPONENT_SCOPE = re.compile(
33 r"(?:component|module|class|widget|section)[:\s]+[`\"']?([a-zA-Z0-9_./\-]{3,})[`\"']?",
34 re.IGNORECASE,
35)
37# UI region / section keywords mapped to canonical names
38_SECTION_KEYWORDS: dict[str, frozenset[str]] = {
39 "header": frozenset({"header", "heading", "title bar", "top bar", "nav", "navbar", "toolbar"}),
40 "body": frozenset({"body", "content", "main", "droppable", "list", "table", "grid"}),
41 "footer": frozenset({"footer", "bottom", "status bar", "action bar"}),
42 "sidebar": frozenset({"sidebar", "side panel", "drawer", "menu"}),
43 "card": frozenset({"card", "tile", "item", "row", "entry"}),
44 "modal": frozenset({"modal", "dialog", "popup", "overlay", "sheet"}),
45 "form": frozenset({"form", "input", "field", "editor", "picker"}),
46}
48# Modification type classification keywords
49_MODIFICATION_TYPES: dict[str, frozenset[str]] = {
50 "structural": frozenset(
51 {
52 "extract",
53 "split",
54 "refactor",
55 "restructure",
56 "reorganize",
57 "create new component",
58 "break out",
59 "separate",
60 "decompose",
61 }
62 ),
63 "infrastructure": frozenset(
64 {
65 "enable",
66 "hook",
67 "handler",
68 "event",
69 "listener",
70 "provider",
71 "context",
72 "store",
73 "state management",
74 "routing",
75 "middleware",
76 "dragging",
77 "drag",
78 "drop",
79 "dnd",
80 }
81 ),
82 "enhancement": frozenset(
83 {
84 "add button",
85 "add field",
86 "add column",
87 "add stats",
88 "add icon",
89 "add toggle",
90 "display",
91 "show",
92 "render",
93 "style",
94 "format",
95 "empty state",
96 "placeholder",
97 "tooltip",
98 "badge",
99 }
100 ),
101}
104@dataclass
105class DependencyProposal:
106 """A proposed dependency relationship between two issues.
108 Attributes:
109 source_id: Issue that would be blocked
110 target_id: Issue that would block (the blocker)
111 reason: Category of discovery method
112 confidence: Score from 0.0 to 1.0
113 rationale: Human-readable explanation
114 overlapping_files: Files referenced by both issues
115 conflict_score: Semantic conflict score from 0.0 to 1.0
116 """
118 source_id: str
119 target_id: str
120 reason: str
121 confidence: float
122 rationale: str
123 overlapping_files: list[str] = field(default_factory=list)
124 conflict_score: float = 0.5
127@dataclass
128class ParallelSafePair:
129 """A pair of issues that share files but can safely run in parallel.
131 Attributes:
132 issue_a: First issue ID
133 issue_b: Second issue ID
134 shared_files: Files referenced by both issues
135 conflict_score: Semantic conflict score (< 0.4)
136 reason: Why these are parallel-safe
137 """
139 issue_a: str
140 issue_b: str
141 shared_files: list[str] = field(default_factory=list)
142 conflict_score: float = 0.0
143 reason: str = ""
146@dataclass
147class ValidationResult:
148 """Result of validating existing dependency references.
150 Attributes:
151 broken_refs: (issue_id, missing_ref_id) pairs
152 missing_backlinks: (issue_id, should_have_backlink_from) pairs
153 cycles: Cycle paths from DependencyGraph.detect_cycles()
154 stale_completed_refs: (issue_id, completed_ref_id) pairs
155 """
157 broken_refs: list[tuple[str, str]] = field(default_factory=list)
158 missing_backlinks: list[tuple[str, str]] = field(default_factory=list)
159 cycles: list[list[str]] = field(default_factory=list)
160 stale_completed_refs: list[tuple[str, str]] = field(default_factory=list)
162 @property
163 def has_issues(self) -> bool:
164 """Return True if any validation problems were found."""
165 return bool(
166 self.broken_refs or self.missing_backlinks or self.cycles or self.stale_completed_refs
167 )
170@dataclass
171class DependencyReport:
172 """Complete dependency analysis report.
174 Attributes:
175 proposals: Proposed new dependency relationships
176 parallel_safe: Pairs of issues that share files but can run in parallel
177 validation: Validation results for existing dependencies
178 issue_count: Total issues analyzed
179 existing_dep_count: Number of existing dependency edges
180 """
182 proposals: list[DependencyProposal] = field(default_factory=list)
183 parallel_safe: list[ParallelSafePair] = field(default_factory=list)
184 validation: ValidationResult = field(default_factory=ValidationResult)
185 issue_count: int = 0
186 existing_dep_count: int = 0
189def _extract_semantic_targets(content: str) -> set[str]:
190 """Extract component and function references from issue content.
192 Identifies PascalCase component names, function references,
193 and explicitly mentioned component/module scopes.
195 Args:
196 content: Issue file content
198 Returns:
199 Set of normalized semantic target names
200 """
201 if not content:
202 return set()
204 stripped = _CODE_FENCE.sub("", content)
205 targets: set[str] = set()
207 for match in _PASCAL_CASE.finditer(stripped):
208 targets.add(match.group(1).lower())
210 for match in _FUNCTION_REF.finditer(stripped):
211 targets.add(match.group(1).lower())
213 for match in _COMPONENT_SCOPE.finditer(stripped):
214 targets.add(match.group(1).lower())
216 return targets
219def _extract_section_mentions(content: str) -> set[str]:
220 """Extract UI region/section references from issue content.
222 Maps keywords like "header", "body", "sidebar" to canonical
223 section names using word-boundary matching.
225 Args:
226 content: Issue file content
228 Returns:
229 Set of canonical section names mentioned
230 """
231 if not content:
232 return set()
234 content_lower = content.lower()
235 sections: set[str] = set()
237 for section_name, keywords in _SECTION_KEYWORDS.items():
238 for keyword in keywords:
239 # Use word boundary for single words, substring for multi-word phrases
240 if " " in keyword:
241 if keyword in content_lower:
242 sections.add(section_name)
243 break
244 else:
245 if re.search(rf"\b{re.escape(keyword)}\b", content_lower):
246 sections.add(section_name)
247 break
249 return sections
252def _classify_modification_type(content: str) -> str:
253 """Classify the modification type of an issue.
255 Returns one of: "structural", "infrastructure", "enhancement".
256 Falls back to "enhancement" if no clear match.
258 Args:
259 content: Issue file content
261 Returns:
262 Modification type classification string
263 """
264 if not content:
265 return "enhancement"
267 content_lower = content.lower()
269 for mod_type in ("structural", "infrastructure", "enhancement"):
270 keywords = _MODIFICATION_TYPES[mod_type]
271 for keyword in keywords:
272 if keyword in content_lower:
273 return mod_type
275 return "enhancement"
278def compute_conflict_score(
279 content_a: str,
280 content_b: str,
281) -> float:
282 """Compute semantic conflict score between two issues.
284 Combines three signals:
285 - Semantic target overlap (component/function names): weight 0.5
286 - Section mention overlap (UI regions): weight 0.3
287 - Modification type match: weight 0.2
289 Args:
290 content_a: First issue's file content
291 content_b: Second issue's file content
293 Returns:
294 Conflict score from 0.0 (parallel-safe) to 1.0 (definite conflict)
295 """
296 targets_a = _extract_semantic_targets(content_a)
297 targets_b = _extract_semantic_targets(content_b)
299 sections_a = _extract_section_mentions(content_a)
300 sections_b = _extract_section_mentions(content_b)
302 type_a = _classify_modification_type(content_a)
303 type_b = _classify_modification_type(content_b)
305 # Signal 1: Semantic target overlap (0.0 - 1.0)
306 if targets_a and targets_b:
307 target_union = len(targets_a | targets_b)
308 target_score = len(targets_a & targets_b) / target_union if target_union > 0 else 0.0
309 else:
310 target_score = 0.5 # Unknown — default to moderate
312 # Signal 2: Section overlap (0.0 or 1.0)
313 if sections_a and sections_b:
314 section_score = 1.0 if sections_a & sections_b else 0.0
315 else:
316 section_score = 0.5 # Unknown
318 # Signal 3: Modification type match (0.0 or 1.0)
319 type_score = 1.0 if type_a == type_b else 0.0
321 return round(target_score * 0.5 + section_score * 0.3 + type_score * 0.2, 2)
324def find_file_overlaps(
325 issues: list[IssueInfo],
326 issue_contents: dict[str, str],
327) -> tuple[list[DependencyProposal], list[ParallelSafePair]]:
328 """Find issues that reference overlapping files and propose dependencies.
330 For each pair of issues where both reference the same file(s), computes
331 a semantic conflict score. High-conflict pairs get dependency proposals;
332 low-conflict pairs are reported as parallel-safe.
334 Pairs that already have a dependency relationship are skipped.
336 Args:
337 issues: List of parsed issue objects
338 issue_contents: Mapping from issue_id to file content
340 Returns:
341 Tuple of (proposed dependencies, parallel-safe pairs)
342 """
343 # Build existing dependency set for skip check
344 existing_deps: set[tuple[str, str]] = set()
345 for issue in issues:
346 for blocker_id in issue.blocked_by:
347 existing_deps.add((issue.issue_id, blocker_id))
349 # Extract file paths per issue
350 issue_paths: dict[str, set[str]] = {}
351 for issue in issues:
352 content = issue_contents.get(issue.issue_id, "")
353 paths = extract_file_paths(content)
354 if paths:
355 issue_paths[issue.issue_id] = paths
357 proposals: list[DependencyProposal] = []
358 parallel_safe: list[ParallelSafePair] = []
359 issue_ids = sorted(issue_paths.keys())
361 _type_order = {"structural": 0, "infrastructure": 1, "enhancement": 2}
363 for i, id_a in enumerate(issue_ids):
364 for id_b in issue_ids[i + 1 :]:
365 overlap = issue_paths[id_a] & issue_paths[id_b]
366 if not overlap:
367 continue
369 # Skip if dependency already exists (in either direction)
370 if (id_a, id_b) in existing_deps or (id_b, id_a) in existing_deps:
371 continue
373 content_a = issue_contents.get(id_a, "")
374 content_b = issue_contents.get(id_b, "")
375 conflict = compute_conflict_score(content_a, content_b)
377 overlap_list = sorted(overlap)
379 # Low-conflict pairs are parallel-safe
380 if conflict < 0.4:
381 sections_a = _extract_section_mentions(content_a)
382 sections_b = _extract_section_mentions(content_b)
383 if sections_a and sections_b:
384 reason = (
385 f"Different sections ({', '.join(sorted(sections_a))}"
386 f" vs {', '.join(sorted(sections_b))})"
387 )
388 else:
389 reason = "Low semantic conflict score"
390 parallel_safe.append(
391 ParallelSafePair(
392 issue_a=id_a,
393 issue_b=id_b,
394 shared_files=overlap_list,
395 conflict_score=conflict,
396 reason=reason,
397 )
398 )
399 continue
401 # Determine direction for high-conflict pairs
402 issue_a = next(iss for iss in issues if iss.issue_id == id_a)
403 issue_b = next(iss for iss in issues if iss.issue_id == id_b)
405 confidence_modifier = 1.0
407 if issue_a.priority_int != issue_b.priority_int:
408 # Different priorities: higher priority blocks lower
409 if issue_a.priority_int < issue_b.priority_int:
410 target_id, source_id = id_a, id_b
411 else:
412 target_id, source_id = id_b, id_a
413 else:
414 # Same priority: use modification type ordering
415 type_a = _classify_modification_type(content_a)
416 type_b = _classify_modification_type(content_b)
417 order_a = _type_order.get(type_a, 2)
418 order_b = _type_order.get(type_b, 2)
420 if order_a != order_b:
421 if order_a < order_b:
422 target_id, source_id = id_a, id_b
423 else:
424 target_id, source_id = id_b, id_a
425 else:
426 # Fall back to ID ordering with reduced confidence
427 if id_a < id_b:
428 target_id, source_id = id_a, id_b
429 else:
430 target_id, source_id = id_b, id_a
431 confidence_modifier = 0.5
433 min_paths = min(len(issue_paths[id_a]), len(issue_paths[id_b]))
434 confidence = len(overlap) / min_paths if min_paths > 0 else 0.0
435 confidence *= confidence_modifier
437 rationale = (
438 f"{source_id} and {target_id} both reference "
439 f"{', '.join(overlap_list[:3])}"
440 f"{' and more' if len(overlap_list) > 3 else ''}. "
441 f"{target_id} has higher priority and should be completed first."
442 )
444 proposals.append(
445 DependencyProposal(
446 source_id=source_id,
447 target_id=target_id,
448 reason="file_overlap",
449 confidence=round(confidence, 2),
450 rationale=rationale,
451 overlapping_files=overlap_list,
452 conflict_score=conflict,
453 )
454 )
456 # Sort by confidence descending
457 proposals.sort(key=lambda p: -p.confidence)
458 return proposals, parallel_safe
461def validate_dependencies(
462 issues: list[IssueInfo],
463 completed_ids: set[str] | None = None,
464 all_known_ids: set[str] | None = None,
465) -> ValidationResult:
466 """Validate existing dependency references for integrity.
468 Checks:
469 - Broken refs: blocked_by entries referencing nonexistent issues
470 - Missing backlinks: A blocks B but B doesn't list A in blocked_by
471 - Cycles: circular dependency chains
472 - Stale completed refs: blocked_by entries referencing completed issues
474 Args:
475 issues: List of parsed issue objects
476 completed_ids: Set of completed issue IDs
477 all_known_ids: Set of all issue IDs that exist on disk (across all
478 categories and completed). When provided, references to issues
479 in this set are not flagged as broken even if they are not in
480 the working ``issues`` list.
482 Returns:
483 ValidationResult with all detected problems
484 """
485 completed = completed_ids or set()
486 result = ValidationResult()
488 active_ids = {issue.issue_id for issue in issues}
489 all_known = active_ids | completed
490 if all_known_ids:
491 all_known = all_known | all_known_ids
493 # Build lookup maps
494 blocked_by_map: dict[str, set[str]] = {}
495 blocks_map: dict[str, set[str]] = {}
496 for issue in issues:
497 blocked_by_map[issue.issue_id] = set(issue.blocked_by)
498 blocks_map[issue.issue_id] = set(issue.blocks)
500 for issue in issues:
501 for ref_id in issue.blocked_by:
502 if ref_id not in all_known:
503 result.broken_refs.append((issue.issue_id, ref_id))
504 elif ref_id in completed:
505 result.stale_completed_refs.append((issue.issue_id, ref_id))
507 # Check backlinks: if A.blocked_by contains B, then B.blocks should contain A
508 for ref_id in issue.blocked_by:
509 if ref_id in active_ids:
510 target_blocks = blocks_map.get(ref_id, set())
511 if issue.issue_id not in target_blocks:
512 result.missing_backlinks.append((issue.issue_id, ref_id))
514 # Cycle detection using DependencyGraph
515 graph = DependencyGraph.from_issues(issues, completed, all_known_ids=all_known_ids)
516 result.cycles = graph.detect_cycles()
518 return result
521def analyze_dependencies(
522 issues: list[IssueInfo],
523 issue_contents: dict[str, str],
524 completed_ids: set[str] | None = None,
525 all_known_ids: set[str] | None = None,
526) -> DependencyReport:
527 """Run full dependency analysis: discovery and validation.
529 Args:
530 issues: List of parsed issue objects
531 issue_contents: Mapping from issue_id to file content
532 completed_ids: Set of completed issue IDs
533 all_known_ids: Set of all issue IDs that exist on disk
535 Returns:
536 Comprehensive dependency report
537 """
538 proposals, parallel_safe = find_file_overlaps(issues, issue_contents)
539 validation = validate_dependencies(issues, completed_ids, all_known_ids)
541 existing_dep_count = sum(len(issue.blocked_by) for issue in issues)
543 return DependencyReport(
544 proposals=proposals,
545 parallel_safe=parallel_safe,
546 validation=validation,
547 issue_count=len(issues),
548 existing_dep_count=existing_dep_count,
549 )
552def format_report(report: DependencyReport) -> str:
553 """Format a dependency report as human-readable markdown.
555 Args:
556 report: The analysis report to format
558 Returns:
559 Markdown-formatted report string
560 """
561 lines: list[str] = []
562 lines.append("# Dependency Analysis Report")
563 lines.append("")
564 lines.append(f"- **Issues analyzed**: {report.issue_count}")
565 lines.append(f"- **Existing dependencies**: {report.existing_dep_count}")
566 lines.append(f"- **Proposed new dependencies**: {len(report.proposals)}")
567 lines.append(f"- **Parallel-safe pairs**: {len(report.parallel_safe)}")
568 lines.append(f"- **Validation issues**: {'Yes' if report.validation.has_issues else 'None'}")
569 lines.append("")
571 # Proposals section
572 if report.proposals:
573 lines.append("## Proposed Dependencies")
574 lines.append("")
575 lines.append(
576 "| # | Source (blocked) | Target (blocker) | Reason "
577 "| Conflict | Confidence | Rationale |"
578 )
579 lines.append(
580 "|---|-----------------|-----------------|--------|----------|------------|-----------|"
581 )
582 for i, p in enumerate(report.proposals, 1):
583 if p.conflict_score >= 0.7:
584 conflict_level = "HIGH"
585 elif p.conflict_score >= 0.4:
586 conflict_level = "MEDIUM"
587 else:
588 conflict_level = "LOW"
589 lines.append(
590 f"| {i} | {p.source_id} | {p.target_id} | "
591 f"{p.reason} | {conflict_level} | {p.confidence:.0%} | {p.rationale} |"
592 )
593 lines.append("")
595 # Parallel-safe section
596 if report.parallel_safe:
597 lines.append("## Parallel Execution Safe")
598 lines.append("")
599 lines.append("| Issue A | Issue B | Shared Files | Conflict Score | Reason |")
600 lines.append("|---------|---------|--------------|---------------|--------|")
601 for pair in report.parallel_safe:
602 files_str = ", ".join(pair.shared_files[:3])
603 if len(pair.shared_files) > 3:
604 files_str += " and more"
605 lines.append(
606 f"| {pair.issue_a} | {pair.issue_b} | "
607 f"{files_str} | {pair.conflict_score:.0%} | {pair.reason} |"
608 )
609 lines.append("")
611 # Validation section
612 v = report.validation
613 if v.has_issues:
614 lines.append("## Validation Issues")
615 lines.append("")
617 if v.broken_refs:
618 lines.append("### Broken References")
619 lines.append("")
620 for issue_id, ref_id in v.broken_refs:
621 lines.append(f"- {issue_id}: references nonexistent {ref_id}")
622 lines.append("")
624 if v.missing_backlinks:
625 lines.append("### Missing Backlinks")
626 lines.append("")
627 for issue_id, ref_id in v.missing_backlinks:
628 lines.append(
629 f"- {issue_id} is blocked by {ref_id}, "
630 f"but {ref_id} does not list {issue_id} in Blocks"
631 )
632 lines.append("")
634 if v.cycles:
635 lines.append("### Dependency Cycles")
636 lines.append("")
637 for cycle in v.cycles:
638 lines.append(f"- {' -> '.join(cycle)}")
639 lines.append("")
641 if v.stale_completed_refs:
642 lines.append("### Stale References (to completed issues)")
643 lines.append("")
644 for issue_id, ref_id in v.stale_completed_refs:
645 lines.append(f"- {issue_id}: blocked by {ref_id} (completed)")
646 lines.append("")
648 if not report.proposals and not report.parallel_safe and not v.has_issues:
649 lines.append("No dependency proposals or validation issues found.")
650 lines.append("")
652 return "\n".join(lines)
655def format_text_graph(
656 issues: list[IssueInfo],
657 proposals: list[DependencyProposal] | None = None,
658) -> str:
659 """Generate an ASCII dependency graph diagram.
661 Shows existing dependencies as solid arrows and proposed
662 dependencies as dashed arrows.
664 Args:
665 issues: List of parsed issue objects
666 proposals: Optional proposed dependencies to include
668 Returns:
669 Text graph string readable in the terminal
670 """
671 if not issues:
672 return "(no issues)"
674 issue_ids = {i.issue_id for i in issues}
675 sorted_issues = sorted(issues, key=lambda i: (i.priority_int, i.issue_id))
677 # Build adjacency: blocker -> list of blocked issues
678 blocks: dict[str, list[str]] = {}
679 for issue in sorted_issues:
680 for blocker_id in issue.blocked_by:
681 if blocker_id in issue_ids:
682 blocks.setdefault(blocker_id, []).append(issue.issue_id)
684 # Add proposed edges
685 proposed_edges: set[tuple[str, str]] = set()
686 if proposals:
687 for p in proposals:
688 if p.target_id in issue_ids and p.source_id in issue_ids:
689 blocks.setdefault(p.target_id, []).append(p.source_id)
690 proposed_edges.add((p.target_id, p.source_id))
692 # Build chains from roots (issues not blocked by anything in the set)
693 blocked_ids: set[str] = set()
694 for targets in blocks.values():
695 blocked_ids.update(targets)
696 roots = [i.issue_id for i in sorted_issues if i.issue_id not in blocked_ids]
698 visited: set[str] = set()
699 chains: list[str] = []
701 def build_chain(issue_id: str) -> str:
702 if issue_id in visited:
703 return issue_id
704 visited.add(issue_id)
705 targets = sorted(blocks.get(issue_id, []))
706 if not targets:
707 return issue_id
708 if len(targets) == 1:
709 arrow = "-.→" if (issue_id, targets[0]) in proposed_edges else "──→"
710 return f"{issue_id} {arrow} {build_chain(targets[0])}"
711 # Multiple branches: first inline, rest as separate chains
712 arrow = "-.→" if (issue_id, targets[0]) in proposed_edges else "──→"
713 result = f"{issue_id} {arrow} {build_chain(targets[0])}"
714 for other in targets[1:]:
715 if other not in visited:
716 arrow_other = "-.→" if (issue_id, other) in proposed_edges else "──→"
717 chains.append(f" {issue_id} {arrow_other} {build_chain(other)}")
718 return result
720 for root in roots:
721 if root not in visited:
722 chain = build_chain(root)
723 chains.append(f" {chain}")
725 # Isolated issues (not in any chain)
726 for issue in sorted_issues:
727 if issue.issue_id not in visited:
728 chains.append(f" {issue.issue_id}")
730 lines: list[str] = list(chains)
732 if any("──→" in c for c in chains) or any("-.→" in c for c in chains):
733 lines.append("")
734 legend_parts = []
735 if any("──→" in c for c in chains):
736 legend_parts.append("──→ blocks")
737 if any("-.→" in c for c in chains):
738 legend_parts.append("-.→ proposed")
739 lines.append(f"Legend: {', '.join(legend_parts)}")
741 return "\n".join(lines)
744def apply_proposals(
745 proposals: list[DependencyProposal],
746 issue_files: dict[str, Path],
747) -> list[str]:
748 """Write approved dependency proposals to issue files.
750 For each proposal, adds the target to the source's ``## Blocked By``
751 section and the source to the target's ``## Blocks`` section.
753 Args:
754 proposals: Approved proposals to apply
755 issue_files: Mapping from issue_id to file path
757 Returns:
758 List of modified file paths
759 """
760 modified: set[str] = set()
762 for proposal in proposals:
763 # Update source issue: add to ## Blocked By
764 source_path = issue_files.get(proposal.source_id)
765 if source_path and source_path.exists():
766 _add_to_section(source_path, "Blocked By", proposal.target_id)
767 modified.add(str(source_path))
769 # Update target issue: add to ## Blocks
770 target_path = issue_files.get(proposal.target_id)
771 if target_path and target_path.exists():
772 _add_to_section(target_path, "Blocks", proposal.source_id)
773 modified.add(str(target_path))
775 return sorted(modified)
778def _add_to_section(file_path: Path, section_name: str, issue_id: str) -> None:
779 """Add an issue ID to a markdown section in a file.
781 If the section exists, appends a new list item.
782 If the section doesn't exist, creates it before the
783 ``## Labels`` or ``## Status`` section, or at the end of the file.
785 Args:
786 file_path: Path to the issue file
787 section_name: Section name (e.g., "Blocked By" or "Blocks")
788 issue_id: Issue ID to add (e.g., "FEAT-001")
789 """
790 content = file_path.read_text(encoding="utf-8")
792 # Check if the ID is already in the section
793 section_pattern = rf"^##\s+{re.escape(section_name)}\s*$"
794 section_match = re.search(section_pattern, content, re.MULTILINE | re.IGNORECASE)
796 if section_match:
797 # Section exists — check if ID already present
798 start = section_match.end()
799 next_section = re.search(r"^##\s+", content[start:], re.MULTILINE)
800 if next_section:
801 section_content = content[start : start + next_section.start()]
802 else:
803 section_content = content[start:]
805 if issue_id in section_content:
806 return # Already present
808 # Find insertion point: end of section content (before next section or EOF)
809 insert_pos = (
810 start
811 + len(section_content.rstrip())
812 + (len(section_content) - len(section_content.rstrip()))
813 )
814 # Actually, insert at end of the last list item in the section
815 # Find the last non-blank line in the section
816 section_lines = section_content.rstrip().split("\n")
817 last_content_line_offset = 0
818 for line in reversed(section_lines):
819 if line.strip():
820 break
821 last_content_line_offset += len(line) + 1
823 insert_pos = start + len(section_content.rstrip())
824 new_entry = f"\n- {issue_id}"
825 content = content[:insert_pos] + new_entry + content[insert_pos:]
826 else:
827 # Section doesn't exist — create it
828 new_section = f"\n## {section_name}\n\n- {issue_id}\n"
830 # Try to insert before ## Labels or ## Status
831 for anchor in ("## Labels", "## Status"):
832 anchor_match = re.search(rf"^{re.escape(anchor)}\s*$", content, re.MULTILINE)
833 if anchor_match:
834 insert_pos = anchor_match.start()
835 content = content[:insert_pos] + new_section + "\n" + content[insert_pos:]
836 break
837 else:
838 # Append at end
839 content = content.rstrip() + "\n" + new_section
841 file_path.write_text(content, encoding="utf-8")
844def _remove_from_section(file_path: Path, section_name: str, issue_id: str) -> bool:
845 """Remove an issue ID from a markdown section in a file.
847 If the section becomes empty after removal, the entire section is removed.
849 Args:
850 file_path: Path to the issue file
851 section_name: Section name (e.g., "Blocked By" or "Blocks")
852 issue_id: Issue ID to remove (e.g., "FEAT-001")
854 Returns:
855 True if a change was made, False if the ID was not found.
856 """
857 content = file_path.read_text(encoding="utf-8")
859 section_pattern = rf"^##\s+{re.escape(section_name)}\s*$"
860 section_match = re.search(section_pattern, content, re.MULTILINE | re.IGNORECASE)
862 if not section_match:
863 return False
865 start = section_match.end()
866 next_section = re.search(r"^##\s+", content[start:], re.MULTILINE)
867 if next_section:
868 section_end = start + next_section.start()
869 else:
870 section_end = len(content)
872 section_content = content[start:section_end]
874 # Find the line containing this issue ID
875 line_pattern = rf"^[-*]\s+\*{{0,2}}{re.escape(issue_id)}\b[^\n]*\n?"
876 line_match = re.search(line_pattern, section_content, re.MULTILINE)
877 if not line_match:
878 return False
880 # Remove the line
881 new_section_content = (
882 section_content[: line_match.start()] + section_content[line_match.end() :]
883 )
885 # Check if the section is now empty (no list items remaining)
886 remaining_items = re.search(r"^[-*]\s+", new_section_content, re.MULTILINE)
887 if not remaining_items:
888 # Remove entire section (header + content)
889 # Include leading newline if present
890 remove_start = section_match.start()
891 if remove_start > 0 and content[remove_start - 1] == "\n":
892 remove_start -= 1
893 content = content[:remove_start] + content[section_end:]
894 else:
895 content = content[:start] + new_section_content + content[section_end:]
897 file_path.write_text(content, encoding="utf-8")
898 return True
901@dataclass
902class FixResult:
903 """Result of auto-fixing dependency validation issues.
905 Attributes:
906 changes: Human-readable descriptions of each fix applied
907 modified_files: Set of file paths that were modified
908 skipped_cycles: Number of cycles skipped (out of scope for auto-fix)
909 """
911 changes: list[str] = field(default_factory=list)
912 modified_files: set[str] = field(default_factory=set)
913 skipped_cycles: int = 0
916def fix_dependencies(
917 issues: list[IssueInfo],
918 completed_ids: set[str] | None = None,
919 all_known_ids: set[str] | None = None,
920 dry_run: bool = False,
921) -> FixResult:
922 """Auto-repair broken dependency references.
924 Fixes three types of validation issues:
925 - Broken refs: removes references to non-existent issues from Blocked By
926 - Stale completed refs: removes references to completed issues from Blocked By
927 - Missing backlinks: adds missing Blocks entries for bidirectional consistency
929 Cycles are explicitly out of scope and are skipped with a count.
931 Args:
932 issues: List of parsed issue objects
933 completed_ids: Set of completed issue IDs
934 all_known_ids: Set of all issue IDs that exist on disk
935 dry_run: If True, report what would change without modifying files
937 Returns:
938 FixResult with changes made and files modified
939 """
940 validation = validate_dependencies(issues, completed_ids, all_known_ids)
941 result = FixResult()
943 if not validation.has_issues:
944 return result
946 # Build issue path map
947 issue_path_map: dict[str, Path] = {issue.issue_id: issue.path for issue in issues}
949 # Fix broken refs: remove from Blocked By
950 for issue_id, ref_id in validation.broken_refs:
951 path = issue_path_map.get(issue_id)
952 if not path or not path.exists():
953 continue
954 desc = f"Removed broken ref {ref_id} from {issue_id}"
955 result.changes.append(desc)
956 if not dry_run:
957 if _remove_from_section(path, "Blocked By", ref_id):
958 result.modified_files.add(str(path))
960 # Fix stale completed refs: remove from Blocked By
961 for issue_id, ref_id in validation.stale_completed_refs:
962 path = issue_path_map.get(issue_id)
963 if not path or not path.exists():
964 continue
965 desc = f"Removed stale ref {ref_id} (completed) from {issue_id}"
966 result.changes.append(desc)
967 if not dry_run:
968 if _remove_from_section(path, "Blocked By", ref_id):
969 result.modified_files.add(str(path))
971 # Fix missing backlinks: add to Blocks
972 for issue_id, ref_id in validation.missing_backlinks:
973 target_path = issue_path_map.get(ref_id)
974 if not target_path or not target_path.exists():
975 continue
976 desc = f"Added backlink: {issue_id} to {ref_id}'s Blocks section"
977 result.changes.append(desc)
978 if not dry_run:
979 _add_to_section(target_path, "Blocks", issue_id)
980 result.modified_files.add(str(target_path))
982 # Report skipped cycles
983 result.skipped_cycles = len(validation.cycles)
985 return result
988def gather_all_issue_ids(issues_dir: Path) -> set[str]:
989 """Scan all issue directories for issue IDs (lightweight, filename-only).
991 Scans bugs/, features/, enhancements/, and completed/ subdirectories
992 for markdown files with issue ID patterns in their filenames.
994 Args:
995 issues_dir: Path to the issues base directory (e.g., .issues)
997 Returns:
998 Set of all issue IDs found across all categories and completed.
999 """
1000 ids: set[str] = set()
1001 for subdir in ["bugs", "features", "enhancements", "completed"]:
1002 d = issues_dir / subdir
1003 if not d.exists():
1004 continue
1005 for f in d.glob("*.md"):
1006 match = re.search(r"(BUG|FEAT|ENH)-(\d+)", f.name)
1007 if match:
1008 ids.add(f"{match.group(1)}-{match.group(2)}")
1009 return ids
1012def _load_issues(
1013 issues_dir: Path,
1014 only_ids: set[str] | None = None,
1015) -> tuple[list[IssueInfo], dict[str, str], set[str]]:
1016 """Load issues from directory for CLI use.
1018 Args:
1019 issues_dir: Path to the issues base directory (e.g., .issues)
1020 only_ids: If provided, only include issues with these IDs
1022 Returns:
1023 Tuple of (active issues, issue contents map, completed issue IDs)
1024 """
1025 from little_loops.config import BRConfig
1026 from little_loops.issue_parser import find_issues
1028 # Find project root by walking up from issues_dir
1029 project_root = issues_dir.resolve().parent
1030 if issues_dir.name != ".issues":
1031 # If issues_dir is already absolute, try to find config relative to it
1032 project_root = issues_dir.parent
1034 config = BRConfig(project_root)
1035 issues = find_issues(config, only_ids=only_ids)
1037 # Build contents map
1038 issue_contents: dict[str, str] = {}
1039 for info in issues:
1040 if info.path.exists():
1041 issue_contents[info.issue_id] = info.path.read_text(encoding="utf-8")
1043 # Gather completed issue IDs
1044 completed_dir = config.get_completed_dir()
1045 completed_ids: set[str] = set()
1046 if completed_dir.exists():
1047 import re as _re
1049 for f in completed_dir.glob("*.md"):
1050 match = _re.search(r"(BUG|FEAT|ENH)-(\d+)", f.name)
1051 if match:
1052 completed_ids.add(f"{match.group(1)}-{match.group(2)}")
1054 return issues, issue_contents, completed_ids
1057def main() -> int:
1058 """Entry point for ll-deps command.
1060 Analyze cross-issue dependencies and validate existing references.
1062 Returns:
1063 Exit code (0 = success, 1 = failure)
1064 """
1065 import argparse
1066 import json as _json
1067 import sys
1069 parser = argparse.ArgumentParser(
1070 prog="ll-deps",
1071 description="Cross-issue dependency discovery and validation",
1072 formatter_class=argparse.RawDescriptionHelpFormatter,
1073 epilog="""
1074Examples:
1075 %(prog)s analyze # Full analysis with markdown output
1076 %(prog)s analyze --format json # JSON output for programmatic use
1077 %(prog)s analyze --graph # Include ASCII dependency graph
1078 %(prog)s analyze --sprint my-sprint # Analyze only issues in a sprint
1079 %(prog)s validate # Validation only (broken refs, cycles)
1080 %(prog)s validate --sprint my-sprint # Validate only sprint issue deps
1081 %(prog)s fix # Auto-fix broken refs, stale refs, backlinks
1082 %(prog)s fix --dry-run # Preview fixes without modifying files
1083""",
1084 )
1086 parser.add_argument(
1087 "-d",
1088 "--issues-dir",
1089 type=Path,
1090 default=None,
1091 help="Path to issues directory (default: .issues)",
1092 )
1094 subparsers = parser.add_subparsers(dest="command", help="Available commands")
1096 # analyze subcommand
1097 analyze_parser = subparsers.add_parser(
1098 "analyze",
1099 help="Full dependency analysis (file overlaps + validation)",
1100 )
1101 analyze_parser.add_argument(
1102 "-f",
1103 "--format",
1104 type=str,
1105 choices=["text", "json"],
1106 default="text",
1107 help="Output format (default: text/markdown)",
1108 )
1109 analyze_parser.add_argument(
1110 "--graph",
1111 action="store_true",
1112 help="Include ASCII dependency graph in output",
1113 )
1114 analyze_parser.add_argument(
1115 "--sprint",
1116 type=str,
1117 default=None,
1118 help="Restrict analysis to issues in the named sprint",
1119 )
1121 # validate subcommand
1122 validate_parser = subparsers.add_parser(
1123 "validate",
1124 help="Validate existing dependency references only",
1125 )
1126 validate_parser.add_argument(
1127 "--sprint",
1128 type=str,
1129 default=None,
1130 help="Restrict validation to issues in the named sprint",
1131 )
1133 # fix subcommand
1134 fix_parser = subparsers.add_parser(
1135 "fix",
1136 help="Auto-fix broken refs, stale refs, and missing backlinks",
1137 )
1138 fix_parser.add_argument(
1139 "--dry-run",
1140 "-n",
1141 action="store_true",
1142 help="Show what would be fixed without making changes",
1143 )
1144 fix_parser.add_argument(
1145 "--sprint",
1146 type=str,
1147 default=None,
1148 help="Restrict fixes to issues in the named sprint",
1149 )
1151 args = parser.parse_args()
1153 if not args.command:
1154 parser.print_help()
1155 return 1
1157 issues_dir = args.issues_dir or Path.cwd() / ".issues"
1158 if not issues_dir.exists():
1159 print(f"Error: Issues directory not found: {issues_dir}", file=sys.stderr)
1160 return 1
1162 # Sprint-scoped filtering
1163 only_ids: set[str] | None = None
1164 if getattr(args, "sprint", None):
1165 from little_loops.config import BRConfig as _BRConfig
1166 from little_loops.sprint import Sprint
1168 project_root = issues_dir.resolve().parent
1169 if issues_dir.name != ".issues":
1170 project_root = issues_dir.parent
1171 _config = _BRConfig(project_root)
1172 sprints_dir = Path(_config.sprints.sprints_dir)
1173 if not sprints_dir.is_absolute():
1174 sprints_dir = project_root / sprints_dir
1176 sprint = Sprint.load(sprints_dir, args.sprint)
1177 if sprint is None:
1178 print(f"Error: Sprint not found: {args.sprint}", file=sys.stderr)
1179 return 1
1180 only_ids = set(sprint.issues)
1181 if not only_ids:
1182 print(f"Sprint '{args.sprint}' has no issues.")
1183 return 0
1185 try:
1186 issues, issue_contents, completed_ids = _load_issues(issues_dir, only_ids=only_ids)
1187 except Exception as e:
1188 print(f"Error loading issues: {e}", file=sys.stderr)
1189 return 1
1191 if not issues:
1192 print("No active issues found.")
1193 return 0
1195 # Gather all issue IDs on disk to avoid false "nonexistent" warnings
1196 # when sprint-scoped analysis references issues outside the sprint
1197 all_known_ids = gather_all_issue_ids(issues_dir)
1199 if args.command == "analyze":
1200 report = analyze_dependencies(issues, issue_contents, completed_ids, all_known_ids)
1202 if args.format == "json":
1203 data = {
1204 "issue_count": report.issue_count,
1205 "existing_dep_count": report.existing_dep_count,
1206 "proposals": [
1207 {
1208 "source_id": p.source_id,
1209 "target_id": p.target_id,
1210 "reason": p.reason,
1211 "confidence": p.confidence,
1212 "rationale": p.rationale,
1213 "overlapping_files": p.overlapping_files,
1214 "conflict_score": p.conflict_score,
1215 }
1216 for p in report.proposals
1217 ],
1218 "parallel_safe": [
1219 {
1220 "issue_a": ps.issue_a,
1221 "issue_b": ps.issue_b,
1222 "shared_files": ps.shared_files,
1223 "conflict_score": ps.conflict_score,
1224 "reason": ps.reason,
1225 }
1226 for ps in report.parallel_safe
1227 ],
1228 "validation": {
1229 "broken_refs": report.validation.broken_refs,
1230 "missing_backlinks": report.validation.missing_backlinks,
1231 "cycles": report.validation.cycles,
1232 "stale_completed_refs": report.validation.stale_completed_refs,
1233 "has_issues": report.validation.has_issues,
1234 },
1235 }
1236 print(_json.dumps(data, indent=2))
1237 else:
1238 print(format_report(report))
1239 if args.graph:
1240 print()
1241 print("## Dependency Graph")
1242 print()
1243 print(format_text_graph(issues, report.proposals))
1245 return 0
1247 if args.command == "validate":
1248 result = validate_dependencies(issues, completed_ids, all_known_ids)
1250 if not result.has_issues:
1251 print("No validation issues found.")
1252 return 0
1254 lines: list[str] = []
1255 lines.append("# Dependency Validation Report")
1256 lines.append("")
1258 if result.broken_refs:
1259 lines.append("## Broken References")
1260 lines.append("")
1261 for issue_id, ref_id in result.broken_refs:
1262 lines.append(f"- {issue_id}: references nonexistent {ref_id}")
1263 lines.append("")
1265 if result.missing_backlinks:
1266 lines.append("## Missing Backlinks")
1267 lines.append("")
1268 for issue_id, ref_id in result.missing_backlinks:
1269 lines.append(
1270 f"- {issue_id} is blocked by {ref_id}, "
1271 f"but {ref_id} does not list {issue_id} in Blocks"
1272 )
1273 lines.append("")
1275 if result.cycles:
1276 lines.append("## Dependency Cycles")
1277 lines.append("")
1278 for cycle in result.cycles:
1279 lines.append(f"- {' -> '.join(cycle)}")
1280 lines.append("")
1282 if result.stale_completed_refs:
1283 lines.append("## Stale References (to completed issues)")
1284 lines.append("")
1285 for issue_id, ref_id in result.stale_completed_refs:
1286 lines.append(f"- {issue_id}: blocked by {ref_id} (completed)")
1287 lines.append("")
1289 print("\n".join(lines))
1290 return 0
1292 if args.command == "fix":
1293 fix_result = fix_dependencies(issues, completed_ids, all_known_ids, dry_run=args.dry_run)
1295 if not fix_result.changes:
1296 print("No fixable issues found.")
1297 if fix_result.skipped_cycles:
1298 print(f"({fix_result.skipped_cycles} cycle(s) detected — resolve manually)")
1299 return 0
1301 prefix = "[DRY RUN] " if args.dry_run else ""
1302 print(f"# {prefix}Dependency Fix Report")
1303 print()
1304 for change in fix_result.changes:
1305 print(f" {prefix}{change}")
1306 print()
1307 print(f"{prefix}{len(fix_result.changes)} fix(es) applied.")
1309 if fix_result.modified_files:
1310 print()
1311 print("Modified files:")
1312 for fpath in sorted(fix_result.modified_files):
1313 print(f" {fpath}")
1315 if fix_result.skipped_cycles:
1316 print()
1317 print(f"({fix_result.skipped_cycles} cycle(s) detected — resolve manually)")
1319 return 0
1321 return 1