Coverage for little_loops / issue_history / debt.py: 0%

187 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""Issue history technical debt analysis: cross-cutting concerns, agent effectiveness, complexity.""" 

2 

3from __future__ import annotations 

4 

5from datetime import date, timedelta 

6from pathlib import Path 

7from typing import Any 

8 

9from little_loops.issue_history._utils import get_issue_content 

10from little_loops.issue_history.models import ( 

11 AgentEffectivenessAnalysis, 

12 AgentOutcome, 

13 CompletedIssue, 

14 ComplexityProxy, 

15 ComplexityProxyAnalysis, 

16 CrossCuttingAnalysis, 

17 CrossCuttingSmell, 

18 HotspotAnalysis, 

19 TechnicalDebtMetrics, 

20) 

21from little_loops.issue_history.parsing import ( 

22 _detect_processing_agent, 

23 _extract_paths_from_issue, 

24 _parse_resolution_action, 

25) 

26 

27# Cross-cutting concern keywords for smell detection 

28_CROSS_CUTTING_KEYWORDS: dict[str, list[str]] = { 

29 "logging": ["log", "logger", "logging", "debug", "trace", "print"], 

30 "error-handling": ["error", "exception", "try", "catch", "raise", "except", "fail"], 

31 "validation": ["valid", "validate", "check", "assert", "verify", "sanitize"], 

32 "auth": ["auth", "permission", "role", "access", "token", "credential", "login"], 

33 "caching": ["cache", "memo", "memoize", "store", "ttl", "expire", "cached"], 

34} 

35 

36# Suggested patterns for each cross-cutting concern type 

37_CONCERN_PATTERNS: dict[str, str] = { 

38 "logging": "decorator", 

39 "error-handling": "middleware", 

40 "validation": "decorator", 

41 "auth": "middleware", 

42 "caching": "decorator", 

43} 

44 

45 

46def detect_cross_cutting_smells( 

47 issues: list[CompletedIssue], 

48 hotspots: HotspotAnalysis, 

49 contents: dict[Path, str] | None = None, 

50) -> CrossCuttingAnalysis: 

51 """Detect cross-cutting concerns scattered across the codebase. 

52 

53 Identifies when issues consistently touch multiple unrelated directories, 

54 suggesting missing abstractions for cross-cutting concerns like logging, 

55 error handling, or validation. 

56 

57 Args: 

58 issues: List of completed issues 

59 hotspots: Hotspot analysis results (provides directory reference) 

60 contents: Pre-loaded issue file contents (path -> content) 

61 

62 Returns: 

63 CrossCuttingAnalysis with detected smells 

64 """ 

65 if not issues: 

66 return CrossCuttingAnalysis() 

67 

68 # Track concern data: {concern_type: {dirs: set, issues: list}} 

69 concern_data: dict[str, dict[str, Any]] = {} 

70 for concern_type in _CROSS_CUTTING_KEYWORDS: 

71 concern_data[concern_type] = { 

72 "directories": set(), 

73 "issue_ids": [], 

74 } 

75 

76 # Get all unique directories from hotspots for scatter score calculation 

77 all_directories: set[str] = set() 

78 if hotspots.directory_hotspots: 

79 all_directories = {h.path for h in hotspots.directory_hotspots} 

80 

81 # Analyze each issue 

82 for issue in issues: 

83 content = get_issue_content(issue, contents) 

84 if content is None: 

85 continue 

86 content_lower = content.lower() 

87 

88 # Extract paths from this issue 

89 paths = _extract_paths_from_issue(content) 

90 issue_dirs = {str(Path(p).parent) for p in paths if "/" in p or "\\" in p} 

91 all_directories.update(issue_dirs) 

92 

93 # Check if this issue touches multiple directories (3+) 

94 if len(issue_dirs) < 3: 

95 continue 

96 

97 # Check for concern keywords 

98 for concern_type, keywords in _CROSS_CUTTING_KEYWORDS.items(): 

99 if any(kw in content_lower for kw in keywords): 

100 concern_data[concern_type]["directories"].update(issue_dirs) 

101 if issue.issue_id not in concern_data[concern_type]["issue_ids"]: 

102 concern_data[concern_type]["issue_ids"].append(issue.issue_id) 

103 

104 # Build CrossCuttingSmell objects 

105 smells: list[CrossCuttingSmell] = [] 

106 total_dirs = len(all_directories) if all_directories else 1 

107 

108 for concern_type, data in concern_data.items(): 

109 if data["issue_ids"]: # Only include concerns with detected issues 

110 dirs = sorted(data["directories"]) 

111 scatter_score = len(dirs) / total_dirs if total_dirs > 0 else 0.0 

112 

113 smell = CrossCuttingSmell( 

114 concern_type=concern_type, 

115 affected_directories=dirs, 

116 issue_count=len(data["issue_ids"]), 

117 issue_ids=data["issue_ids"], 

118 scatter_score=scatter_score, 

119 suggested_pattern=_CONCERN_PATTERNS.get(concern_type, "aspect"), 

120 ) 

121 smells.append(smell) 

122 

123 # Sort by scatter score descending 

124 smells.sort(key=lambda s: -s.scatter_score) 

125 

126 # Identify most scattered concern 

127 most_scattered = smells[0].concern_type if smells else "" 

128 

129 # Build consolidation opportunities 

130 consolidation_opportunities = [] 

131 for smell in smells: 

132 if smell.scatter_score >= 0.3: # Threshold for suggesting consolidation 

133 consolidation_opportunities.append( 

134 f"Centralize {smell.concern_type} ({smell.issue_count} issues would benefit)" 

135 ) 

136 

137 return CrossCuttingAnalysis( 

138 smells=smells, 

139 most_scattered_concern=most_scattered, 

140 consolidation_opportunities=consolidation_opportunities[:10], 

141 ) 

142 

143 

144def analyze_agent_effectiveness( 

145 issues: list[CompletedIssue], 

146 contents: dict[Path, str] | None = None, 

147) -> AgentEffectivenessAnalysis: 

148 """Analyze agent effectiveness across issue types. 

149 

150 Groups issues by processing agent and issue type, calculating 

151 success/failure/rejection rates for each combination. 

152 

153 Args: 

154 issues: List of completed issues 

155 contents: Pre-loaded issue file contents (path -> content) 

156 

157 Returns: 

158 AgentEffectivenessAnalysis with outcomes and recommendations 

159 """ 

160 if not issues: 

161 return AgentEffectivenessAnalysis() 

162 

163 # Track outcomes by (agent, issue_type) 

164 outcomes_map: dict[tuple[str, str], AgentOutcome] = {} 

165 

166 for issue in issues: 

167 content = get_issue_content(issue, contents) 

168 if content is None: 

169 continue 

170 

171 # Detect agent (discovered_by may contain source info in some cases) 

172 agent = _detect_processing_agent(content, issue.discovered_by) 

173 

174 # Get resolution outcome 

175 resolution = _parse_resolution_action(content) 

176 

177 # Get or create outcome tracker 

178 key = (agent, issue.issue_type) 

179 if key not in outcomes_map: 

180 outcomes_map[key] = AgentOutcome( 

181 agent_name=agent, 

182 issue_type=issue.issue_type, 

183 ) 

184 

185 outcome = outcomes_map[key] 

186 

187 # Categorize outcome 

188 if resolution == "completed": 

189 outcome.success_count += 1 

190 elif resolution in ("rejected", "invalid", "duplicate"): 

191 outcome.rejection_count += 1 

192 else: # deferred or other 

193 outcome.failure_count += 1 

194 

195 # Build outcomes list 

196 outcomes = list(outcomes_map.values()) 

197 

198 # Determine best agent per issue type 

199 best_agent_by_type: dict[str, str] = {} 

200 type_agents: dict[str, list[AgentOutcome]] = {} 

201 

202 for outcome in outcomes: 

203 if outcome.issue_type not in type_agents: 

204 type_agents[outcome.issue_type] = [] 

205 type_agents[outcome.issue_type].append(outcome) 

206 

207 for issue_type, agent_outcomes in type_agents.items(): 

208 # Require minimum sample size 

209 significant_outcomes = [o for o in agent_outcomes if o.total_count >= 3] 

210 if significant_outcomes: 

211 best = max(significant_outcomes, key=lambda o: o.success_rate) 

212 best_agent_by_type[issue_type] = best.agent_name 

213 

214 # Identify problematic combinations (success rate < 50% with >= 5 samples) 

215 problematic_combinations: list[tuple[str, str, str]] = [] 

216 for outcome in outcomes: 

217 if outcome.total_count >= 5 and outcome.success_rate < 0.5: 

218 reason = ( 

219 f"{outcome.success_rate * 100:.0f}% success " 

220 f"({outcome.success_count}/{outcome.total_count})" 

221 ) 

222 problematic_combinations.append((outcome.agent_name, outcome.issue_type, reason)) 

223 

224 # Sort by success rate ascending (worst first) 

225 problematic_combinations.sort(key=lambda x: float(x[2].split("%")[0])) 

226 

227 return AgentEffectivenessAnalysis( 

228 outcomes=sorted(outcomes, key=lambda o: (o.agent_name, o.issue_type)), 

229 best_agent_by_type=best_agent_by_type, 

230 problematic_combinations=problematic_combinations, 

231 ) 

232 

233 

234def analyze_complexity_proxy( 

235 issues: list[CompletedIssue], 

236 hotspots: HotspotAnalysis, 

237 contents: dict[Path, str] | None = None, 

238) -> ComplexityProxyAnalysis: 

239 """Use issue duration as proxy for code complexity. 

240 

241 Areas that consistently take longer to resolve suggest higher complexity, 

242 insufficient documentation, or accumulated technical debt. 

243 

244 Args: 

245 issues: List of completed issues with dates 

246 hotspots: Pre-computed hotspot analysis for path information 

247 contents: Pre-loaded issue file contents (path -> content) 

248 

249 Returns: 

250 ComplexityProxyAnalysis with duration-based complexity metrics 

251 """ 

252 # Calculate durations for all issues with both dates 

253 issue_durations: dict[str, float] = {} # issue_id -> days 

254 for issue in issues: 

255 if issue.discovered_date and issue.completed_date: 

256 delta = issue.completed_date - issue.discovered_date 

257 days = float(delta.days) 

258 if days >= 0: # Sanity check 

259 issue_durations[issue.issue_id] = days 

260 

261 if not issue_durations: 

262 return ComplexityProxyAnalysis() 

263 

264 # Calculate baseline (median duration) 

265 all_durations = sorted(issue_durations.values()) 

266 n = len(all_durations) 

267 if n % 2 == 0: 

268 baseline_days = (all_durations[n // 2 - 1] + all_durations[n // 2]) / 2 

269 else: 

270 baseline_days = all_durations[n // 2] 

271 

272 if baseline_days == 0: 

273 baseline_days = 1.0 # Avoid division by zero 

274 

275 # Map issues to their affected files by reading issue content 

276 issue_to_files: dict[str, list[str]] = {} 

277 for issue in issues: 

278 if issue.issue_id in issue_durations: 

279 content = get_issue_content(issue, contents) 

280 if content is None: 

281 continue 

282 paths = _extract_paths_from_issue(content) 

283 if paths: 

284 issue_to_files[issue.issue_id] = paths 

285 

286 # Aggregate durations by file 

287 file_durations: dict[str, list[tuple[str, float]]] = {} # path -> [(issue_id, days), ...] 

288 for issue_id, files in issue_to_files.items(): 

289 days = issue_durations[issue_id] 

290 for f in files: 

291 if f not in file_durations: 

292 file_durations[f] = [] 

293 file_durations[f].append((issue_id, days)) 

294 

295 # Aggregate durations by directory 

296 dir_durations: dict[str, list[tuple[str, float]]] = {} 

297 for path, entries in file_durations.items(): 

298 dir_path = "/".join(path.split("/")[:-1]) + "/" if "/" in path else "./" 

299 if dir_path not in dir_durations: 

300 dir_durations[dir_path] = [] 

301 dir_durations[dir_path].extend(entries) 

302 

303 # Build file complexity proxies 

304 file_complexity: list[ComplexityProxy] = [] 

305 for path, entries in file_durations.items(): 

306 if len(entries) < 2: # Need at least 2 data points 

307 continue 

308 

309 durations = [d for _, d in entries] 

310 avg = sum(durations) / len(durations) 

311 sorted_d = sorted(durations) 

312 median = sorted_d[len(sorted_d) // 2] 

313 slowest = max(entries, key=lambda x: x[1]) 

314 

315 # Normalize complexity score (0-1 based on how much slower than baseline) 

316 ratio = avg / baseline_days 

317 complexity_score = min(1.0, (ratio - 1) / 4) # 5x slower = 1.0 

318 complexity_score = max(0.0, complexity_score) 

319 

320 comparison = f"{ratio:.1f}x baseline" if ratio >= 1.5 else "near baseline" 

321 

322 file_complexity.append( 

323 ComplexityProxy( 

324 path=path, 

325 avg_resolution_days=avg, 

326 median_resolution_days=median, 

327 issue_count=len(entries), 

328 slowest_issue=slowest, 

329 complexity_score=complexity_score, 

330 comparison_to_baseline=comparison, 

331 ) 

332 ) 

333 

334 # Build directory complexity proxies 

335 directory_complexity: list[ComplexityProxy] = [] 

336 for dir_path, entries in dir_durations.items(): 

337 if len(entries) < 3: # Need at least 3 data points for directories 

338 continue 

339 

340 # Deduplicate by issue_id for directory-level stats 

341 unique_entries: dict[str, float] = {} 

342 for issue_id, days in entries: 

343 if issue_id not in unique_entries or days > unique_entries[issue_id]: 

344 unique_entries[issue_id] = days 

345 

346 entries_list = list(unique_entries.items()) 

347 durations = list(unique_entries.values()) 

348 avg = sum(durations) / len(durations) 

349 sorted_d = sorted(durations) 

350 median = sorted_d[len(sorted_d) // 2] 

351 slowest = max(entries_list, key=lambda x: x[1]) 

352 

353 ratio = avg / baseline_days 

354 complexity_score = min(1.0, (ratio - 1) / 4) 

355 complexity_score = max(0.0, complexity_score) 

356 

357 comparison = f"{ratio:.1f}x baseline" if ratio >= 1.5 else "near baseline" 

358 

359 directory_complexity.append( 

360 ComplexityProxy( 

361 path=dir_path, 

362 avg_resolution_days=avg, 

363 median_resolution_days=median, 

364 issue_count=len(unique_entries), 

365 slowest_issue=slowest, 

366 complexity_score=complexity_score, 

367 comparison_to_baseline=comparison, 

368 ) 

369 ) 

370 

371 # Sort by complexity score descending 

372 file_complexity.sort(key=lambda c: -c.complexity_score) 

373 directory_complexity.sort(key=lambda c: -c.complexity_score) 

374 

375 # Identify outliers (>2x baseline) 

376 complexity_outliers = [ 

377 c.path for c in file_complexity if c.avg_resolution_days > baseline_days * 2 

378 ] 

379 

380 return ComplexityProxyAnalysis( 

381 file_complexity=file_complexity[:10], 

382 directory_complexity=directory_complexity[:10], 

383 baseline_days=baseline_days, 

384 complexity_outliers=complexity_outliers[:10], 

385 ) 

386 

387 

388def _calculate_debt_metrics( 

389 completed_issues: list[CompletedIssue], 

390 active_issues: list[tuple[Path, str, str, date | None]], 

391) -> TechnicalDebtMetrics: 

392 """Calculate technical debt health metrics. 

393 

394 Args: 

395 completed_issues: List of completed issues 

396 active_issues: List of active issue tuples 

397 

398 Returns: 

399 TechnicalDebtMetrics with calculated values 

400 """ 

401 today = date.today() 

402 metrics = TechnicalDebtMetrics() 

403 

404 # Backlog size 

405 metrics.backlog_size = len(active_issues) 

406 

407 # Count aging and high priority 

408 for _path, _issue_type, priority, discovered_date in active_issues: 

409 if priority in ("P0", "P1"): 

410 metrics.high_priority_open += 1 

411 

412 if discovered_date: 

413 age = (today - discovered_date).days 

414 if age >= 30: 

415 metrics.aging_30_plus += 1 

416 if age >= 60: 

417 metrics.aging_60_plus += 1 

418 

419 # Calculate backlog growth rate (issues per week) 

420 # Look at last 4 weeks of completions vs creations 

421 four_weeks_ago = today - timedelta(days=28) 

422 

423 completed_recently = sum( 

424 1 for i in completed_issues if i.completed_date and i.completed_date >= four_weeks_ago 

425 ) 

426 

427 created_recently = sum(1 for _, _, _, d in active_issues if d and d >= four_weeks_ago) 

428 

429 # Net change per week 

430 if completed_recently > 0 or created_recently > 0: 

431 metrics.backlog_growth_rate = (created_recently - completed_recently) / 4.0 

432 

433 # Debt paydown ratio (bug fixes vs features) 

434 bug_count = sum(1 for i in completed_issues if i.issue_type == "BUG") 

435 feat_count = sum(1 for i in completed_issues if i.issue_type == "FEAT") 

436 

437 if feat_count > 0: 

438 metrics.debt_paydown_ratio = bug_count / feat_count 

439 elif bug_count > 0: 

440 metrics.debt_paydown_ratio = float(bug_count) # All maintenance 

441 

442 return metrics