Coverage for little_loops / dependency_graph.py: 18%

168 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""Dependency graph for issue management. 

2 

3Constructs a directed acyclic graph (DAG) from issue dependencies, 

4providing topological sorting and cycle detection. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10from collections import deque 

11from dataclasses import dataclass, field 

12from typing import TYPE_CHECKING 

13 

14if TYPE_CHECKING: 

15 from little_loops.config import DependencyMappingConfig 

16 from little_loops.issue_parser import IssueInfo 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21@dataclass 

22class WaveContentionNote: 

23 """Annotation for a wave that was split due to file overlap.""" 

24 

25 contended_paths: list[str] 

26 sub_wave_index: int 

27 total_sub_waves: int 

28 parent_wave_index: int = 0 

29 

30 

31@dataclass 

32class DependencyGraph: 

33 """Directed acyclic graph of issue dependencies. 

34 

35 Builds a graph from issue dependencies where edges represent 

36 "blocked by" relationships. Provides methods for: 

37 - Topological sorting (dependency order) 

38 - Cycle detection 

39 - Ready issue queries (blockers resolved) 

40 - Blocking issue queries 

41 

42 Attributes: 

43 issues: Mapping from issue_id to IssueInfo 

44 blocked_by: Mapping from issue_id to set of blocking issue_ids 

45 blocks: Mapping from issue_id to set of blocked issue_ids 

46 """ 

47 

48 issues: dict[str, IssueInfo] = field(default_factory=dict) 

49 blocked_by: dict[str, set[str]] = field(default_factory=dict) 

50 blocks: dict[str, set[str]] = field(default_factory=dict) 

51 

52 @classmethod 

53 def from_issues( 

54 cls, 

55 issues: list[IssueInfo], 

56 completed_ids: set[str] | None = None, 

57 all_known_ids: set[str] | None = None, 

58 ) -> DependencyGraph: 

59 """Build graph from list of issues. 

60 

61 Constructs a dependency graph where: 

62 - Each issue is a node 

63 - blocked_by relationships create edges from blockers to blocked issues 

64 - Completed issues are treated as satisfied (not blocking) 

65 - Missing issues are logged as warnings but don't block 

66 

67 Args: 

68 issues: List of IssueInfo objects with blocked_by/blocks fields 

69 completed_ids: Set of completed issue IDs (treated as resolved) 

70 all_known_ids: Set of all issue IDs that exist on disk. When 

71 provided, references to issues in this set are silently 

72 skipped (not warned) even if they are not in the graph. 

73 

74 Returns: 

75 Constructed DependencyGraph 

76 

77 Example: 

78 >>> issues = [issue_a, issue_b, issue_c] 

79 >>> completed = {"FEAT-001"} # Already done 

80 >>> graph = DependencyGraph.from_issues(issues, completed) 

81 """ 

82 completed = completed_ids or set() 

83 graph = cls() 

84 

85 # Index all issues by ID 

86 for issue in issues: 

87 graph.issues[issue.issue_id] = issue 

88 graph.blocked_by[issue.issue_id] = set() 

89 graph.blocks[issue.issue_id] = set() 

90 

91 # Build dependency edges 

92 all_issue_ids = set(graph.issues.keys()) 

93 for issue in issues: 

94 for blocker_id in issue.blocked_by: 

95 # Skip completed blockers (already satisfied) 

96 if blocker_id in completed: 

97 continue 

98 # Skip blockers not in the graph 

99 if blocker_id not in all_issue_ids: 

100 # Only warn if the issue truly doesn't exist on disk 

101 if all_known_ids is None or blocker_id not in all_known_ids: 

102 logger.warning( 

103 f"Issue {issue.issue_id} blocked by unknown issue {blocker_id}" 

104 ) 

105 continue 

106 # Add bidirectional edge 

107 graph.blocked_by[issue.issue_id].add(blocker_id) 

108 graph.blocks[blocker_id].add(issue.issue_id) 

109 

110 return graph 

111 

112 def get_ready_issues(self, completed: set[str] | None = None) -> list[IssueInfo]: 

113 """Return issues whose blockers are all completed. 

114 

115 An issue is "ready" if: 

116 - It is not already completed 

117 - All its blockers are either completed or not in the graph 

118 

119 Args: 

120 completed: Set of completed issue IDs 

121 

122 Returns: 

123 List of IssueInfo for issues with no active blockers, 

124 sorted by priority (highest first) then issue_id 

125 """ 

126 completed = completed or set() 

127 ready = [] 

128 for issue_id, issue in self.issues.items(): 

129 if issue_id in completed: 

130 continue 

131 blockers = self.get_blocking_issues(issue_id, completed) 

132 if not blockers: 

133 ready.append(issue) 

134 # Sort by priority then issue_id for consistent ordering 

135 ready.sort(key=lambda x: (x.priority_int, x.issue_id)) 

136 return ready 

137 

138 def get_execution_waves(self, completed: set[str] | None = None) -> list[list[IssueInfo]]: 

139 """Return issues grouped into parallel execution waves. 

140 

141 Wave 1: All issues with no blockers (or blockers already completed) 

142 Wave 2: Issues whose blockers are all in wave 1 

143 Wave N: Issues whose blockers are all in waves 1..N-1 

144 

145 This is similar to topological_sort but groups issues by "level" 

146 rather than returning a flat list. 

147 

148 Args: 

149 completed: Set of already-completed issue IDs 

150 

151 Returns: 

152 List of waves, each wave is a list of issues that can run in parallel. 

153 Empty list if graph is empty or all issues are completed. 

154 

155 Raises: 

156 ValueError: If graph contains cycles (not a DAG) 

157 

158 Example: 

159 If A blocks B and C, and B and C block D: 

160 - Wave 1: [A] 

161 - Wave 2: [B, C] 

162 - Wave 3: [D] 

163 """ 

164 completed = completed or set() 

165 waves: list[list[IssueInfo]] = [] 

166 processed: set[str] = set(completed) 

167 

168 while True: 

169 # Get issues ready to run (all blockers in processed set) 

170 wave = self.get_ready_issues(completed=processed) 

171 if not wave: 

172 break 

173 waves.append(wave) 

174 # Mark this wave as processed for next iteration 

175 for issue in wave: 

176 processed.add(issue.issue_id) 

177 

178 # Check for cycles - if we have unprocessed issues, there's a cycle 

179 remaining = set(self.issues.keys()) - processed 

180 if remaining: 

181 cycles = self.detect_cycles() 

182 cycle_str = ", ".join(" -> ".join(cycle) for cycle in cycles) 

183 raise ValueError(f"Dependency graph contains cycles: {cycle_str}") 

184 

185 return waves 

186 

187 def is_blocked(self, issue_id: str, completed: set[str] | None = None) -> bool: 

188 """Check if an issue is still blocked. 

189 

190 Args: 

191 issue_id: Issue ID to check 

192 completed: Set of completed issue IDs 

193 

194 Returns: 

195 True if issue has unresolved blockers, False otherwise 

196 """ 

197 return bool(self.get_blocking_issues(issue_id, completed)) 

198 

199 def get_blocking_issues(self, issue_id: str, completed: set[str] | None = None) -> set[str]: 

200 """Return incomplete issues blocking this one. 

201 

202 Args: 

203 issue_id: Issue ID to check 

204 completed: Set of completed issue IDs 

205 

206 Returns: 

207 Set of issue IDs still blocking this issue 

208 """ 

209 completed = completed or set() 

210 blockers = self.blocked_by.get(issue_id, set()) 

211 return blockers - completed 

212 

213 def get_blocked_by_issue(self, issue_id: str) -> set[str]: 

214 """Return issues that this issue blocks. 

215 

216 Args: 

217 issue_id: Issue ID to check 

218 

219 Returns: 

220 Set of issue IDs that are blocked by this issue 

221 """ 

222 return self.blocks.get(issue_id, set()).copy() 

223 

224 def topological_sort(self) -> list[IssueInfo]: 

225 """Return issues in dependency order (Kahn's algorithm). 

226 

227 Issues with no dependencies come first, followed by issues 

228 whose dependencies have been satisfied. Within each "level", 

229 issues are sorted by priority then issue_id. 

230 

231 Returns: 

232 List of IssueInfo in topological order 

233 

234 Raises: 

235 ValueError: If graph contains cycles (not a DAG) 

236 

237 Example: 

238 If A blocks B, and B blocks C, returns [A, B, C] 

239 """ 

240 # Calculate in-degree for each node (number of blockers) 

241 in_degree: dict[str, int] = { 

242 issue_id: len(blockers) for issue_id, blockers in self.blocked_by.items() 

243 } 

244 

245 # Start with nodes that have no blockers, sorted by priority 

246 zero_degree = [ 

247 self.issues[issue_id] for issue_id, degree in in_degree.items() if degree == 0 

248 ] 

249 zero_degree.sort(key=lambda x: (x.priority_int, x.issue_id)) 

250 queue: deque[str] = deque(issue.issue_id for issue in zero_degree) 

251 

252 result: list[IssueInfo] = [] 

253 while queue: 

254 issue_id = queue.popleft() 

255 result.append(self.issues[issue_id]) 

256 

257 # Reduce in-degree for nodes this one blocks 

258 # Collect newly ready nodes, then sort before adding to queue 

259 newly_ready: list[IssueInfo] = [] 

260 for blocked_id in self.blocks.get(issue_id, set()): 

261 in_degree[blocked_id] -= 1 

262 if in_degree[blocked_id] == 0: 

263 newly_ready.append(self.issues[blocked_id]) 

264 

265 # Sort newly ready by priority for consistent ordering 

266 newly_ready.sort(key=lambda x: (x.priority_int, x.issue_id)) 

267 for issue in newly_ready: 

268 queue.append(issue.issue_id) 

269 

270 # Check for cycles - if we didn't process all nodes, there's a cycle 

271 if len(result) != len(self.issues): 

272 cycles = self.detect_cycles() 

273 cycle_str = ", ".join(" -> ".join(cycle) for cycle in cycles) 

274 raise ValueError(f"Dependency graph contains cycles: {cycle_str}") 

275 

276 return result 

277 

278 def detect_cycles(self) -> list[list[str]]: 

279 """Detect and return any dependency cycles. 

280 

281 Uses DFS with coloring to find back edges indicating cycles. 

282 A cycle exists when we encounter a node that is currently being 

283 visited (GRAY state) in the DFS traversal. 

284 

285 Returns: 

286 List of cycles, each cycle is a list of issue IDs forming 

287 a path from the cycle start back to itself. 

288 Empty list if no cycles exist. 

289 

290 Example: 

291 If A -> B -> C -> A (circular), returns [["A", "B", "C", "A"]] 

292 """ 

293 WHITE, GRAY, BLACK = 0, 1, 2 

294 color: dict[str, int] = dict.fromkeys(self.issues, WHITE) 

295 cycles: list[list[str]] = [] 

296 path: list[str] = [] 

297 

298 def dfs(node: str) -> None: 

299 color[node] = GRAY 

300 path.append(node) 

301 

302 # Traverse blockers (edges point from blocked to blocker) 

303 for neighbor in self.blocked_by.get(node, set()): 

304 if neighbor not in color: 

305 continue 

306 if color[neighbor] == GRAY: 

307 # Found cycle - extract from path 

308 cycle_start = path.index(neighbor) 

309 cycle = path[cycle_start:] + [neighbor] 

310 cycles.append(cycle) 

311 elif color[neighbor] == WHITE: 

312 dfs(neighbor) 

313 

314 path.pop() 

315 color[node] = BLACK 

316 

317 for issue_id in self.issues: 

318 if color[issue_id] == WHITE: 

319 dfs(issue_id) 

320 

321 return cycles 

322 

323 def has_cycles(self) -> bool: 

324 """Check if the graph contains any cycles. 

325 

326 Returns: 

327 True if cycles exist, False otherwise 

328 """ 

329 return len(self.detect_cycles()) > 0 

330 

331 def __len__(self) -> int: 

332 """Return number of issues in the graph.""" 

333 return len(self.issues) 

334 

335 def __contains__(self, issue_id: str) -> bool: 

336 """Check if an issue is in the graph.""" 

337 return issue_id in self.issues 

338 

339 

340def refine_waves_for_contention( 

341 waves: list[list[IssueInfo]], 

342 *, 

343 config: DependencyMappingConfig | None = None, 

344) -> tuple[list[list[IssueInfo]], list[WaveContentionNote | None]]: 

345 """Refine execution waves by splitting on file overlap. 

346 

347 For each wave with multiple issues, extracts file hints from issue 

348 content and checks for pairwise overlaps. Overlapping issues are 

349 split into sub-waves using greedy graph coloring so no two issues 

350 in the same sub-wave modify the same files. 

351 

352 Args: 

353 waves: Execution waves from get_execution_waves() 

354 

355 Returns: 

356 Tuple of (refined_waves, contention_notes). 

357 refined_waves: Wave list with contention-free sub-waves. 

358 contention_notes: Parallel list (same length as refined_waves). 

359 None for waves that weren't split, WaveContentionNote for sub-waves. 

360 """ 

361 from little_loops.parallel.file_hints import FileHints, extract_file_hints 

362 

363 refined: list[list[IssueInfo]] = [] 

364 annotations: list[WaveContentionNote | None] = [] 

365 

366 for orig_idx, wave in enumerate(waves): 

367 if len(wave) <= 1: 

368 refined.append(wave) 

369 annotations.append(None) 

370 continue 

371 

372 # Extract file hints for each issue in the wave 

373 hints: dict[str, FileHints] = {} 

374 for issue in wave: 

375 content = issue.path.read_text() if issue.path.exists() else "" 

376 hints[issue.issue_id] = extract_file_hints(content, issue.issue_id) 

377 

378 # Build conflict adjacency: issue_id -> set of conflicting issue_ids 

379 conflicts: dict[str, set[str]] = {issue.issue_id: set() for issue in wave} 

380 for i, a in enumerate(wave): 

381 for b in wave[i + 1 :]: 

382 if hints[a.issue_id].overlaps_with(hints[b.issue_id], config=config): 

383 conflicts[a.issue_id].add(b.issue_id) 

384 conflicts[b.issue_id].add(a.issue_id) 

385 

386 # If no conflicts, keep wave as-is 

387 if not any(conflicts.values()): 

388 refined.append(wave) 

389 annotations.append(None) 

390 continue 

391 

392 # Collect all contended paths for display 

393 contended: set[str] = set() 

394 for i, a in enumerate(wave): 

395 for b in wave[i + 1 :]: 

396 contended.update( 

397 hints[a.issue_id].get_overlapping_paths(hints[b.issue_id], config=config) 

398 ) 

399 contended_paths = sorted(contended) 

400 

401 # Greedy graph coloring — assign each issue the lowest color 

402 # not used by any conflicting neighbor 

403 color: dict[str, int] = {} 

404 for issue in wave: # iterate in priority order (preserved from get_ready_issues) 

405 used_colors = {color[c] for c in conflicts[issue.issue_id] if c in color} 

406 c = 0 

407 while c in used_colors: 

408 c += 1 

409 color[issue.issue_id] = c 

410 

411 # Group issues by color into sub-waves, preserving priority order 

412 max_color = max(color.values()) 

413 total_sub_waves = max_color + 1 

414 for c in range(total_sub_waves): 

415 sub_wave = [issue for issue in wave if color[issue.issue_id] == c] 

416 if sub_wave: 

417 refined.append(sub_wave) 

418 annotations.append( 

419 WaveContentionNote( 

420 contended_paths=contended_paths, 

421 sub_wave_index=c, 

422 total_sub_waves=total_sub_waves, 

423 parent_wave_index=orig_idx, 

424 ) 

425 ) 

426 

427 logger.info(f" Wave split into {total_sub_waves} sub-waves due to file overlap") 

428 

429 return refined, annotations