Coverage for little_loops / dependency_mapper / operations.py: 0%

124 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""Dependency fix and mutation operations. 

2 

3Functions for applying dependency proposals to issue files and 

4auto-repairing broken dependency references. 

5""" 

6 

7from __future__ import annotations 

8 

9import re 

10from pathlib import Path 

11from typing import TYPE_CHECKING 

12 

13from little_loops.dependency_mapper.analysis import validate_dependencies 

14from little_loops.dependency_mapper.models import DependencyProposal, FixResult 

15 

16if TYPE_CHECKING: 

17 from little_loops.config import BRConfig 

18 from little_loops.issue_parser import IssueInfo 

19 

20 

21def apply_proposals( 

22 proposals: list[DependencyProposal], 

23 issue_files: dict[str, Path], 

24) -> list[str]: 

25 """Write approved dependency proposals to issue files. 

26 

27 For each proposal, adds the target to the source's ``## Blocked By`` 

28 section and the source to the target's ``## Blocks`` section. 

29 

30 Args: 

31 proposals: Approved proposals to apply 

32 issue_files: Mapping from issue_id to file path 

33 

34 Returns: 

35 List of modified file paths 

36 """ 

37 modified: set[str] = set() 

38 

39 for proposal in proposals: 

40 # Update source issue: add to ## Blocked By 

41 source_path = issue_files.get(proposal.source_id) 

42 if source_path and source_path.exists(): 

43 _add_to_section(source_path, "Blocked By", proposal.target_id) 

44 modified.add(str(source_path)) 

45 

46 # Update target issue: add to ## Blocks 

47 target_path = issue_files.get(proposal.target_id) 

48 if target_path and target_path.exists(): 

49 _add_to_section(target_path, "Blocks", proposal.source_id) 

50 modified.add(str(target_path)) 

51 

52 return sorted(modified) 

53 

54 

55def _add_to_section(file_path: Path, section_name: str, issue_id: str) -> None: 

56 """Add an issue ID to a markdown section in a file. 

57 

58 If the section exists, appends a new list item. 

59 If the section doesn't exist, creates it before the 

60 ``## Labels`` or ``## Status`` section, or at the end of the file. 

61 

62 Args: 

63 file_path: Path to the issue file 

64 section_name: Section name (e.g., "Blocked By" or "Blocks") 

65 issue_id: Issue ID to add (e.g., "FEAT-001") 

66 """ 

67 content = file_path.read_text(encoding="utf-8") 

68 

69 # Check if the ID is already in the section 

70 section_pattern = rf"^##\s+{re.escape(section_name)}\s*$" 

71 section_match = re.search(section_pattern, content, re.MULTILINE | re.IGNORECASE) 

72 

73 if section_match: 

74 # Section exists — check if ID already present 

75 start = section_match.end() 

76 next_section = re.search(r"^##\s+", content[start:], re.MULTILINE) 

77 if next_section: 

78 section_content = content[start : start + next_section.start()] 

79 else: 

80 section_content = content[start:] 

81 

82 if issue_id in section_content: 

83 return # Already present 

84 

85 # Find insertion point: end of section content (before next section or EOF) 

86 insert_pos = ( 

87 start 

88 + len(section_content.rstrip()) 

89 + (len(section_content) - len(section_content.rstrip())) 

90 ) 

91 # Actually, insert at end of the last list item in the section 

92 # Find the last non-blank line in the section 

93 section_lines = section_content.rstrip().split("\n") 

94 last_content_line_offset = 0 

95 for line in reversed(section_lines): 

96 if line.strip(): 

97 break 

98 last_content_line_offset += len(line) + 1 

99 

100 insert_pos = start + len(section_content.rstrip()) 

101 new_entry = f"\n- {issue_id}" 

102 content = content[:insert_pos] + new_entry + content[insert_pos:] 

103 else: 

104 # Section doesn't exist — create it 

105 new_section = f"\n## {section_name}\n\n- {issue_id}\n" 

106 

107 # Try to insert before ## Labels or ## Status 

108 for anchor in ("## Labels", "## Status"): 

109 anchor_match = re.search(rf"^{re.escape(anchor)}\s*$", content, re.MULTILINE) 

110 if anchor_match: 

111 insert_pos = anchor_match.start() 

112 content = content[:insert_pos] + new_section + "\n" + content[insert_pos:] 

113 break 

114 else: 

115 # Append at end 

116 content = content.rstrip() + "\n" + new_section 

117 

118 file_path.write_text(content, encoding="utf-8") 

119 

120 

121def _remove_from_section(file_path: Path, section_name: str, issue_id: str) -> bool: 

122 """Remove an issue ID from a markdown section in a file. 

123 

124 If the section becomes empty after removal, the entire section is removed. 

125 

126 Args: 

127 file_path: Path to the issue file 

128 section_name: Section name (e.g., "Blocked By" or "Blocks") 

129 issue_id: Issue ID to remove (e.g., "FEAT-001") 

130 

131 Returns: 

132 True if a change was made, False if the ID was not found. 

133 """ 

134 content = file_path.read_text(encoding="utf-8") 

135 

136 section_pattern = rf"^##\s+{re.escape(section_name)}\s*$" 

137 section_match = re.search(section_pattern, content, re.MULTILINE | re.IGNORECASE) 

138 

139 if not section_match: 

140 return False 

141 

142 start = section_match.end() 

143 next_section = re.search(r"^##\s+", content[start:], re.MULTILINE) 

144 if next_section: 

145 section_end = start + next_section.start() 

146 else: 

147 section_end = len(content) 

148 

149 section_content = content[start:section_end] 

150 

151 # Find the line containing this issue ID 

152 line_pattern = rf"^[-*]\s+\*{{0,2}}{re.escape(issue_id)}\b[^\n]*\n?" 

153 line_match = re.search(line_pattern, section_content, re.MULTILINE) 

154 if not line_match: 

155 return False 

156 

157 # Remove the line 

158 new_section_content = ( 

159 section_content[: line_match.start()] + section_content[line_match.end() :] 

160 ) 

161 

162 # Check if the section is now empty (no list items remaining) 

163 remaining_items = re.search(r"^[-*]\s+", new_section_content, re.MULTILINE) 

164 if not remaining_items: 

165 # Remove entire section (header + content) 

166 # Include leading newline if present 

167 remove_start = section_match.start() 

168 if remove_start > 0 and content[remove_start - 1] == "\n": 

169 remove_start -= 1 

170 content = content[:remove_start] + content[section_end:] 

171 else: 

172 content = content[:start] + new_section_content + content[section_end:] 

173 

174 file_path.write_text(content, encoding="utf-8") 

175 return True 

176 

177 

178def fix_dependencies( 

179 issues: list[IssueInfo], 

180 completed_ids: set[str] | None = None, 

181 all_known_ids: set[str] | None = None, 

182 dry_run: bool = False, 

183) -> FixResult: 

184 """Auto-repair broken dependency references. 

185 

186 Fixes three types of validation issues: 

187 - Broken refs: removes references to non-existent issues from Blocked By 

188 - Stale completed refs: removes references to completed issues from Blocked By 

189 - Missing backlinks: adds missing Blocks entries for bidirectional consistency 

190 

191 Cycles are explicitly out of scope and are skipped with a count. 

192 

193 Args: 

194 issues: List of parsed issue objects 

195 completed_ids: Set of completed issue IDs 

196 all_known_ids: Set of all issue IDs that exist on disk 

197 dry_run: If True, report what would change without modifying files 

198 

199 Returns: 

200 FixResult with changes made and files modified 

201 """ 

202 validation = validate_dependencies(issues, completed_ids, all_known_ids) 

203 result = FixResult() 

204 

205 if not validation.has_issues: 

206 return result 

207 

208 # Build issue path map 

209 issue_path_map: dict[str, Path] = {issue.issue_id: issue.path for issue in issues} 

210 

211 # Fix broken refs: remove from Blocked By 

212 for issue_id, ref_id in validation.broken_refs: 

213 path = issue_path_map.get(issue_id) 

214 if not path or not path.exists(): 

215 continue 

216 desc = f"Removed broken ref {ref_id} from {issue_id}" 

217 result.changes.append(desc) 

218 if not dry_run: 

219 if _remove_from_section(path, "Blocked By", ref_id): 

220 result.modified_files.add(str(path)) 

221 

222 # Fix stale completed refs: remove from Blocked By 

223 for issue_id, ref_id in validation.stale_completed_refs: 

224 path = issue_path_map.get(issue_id) 

225 if not path or not path.exists(): 

226 continue 

227 desc = f"Removed stale ref {ref_id} (completed) from {issue_id}" 

228 result.changes.append(desc) 

229 if not dry_run: 

230 if _remove_from_section(path, "Blocked By", ref_id): 

231 result.modified_files.add(str(path)) 

232 

233 # Fix missing backlinks: add to Blocks 

234 for issue_id, ref_id in validation.missing_backlinks: 

235 target_path = issue_path_map.get(ref_id) 

236 if not target_path or not target_path.exists(): 

237 continue 

238 desc = f"Added backlink: {issue_id} to {ref_id}'s Blocks section" 

239 result.changes.append(desc) 

240 if not dry_run: 

241 _add_to_section(target_path, "Blocks", issue_id) 

242 result.modified_files.add(str(target_path)) 

243 

244 # Report skipped cycles 

245 result.skipped_cycles = len(validation.cycles) 

246 

247 return result 

248 

249 

250def gather_all_issue_ids(issues_dir: Path, config: BRConfig | None = None) -> set[str]: 

251 """Scan all issue directories for issue IDs (lightweight, filename-only). 

252 

253 Scans active-category and completed subdirectories for markdown files 

254 with issue ID patterns in their filenames. 

255 

256 Args: 

257 issues_dir: Path to the issues base directory (e.g., .issues) 

258 config: Optional project config. When supplied, active category names 

259 and the completed-directory name are read from config so that 

260 custom categories are included. When omitted, falls back to 

261 ``["bugs", "features", "enhancements", "completed"]``. 

262 

263 Returns: 

264 Set of all issue IDs found across all categories and completed. 

265 """ 

266 if config is not None: 

267 subdirs = config.issue_categories + [ 

268 config.get_completed_dir().name, 

269 config.get_deferred_dir().name, 

270 ] 

271 else: 

272 subdirs = ["bugs", "features", "enhancements", "completed", "deferred"] 

273 

274 ids: set[str] = set() 

275 for subdir in subdirs: 

276 d = issues_dir / subdir 

277 if not d.exists(): 

278 continue 

279 for f in d.glob("*.md"): 

280 match = re.search(r"(BUG|FEAT|ENH)-(\d+)", f.name) 

281 if match: 

282 ids.add(f"{match.group(1)}-{match.group(2)}") 

283 return ids