Coverage for little_loops / issue_history / debt.py: 0%
187 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""Issue history technical debt analysis: cross-cutting concerns, agent effectiveness, complexity."""
3from __future__ import annotations
5from datetime import date, timedelta
6from pathlib import Path
7from typing import Any
9from little_loops.issue_history._utils import get_issue_content
10from little_loops.issue_history.models import (
11 AgentEffectivenessAnalysis,
12 AgentOutcome,
13 CompletedIssue,
14 ComplexityProxy,
15 ComplexityProxyAnalysis,
16 CrossCuttingAnalysis,
17 CrossCuttingSmell,
18 HotspotAnalysis,
19 TechnicalDebtMetrics,
20)
21from little_loops.issue_history.parsing import (
22 _detect_processing_agent,
23 _extract_paths_from_issue,
24 _parse_resolution_action,
25)
27# Cross-cutting concern keywords for smell detection
28_CROSS_CUTTING_KEYWORDS: dict[str, list[str]] = {
29 "logging": ["log", "logger", "logging", "debug", "trace", "print"],
30 "error-handling": ["error", "exception", "try", "catch", "raise", "except", "fail"],
31 "validation": ["valid", "validate", "check", "assert", "verify", "sanitize"],
32 "auth": ["auth", "permission", "role", "access", "token", "credential", "login"],
33 "caching": ["cache", "memo", "memoize", "store", "ttl", "expire", "cached"],
34}
36# Suggested patterns for each cross-cutting concern type
37_CONCERN_PATTERNS: dict[str, str] = {
38 "logging": "decorator",
39 "error-handling": "middleware",
40 "validation": "decorator",
41 "auth": "middleware",
42 "caching": "decorator",
43}
46def detect_cross_cutting_smells(
47 issues: list[CompletedIssue],
48 hotspots: HotspotAnalysis,
49 contents: dict[Path, str] | None = None,
50) -> CrossCuttingAnalysis:
51 """Detect cross-cutting concerns scattered across the codebase.
53 Identifies when issues consistently touch multiple unrelated directories,
54 suggesting missing abstractions for cross-cutting concerns like logging,
55 error handling, or validation.
57 Args:
58 issues: List of completed issues
59 hotspots: Hotspot analysis results (provides directory reference)
60 contents: Pre-loaded issue file contents (path -> content)
62 Returns:
63 CrossCuttingAnalysis with detected smells
64 """
65 if not issues:
66 return CrossCuttingAnalysis()
68 # Track concern data: {concern_type: {dirs: set, issues: list}}
69 concern_data: dict[str, dict[str, Any]] = {}
70 for concern_type in _CROSS_CUTTING_KEYWORDS:
71 concern_data[concern_type] = {
72 "directories": set(),
73 "issue_ids": [],
74 }
76 # Get all unique directories from hotspots for scatter score calculation
77 all_directories: set[str] = set()
78 if hotspots.directory_hotspots:
79 all_directories = {h.path for h in hotspots.directory_hotspots}
81 # Analyze each issue
82 for issue in issues:
83 content = get_issue_content(issue, contents)
84 if content is None:
85 continue
86 content_lower = content.lower()
88 # Extract paths from this issue
89 paths = _extract_paths_from_issue(content)
90 issue_dirs = {str(Path(p).parent) for p in paths if "/" in p or "\\" in p}
91 all_directories.update(issue_dirs)
93 # Check if this issue touches multiple directories (3+)
94 if len(issue_dirs) < 3:
95 continue
97 # Check for concern keywords
98 for concern_type, keywords in _CROSS_CUTTING_KEYWORDS.items():
99 if any(kw in content_lower for kw in keywords):
100 concern_data[concern_type]["directories"].update(issue_dirs)
101 if issue.issue_id not in concern_data[concern_type]["issue_ids"]:
102 concern_data[concern_type]["issue_ids"].append(issue.issue_id)
104 # Build CrossCuttingSmell objects
105 smells: list[CrossCuttingSmell] = []
106 total_dirs = len(all_directories) if all_directories else 1
108 for concern_type, data in concern_data.items():
109 if data["issue_ids"]: # Only include concerns with detected issues
110 dirs = sorted(data["directories"])
111 scatter_score = len(dirs) / total_dirs if total_dirs > 0 else 0.0
113 smell = CrossCuttingSmell(
114 concern_type=concern_type,
115 affected_directories=dirs,
116 issue_count=len(data["issue_ids"]),
117 issue_ids=data["issue_ids"],
118 scatter_score=scatter_score,
119 suggested_pattern=_CONCERN_PATTERNS.get(concern_type, "aspect"),
120 )
121 smells.append(smell)
123 # Sort by scatter score descending
124 smells.sort(key=lambda s: -s.scatter_score)
126 # Identify most scattered concern
127 most_scattered = smells[0].concern_type if smells else ""
129 # Build consolidation opportunities
130 consolidation_opportunities = []
131 for smell in smells:
132 if smell.scatter_score >= 0.3: # Threshold for suggesting consolidation
133 consolidation_opportunities.append(
134 f"Centralize {smell.concern_type} ({smell.issue_count} issues would benefit)"
135 )
137 return CrossCuttingAnalysis(
138 smells=smells,
139 most_scattered_concern=most_scattered,
140 consolidation_opportunities=consolidation_opportunities[:10],
141 )
144def analyze_agent_effectiveness(
145 issues: list[CompletedIssue],
146 contents: dict[Path, str] | None = None,
147) -> AgentEffectivenessAnalysis:
148 """Analyze agent effectiveness across issue types.
150 Groups issues by processing agent and issue type, calculating
151 success/failure/rejection rates for each combination.
153 Args:
154 issues: List of completed issues
155 contents: Pre-loaded issue file contents (path -> content)
157 Returns:
158 AgentEffectivenessAnalysis with outcomes and recommendations
159 """
160 if not issues:
161 return AgentEffectivenessAnalysis()
163 # Track outcomes by (agent, issue_type)
164 outcomes_map: dict[tuple[str, str], AgentOutcome] = {}
166 for issue in issues:
167 content = get_issue_content(issue, contents)
168 if content is None:
169 continue
171 # Detect agent (discovered_by may contain source info in some cases)
172 agent = _detect_processing_agent(content, issue.discovered_by)
174 # Get resolution outcome
175 resolution = _parse_resolution_action(content)
177 # Get or create outcome tracker
178 key = (agent, issue.issue_type)
179 if key not in outcomes_map:
180 outcomes_map[key] = AgentOutcome(
181 agent_name=agent,
182 issue_type=issue.issue_type,
183 )
185 outcome = outcomes_map[key]
187 # Categorize outcome
188 if resolution == "completed":
189 outcome.success_count += 1
190 elif resolution in ("rejected", "invalid", "duplicate"):
191 outcome.rejection_count += 1
192 else: # deferred or other
193 outcome.failure_count += 1
195 # Build outcomes list
196 outcomes = list(outcomes_map.values())
198 # Determine best agent per issue type
199 best_agent_by_type: dict[str, str] = {}
200 type_agents: dict[str, list[AgentOutcome]] = {}
202 for outcome in outcomes:
203 if outcome.issue_type not in type_agents:
204 type_agents[outcome.issue_type] = []
205 type_agents[outcome.issue_type].append(outcome)
207 for issue_type, agent_outcomes in type_agents.items():
208 # Require minimum sample size
209 significant_outcomes = [o for o in agent_outcomes if o.total_count >= 3]
210 if significant_outcomes:
211 best = max(significant_outcomes, key=lambda o: o.success_rate)
212 best_agent_by_type[issue_type] = best.agent_name
214 # Identify problematic combinations (success rate < 50% with >= 5 samples)
215 problematic_combinations: list[tuple[str, str, str]] = []
216 for outcome in outcomes:
217 if outcome.total_count >= 5 and outcome.success_rate < 0.5:
218 reason = (
219 f"{outcome.success_rate * 100:.0f}% success "
220 f"({outcome.success_count}/{outcome.total_count})"
221 )
222 problematic_combinations.append((outcome.agent_name, outcome.issue_type, reason))
224 # Sort by success rate ascending (worst first)
225 problematic_combinations.sort(key=lambda x: float(x[2].split("%")[0]))
227 return AgentEffectivenessAnalysis(
228 outcomes=sorted(outcomes, key=lambda o: (o.agent_name, o.issue_type)),
229 best_agent_by_type=best_agent_by_type,
230 problematic_combinations=problematic_combinations,
231 )
234def analyze_complexity_proxy(
235 issues: list[CompletedIssue],
236 hotspots: HotspotAnalysis,
237 contents: dict[Path, str] | None = None,
238) -> ComplexityProxyAnalysis:
239 """Use issue duration as proxy for code complexity.
241 Areas that consistently take longer to resolve suggest higher complexity,
242 insufficient documentation, or accumulated technical debt.
244 Args:
245 issues: List of completed issues with dates
246 hotspots: Pre-computed hotspot analysis for path information
247 contents: Pre-loaded issue file contents (path -> content)
249 Returns:
250 ComplexityProxyAnalysis with duration-based complexity metrics
251 """
252 # Calculate durations for all issues with both dates
253 issue_durations: dict[str, float] = {} # issue_id -> days
254 for issue in issues:
255 if issue.discovered_date and issue.completed_date:
256 delta = issue.completed_date - issue.discovered_date
257 days = float(delta.days)
258 if days >= 0: # Sanity check
259 issue_durations[issue.issue_id] = days
261 if not issue_durations:
262 return ComplexityProxyAnalysis()
264 # Calculate baseline (median duration)
265 all_durations = sorted(issue_durations.values())
266 n = len(all_durations)
267 if n % 2 == 0:
268 baseline_days = (all_durations[n // 2 - 1] + all_durations[n // 2]) / 2
269 else:
270 baseline_days = all_durations[n // 2]
272 if baseline_days == 0:
273 baseline_days = 1.0 # Avoid division by zero
275 # Map issues to their affected files by reading issue content
276 issue_to_files: dict[str, list[str]] = {}
277 for issue in issues:
278 if issue.issue_id in issue_durations:
279 content = get_issue_content(issue, contents)
280 if content is None:
281 continue
282 paths = _extract_paths_from_issue(content)
283 if paths:
284 issue_to_files[issue.issue_id] = paths
286 # Aggregate durations by file
287 file_durations: dict[str, list[tuple[str, float]]] = {} # path -> [(issue_id, days), ...]
288 for issue_id, files in issue_to_files.items():
289 days = issue_durations[issue_id]
290 for f in files:
291 if f not in file_durations:
292 file_durations[f] = []
293 file_durations[f].append((issue_id, days))
295 # Aggregate durations by directory
296 dir_durations: dict[str, list[tuple[str, float]]] = {}
297 for path, entries in file_durations.items():
298 dir_path = "/".join(path.split("/")[:-1]) + "/" if "/" in path else "./"
299 if dir_path not in dir_durations:
300 dir_durations[dir_path] = []
301 dir_durations[dir_path].extend(entries)
303 # Build file complexity proxies
304 file_complexity: list[ComplexityProxy] = []
305 for path, entries in file_durations.items():
306 if len(entries) < 2: # Need at least 2 data points
307 continue
309 durations = [d for _, d in entries]
310 avg = sum(durations) / len(durations)
311 sorted_d = sorted(durations)
312 median = sorted_d[len(sorted_d) // 2]
313 slowest = max(entries, key=lambda x: x[1])
315 # Normalize complexity score (0-1 based on how much slower than baseline)
316 ratio = avg / baseline_days
317 complexity_score = min(1.0, (ratio - 1) / 4) # 5x slower = 1.0
318 complexity_score = max(0.0, complexity_score)
320 comparison = f"{ratio:.1f}x baseline" if ratio >= 1.5 else "near baseline"
322 file_complexity.append(
323 ComplexityProxy(
324 path=path,
325 avg_resolution_days=avg,
326 median_resolution_days=median,
327 issue_count=len(entries),
328 slowest_issue=slowest,
329 complexity_score=complexity_score,
330 comparison_to_baseline=comparison,
331 )
332 )
334 # Build directory complexity proxies
335 directory_complexity: list[ComplexityProxy] = []
336 for dir_path, entries in dir_durations.items():
337 if len(entries) < 3: # Need at least 3 data points for directories
338 continue
340 # Deduplicate by issue_id for directory-level stats
341 unique_entries: dict[str, float] = {}
342 for issue_id, days in entries:
343 if issue_id not in unique_entries or days > unique_entries[issue_id]:
344 unique_entries[issue_id] = days
346 entries_list = list(unique_entries.items())
347 durations = list(unique_entries.values())
348 avg = sum(durations) / len(durations)
349 sorted_d = sorted(durations)
350 median = sorted_d[len(sorted_d) // 2]
351 slowest = max(entries_list, key=lambda x: x[1])
353 ratio = avg / baseline_days
354 complexity_score = min(1.0, (ratio - 1) / 4)
355 complexity_score = max(0.0, complexity_score)
357 comparison = f"{ratio:.1f}x baseline" if ratio >= 1.5 else "near baseline"
359 directory_complexity.append(
360 ComplexityProxy(
361 path=dir_path,
362 avg_resolution_days=avg,
363 median_resolution_days=median,
364 issue_count=len(unique_entries),
365 slowest_issue=slowest,
366 complexity_score=complexity_score,
367 comparison_to_baseline=comparison,
368 )
369 )
371 # Sort by complexity score descending
372 file_complexity.sort(key=lambda c: -c.complexity_score)
373 directory_complexity.sort(key=lambda c: -c.complexity_score)
375 # Identify outliers (>2x baseline)
376 complexity_outliers = [
377 c.path for c in file_complexity if c.avg_resolution_days > baseline_days * 2
378 ]
380 return ComplexityProxyAnalysis(
381 file_complexity=file_complexity[:10],
382 directory_complexity=directory_complexity[:10],
383 baseline_days=baseline_days,
384 complexity_outliers=complexity_outliers[:10],
385 )
388def _calculate_debt_metrics(
389 completed_issues: list[CompletedIssue],
390 active_issues: list[tuple[Path, str, str, date | None]],
391) -> TechnicalDebtMetrics:
392 """Calculate technical debt health metrics.
394 Args:
395 completed_issues: List of completed issues
396 active_issues: List of active issue tuples
398 Returns:
399 TechnicalDebtMetrics with calculated values
400 """
401 today = date.today()
402 metrics = TechnicalDebtMetrics()
404 # Backlog size
405 metrics.backlog_size = len(active_issues)
407 # Count aging and high priority
408 for _path, _issue_type, priority, discovered_date in active_issues:
409 if priority in ("P0", "P1"):
410 metrics.high_priority_open += 1
412 if discovered_date:
413 age = (today - discovered_date).days
414 if age >= 30:
415 metrics.aging_30_plus += 1
416 if age >= 60:
417 metrics.aging_60_plus += 1
419 # Calculate backlog growth rate (issues per week)
420 # Look at last 4 weeks of completions vs creations
421 four_weeks_ago = today - timedelta(days=28)
423 completed_recently = sum(
424 1 for i in completed_issues if i.completed_date and i.completed_date >= four_weeks_ago
425 )
427 created_recently = sum(1 for _, _, _, d in active_issues if d and d >= four_weeks_ago)
429 # Net change per week
430 if completed_recently > 0 or created_recently > 0:
431 metrics.backlog_growth_rate = (created_recently - completed_recently) / 4.0
433 # Debt paydown ratio (bug fixes vs features)
434 bug_count = sum(1 for i in completed_issues if i.issue_type == "BUG")
435 feat_count = sum(1 for i in completed_issues if i.issue_type == "FEAT")
437 if feat_count > 0:
438 metrics.debt_paydown_ratio = bug_count / feat_count
439 elif bug_count > 0:
440 metrics.debt_paydown_ratio = float(bug_count) # All maintenance
442 return metrics