Coverage for little_loops / dependency_mapper / operations.py: 0%
124 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""Dependency fix and mutation operations.
3Functions for applying dependency proposals to issue files and
4auto-repairing broken dependency references.
5"""
7from __future__ import annotations
9import re
10from pathlib import Path
11from typing import TYPE_CHECKING
13from little_loops.dependency_mapper.analysis import validate_dependencies
14from little_loops.dependency_mapper.models import DependencyProposal, FixResult
16if TYPE_CHECKING:
17 from little_loops.config import BRConfig
18 from little_loops.issue_parser import IssueInfo
21def apply_proposals(
22 proposals: list[DependencyProposal],
23 issue_files: dict[str, Path],
24) -> list[str]:
25 """Write approved dependency proposals to issue files.
27 For each proposal, adds the target to the source's ``## Blocked By``
28 section and the source to the target's ``## Blocks`` section.
30 Args:
31 proposals: Approved proposals to apply
32 issue_files: Mapping from issue_id to file path
34 Returns:
35 List of modified file paths
36 """
37 modified: set[str] = set()
39 for proposal in proposals:
40 # Update source issue: add to ## Blocked By
41 source_path = issue_files.get(proposal.source_id)
42 if source_path and source_path.exists():
43 _add_to_section(source_path, "Blocked By", proposal.target_id)
44 modified.add(str(source_path))
46 # Update target issue: add to ## Blocks
47 target_path = issue_files.get(proposal.target_id)
48 if target_path and target_path.exists():
49 _add_to_section(target_path, "Blocks", proposal.source_id)
50 modified.add(str(target_path))
52 return sorted(modified)
55def _add_to_section(file_path: Path, section_name: str, issue_id: str) -> None:
56 """Add an issue ID to a markdown section in a file.
58 If the section exists, appends a new list item.
59 If the section doesn't exist, creates it before the
60 ``## Labels`` or ``## Status`` section, or at the end of the file.
62 Args:
63 file_path: Path to the issue file
64 section_name: Section name (e.g., "Blocked By" or "Blocks")
65 issue_id: Issue ID to add (e.g., "FEAT-001")
66 """
67 content = file_path.read_text(encoding="utf-8")
69 # Check if the ID is already in the section
70 section_pattern = rf"^##\s+{re.escape(section_name)}\s*$"
71 section_match = re.search(section_pattern, content, re.MULTILINE | re.IGNORECASE)
73 if section_match:
74 # Section exists — check if ID already present
75 start = section_match.end()
76 next_section = re.search(r"^##\s+", content[start:], re.MULTILINE)
77 if next_section:
78 section_content = content[start : start + next_section.start()]
79 else:
80 section_content = content[start:]
82 if issue_id in section_content:
83 return # Already present
85 # Find insertion point: end of section content (before next section or EOF)
86 insert_pos = (
87 start
88 + len(section_content.rstrip())
89 + (len(section_content) - len(section_content.rstrip()))
90 )
91 # Actually, insert at end of the last list item in the section
92 # Find the last non-blank line in the section
93 section_lines = section_content.rstrip().split("\n")
94 last_content_line_offset = 0
95 for line in reversed(section_lines):
96 if line.strip():
97 break
98 last_content_line_offset += len(line) + 1
100 insert_pos = start + len(section_content.rstrip())
101 new_entry = f"\n- {issue_id}"
102 content = content[:insert_pos] + new_entry + content[insert_pos:]
103 else:
104 # Section doesn't exist — create it
105 new_section = f"\n## {section_name}\n\n- {issue_id}\n"
107 # Try to insert before ## Labels or ## Status
108 for anchor in ("## Labels", "## Status"):
109 anchor_match = re.search(rf"^{re.escape(anchor)}\s*$", content, re.MULTILINE)
110 if anchor_match:
111 insert_pos = anchor_match.start()
112 content = content[:insert_pos] + new_section + "\n" + content[insert_pos:]
113 break
114 else:
115 # Append at end
116 content = content.rstrip() + "\n" + new_section
118 file_path.write_text(content, encoding="utf-8")
121def _remove_from_section(file_path: Path, section_name: str, issue_id: str) -> bool:
122 """Remove an issue ID from a markdown section in a file.
124 If the section becomes empty after removal, the entire section is removed.
126 Args:
127 file_path: Path to the issue file
128 section_name: Section name (e.g., "Blocked By" or "Blocks")
129 issue_id: Issue ID to remove (e.g., "FEAT-001")
131 Returns:
132 True if a change was made, False if the ID was not found.
133 """
134 content = file_path.read_text(encoding="utf-8")
136 section_pattern = rf"^##\s+{re.escape(section_name)}\s*$"
137 section_match = re.search(section_pattern, content, re.MULTILINE | re.IGNORECASE)
139 if not section_match:
140 return False
142 start = section_match.end()
143 next_section = re.search(r"^##\s+", content[start:], re.MULTILINE)
144 if next_section:
145 section_end = start + next_section.start()
146 else:
147 section_end = len(content)
149 section_content = content[start:section_end]
151 # Find the line containing this issue ID
152 line_pattern = rf"^[-*]\s+\*{{0,2}}{re.escape(issue_id)}\b[^\n]*\n?"
153 line_match = re.search(line_pattern, section_content, re.MULTILINE)
154 if not line_match:
155 return False
157 # Remove the line
158 new_section_content = (
159 section_content[: line_match.start()] + section_content[line_match.end() :]
160 )
162 # Check if the section is now empty (no list items remaining)
163 remaining_items = re.search(r"^[-*]\s+", new_section_content, re.MULTILINE)
164 if not remaining_items:
165 # Remove entire section (header + content)
166 # Include leading newline if present
167 remove_start = section_match.start()
168 if remove_start > 0 and content[remove_start - 1] == "\n":
169 remove_start -= 1
170 content = content[:remove_start] + content[section_end:]
171 else:
172 content = content[:start] + new_section_content + content[section_end:]
174 file_path.write_text(content, encoding="utf-8")
175 return True
178def fix_dependencies(
179 issues: list[IssueInfo],
180 completed_ids: set[str] | None = None,
181 all_known_ids: set[str] | None = None,
182 dry_run: bool = False,
183) -> FixResult:
184 """Auto-repair broken dependency references.
186 Fixes three types of validation issues:
187 - Broken refs: removes references to non-existent issues from Blocked By
188 - Stale completed refs: removes references to completed issues from Blocked By
189 - Missing backlinks: adds missing Blocks entries for bidirectional consistency
191 Cycles are explicitly out of scope and are skipped with a count.
193 Args:
194 issues: List of parsed issue objects
195 completed_ids: Set of completed issue IDs
196 all_known_ids: Set of all issue IDs that exist on disk
197 dry_run: If True, report what would change without modifying files
199 Returns:
200 FixResult with changes made and files modified
201 """
202 validation = validate_dependencies(issues, completed_ids, all_known_ids)
203 result = FixResult()
205 if not validation.has_issues:
206 return result
208 # Build issue path map
209 issue_path_map: dict[str, Path] = {issue.issue_id: issue.path for issue in issues}
211 # Fix broken refs: remove from Blocked By
212 for issue_id, ref_id in validation.broken_refs:
213 path = issue_path_map.get(issue_id)
214 if not path or not path.exists():
215 continue
216 desc = f"Removed broken ref {ref_id} from {issue_id}"
217 result.changes.append(desc)
218 if not dry_run:
219 if _remove_from_section(path, "Blocked By", ref_id):
220 result.modified_files.add(str(path))
222 # Fix stale completed refs: remove from Blocked By
223 for issue_id, ref_id in validation.stale_completed_refs:
224 path = issue_path_map.get(issue_id)
225 if not path or not path.exists():
226 continue
227 desc = f"Removed stale ref {ref_id} (completed) from {issue_id}"
228 result.changes.append(desc)
229 if not dry_run:
230 if _remove_from_section(path, "Blocked By", ref_id):
231 result.modified_files.add(str(path))
233 # Fix missing backlinks: add to Blocks
234 for issue_id, ref_id in validation.missing_backlinks:
235 target_path = issue_path_map.get(ref_id)
236 if not target_path or not target_path.exists():
237 continue
238 desc = f"Added backlink: {issue_id} to {ref_id}'s Blocks section"
239 result.changes.append(desc)
240 if not dry_run:
241 _add_to_section(target_path, "Blocks", issue_id)
242 result.modified_files.add(str(target_path))
244 # Report skipped cycles
245 result.skipped_cycles = len(validation.cycles)
247 return result
250def gather_all_issue_ids(issues_dir: Path, config: BRConfig | None = None) -> set[str]:
251 """Scan all issue directories for issue IDs (lightweight, filename-only).
253 Scans active-category and completed subdirectories for markdown files
254 with issue ID patterns in their filenames.
256 Args:
257 issues_dir: Path to the issues base directory (e.g., .issues)
258 config: Optional project config. When supplied, active category names
259 and the completed-directory name are read from config so that
260 custom categories are included. When omitted, falls back to
261 ``["bugs", "features", "enhancements", "completed"]``.
263 Returns:
264 Set of all issue IDs found across all categories and completed.
265 """
266 if config is not None:
267 subdirs = config.issue_categories + [
268 config.get_completed_dir().name,
269 config.get_deferred_dir().name,
270 ]
271 else:
272 subdirs = ["bugs", "features", "enhancements", "completed", "deferred"]
274 ids: set[str] = set()
275 for subdir in subdirs:
276 d = issues_dir / subdir
277 if not d.exists():
278 continue
279 for f in d.glob("*.md"):
280 match = re.search(r"(BUG|FEAT|ENH)-(\d+)", f.name)
281 if match:
282 ids.add(f"{match.group(1)}-{match.group(2)}")
283 return ids