Coverage for little_loops / dependency_graph.py: 18%
168 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""Dependency graph for issue management.
3Constructs a directed acyclic graph (DAG) from issue dependencies,
4providing topological sorting and cycle detection.
5"""
7from __future__ import annotations
9import logging
10from collections import deque
11from dataclasses import dataclass, field
12from typing import TYPE_CHECKING
14if TYPE_CHECKING:
15 from little_loops.config import DependencyMappingConfig
16 from little_loops.issue_parser import IssueInfo
18logger = logging.getLogger(__name__)
21@dataclass
22class WaveContentionNote:
23 """Annotation for a wave that was split due to file overlap."""
25 contended_paths: list[str]
26 sub_wave_index: int
27 total_sub_waves: int
28 parent_wave_index: int = 0
31@dataclass
32class DependencyGraph:
33 """Directed acyclic graph of issue dependencies.
35 Builds a graph from issue dependencies where edges represent
36 "blocked by" relationships. Provides methods for:
37 - Topological sorting (dependency order)
38 - Cycle detection
39 - Ready issue queries (blockers resolved)
40 - Blocking issue queries
42 Attributes:
43 issues: Mapping from issue_id to IssueInfo
44 blocked_by: Mapping from issue_id to set of blocking issue_ids
45 blocks: Mapping from issue_id to set of blocked issue_ids
46 """
48 issues: dict[str, IssueInfo] = field(default_factory=dict)
49 blocked_by: dict[str, set[str]] = field(default_factory=dict)
50 blocks: dict[str, set[str]] = field(default_factory=dict)
52 @classmethod
53 def from_issues(
54 cls,
55 issues: list[IssueInfo],
56 completed_ids: set[str] | None = None,
57 all_known_ids: set[str] | None = None,
58 ) -> DependencyGraph:
59 """Build graph from list of issues.
61 Constructs a dependency graph where:
62 - Each issue is a node
63 - blocked_by relationships create edges from blockers to blocked issues
64 - Completed issues are treated as satisfied (not blocking)
65 - Missing issues are logged as warnings but don't block
67 Args:
68 issues: List of IssueInfo objects with blocked_by/blocks fields
69 completed_ids: Set of completed issue IDs (treated as resolved)
70 all_known_ids: Set of all issue IDs that exist on disk. When
71 provided, references to issues in this set are silently
72 skipped (not warned) even if they are not in the graph.
74 Returns:
75 Constructed DependencyGraph
77 Example:
78 >>> issues = [issue_a, issue_b, issue_c]
79 >>> completed = {"FEAT-001"} # Already done
80 >>> graph = DependencyGraph.from_issues(issues, completed)
81 """
82 completed = completed_ids or set()
83 graph = cls()
85 # Index all issues by ID
86 for issue in issues:
87 graph.issues[issue.issue_id] = issue
88 graph.blocked_by[issue.issue_id] = set()
89 graph.blocks[issue.issue_id] = set()
91 # Build dependency edges
92 all_issue_ids = set(graph.issues.keys())
93 for issue in issues:
94 for blocker_id in issue.blocked_by:
95 # Skip completed blockers (already satisfied)
96 if blocker_id in completed:
97 continue
98 # Skip blockers not in the graph
99 if blocker_id not in all_issue_ids:
100 # Only warn if the issue truly doesn't exist on disk
101 if all_known_ids is None or blocker_id not in all_known_ids:
102 logger.warning(
103 f"Issue {issue.issue_id} blocked by unknown issue {blocker_id}"
104 )
105 continue
106 # Add bidirectional edge
107 graph.blocked_by[issue.issue_id].add(blocker_id)
108 graph.blocks[blocker_id].add(issue.issue_id)
110 return graph
112 def get_ready_issues(self, completed: set[str] | None = None) -> list[IssueInfo]:
113 """Return issues whose blockers are all completed.
115 An issue is "ready" if:
116 - It is not already completed
117 - All its blockers are either completed or not in the graph
119 Args:
120 completed: Set of completed issue IDs
122 Returns:
123 List of IssueInfo for issues with no active blockers,
124 sorted by priority (highest first) then issue_id
125 """
126 completed = completed or set()
127 ready = []
128 for issue_id, issue in self.issues.items():
129 if issue_id in completed:
130 continue
131 blockers = self.get_blocking_issues(issue_id, completed)
132 if not blockers:
133 ready.append(issue)
134 # Sort by priority then issue_id for consistent ordering
135 ready.sort(key=lambda x: (x.priority_int, x.issue_id))
136 return ready
138 def get_execution_waves(self, completed: set[str] | None = None) -> list[list[IssueInfo]]:
139 """Return issues grouped into parallel execution waves.
141 Wave 1: All issues with no blockers (or blockers already completed)
142 Wave 2: Issues whose blockers are all in wave 1
143 Wave N: Issues whose blockers are all in waves 1..N-1
145 This is similar to topological_sort but groups issues by "level"
146 rather than returning a flat list.
148 Args:
149 completed: Set of already-completed issue IDs
151 Returns:
152 List of waves, each wave is a list of issues that can run in parallel.
153 Empty list if graph is empty or all issues are completed.
155 Raises:
156 ValueError: If graph contains cycles (not a DAG)
158 Example:
159 If A blocks B and C, and B and C block D:
160 - Wave 1: [A]
161 - Wave 2: [B, C]
162 - Wave 3: [D]
163 """
164 completed = completed or set()
165 waves: list[list[IssueInfo]] = []
166 processed: set[str] = set(completed)
168 while True:
169 # Get issues ready to run (all blockers in processed set)
170 wave = self.get_ready_issues(completed=processed)
171 if not wave:
172 break
173 waves.append(wave)
174 # Mark this wave as processed for next iteration
175 for issue in wave:
176 processed.add(issue.issue_id)
178 # Check for cycles - if we have unprocessed issues, there's a cycle
179 remaining = set(self.issues.keys()) - processed
180 if remaining:
181 cycles = self.detect_cycles()
182 cycle_str = ", ".join(" -> ".join(cycle) for cycle in cycles)
183 raise ValueError(f"Dependency graph contains cycles: {cycle_str}")
185 return waves
187 def is_blocked(self, issue_id: str, completed: set[str] | None = None) -> bool:
188 """Check if an issue is still blocked.
190 Args:
191 issue_id: Issue ID to check
192 completed: Set of completed issue IDs
194 Returns:
195 True if issue has unresolved blockers, False otherwise
196 """
197 return bool(self.get_blocking_issues(issue_id, completed))
199 def get_blocking_issues(self, issue_id: str, completed: set[str] | None = None) -> set[str]:
200 """Return incomplete issues blocking this one.
202 Args:
203 issue_id: Issue ID to check
204 completed: Set of completed issue IDs
206 Returns:
207 Set of issue IDs still blocking this issue
208 """
209 completed = completed or set()
210 blockers = self.blocked_by.get(issue_id, set())
211 return blockers - completed
213 def get_blocked_by_issue(self, issue_id: str) -> set[str]:
214 """Return issues that this issue blocks.
216 Args:
217 issue_id: Issue ID to check
219 Returns:
220 Set of issue IDs that are blocked by this issue
221 """
222 return self.blocks.get(issue_id, set()).copy()
224 def topological_sort(self) -> list[IssueInfo]:
225 """Return issues in dependency order (Kahn's algorithm).
227 Issues with no dependencies come first, followed by issues
228 whose dependencies have been satisfied. Within each "level",
229 issues are sorted by priority then issue_id.
231 Returns:
232 List of IssueInfo in topological order
234 Raises:
235 ValueError: If graph contains cycles (not a DAG)
237 Example:
238 If A blocks B, and B blocks C, returns [A, B, C]
239 """
240 # Calculate in-degree for each node (number of blockers)
241 in_degree: dict[str, int] = {
242 issue_id: len(blockers) for issue_id, blockers in self.blocked_by.items()
243 }
245 # Start with nodes that have no blockers, sorted by priority
246 zero_degree = [
247 self.issues[issue_id] for issue_id, degree in in_degree.items() if degree == 0
248 ]
249 zero_degree.sort(key=lambda x: (x.priority_int, x.issue_id))
250 queue: deque[str] = deque(issue.issue_id for issue in zero_degree)
252 result: list[IssueInfo] = []
253 while queue:
254 issue_id = queue.popleft()
255 result.append(self.issues[issue_id])
257 # Reduce in-degree for nodes this one blocks
258 # Collect newly ready nodes, then sort before adding to queue
259 newly_ready: list[IssueInfo] = []
260 for blocked_id in self.blocks.get(issue_id, set()):
261 in_degree[blocked_id] -= 1
262 if in_degree[blocked_id] == 0:
263 newly_ready.append(self.issues[blocked_id])
265 # Sort newly ready by priority for consistent ordering
266 newly_ready.sort(key=lambda x: (x.priority_int, x.issue_id))
267 for issue in newly_ready:
268 queue.append(issue.issue_id)
270 # Check for cycles - if we didn't process all nodes, there's a cycle
271 if len(result) != len(self.issues):
272 cycles = self.detect_cycles()
273 cycle_str = ", ".join(" -> ".join(cycle) for cycle in cycles)
274 raise ValueError(f"Dependency graph contains cycles: {cycle_str}")
276 return result
278 def detect_cycles(self) -> list[list[str]]:
279 """Detect and return any dependency cycles.
281 Uses DFS with coloring to find back edges indicating cycles.
282 A cycle exists when we encounter a node that is currently being
283 visited (GRAY state) in the DFS traversal.
285 Returns:
286 List of cycles, each cycle is a list of issue IDs forming
287 a path from the cycle start back to itself.
288 Empty list if no cycles exist.
290 Example:
291 If A -> B -> C -> A (circular), returns [["A", "B", "C", "A"]]
292 """
293 WHITE, GRAY, BLACK = 0, 1, 2
294 color: dict[str, int] = dict.fromkeys(self.issues, WHITE)
295 cycles: list[list[str]] = []
296 path: list[str] = []
298 def dfs(node: str) -> None:
299 color[node] = GRAY
300 path.append(node)
302 # Traverse blockers (edges point from blocked to blocker)
303 for neighbor in self.blocked_by.get(node, set()):
304 if neighbor not in color:
305 continue
306 if color[neighbor] == GRAY:
307 # Found cycle - extract from path
308 cycle_start = path.index(neighbor)
309 cycle = path[cycle_start:] + [neighbor]
310 cycles.append(cycle)
311 elif color[neighbor] == WHITE:
312 dfs(neighbor)
314 path.pop()
315 color[node] = BLACK
317 for issue_id in self.issues:
318 if color[issue_id] == WHITE:
319 dfs(issue_id)
321 return cycles
323 def has_cycles(self) -> bool:
324 """Check if the graph contains any cycles.
326 Returns:
327 True if cycles exist, False otherwise
328 """
329 return len(self.detect_cycles()) > 0
331 def __len__(self) -> int:
332 """Return number of issues in the graph."""
333 return len(self.issues)
335 def __contains__(self, issue_id: str) -> bool:
336 """Check if an issue is in the graph."""
337 return issue_id in self.issues
340def refine_waves_for_contention(
341 waves: list[list[IssueInfo]],
342 *,
343 config: DependencyMappingConfig | None = None,
344) -> tuple[list[list[IssueInfo]], list[WaveContentionNote | None]]:
345 """Refine execution waves by splitting on file overlap.
347 For each wave with multiple issues, extracts file hints from issue
348 content and checks for pairwise overlaps. Overlapping issues are
349 split into sub-waves using greedy graph coloring so no two issues
350 in the same sub-wave modify the same files.
352 Args:
353 waves: Execution waves from get_execution_waves()
355 Returns:
356 Tuple of (refined_waves, contention_notes).
357 refined_waves: Wave list with contention-free sub-waves.
358 contention_notes: Parallel list (same length as refined_waves).
359 None for waves that weren't split, WaveContentionNote for sub-waves.
360 """
361 from little_loops.parallel.file_hints import FileHints, extract_file_hints
363 refined: list[list[IssueInfo]] = []
364 annotations: list[WaveContentionNote | None] = []
366 for orig_idx, wave in enumerate(waves):
367 if len(wave) <= 1:
368 refined.append(wave)
369 annotations.append(None)
370 continue
372 # Extract file hints for each issue in the wave
373 hints: dict[str, FileHints] = {}
374 for issue in wave:
375 content = issue.path.read_text() if issue.path.exists() else ""
376 hints[issue.issue_id] = extract_file_hints(content, issue.issue_id)
378 # Build conflict adjacency: issue_id -> set of conflicting issue_ids
379 conflicts: dict[str, set[str]] = {issue.issue_id: set() for issue in wave}
380 for i, a in enumerate(wave):
381 for b in wave[i + 1 :]:
382 if hints[a.issue_id].overlaps_with(hints[b.issue_id], config=config):
383 conflicts[a.issue_id].add(b.issue_id)
384 conflicts[b.issue_id].add(a.issue_id)
386 # If no conflicts, keep wave as-is
387 if not any(conflicts.values()):
388 refined.append(wave)
389 annotations.append(None)
390 continue
392 # Collect all contended paths for display
393 contended: set[str] = set()
394 for i, a in enumerate(wave):
395 for b in wave[i + 1 :]:
396 contended.update(
397 hints[a.issue_id].get_overlapping_paths(hints[b.issue_id], config=config)
398 )
399 contended_paths = sorted(contended)
401 # Greedy graph coloring — assign each issue the lowest color
402 # not used by any conflicting neighbor
403 color: dict[str, int] = {}
404 for issue in wave: # iterate in priority order (preserved from get_ready_issues)
405 used_colors = {color[c] for c in conflicts[issue.issue_id] if c in color}
406 c = 0
407 while c in used_colors:
408 c += 1
409 color[issue.issue_id] = c
411 # Group issues by color into sub-waves, preserving priority order
412 max_color = max(color.values())
413 total_sub_waves = max_color + 1
414 for c in range(total_sub_waves):
415 sub_wave = [issue for issue in wave if color[issue.issue_id] == c]
416 if sub_wave:
417 refined.append(sub_wave)
418 annotations.append(
419 WaveContentionNote(
420 contended_paths=contended_paths,
421 sub_wave_index=c,
422 total_sub_waves=total_sub_waves,
423 parent_wave_index=orig_idx,
424 )
425 )
427 logger.info(f" Wave split into {total_sub_waves} sub-waves due to file overlap")
429 return refined, annotations