Coverage for little_loops / dependency_mapper / analysis.py: 0%

173 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""Dependency analysis functions. 

2 

3Functions for computing conflict scores, finding file overlaps, 

4validating dependency references, and orchestrating full dependency analysis. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10import re 

11from typing import TYPE_CHECKING 

12 

13from little_loops.dependency_graph import DependencyGraph 

14from little_loops.dependency_mapper.models import ( 

15 DependencyProposal, 

16 DependencyReport, 

17 ParallelSafePair, 

18 ValidationResult, 

19) 

20from little_loops.text_utils import extract_file_paths 

21 

22if TYPE_CHECKING: 

23 from little_loops.config import DependencyMappingConfig 

24 from little_loops.issue_parser import IssueInfo 

25 

26logger = logging.getLogger(__name__) 

27 

28_CODE_FENCE = re.compile(r"```[\s\S]*?```", re.MULTILINE) 

29 

30# Semantic target extraction patterns 

31_PASCAL_CASE = re.compile(r"\b([A-Z][a-z]+(?:[A-Z][a-z]+)+)\b") 

32_FUNCTION_REF = re.compile(r"`(\w+)\(\)`") 

33_COMPONENT_SCOPE = re.compile( 

34 r"(?:component|module|class|widget|section)[:\s]+[`\"']?([a-zA-Z0-9_./\-]{3,})[`\"']?", 

35 re.IGNORECASE, 

36) 

37 

38# UI region / section keywords mapped to canonical names 

39_SECTION_KEYWORDS: dict[str, frozenset[str]] = { 

40 "header": frozenset({"header", "navbar", "toolbar", "top bar"}), 

41 "body": frozenset({"droppable"}), 

42 "footer": frozenset({"footer", "status bar", "action bar"}), 

43 "sidebar": frozenset({"sidebar", "side panel", "drawer"}), 

44 "card": frozenset({"card", "tile"}), 

45 "modal": frozenset({"modal", "dialog", "popup", "overlay"}), 

46 "form": frozenset({"form"}), 

47} 

48 

49# Modification type classification keywords 

50_MODIFICATION_TYPES: dict[str, frozenset[str]] = { 

51 "structural": frozenset( 

52 { 

53 "extract", 

54 "split", 

55 "refactor", 

56 "restructure", 

57 "reorganize", 

58 "create new component", 

59 "break out", 

60 "separate", 

61 "decompose", 

62 } 

63 ), 

64 "infrastructure": frozenset( 

65 { 

66 "listener", 

67 "provider", 

68 "state management", 

69 "routing", 

70 "middleware", 

71 "dragging", 

72 "drag", 

73 "drop", 

74 "dnd", 

75 } 

76 ), 

77 "enhancement": frozenset( 

78 { 

79 "add button", 

80 "add field", 

81 "add column", 

82 "add stats", 

83 "add icon", 

84 "add toggle", 

85 "empty state", 

86 "placeholder", 

87 "tooltip", 

88 "badge", 

89 } 

90 ), 

91} 

92 

93 

94def _basename(path: str) -> str: 

95 """Extract the basename from a file path.""" 

96 return path.rsplit("/", 1)[-1] if "/" in path else path 

97 

98 

99def _extract_semantic_targets(content: str) -> set[str]: 

100 """Extract component and function references from issue content. 

101 

102 Identifies PascalCase component names, function references, 

103 and explicitly mentioned component/module scopes. 

104 

105 Args: 

106 content: Issue file content 

107 

108 Returns: 

109 Set of normalized semantic target names 

110 """ 

111 if not content: 

112 return set() 

113 

114 stripped = _CODE_FENCE.sub("", content) 

115 targets: set[str] = set() 

116 

117 for match in _PASCAL_CASE.finditer(stripped): 

118 targets.add(match.group(1).lower()) 

119 

120 for match in _FUNCTION_REF.finditer(stripped): 

121 targets.add(match.group(1).lower()) 

122 

123 for match in _COMPONENT_SCOPE.finditer(stripped): 

124 targets.add(match.group(1).lower()) 

125 

126 return targets 

127 

128 

129def _extract_section_mentions(content: str) -> set[str]: 

130 """Extract UI region/section references from issue content. 

131 

132 Maps keywords like "header", "body", "sidebar" to canonical 

133 section names using word-boundary matching. 

134 

135 Args: 

136 content: Issue file content 

137 

138 Returns: 

139 Set of canonical section names mentioned 

140 """ 

141 if not content: 

142 return set() 

143 

144 content_lower = content.lower() 

145 sections: set[str] = set() 

146 

147 for section_name, keywords in _SECTION_KEYWORDS.items(): 

148 for keyword in keywords: 

149 # Use word boundary for single words, substring for multi-word phrases 

150 if " " in keyword: 

151 if keyword in content_lower: 

152 sections.add(section_name) 

153 break 

154 else: 

155 if re.search(rf"\b{re.escape(keyword)}\b", content_lower): 

156 sections.add(section_name) 

157 break 

158 

159 return sections 

160 

161 

162def _classify_modification_type(content: str) -> str: 

163 """Classify the modification type of an issue. 

164 

165 Returns one of: "structural", "infrastructure", "enhancement". 

166 Falls back to "enhancement" if no clear match. 

167 

168 Args: 

169 content: Issue file content 

170 

171 Returns: 

172 Modification type classification string 

173 """ 

174 if not content: 

175 return "enhancement" 

176 

177 content_lower = content.lower() 

178 

179 for mod_type in ("structural", "infrastructure", "enhancement"): 

180 keywords = _MODIFICATION_TYPES[mod_type] 

181 for keyword in keywords: 

182 if keyword in content_lower: 

183 return mod_type 

184 

185 return "enhancement" 

186 

187 

188def compute_conflict_score( 

189 content_a: str, 

190 content_b: str, 

191 *, 

192 config: DependencyMappingConfig | None = None, 

193) -> float: 

194 """Compute semantic conflict score between two issues. 

195 

196 Combines three signals with configurable weights: 

197 - Semantic target overlap (component/function names) 

198 - Section mention overlap (UI regions) 

199 - Modification type match 

200 

201 Args: 

202 content_a: First issue's file content 

203 content_b: Second issue's file content 

204 config: Optional dependency mapping config for custom scoring weights. 

205 Falls back to default weights (0.5/0.3/0.2) when not provided. 

206 

207 Returns: 

208 Conflict score from 0.0 (parallel-safe) to 1.0 (definite conflict) 

209 """ 

210 targets_a = _extract_semantic_targets(content_a) 

211 targets_b = _extract_semantic_targets(content_b) 

212 

213 sections_a = _extract_section_mentions(content_a) 

214 sections_b = _extract_section_mentions(content_b) 

215 

216 type_a = _classify_modification_type(content_a) 

217 type_b = _classify_modification_type(content_b) 

218 

219 # Resolve scoring weights from config or defaults 

220 w_semantic = config.scoring_weights.semantic if config else 0.5 

221 w_section = config.scoring_weights.section if config else 0.3 

222 w_type = config.scoring_weights.type if config else 0.2 

223 

224 # Signal 1: Semantic target overlap (0.0 - 1.0) 

225 if targets_a and targets_b: 

226 target_union = len(targets_a | targets_b) 

227 target_score = len(targets_a & targets_b) / target_union if target_union > 0 else 0.0 

228 else: 

229 target_score = 0.0 # Unknown — default to no conflict 

230 

231 # Signal 2: Section overlap (0.0 or 1.0) 

232 if sections_a and sections_b: 

233 section_score = 1.0 if sections_a & sections_b else 0.0 

234 else: 

235 section_score = 0.0 # Unknown — default to no conflict 

236 

237 # Signal 3: Modification type match (0.0 or 1.0) 

238 type_score = 1.0 if type_a == type_b else 0.0 

239 

240 return round(target_score * w_semantic + section_score * w_section + type_score * w_type, 2) 

241 

242 

243def find_file_overlaps( 

244 issues: list[IssueInfo], 

245 issue_contents: dict[str, str], 

246 *, 

247 config: DependencyMappingConfig | None = None, 

248) -> tuple[list[DependencyProposal], list[ParallelSafePair]]: 

249 """Find issues that reference overlapping files and propose dependencies. 

250 

251 For each pair of issues where both reference the same file(s), computes 

252 a semantic conflict score. High-conflict pairs get dependency proposals; 

253 low-conflict pairs are reported as parallel-safe. 

254 

255 Pairs that already have a dependency relationship are skipped. 

256 

257 Args: 

258 issues: List of parsed issue objects 

259 issue_contents: Mapping from issue_id to file content 

260 config: Optional dependency mapping config for custom thresholds. 

261 Falls back to hardcoded defaults when not provided. 

262 

263 Returns: 

264 Tuple of (proposed dependencies, parallel-safe pairs) 

265 """ 

266 # Build existing dependency set for skip check 

267 existing_deps: set[tuple[str, str]] = set() 

268 for issue in issues: 

269 for blocker_id in issue.blocked_by: 

270 existing_deps.add((issue.issue_id, blocker_id)) 

271 

272 # Resolve overlap thresholds from config or defaults 

273 min_files = config.overlap_min_files if config else 2 

274 min_ratio = config.overlap_min_ratio if config else 0.25 

275 exclude_files = ( 

276 frozenset(config.exclude_common_files) 

277 if config 

278 else frozenset( 

279 { 

280 "__init__.py", 

281 "pyproject.toml", 

282 "setup.py", 

283 "setup.cfg", 

284 "CHANGELOG.md", 

285 "README.md", 

286 "conftest.py", 

287 } 

288 ) 

289 ) 

290 

291 # Extract file paths per issue, filtering common infrastructure files 

292 issue_paths: dict[str, set[str]] = {} 

293 for issue in issues: 

294 content = issue_contents.get(issue.issue_id, "") 

295 paths = extract_file_paths(content) 

296 if paths: 

297 filtered = {p for p in paths if _basename(p) not in exclude_files} 

298 if filtered: 

299 issue_paths[issue.issue_id] = filtered 

300 

301 proposals: list[DependencyProposal] = [] 

302 parallel_safe: list[ParallelSafePair] = [] 

303 issue_ids = sorted(issue_paths.keys()) 

304 

305 _type_order = {"structural": 0, "infrastructure": 1, "enhancement": 2} 

306 

307 for i, id_a in enumerate(issue_ids): 

308 for id_b in issue_ids[i + 1 :]: 

309 overlap = issue_paths[id_a] & issue_paths[id_b] 

310 if not overlap: 

311 continue 

312 

313 # Apply minimum overlap guards (matching FileHints.overlaps_with) 

314 smaller_set = min(len(issue_paths[id_a]), len(issue_paths[id_b])) 

315 ratio = len(overlap) / smaller_set if smaller_set > 0 else 0.0 

316 if len(overlap) < min_files and ratio < min_ratio: 

317 continue 

318 

319 # Skip if dependency already exists (in either direction) 

320 if (id_a, id_b) in existing_deps or (id_b, id_a) in existing_deps: 

321 continue 

322 

323 content_a = issue_contents.get(id_a, "") 

324 content_b = issue_contents.get(id_b, "") 

325 conflict = compute_conflict_score(content_a, content_b, config=config) 

326 

327 overlap_list = sorted(overlap) 

328 

329 # Resolve conflict threshold from config or default 

330 conflict_threshold = config.conflict_threshold if config else 0.4 

331 

332 # Low-conflict pairs are parallel-safe 

333 if conflict < conflict_threshold: 

334 sections_a = _extract_section_mentions(content_a) 

335 sections_b = _extract_section_mentions(content_b) 

336 if sections_a and sections_b: 

337 reason = ( 

338 f"Different sections ({', '.join(sorted(sections_a))}" 

339 f" vs {', '.join(sorted(sections_b))})" 

340 ) 

341 else: 

342 reason = "Low semantic conflict score" 

343 parallel_safe.append( 

344 ParallelSafePair( 

345 issue_a=id_a, 

346 issue_b=id_b, 

347 shared_files=overlap_list, 

348 conflict_score=conflict, 

349 reason=reason, 

350 ) 

351 ) 

352 continue 

353 

354 # Determine direction for high-conflict pairs 

355 issue_a = next(iss for iss in issues if iss.issue_id == id_a) 

356 issue_b = next(iss for iss in issues if iss.issue_id == id_b) 

357 

358 confidence_modifier = 1.0 

359 

360 if issue_a.priority_int != issue_b.priority_int: 

361 # Different priorities: higher priority blocks lower 

362 if issue_a.priority_int < issue_b.priority_int: 

363 target_id, source_id = id_a, id_b 

364 else: 

365 target_id, source_id = id_b, id_a 

366 else: 

367 # Same priority: use modification type ordering 

368 type_a = _classify_modification_type(content_a) 

369 type_b = _classify_modification_type(content_b) 

370 order_a = _type_order.get(type_a, 2) 

371 order_b = _type_order.get(type_b, 2) 

372 

373 if order_a != order_b: 

374 if order_a < order_b: 

375 target_id, source_id = id_a, id_b 

376 else: 

377 target_id, source_id = id_b, id_a 

378 else: 

379 # Fall back to ID ordering with reduced confidence 

380 if id_a < id_b: 

381 target_id, source_id = id_a, id_b 

382 else: 

383 target_id, source_id = id_b, id_a 

384 confidence_modifier = config.confidence_modifier if config else 0.5 

385 

386 min_paths = min(len(issue_paths[id_a]), len(issue_paths[id_b])) 

387 confidence = len(overlap) / min_paths if min_paths > 0 else 0.0 

388 confidence *= confidence_modifier 

389 

390 rationale = ( 

391 f"{source_id} and {target_id} both reference " 

392 f"{', '.join(overlap_list[:3])}" 

393 f"{' and more' if len(overlap_list) > 3 else ''}. " 

394 f"{target_id} has higher priority and should be completed first." 

395 ) 

396 

397 proposals.append( 

398 DependencyProposal( 

399 source_id=source_id, 

400 target_id=target_id, 

401 reason="file_overlap", 

402 confidence=round(confidence, 2), 

403 rationale=rationale, 

404 overlapping_files=overlap_list, 

405 conflict_score=conflict, 

406 ) 

407 ) 

408 

409 # Sort by confidence descending 

410 proposals.sort(key=lambda p: -p.confidence) 

411 return proposals, parallel_safe 

412 

413 

414def validate_dependencies( 

415 issues: list[IssueInfo], 

416 completed_ids: set[str] | None = None, 

417 all_known_ids: set[str] | None = None, 

418) -> ValidationResult: 

419 """Validate existing dependency references for integrity. 

420 

421 Checks: 

422 - Broken refs: blocked_by entries referencing nonexistent issues 

423 - Missing backlinks: A blocks B but B doesn't list A in blocked_by 

424 - Cycles: circular dependency chains 

425 - Stale completed refs: blocked_by entries referencing completed issues 

426 

427 Args: 

428 issues: List of parsed issue objects 

429 completed_ids: Set of completed issue IDs 

430 all_known_ids: Set of all issue IDs that exist on disk (across all 

431 categories and completed). When provided, references to issues 

432 in this set are not flagged as broken even if they are not in 

433 the working ``issues`` list. 

434 

435 Returns: 

436 ValidationResult with all detected problems 

437 """ 

438 completed = completed_ids or set() 

439 result = ValidationResult() 

440 

441 active_ids = {issue.issue_id for issue in issues} 

442 all_known = active_ids | completed 

443 if all_known_ids: 

444 all_known = all_known | all_known_ids 

445 

446 # Build lookup maps 

447 blocked_by_map: dict[str, set[str]] = {} 

448 blocks_map: dict[str, set[str]] = {} 

449 for issue in issues: 

450 blocked_by_map[issue.issue_id] = set(issue.blocked_by) 

451 blocks_map[issue.issue_id] = set(issue.blocks) 

452 

453 for issue in issues: 

454 for ref_id in issue.blocked_by: 

455 if ref_id not in all_known: 

456 result.broken_refs.append((issue.issue_id, ref_id)) 

457 elif ref_id in completed: 

458 result.stale_completed_refs.append((issue.issue_id, ref_id)) 

459 

460 # Check backlinks: if A.blocked_by contains B, then B.blocks should contain A 

461 for ref_id in issue.blocked_by: 

462 if ref_id in active_ids: 

463 target_blocks = blocks_map.get(ref_id, set()) 

464 if issue.issue_id not in target_blocks: 

465 result.missing_backlinks.append((issue.issue_id, ref_id)) 

466 

467 # Cycle detection using DependencyGraph 

468 graph = DependencyGraph.from_issues(issues, completed, all_known_ids=all_known_ids) 

469 result.cycles = graph.detect_cycles() 

470 

471 return result 

472 

473 

474def analyze_dependencies( 

475 issues: list[IssueInfo], 

476 issue_contents: dict[str, str], 

477 completed_ids: set[str] | None = None, 

478 all_known_ids: set[str] | None = None, 

479 *, 

480 config: DependencyMappingConfig | None = None, 

481) -> DependencyReport: 

482 """Run full dependency analysis: discovery and validation. 

483 

484 Args: 

485 issues: List of parsed issue objects 

486 issue_contents: Mapping from issue_id to file content 

487 completed_ids: Set of completed issue IDs 

488 all_known_ids: Set of all issue IDs that exist on disk 

489 config: Optional dependency mapping config for custom thresholds. 

490 

491 Returns: 

492 Comprehensive dependency report 

493 """ 

494 proposals, parallel_safe = find_file_overlaps(issues, issue_contents, config=config) 

495 validation = validate_dependencies(issues, completed_ids, all_known_ids) 

496 

497 existing_dep_count = sum(len(issue.blocked_by) for issue in issues) 

498 

499 return DependencyReport( 

500 proposals=proposals, 

501 parallel_safe=parallel_safe, 

502 validation=validation, 

503 issue_count=len(issues), 

504 existing_dep_count=existing_dep_count, 

505 )