Coverage for little_loops / dependency_mapper / analysis.py: 0%
173 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""Dependency analysis functions.
3Functions for computing conflict scores, finding file overlaps,
4validating dependency references, and orchestrating full dependency analysis.
5"""
7from __future__ import annotations
9import logging
10import re
11from typing import TYPE_CHECKING
13from little_loops.dependency_graph import DependencyGraph
14from little_loops.dependency_mapper.models import (
15 DependencyProposal,
16 DependencyReport,
17 ParallelSafePair,
18 ValidationResult,
19)
20from little_loops.text_utils import extract_file_paths
22if TYPE_CHECKING:
23 from little_loops.config import DependencyMappingConfig
24 from little_loops.issue_parser import IssueInfo
26logger = logging.getLogger(__name__)
28_CODE_FENCE = re.compile(r"```[\s\S]*?```", re.MULTILINE)
30# Semantic target extraction patterns
31_PASCAL_CASE = re.compile(r"\b([A-Z][a-z]+(?:[A-Z][a-z]+)+)\b")
32_FUNCTION_REF = re.compile(r"`(\w+)\(\)`")
33_COMPONENT_SCOPE = re.compile(
34 r"(?:component|module|class|widget|section)[:\s]+[`\"']?([a-zA-Z0-9_./\-]{3,})[`\"']?",
35 re.IGNORECASE,
36)
38# UI region / section keywords mapped to canonical names
39_SECTION_KEYWORDS: dict[str, frozenset[str]] = {
40 "header": frozenset({"header", "navbar", "toolbar", "top bar"}),
41 "body": frozenset({"droppable"}),
42 "footer": frozenset({"footer", "status bar", "action bar"}),
43 "sidebar": frozenset({"sidebar", "side panel", "drawer"}),
44 "card": frozenset({"card", "tile"}),
45 "modal": frozenset({"modal", "dialog", "popup", "overlay"}),
46 "form": frozenset({"form"}),
47}
49# Modification type classification keywords
50_MODIFICATION_TYPES: dict[str, frozenset[str]] = {
51 "structural": frozenset(
52 {
53 "extract",
54 "split",
55 "refactor",
56 "restructure",
57 "reorganize",
58 "create new component",
59 "break out",
60 "separate",
61 "decompose",
62 }
63 ),
64 "infrastructure": frozenset(
65 {
66 "listener",
67 "provider",
68 "state management",
69 "routing",
70 "middleware",
71 "dragging",
72 "drag",
73 "drop",
74 "dnd",
75 }
76 ),
77 "enhancement": frozenset(
78 {
79 "add button",
80 "add field",
81 "add column",
82 "add stats",
83 "add icon",
84 "add toggle",
85 "empty state",
86 "placeholder",
87 "tooltip",
88 "badge",
89 }
90 ),
91}
94def _basename(path: str) -> str:
95 """Extract the basename from a file path."""
96 return path.rsplit("/", 1)[-1] if "/" in path else path
99def _extract_semantic_targets(content: str) -> set[str]:
100 """Extract component and function references from issue content.
102 Identifies PascalCase component names, function references,
103 and explicitly mentioned component/module scopes.
105 Args:
106 content: Issue file content
108 Returns:
109 Set of normalized semantic target names
110 """
111 if not content:
112 return set()
114 stripped = _CODE_FENCE.sub("", content)
115 targets: set[str] = set()
117 for match in _PASCAL_CASE.finditer(stripped):
118 targets.add(match.group(1).lower())
120 for match in _FUNCTION_REF.finditer(stripped):
121 targets.add(match.group(1).lower())
123 for match in _COMPONENT_SCOPE.finditer(stripped):
124 targets.add(match.group(1).lower())
126 return targets
129def _extract_section_mentions(content: str) -> set[str]:
130 """Extract UI region/section references from issue content.
132 Maps keywords like "header", "body", "sidebar" to canonical
133 section names using word-boundary matching.
135 Args:
136 content: Issue file content
138 Returns:
139 Set of canonical section names mentioned
140 """
141 if not content:
142 return set()
144 content_lower = content.lower()
145 sections: set[str] = set()
147 for section_name, keywords in _SECTION_KEYWORDS.items():
148 for keyword in keywords:
149 # Use word boundary for single words, substring for multi-word phrases
150 if " " in keyword:
151 if keyword in content_lower:
152 sections.add(section_name)
153 break
154 else:
155 if re.search(rf"\b{re.escape(keyword)}\b", content_lower):
156 sections.add(section_name)
157 break
159 return sections
162def _classify_modification_type(content: str) -> str:
163 """Classify the modification type of an issue.
165 Returns one of: "structural", "infrastructure", "enhancement".
166 Falls back to "enhancement" if no clear match.
168 Args:
169 content: Issue file content
171 Returns:
172 Modification type classification string
173 """
174 if not content:
175 return "enhancement"
177 content_lower = content.lower()
179 for mod_type in ("structural", "infrastructure", "enhancement"):
180 keywords = _MODIFICATION_TYPES[mod_type]
181 for keyword in keywords:
182 if keyword in content_lower:
183 return mod_type
185 return "enhancement"
188def compute_conflict_score(
189 content_a: str,
190 content_b: str,
191 *,
192 config: DependencyMappingConfig | None = None,
193) -> float:
194 """Compute semantic conflict score between two issues.
196 Combines three signals with configurable weights:
197 - Semantic target overlap (component/function names)
198 - Section mention overlap (UI regions)
199 - Modification type match
201 Args:
202 content_a: First issue's file content
203 content_b: Second issue's file content
204 config: Optional dependency mapping config for custom scoring weights.
205 Falls back to default weights (0.5/0.3/0.2) when not provided.
207 Returns:
208 Conflict score from 0.0 (parallel-safe) to 1.0 (definite conflict)
209 """
210 targets_a = _extract_semantic_targets(content_a)
211 targets_b = _extract_semantic_targets(content_b)
213 sections_a = _extract_section_mentions(content_a)
214 sections_b = _extract_section_mentions(content_b)
216 type_a = _classify_modification_type(content_a)
217 type_b = _classify_modification_type(content_b)
219 # Resolve scoring weights from config or defaults
220 w_semantic = config.scoring_weights.semantic if config else 0.5
221 w_section = config.scoring_weights.section if config else 0.3
222 w_type = config.scoring_weights.type if config else 0.2
224 # Signal 1: Semantic target overlap (0.0 - 1.0)
225 if targets_a and targets_b:
226 target_union = len(targets_a | targets_b)
227 target_score = len(targets_a & targets_b) / target_union if target_union > 0 else 0.0
228 else:
229 target_score = 0.0 # Unknown — default to no conflict
231 # Signal 2: Section overlap (0.0 or 1.0)
232 if sections_a and sections_b:
233 section_score = 1.0 if sections_a & sections_b else 0.0
234 else:
235 section_score = 0.0 # Unknown — default to no conflict
237 # Signal 3: Modification type match (0.0 or 1.0)
238 type_score = 1.0 if type_a == type_b else 0.0
240 return round(target_score * w_semantic + section_score * w_section + type_score * w_type, 2)
243def find_file_overlaps(
244 issues: list[IssueInfo],
245 issue_contents: dict[str, str],
246 *,
247 config: DependencyMappingConfig | None = None,
248) -> tuple[list[DependencyProposal], list[ParallelSafePair]]:
249 """Find issues that reference overlapping files and propose dependencies.
251 For each pair of issues where both reference the same file(s), computes
252 a semantic conflict score. High-conflict pairs get dependency proposals;
253 low-conflict pairs are reported as parallel-safe.
255 Pairs that already have a dependency relationship are skipped.
257 Args:
258 issues: List of parsed issue objects
259 issue_contents: Mapping from issue_id to file content
260 config: Optional dependency mapping config for custom thresholds.
261 Falls back to hardcoded defaults when not provided.
263 Returns:
264 Tuple of (proposed dependencies, parallel-safe pairs)
265 """
266 # Build existing dependency set for skip check
267 existing_deps: set[tuple[str, str]] = set()
268 for issue in issues:
269 for blocker_id in issue.blocked_by:
270 existing_deps.add((issue.issue_id, blocker_id))
272 # Resolve overlap thresholds from config or defaults
273 min_files = config.overlap_min_files if config else 2
274 min_ratio = config.overlap_min_ratio if config else 0.25
275 exclude_files = (
276 frozenset(config.exclude_common_files)
277 if config
278 else frozenset(
279 {
280 "__init__.py",
281 "pyproject.toml",
282 "setup.py",
283 "setup.cfg",
284 "CHANGELOG.md",
285 "README.md",
286 "conftest.py",
287 }
288 )
289 )
291 # Extract file paths per issue, filtering common infrastructure files
292 issue_paths: dict[str, set[str]] = {}
293 for issue in issues:
294 content = issue_contents.get(issue.issue_id, "")
295 paths = extract_file_paths(content)
296 if paths:
297 filtered = {p for p in paths if _basename(p) not in exclude_files}
298 if filtered:
299 issue_paths[issue.issue_id] = filtered
301 proposals: list[DependencyProposal] = []
302 parallel_safe: list[ParallelSafePair] = []
303 issue_ids = sorted(issue_paths.keys())
305 _type_order = {"structural": 0, "infrastructure": 1, "enhancement": 2}
307 for i, id_a in enumerate(issue_ids):
308 for id_b in issue_ids[i + 1 :]:
309 overlap = issue_paths[id_a] & issue_paths[id_b]
310 if not overlap:
311 continue
313 # Apply minimum overlap guards (matching FileHints.overlaps_with)
314 smaller_set = min(len(issue_paths[id_a]), len(issue_paths[id_b]))
315 ratio = len(overlap) / smaller_set if smaller_set > 0 else 0.0
316 if len(overlap) < min_files and ratio < min_ratio:
317 continue
319 # Skip if dependency already exists (in either direction)
320 if (id_a, id_b) in existing_deps or (id_b, id_a) in existing_deps:
321 continue
323 content_a = issue_contents.get(id_a, "")
324 content_b = issue_contents.get(id_b, "")
325 conflict = compute_conflict_score(content_a, content_b, config=config)
327 overlap_list = sorted(overlap)
329 # Resolve conflict threshold from config or default
330 conflict_threshold = config.conflict_threshold if config else 0.4
332 # Low-conflict pairs are parallel-safe
333 if conflict < conflict_threshold:
334 sections_a = _extract_section_mentions(content_a)
335 sections_b = _extract_section_mentions(content_b)
336 if sections_a and sections_b:
337 reason = (
338 f"Different sections ({', '.join(sorted(sections_a))}"
339 f" vs {', '.join(sorted(sections_b))})"
340 )
341 else:
342 reason = "Low semantic conflict score"
343 parallel_safe.append(
344 ParallelSafePair(
345 issue_a=id_a,
346 issue_b=id_b,
347 shared_files=overlap_list,
348 conflict_score=conflict,
349 reason=reason,
350 )
351 )
352 continue
354 # Determine direction for high-conflict pairs
355 issue_a = next(iss for iss in issues if iss.issue_id == id_a)
356 issue_b = next(iss for iss in issues if iss.issue_id == id_b)
358 confidence_modifier = 1.0
360 if issue_a.priority_int != issue_b.priority_int:
361 # Different priorities: higher priority blocks lower
362 if issue_a.priority_int < issue_b.priority_int:
363 target_id, source_id = id_a, id_b
364 else:
365 target_id, source_id = id_b, id_a
366 else:
367 # Same priority: use modification type ordering
368 type_a = _classify_modification_type(content_a)
369 type_b = _classify_modification_type(content_b)
370 order_a = _type_order.get(type_a, 2)
371 order_b = _type_order.get(type_b, 2)
373 if order_a != order_b:
374 if order_a < order_b:
375 target_id, source_id = id_a, id_b
376 else:
377 target_id, source_id = id_b, id_a
378 else:
379 # Fall back to ID ordering with reduced confidence
380 if id_a < id_b:
381 target_id, source_id = id_a, id_b
382 else:
383 target_id, source_id = id_b, id_a
384 confidence_modifier = config.confidence_modifier if config else 0.5
386 min_paths = min(len(issue_paths[id_a]), len(issue_paths[id_b]))
387 confidence = len(overlap) / min_paths if min_paths > 0 else 0.0
388 confidence *= confidence_modifier
390 rationale = (
391 f"{source_id} and {target_id} both reference "
392 f"{', '.join(overlap_list[:3])}"
393 f"{' and more' if len(overlap_list) > 3 else ''}. "
394 f"{target_id} has higher priority and should be completed first."
395 )
397 proposals.append(
398 DependencyProposal(
399 source_id=source_id,
400 target_id=target_id,
401 reason="file_overlap",
402 confidence=round(confidence, 2),
403 rationale=rationale,
404 overlapping_files=overlap_list,
405 conflict_score=conflict,
406 )
407 )
409 # Sort by confidence descending
410 proposals.sort(key=lambda p: -p.confidence)
411 return proposals, parallel_safe
414def validate_dependencies(
415 issues: list[IssueInfo],
416 completed_ids: set[str] | None = None,
417 all_known_ids: set[str] | None = None,
418) -> ValidationResult:
419 """Validate existing dependency references for integrity.
421 Checks:
422 - Broken refs: blocked_by entries referencing nonexistent issues
423 - Missing backlinks: A blocks B but B doesn't list A in blocked_by
424 - Cycles: circular dependency chains
425 - Stale completed refs: blocked_by entries referencing completed issues
427 Args:
428 issues: List of parsed issue objects
429 completed_ids: Set of completed issue IDs
430 all_known_ids: Set of all issue IDs that exist on disk (across all
431 categories and completed). When provided, references to issues
432 in this set are not flagged as broken even if they are not in
433 the working ``issues`` list.
435 Returns:
436 ValidationResult with all detected problems
437 """
438 completed = completed_ids or set()
439 result = ValidationResult()
441 active_ids = {issue.issue_id for issue in issues}
442 all_known = active_ids | completed
443 if all_known_ids:
444 all_known = all_known | all_known_ids
446 # Build lookup maps
447 blocked_by_map: dict[str, set[str]] = {}
448 blocks_map: dict[str, set[str]] = {}
449 for issue in issues:
450 blocked_by_map[issue.issue_id] = set(issue.blocked_by)
451 blocks_map[issue.issue_id] = set(issue.blocks)
453 for issue in issues:
454 for ref_id in issue.blocked_by:
455 if ref_id not in all_known:
456 result.broken_refs.append((issue.issue_id, ref_id))
457 elif ref_id in completed:
458 result.stale_completed_refs.append((issue.issue_id, ref_id))
460 # Check backlinks: if A.blocked_by contains B, then B.blocks should contain A
461 for ref_id in issue.blocked_by:
462 if ref_id in active_ids:
463 target_blocks = blocks_map.get(ref_id, set())
464 if issue.issue_id not in target_blocks:
465 result.missing_backlinks.append((issue.issue_id, ref_id))
467 # Cycle detection using DependencyGraph
468 graph = DependencyGraph.from_issues(issues, completed, all_known_ids=all_known_ids)
469 result.cycles = graph.detect_cycles()
471 return result
474def analyze_dependencies(
475 issues: list[IssueInfo],
476 issue_contents: dict[str, str],
477 completed_ids: set[str] | None = None,
478 all_known_ids: set[str] | None = None,
479 *,
480 config: DependencyMappingConfig | None = None,
481) -> DependencyReport:
482 """Run full dependency analysis: discovery and validation.
484 Args:
485 issues: List of parsed issue objects
486 issue_contents: Mapping from issue_id to file content
487 completed_ids: Set of completed issue IDs
488 all_known_ids: Set of all issue IDs that exist on disk
489 config: Optional dependency mapping config for custom thresholds.
491 Returns:
492 Comprehensive dependency report
493 """
494 proposals, parallel_safe = find_file_overlaps(issues, issue_contents, config=config)
495 validation = validate_dependencies(issues, completed_ids, all_known_ids)
497 existing_dep_count = sum(len(issue.blocked_by) for issue in issues)
499 return DependencyReport(
500 proposals=proposals,
501 parallel_safe=parallel_safe,
502 validation=validation,
503 issue_count=len(issues),
504 existing_dep_count=existing_dep_count,
505 )