Coverage for little_loops / issue_history.py: 16%
1825 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-02-13 16:40 -0600
« prev ^ index » next coverage.py v7.12.0, created at 2026-02-13 16:40 -0600
1"""Issue history analysis and summary statistics.
3Provides analysis of completed issues including:
4- Type distribution (BUG, ENH, FEAT)
5- Priority distribution (P0-P5)
6- Discovery source breakdown
7- Completion velocity metrics
8- Trend analysis over time periods
9- Subsystem health tracking
10- Technical debt metrics
11"""
13from __future__ import annotations
15import json
16import re
17from collections import defaultdict
18from dataclasses import dataclass, field
19from datetime import date, timedelta
20from pathlib import Path
21from typing import Any, Literal
23from little_loops.frontmatter import parse_frontmatter
25__all__ = [
26 # Core dataclasses
27 "CompletedIssue",
28 "HistorySummary",
29 # Advanced analysis dataclasses
30 "PeriodMetrics",
31 "SubsystemHealth",
32 "Hotspot",
33 "HotspotAnalysis",
34 "RegressionCluster",
35 "RegressionAnalysis",
36 "TestGap",
37 "TestGapAnalysis",
38 "RejectionMetrics",
39 "RejectionAnalysis",
40 "ManualPattern",
41 "ManualPatternAnalysis",
42 "ConfigGap",
43 "ConfigGapsAnalysis",
44 "AgentOutcome",
45 "AgentEffectivenessAnalysis",
46 "TechnicalDebtMetrics",
47 "ComplexityProxy",
48 "ComplexityProxyAnalysis",
49 "CrossCuttingSmell",
50 "CrossCuttingAnalysis",
51 "HistoryAnalysis",
52 # Parsing and scanning
53 "parse_completed_issue",
54 "scan_completed_issues",
55 "scan_active_issues",
56 # Summary functions
57 "calculate_summary",
58 "calculate_analysis",
59 "analyze_hotspots",
60 "analyze_regression_clustering",
61 "analyze_test_gaps",
62 "analyze_rejection_rates",
63 "detect_manual_patterns",
64 "detect_config_gaps",
65 "analyze_agent_effectiveness",
66 "analyze_complexity_proxy",
67 "detect_cross_cutting_smells",
68 # Formatting functions
69 "format_summary_text",
70 "format_summary_json",
71 "format_analysis_text",
72 "format_analysis_json",
73 "format_analysis_markdown",
74 "format_analysis_yaml",
75]
78@dataclass
79class CompletedIssue:
80 """Parsed information from a completed issue file."""
82 path: Path
83 issue_type: str # BUG, ENH, FEAT
84 priority: str # P0-P5
85 issue_id: str # e.g., BUG-001
86 discovered_by: str | None = None
87 discovered_date: date | None = None
88 completed_date: date | None = None
90 def to_dict(self) -> dict[str, Any]:
91 """Convert to dictionary for JSON serialization."""
92 return {
93 "path": str(self.path),
94 "issue_type": self.issue_type,
95 "priority": self.priority,
96 "issue_id": self.issue_id,
97 "discovered_by": self.discovered_by,
98 "discovered_date": (self.discovered_date.isoformat() if self.discovered_date else None),
99 "completed_date": (self.completed_date.isoformat() if self.completed_date else None),
100 }
103@dataclass
104class HistorySummary:
105 """Summary statistics for completed issues."""
107 total_count: int
108 type_counts: dict[str, int] = field(default_factory=dict)
109 priority_counts: dict[str, int] = field(default_factory=dict)
110 discovery_counts: dict[str, int] = field(default_factory=dict)
111 earliest_date: date | None = None
112 latest_date: date | None = None
114 @property
115 def date_range_days(self) -> int | None:
116 """Calculate days between earliest and latest completion."""
117 if self.earliest_date and self.latest_date:
118 return (self.latest_date - self.earliest_date).days + 1
119 return None
121 @property
122 def velocity(self) -> float | None:
123 """Calculate issues per day."""
124 if self.date_range_days and self.date_range_days > 0:
125 return self.total_count / self.date_range_days
126 return None
128 def to_dict(self) -> dict[str, Any]:
129 """Convert to dictionary for JSON serialization."""
130 return {
131 "total_count": self.total_count,
132 "type_counts": self.type_counts,
133 "priority_counts": self.priority_counts,
134 "discovery_counts": self.discovery_counts,
135 "earliest_date": (self.earliest_date.isoformat() if self.earliest_date else None),
136 "latest_date": self.latest_date.isoformat() if self.latest_date else None,
137 "date_range_days": self.date_range_days,
138 "velocity": round(self.velocity, 2) if self.velocity else None,
139 }
142# =============================================================================
143# Advanced Analysis Dataclasses (FEAT-110)
144# =============================================================================
147@dataclass
148class PeriodMetrics:
149 """Metrics for a specific time period."""
151 period_start: date
152 period_end: date
153 period_label: str # e.g., "Q1 2025", "Jan 2025", "Week 3"
154 total_completed: int = 0
155 type_counts: dict[str, int] = field(default_factory=dict)
156 priority_counts: dict[str, int] = field(default_factory=dict)
157 avg_completion_days: float | None = None
159 @property
160 def bug_ratio(self) -> float | None:
161 """Calculate bug percentage."""
162 if self.total_completed == 0:
163 return None
164 bug_count = self.type_counts.get("BUG", 0)
165 return bug_count / self.total_completed
167 def to_dict(self) -> dict[str, Any]:
168 """Convert to dictionary for serialization."""
169 return {
170 "period_start": self.period_start.isoformat(),
171 "period_end": self.period_end.isoformat(),
172 "period_label": self.period_label,
173 "total_completed": self.total_completed,
174 "type_counts": self.type_counts,
175 "priority_counts": self.priority_counts,
176 "bug_ratio": round(self.bug_ratio, 3) if self.bug_ratio is not None else None,
177 "avg_completion_days": (
178 round(self.avg_completion_days, 1) if self.avg_completion_days else None
179 ),
180 }
183@dataclass
184class SubsystemHealth:
185 """Health metrics for a subsystem (directory)."""
187 subsystem: str # Directory path
188 total_issues: int = 0
189 recent_issues: int = 0 # Issues in last 30 days
190 issue_ids: list[str] = field(default_factory=list)
191 trend: str = "stable" # "improving", "stable", "degrading"
193 def to_dict(self) -> dict[str, Any]:
194 """Convert to dictionary for serialization."""
195 return {
196 "subsystem": self.subsystem,
197 "total_issues": self.total_issues,
198 "recent_issues": self.recent_issues,
199 "issue_ids": self.issue_ids[:5], # Top 5
200 "trend": self.trend,
201 }
204@dataclass
205class Hotspot:
206 """A file or directory that appears in multiple issues."""
208 path: str
209 issue_count: int = 0
210 issue_ids: list[str] = field(default_factory=list)
211 issue_types: dict[str, int] = field(default_factory=dict) # {"BUG": 5, "ENH": 3}
212 bug_ratio: float = 0.0 # bugs / total issues
213 churn_indicator: str = "low" # "high", "medium", "low"
215 def to_dict(self) -> dict[str, Any]:
216 """Convert to dictionary for serialization."""
217 return {
218 "path": self.path,
219 "issue_count": self.issue_count,
220 "issue_ids": self.issue_ids[:10], # Top 10
221 "issue_types": self.issue_types,
222 "bug_ratio": round(self.bug_ratio, 3),
223 "churn_indicator": self.churn_indicator,
224 }
227@dataclass
228class HotspotAnalysis:
229 """Analysis of files and directories appearing repeatedly in issues."""
231 file_hotspots: list[Hotspot] = field(default_factory=list)
232 directory_hotspots: list[Hotspot] = field(default_factory=list)
233 bug_magnets: list[Hotspot] = field(default_factory=list) # >60% bug ratio
235 def to_dict(self) -> dict[str, Any]:
236 """Convert to dictionary for serialization."""
237 return {
238 "file_hotspots": [h.to_dict() for h in self.file_hotspots],
239 "directory_hotspots": [h.to_dict() for h in self.directory_hotspots],
240 "bug_magnets": [h.to_dict() for h in self.bug_magnets],
241 }
244@dataclass
245class CouplingPair:
246 """A pair of files that frequently appear together in issues."""
248 file_a: str
249 file_b: str
250 co_occurrence_count: int = 0
251 coupling_strength: float = 0.0 # 0-1, Jaccard similarity
252 issue_ids: list[str] = field(default_factory=list)
254 def to_dict(self) -> dict[str, Any]:
255 """Convert to dictionary for serialization."""
256 return {
257 "file_a": self.file_a,
258 "file_b": self.file_b,
259 "co_occurrence_count": self.co_occurrence_count,
260 "coupling_strength": round(self.coupling_strength, 3),
261 "issue_ids": self.issue_ids[:10], # Top 10
262 }
265@dataclass
266class CouplingAnalysis:
267 """Analysis of files that frequently change together."""
269 pairs: list[CouplingPair] = field(default_factory=list)
270 clusters: list[list[str]] = field(default_factory=list) # Groups of coupled files
271 hotspots: list[str] = field(default_factory=list) # Files coupled with 3+ others
273 def to_dict(self) -> dict[str, Any]:
274 """Convert to dictionary for serialization."""
275 return {
276 "pairs": [p.to_dict() for p in self.pairs],
277 "clusters": self.clusters[:10], # Top 10 clusters
278 "hotspots": self.hotspots[:10], # Top 10 hotspots
279 }
282@dataclass
283class RegressionCluster:
284 """A cluster of bugs where fixes led to new bugs."""
286 primary_file: str # Main file in the regression chain
287 regression_count: int = 0 # Number of regression pairs
288 fix_bug_pairs: list[tuple[str, str]] = field(default_factory=list) # (fixed_id, caused_id)
289 related_files: list[str] = field(default_factory=list) # All files in chain
290 time_pattern: str = "immediate" # "immediate" (<3d), "delayed" (3-7d), "chronic" (recurring)
291 severity: str = "medium" # "critical", "high", "medium"
293 def to_dict(self) -> dict[str, Any]:
294 """Convert to dictionary for serialization."""
295 return {
296 "primary_file": self.primary_file,
297 "regression_count": self.regression_count,
298 "fix_bug_pairs": self.fix_bug_pairs[:10], # Top 10
299 "related_files": self.related_files[:10], # Top 10
300 "time_pattern": self.time_pattern,
301 "severity": self.severity,
302 }
305@dataclass
306class RegressionAnalysis:
307 """Analysis of regression patterns in bug fixes."""
309 clusters: list[RegressionCluster] = field(default_factory=list)
310 total_regression_chains: int = 0
311 most_fragile_files: list[str] = field(default_factory=list)
313 def to_dict(self) -> dict[str, Any]:
314 """Convert to dictionary for serialization."""
315 return {
316 "clusters": [c.to_dict() for c in self.clusters],
317 "total_regression_chains": self.total_regression_chains,
318 "most_fragile_files": self.most_fragile_files[:5], # Top 5
319 }
322@dataclass
323class TestGap:
324 """A source file with bugs but missing or weak test coverage."""
326 source_file: str
327 bug_count: int = 0
328 bug_ids: list[str] = field(default_factory=list)
329 has_test_file: bool = False
330 test_file_path: str | None = None
331 gap_score: float = 0.0 # bug_count * multiplier, higher = worse
332 priority: str = "low" # "critical", "high", "medium", "low"
334 def to_dict(self) -> dict[str, Any]:
335 """Convert to dictionary for serialization."""
336 return {
337 "source_file": self.source_file,
338 "bug_count": self.bug_count,
339 "bug_ids": self.bug_ids[:10], # Top 10
340 "has_test_file": self.has_test_file,
341 "test_file_path": self.test_file_path,
342 "gap_score": round(self.gap_score, 2),
343 "priority": self.priority,
344 }
347@dataclass
348class TestGapAnalysis:
349 """Analysis of test coverage gaps correlated with bug occurrences."""
351 gaps: list[TestGap] = field(default_factory=list)
352 untested_bug_magnets: list[str] = field(default_factory=list)
353 files_with_tests_avg_bugs: float = 0.0
354 files_without_tests_avg_bugs: float = 0.0
355 priority_test_targets: list[str] = field(default_factory=list)
357 def to_dict(self) -> dict[str, Any]:
358 """Convert to dictionary for serialization."""
359 return {
360 "gaps": [g.to_dict() for g in self.gaps],
361 "untested_bug_magnets": self.untested_bug_magnets[:5],
362 "files_with_tests_avg_bugs": round(self.files_with_tests_avg_bugs, 2),
363 "files_without_tests_avg_bugs": round(self.files_without_tests_avg_bugs, 2),
364 "priority_test_targets": self.priority_test_targets[:10],
365 }
368@dataclass
369class RejectionMetrics:
370 """Metrics for rejection and invalid closure tracking."""
372 total_closed: int = 0
373 rejected_count: int = 0
374 invalid_count: int = 0
375 duplicate_count: int = 0
376 deferred_count: int = 0
377 completed_count: int = 0
379 @property
380 def rejection_rate(self) -> float:
381 """Calculate rejection rate."""
382 if self.total_closed == 0:
383 return 0.0
384 return self.rejected_count / self.total_closed
386 @property
387 def invalid_rate(self) -> float:
388 """Calculate invalid rate."""
389 if self.total_closed == 0:
390 return 0.0
391 return self.invalid_count / self.total_closed
393 def to_dict(self) -> dict[str, Any]:
394 """Convert to dictionary for serialization."""
395 return {
396 "total_closed": self.total_closed,
397 "rejected_count": self.rejected_count,
398 "invalid_count": self.invalid_count,
399 "duplicate_count": self.duplicate_count,
400 "deferred_count": self.deferred_count,
401 "completed_count": self.completed_count,
402 "rejection_rate": round(self.rejection_rate, 3),
403 "invalid_rate": round(self.invalid_rate, 3),
404 }
407@dataclass
408class RejectionAnalysis:
409 """Analysis of rejection and invalid closure patterns."""
411 overall: RejectionMetrics = field(default_factory=RejectionMetrics)
412 by_type: dict[str, RejectionMetrics] = field(default_factory=dict)
413 by_month: dict[str, RejectionMetrics] = field(default_factory=dict)
414 common_reasons: list[tuple[str, int]] = field(default_factory=list)
415 trend: str = "stable" # "improving", "stable", "degrading"
417 def to_dict(self) -> dict[str, Any]:
418 """Convert to dictionary for serialization."""
419 return {
420 "overall": self.overall.to_dict(),
421 "by_type": {k: v.to_dict() for k, v in self.by_type.items()},
422 "by_month": {k: v.to_dict() for k, v in sorted(self.by_month.items())},
423 "common_reasons": self.common_reasons[:10],
424 "trend": self.trend,
425 }
428@dataclass
429class ManualPattern:
430 """A recurring manual activity detected across issues."""
432 pattern_type: str # "test", "lint", "build", "git", "verification"
433 pattern_description: str
434 occurrence_count: int = 0
435 affected_issues: list[str] = field(default_factory=list) # issue IDs
436 example_commands: list[str] = field(default_factory=list) # sample commands found
437 suggested_automation: str = "" # hook, skill, or agent suggestion
438 automation_complexity: str = "simple" # "trivial", "simple", "moderate"
440 def to_dict(self) -> dict[str, Any]:
441 """Convert to dictionary for serialization."""
442 return {
443 "pattern_type": self.pattern_type,
444 "pattern_description": self.pattern_description,
445 "occurrence_count": self.occurrence_count,
446 "affected_issues": self.affected_issues[:10],
447 "example_commands": self.example_commands[:5],
448 "suggested_automation": self.suggested_automation,
449 "automation_complexity": self.automation_complexity,
450 }
453@dataclass
454class ManualPatternAnalysis:
455 """Analysis of recurring manual activities that could be automated."""
457 patterns: list[ManualPattern] = field(default_factory=list)
458 total_manual_interventions: int = 0
459 automatable_count: int = 0
460 automation_suggestions: list[str] = field(default_factory=list)
462 @property
463 def automatable_percentage(self) -> float:
464 """Calculate percentage of patterns that are automatable."""
465 if self.total_manual_interventions == 0:
466 return 0.0
467 return self.automatable_count / self.total_manual_interventions * 100
469 def to_dict(self) -> dict[str, Any]:
470 """Convert to dictionary for serialization."""
471 return {
472 "patterns": [p.to_dict() for p in self.patterns],
473 "total_manual_interventions": self.total_manual_interventions,
474 "automatable_count": self.automatable_count,
475 "automatable_percentage": round(self.automatable_percentage, 1),
476 "automation_suggestions": self.automation_suggestions[:10],
477 }
480@dataclass
481class ConfigGap:
482 """A gap in configuration that could address recurring manual work."""
484 gap_type: str # "hook", "skill", "agent"
485 description: str
486 evidence: list[str] = field(default_factory=list) # issue IDs showing the pattern
487 suggested_config: str = "" # example configuration
488 priority: str = "medium" # "high", "medium", "low"
489 pattern_type: str = "" # links back to ManualPattern.pattern_type
491 def to_dict(self) -> dict[str, Any]:
492 """Convert to dictionary for serialization."""
493 return {
494 "gap_type": self.gap_type,
495 "description": self.description,
496 "evidence": self.evidence[:10],
497 "suggested_config": self.suggested_config,
498 "priority": self.priority,
499 "pattern_type": self.pattern_type,
500 }
503@dataclass
504class ConfigGapsAnalysis:
505 """Analysis of configuration gaps based on manual pattern detection."""
507 gaps: list[ConfigGap] = field(default_factory=list)
508 current_hooks: list[str] = field(default_factory=list)
509 current_skills: list[str] = field(default_factory=list)
510 current_agents: list[str] = field(default_factory=list)
511 coverage_score: float = 0.0 # 0-1, how well config covers common needs
513 def to_dict(self) -> dict[str, Any]:
514 """Convert to dictionary for serialization."""
515 return {
516 "gaps": [g.to_dict() for g in self.gaps],
517 "current_hooks": self.current_hooks,
518 "current_skills": self.current_skills,
519 "current_agents": self.current_agents,
520 "coverage_score": round(self.coverage_score, 2),
521 }
524@dataclass
525class AgentOutcome:
526 """Metrics for a single agent processing a specific issue type."""
528 agent_name: str
529 issue_type: str
530 success_count: int = 0
531 failure_count: int = 0
532 rejection_count: int = 0
534 @property
535 def total_count(self) -> int:
536 """Total issues handled."""
537 return self.success_count + self.failure_count + self.rejection_count
539 @property
540 def success_rate(self) -> float:
541 """Calculate success rate."""
542 if self.total_count == 0:
543 return 0.0
544 return self.success_count / self.total_count
546 def to_dict(self) -> dict[str, Any]:
547 """Convert to dictionary for serialization."""
548 return {
549 "agent_name": self.agent_name,
550 "issue_type": self.issue_type,
551 "success_count": self.success_count,
552 "failure_count": self.failure_count,
553 "rejection_count": self.rejection_count,
554 "total_count": self.total_count,
555 "success_rate": round(self.success_rate, 3),
556 }
559@dataclass
560class AgentEffectivenessAnalysis:
561 """Analysis of agent effectiveness across issue types."""
563 outcomes: list[AgentOutcome] = field(default_factory=list)
564 best_agent_by_type: dict[str, str] = field(default_factory=dict)
565 problematic_combinations: list[tuple[str, str, str]] = field(default_factory=list)
567 def to_dict(self) -> dict[str, Any]:
568 """Convert to dictionary for serialization."""
569 return {
570 "outcomes": [o.to_dict() for o in self.outcomes],
571 "best_agent_by_type": self.best_agent_by_type,
572 "problematic_combinations": self.problematic_combinations[:10],
573 }
576@dataclass
577class TechnicalDebtMetrics:
578 """Technical debt health indicators."""
580 backlog_size: int = 0 # Total open issues
581 backlog_growth_rate: float = 0.0 # Net issues/week
582 aging_30_plus: int = 0 # Issues > 30 days old
583 aging_60_plus: int = 0 # Issues > 60 days old
584 high_priority_open: int = 0 # P0-P1 open
585 debt_paydown_ratio: float = 0.0 # maintenance vs features
587 def to_dict(self) -> dict[str, Any]:
588 """Convert to dictionary for serialization."""
589 return {
590 "backlog_size": self.backlog_size,
591 "backlog_growth_rate": round(self.backlog_growth_rate, 2),
592 "aging_30_plus": self.aging_30_plus,
593 "aging_60_plus": self.aging_60_plus,
594 "high_priority_open": self.high_priority_open,
595 "debt_paydown_ratio": round(self.debt_paydown_ratio, 2),
596 }
599@dataclass
600class ComplexityProxy:
601 """Duration-based complexity proxy for a file or directory."""
603 path: str
604 avg_resolution_days: float
605 median_resolution_days: float
606 issue_count: int
607 slowest_issue: tuple[str, float] # (issue_id, days)
608 complexity_score: float # normalized 0-1
609 comparison_to_baseline: str # "2.1x baseline", etc.
611 def to_dict(self) -> dict[str, Any]:
612 """Convert to dictionary for serialization."""
613 return {
614 "path": self.path,
615 "avg_resolution_days": round(self.avg_resolution_days, 1),
616 "median_resolution_days": round(self.median_resolution_days, 1),
617 "issue_count": self.issue_count,
618 "slowest_issue": {
619 "issue_id": self.slowest_issue[0],
620 "days": round(self.slowest_issue[1], 1),
621 },
622 "complexity_score": round(self.complexity_score, 3),
623 "comparison_to_baseline": self.comparison_to_baseline,
624 }
627@dataclass
628class ComplexityProxyAnalysis:
629 """Analysis using issue duration as complexity proxy."""
631 file_complexity: list[ComplexityProxy] = field(default_factory=list)
632 directory_complexity: list[ComplexityProxy] = field(default_factory=list)
633 baseline_days: float = 0.0 # median across all issues
634 complexity_outliers: list[str] = field(default_factory=list) # files >2x baseline
636 def to_dict(self) -> dict[str, Any]:
637 """Convert to dictionary for serialization."""
638 return {
639 "file_complexity": [c.to_dict() for c in self.file_complexity[:10]],
640 "directory_complexity": [c.to_dict() for c in self.directory_complexity[:10]],
641 "baseline_days": round(self.baseline_days, 1),
642 "complexity_outliers": self.complexity_outliers[:10],
643 }
646@dataclass
647class CrossCuttingSmell:
648 """A detected cross-cutting concern scattered across the codebase."""
650 concern_type: str # "logging", "error-handling", "validation", "auth", "caching"
651 affected_directories: list[str] = field(default_factory=list)
652 issue_count: int = 0
653 issue_ids: list[str] = field(default_factory=list)
654 scatter_score: float = 0.0 # higher = more scattered (0-1)
655 suggested_pattern: str = "" # "middleware", "decorator", "aspect"
657 def to_dict(self) -> dict[str, Any]:
658 """Convert to dictionary for serialization."""
659 return {
660 "concern_type": self.concern_type,
661 "affected_directories": self.affected_directories[:10],
662 "issue_count": self.issue_count,
663 "issue_ids": self.issue_ids[:10],
664 "scatter_score": round(self.scatter_score, 2),
665 "suggested_pattern": self.suggested_pattern,
666 }
669@dataclass
670class CrossCuttingAnalysis:
671 """Analysis of cross-cutting concerns scattered across the codebase."""
673 smells: list[CrossCuttingSmell] = field(default_factory=list)
674 most_scattered_concern: str = ""
675 consolidation_opportunities: list[str] = field(default_factory=list)
677 def to_dict(self) -> dict[str, Any]:
678 """Convert to dictionary for serialization."""
679 return {
680 "smells": [s.to_dict() for s in self.smells],
681 "most_scattered_concern": self.most_scattered_concern,
682 "consolidation_opportunities": self.consolidation_opportunities[:10],
683 }
686@dataclass
687class HistoryAnalysis:
688 """Complete history analysis report."""
690 generated_date: date
691 total_completed: int
692 total_active: int
693 date_range_start: date | None
694 date_range_end: date | None
696 # Core summary (from existing HistorySummary)
697 summary: HistorySummary
699 # Trend analysis
700 period_metrics: list[PeriodMetrics] = field(default_factory=list)
701 velocity_trend: str = "stable" # "increasing", "stable", "decreasing"
702 bug_ratio_trend: str = "stable"
704 # Subsystem health
705 subsystem_health: list[SubsystemHealth] = field(default_factory=list)
707 # Hotspot analysis
708 hotspot_analysis: HotspotAnalysis | None = None
710 # Coupling analysis
711 coupling_analysis: CouplingAnalysis | None = None
713 # Regression clustering analysis
714 regression_analysis: RegressionAnalysis | None = None
716 # Test gap analysis
717 test_gap_analysis: TestGapAnalysis | None = None
719 # Rejection analysis
720 rejection_analysis: RejectionAnalysis | None = None
722 # Manual pattern analysis
723 manual_pattern_analysis: ManualPatternAnalysis | None = None
725 # Agent effectiveness analysis
726 agent_effectiveness_analysis: AgentEffectivenessAnalysis | None = None
728 # Complexity proxy analysis
729 complexity_proxy_analysis: ComplexityProxyAnalysis | None = None
731 # Configuration gaps analysis
732 config_gaps_analysis: ConfigGapsAnalysis | None = None
734 # Cross-cutting concern analysis
735 cross_cutting_analysis: CrossCuttingAnalysis | None = None
737 # Technical debt
738 debt_metrics: TechnicalDebtMetrics | None = None
740 # Comparative analysis (optional)
741 comparison_period: str | None = None # e.g., "30d"
742 previous_period: PeriodMetrics | None = None
743 current_period: PeriodMetrics | None = None
745 def to_dict(self) -> dict[str, Any]:
746 """Convert to dictionary for serialization."""
747 return {
748 "generated_date": self.generated_date.isoformat(),
749 "total_completed": self.total_completed,
750 "total_active": self.total_active,
751 "date_range_start": (
752 self.date_range_start.isoformat() if self.date_range_start else None
753 ),
754 "date_range_end": (self.date_range_end.isoformat() if self.date_range_end else None),
755 "summary": self.summary.to_dict(),
756 "period_metrics": [p.to_dict() for p in self.period_metrics],
757 "velocity_trend": self.velocity_trend,
758 "bug_ratio_trend": self.bug_ratio_trend,
759 "subsystem_health": [s.to_dict() for s in self.subsystem_health],
760 "hotspot_analysis": (
761 self.hotspot_analysis.to_dict() if self.hotspot_analysis else None
762 ),
763 "coupling_analysis": (
764 self.coupling_analysis.to_dict() if self.coupling_analysis else None
765 ),
766 "regression_analysis": (
767 self.regression_analysis.to_dict() if self.regression_analysis else None
768 ),
769 "test_gap_analysis": (
770 self.test_gap_analysis.to_dict() if self.test_gap_analysis else None
771 ),
772 "rejection_analysis": (
773 self.rejection_analysis.to_dict() if self.rejection_analysis else None
774 ),
775 "manual_pattern_analysis": (
776 self.manual_pattern_analysis.to_dict() if self.manual_pattern_analysis else None
777 ),
778 "agent_effectiveness_analysis": (
779 self.agent_effectiveness_analysis.to_dict()
780 if self.agent_effectiveness_analysis
781 else None
782 ),
783 "complexity_proxy_analysis": (
784 self.complexity_proxy_analysis.to_dict() if self.complexity_proxy_analysis else None
785 ),
786 "config_gaps_analysis": (
787 self.config_gaps_analysis.to_dict() if self.config_gaps_analysis else None
788 ),
789 "cross_cutting_analysis": (
790 self.cross_cutting_analysis.to_dict() if self.cross_cutting_analysis else None
791 ),
792 "debt_metrics": self.debt_metrics.to_dict() if self.debt_metrics else None,
793 "comparison_period": self.comparison_period,
794 "previous_period": (self.previous_period.to_dict() if self.previous_period else None),
795 "current_period": (self.current_period.to_dict() if self.current_period else None),
796 }
799# =============================================================================
800# Parsing Functions
801# =============================================================================
804def parse_completed_issue(file_path: Path) -> CompletedIssue:
805 """Parse a completed issue file.
807 Args:
808 file_path: Path to the issue markdown file
810 Returns:
811 CompletedIssue with parsed metadata
812 """
813 filename = file_path.name
814 content = file_path.read_text(encoding="utf-8")
816 # Extract from filename: P[0-5]-[TYPE]-[NNN]-description.md
817 issue_type = "UNKNOWN"
818 priority = "P5"
819 issue_id = "UNKNOWN"
821 # Match priority
822 priority_match = re.match(r"^(P\d)", filename)
823 if priority_match:
824 priority = priority_match.group(1)
826 # Match type and ID
827 type_match = re.search(r"(BUG|ENH|FEAT)-(\d+)", filename)
828 if type_match:
829 issue_type = type_match.group(1)
830 issue_id = f"{type_match.group(1)}-{type_match.group(2)}"
832 # Parse frontmatter for discovered_by and discovered_date
833 discovered_by = _parse_discovered_by(content)
834 discovered_date = _parse_discovered_date(content)
836 # Parse completion date from Resolution section or file mtime
837 completed_date = _parse_completion_date(content, file_path)
839 return CompletedIssue(
840 path=file_path,
841 issue_type=issue_type,
842 priority=priority,
843 issue_id=issue_id,
844 discovered_by=discovered_by,
845 discovered_date=discovered_date,
846 completed_date=completed_date,
847 )
850def _parse_discovered_by(content: str) -> str | None:
851 """Extract discovered_by from YAML frontmatter.
853 Args:
854 content: File content
856 Returns:
857 discovered_by value or None
858 """
859 fm = parse_frontmatter(content)
860 value = fm.get("discovered_by")
861 return value if isinstance(value, str) else None
864def _parse_completion_date(content: str, file_path: Path) -> date | None:
865 """Extract completion date from Resolution section or file mtime.
867 Args:
868 content: File content
869 file_path: Path for mtime fallback
871 Returns:
872 Completion date or None
873 """
874 # Try Resolution section: **Completed**: YYYY-MM-DD
875 match = re.search(r"\*\*Completed\*\*:\s*(\d{4}-\d{2}-\d{2})", content)
876 if match:
877 try:
878 return date.fromisoformat(match.group(1))
879 except ValueError:
880 pass
882 # Fallback to file mtime
883 try:
884 mtime = file_path.stat().st_mtime
885 return date.fromtimestamp(mtime)
886 except OSError:
887 return None
890def _parse_resolution_action(content: str) -> str:
891 """Extract resolution action category from issue content.
893 Categorizes based on Resolution section fields:
894 - "completed": Normal completion with **Action**: fix/implement
895 - "rejected": Explicitly rejected (out of scope, not valid)
896 - "invalid": Invalid reference or spec
897 - "duplicate": Duplicate of existing issue
898 - "deferred": Deferred to future work
900 Args:
901 content: Issue file content
903 Returns:
904 Resolution category string
905 """
906 # Look for Status field patterns
907 status_match = re.search(r"\*\*Status\*\*:\s*(.+?)(?:\n|$)", content)
908 if status_match:
909 status = status_match.group(1).strip().lower()
910 if "closed" in status:
911 # Check Reason field for specific category
912 reason_match = re.search(r"\*\*Reason\*\*:\s*(.+?)(?:\n|$)", content)
913 if reason_match:
914 reason = reason_match.group(1).strip().lower()
915 if "duplicate" in reason:
916 return "duplicate"
917 if "invalid" in reason:
918 return "invalid"
919 if "deferred" in reason:
920 return "deferred"
921 if "rejected" in reason or "out of scope" in reason:
922 return "rejected"
923 # Generic closed without specific reason
924 return "rejected"
926 # Check for Action field (normal completion)
927 action_match = re.search(r"\*\*Action\*\*:\s*(.+?)(?:\n|$)", content)
928 if action_match:
929 return "completed"
931 # Default to completed if no resolution section
932 return "completed"
935def _detect_processing_agent(content: str, discovered_source: str | None = None) -> str:
936 """Detect which processing agent handled an issue.
938 Detection strategy (in priority order):
939 1. Check discovered_source field for 'll-parallel' or 'll-auto'
940 2. Check content for '**Log Type**:' field
941 3. Check content for '**Tool**:' field
942 4. Default to 'manual'
944 Args:
945 content: Issue file content
946 discovered_source: Optional discovered_source frontmatter value
948 Returns:
949 Agent name: 'll-auto', 'll-parallel', or 'manual'
950 """
951 # Check discovered_source first
952 if discovered_source:
953 source_lower = discovered_source.lower()
954 if "ll-parallel" in source_lower:
955 return "ll-parallel"
956 if "ll-auto" in source_lower:
957 return "ll-auto"
959 # Check Log Type field
960 log_type_match = re.search(r"\*\*Log Type\*\*:\s*(.+?)(?:\n|$)", content)
961 if log_type_match:
962 log_type = log_type_match.group(1).strip().lower()
963 if "ll-parallel" in log_type:
964 return "ll-parallel"
965 if "ll-auto" in log_type:
966 return "ll-auto"
968 # Check Tool field
969 tool_match = re.search(r"\*\*Tool\*\*:\s*(.+?)(?:\n|$)", content)
970 if tool_match:
971 tool = tool_match.group(1).strip().lower()
972 if "ll-parallel" in tool:
973 return "ll-parallel"
974 if "ll-auto" in tool:
975 return "ll-auto"
977 # Default to manual
978 return "manual"
981def scan_completed_issues(completed_dir: Path) -> list[CompletedIssue]:
982 """Scan completed directory for issue files.
984 Args:
985 completed_dir: Path to .issues/completed/
987 Returns:
988 List of parsed CompletedIssue objects
989 """
990 issues: list[CompletedIssue] = []
992 if not completed_dir.exists():
993 return issues
995 for file_path in sorted(completed_dir.glob("*.md")):
996 try:
997 issue = parse_completed_issue(file_path)
998 issues.append(issue)
999 except Exception:
1000 # Skip unparseable files
1001 continue
1003 return issues
1006def calculate_summary(issues: list[CompletedIssue]) -> HistorySummary:
1007 """Calculate summary statistics from issues.
1009 Args:
1010 issues: List of CompletedIssue objects
1012 Returns:
1013 HistorySummary with calculated statistics
1014 """
1015 type_counts: dict[str, int] = {}
1016 priority_counts: dict[str, int] = {}
1017 discovery_counts: dict[str, int] = {}
1018 dates: list[date] = []
1020 for issue in issues:
1021 # Count by type
1022 type_counts[issue.issue_type] = type_counts.get(issue.issue_type, 0) + 1
1024 # Count by priority
1025 priority_counts[issue.priority] = priority_counts.get(issue.priority, 0) + 1
1027 # Count by discovery source
1028 source = issue.discovered_by or "unknown"
1029 discovery_counts[source] = discovery_counts.get(source, 0) + 1
1031 # Collect dates
1032 if issue.completed_date:
1033 dates.append(issue.completed_date)
1035 # Sort counts for consistent output
1036 type_counts = dict(sorted(type_counts.items()))
1037 priority_counts = dict(sorted(priority_counts.items()))
1038 discovery_counts = dict(sorted(discovery_counts.items(), key=lambda x: (-x[1], x[0])))
1040 return HistorySummary(
1041 total_count=len(issues),
1042 type_counts=type_counts,
1043 priority_counts=priority_counts,
1044 discovery_counts=discovery_counts,
1045 earliest_date=min(dates) if dates else None,
1046 latest_date=max(dates) if dates else None,
1047 )
1050def format_summary_text(summary: HistorySummary) -> str:
1051 """Format summary as human-readable text.
1053 Args:
1054 summary: HistorySummary to format
1056 Returns:
1057 Formatted text string
1058 """
1059 lines: list[str] = []
1061 lines.append("Issue History Summary")
1062 lines.append("=" * 21)
1063 lines.append(f"Total Completed: {summary.total_count}")
1065 if summary.earliest_date and summary.latest_date:
1066 days = summary.date_range_days or 0
1067 lines.append(f"Date Range: {summary.earliest_date} to {summary.latest_date} ({days} days)")
1068 if summary.velocity:
1069 lines.append(f"Velocity: {summary.velocity:.1f} issues/day")
1071 lines.append("")
1072 lines.append("By Type:")
1073 total = summary.total_count or 1
1074 for issue_type, count in summary.type_counts.items():
1075 pct = count * 100 // total
1076 lines.append(f" {issue_type:5}: {count:3} ({pct:2}%)")
1078 lines.append("")
1079 lines.append("By Priority:")
1080 for priority, count in summary.priority_counts.items():
1081 pct = count * 100 // total
1082 lines.append(f" {priority}: {count:3} ({pct:2}%)")
1084 lines.append("")
1085 lines.append("By Discovery Source:")
1086 for source, count in summary.discovery_counts.items():
1087 pct = count * 100 // total
1088 lines.append(f" {source:15}: {count:3} ({pct:2}%)")
1090 return "\n".join(lines)
1093def format_summary_json(summary: HistorySummary) -> str:
1094 """Format summary as JSON.
1096 Args:
1097 summary: HistorySummary to format
1099 Returns:
1100 JSON string
1101 """
1102 return json.dumps(summary.to_dict(), indent=2)
1105# =============================================================================
1106# Advanced Analysis Functions (FEAT-110)
1107# =============================================================================
1110def _parse_discovered_date(content: str) -> date | None:
1111 """Extract discovered_date from YAML frontmatter.
1113 Args:
1114 content: File content
1116 Returns:
1117 discovered_date value or None
1118 """
1119 fm = parse_frontmatter(content)
1120 value = fm.get("discovered_date")
1121 if not isinstance(value, str):
1122 return None
1123 try:
1124 return date.fromisoformat(value)
1125 except ValueError:
1126 return None
1129def _extract_subsystem(content: str) -> str | None:
1130 """Extract primary subsystem/directory from issue content.
1132 Args:
1133 content: Issue file content
1135 Returns:
1136 Directory path (e.g., "scripts/little_loops/") or None
1137 """
1138 # Look for file paths in Location or common patterns
1139 patterns = [
1140 r"\*\*File\*\*:\s*`?([^`\n]+/)[^/`\n]+`?", # **File**: path/to/file.py
1141 r"`([a-zA-Z_][\w/.-]+/)[^/`]+\.py`", # `path/to/file.py`
1142 ]
1144 for pattern in patterns:
1145 match = re.search(pattern, content)
1146 if match:
1147 return match.group(1)
1149 return None
1152def _extract_paths_from_issue(content: str) -> list[str]:
1153 """Extract all file paths from issue content.
1155 Args:
1156 content: Issue file content
1158 Returns:
1159 List of file paths found in content
1160 """
1161 patterns = [
1162 r"\*\*File\*\*:\s*`?([^`\n:]+)`?", # **File**: path/to/file.py
1163 r"`([a-zA-Z_][\w/.-]+\.[a-z]{2,4})`", # `path/to/file.py`
1164 r"(?:^|\s)([a-zA-Z_][\w/.-]+\.[a-z]{2,4})(?::\d+)?(?:\s|$|:|\))", # path.py:123
1165 ]
1167 paths: set[str] = set()
1168 for pattern in patterns:
1169 for match in re.finditer(pattern, content, re.MULTILINE):
1170 path = match.group(1).strip()
1171 # Must look like a file path
1172 if "/" in path or path.endswith((".py", ".md", ".js", ".ts", ".json", ".yaml", ".yml")):
1173 # Normalize: remove line numbers (path.py:123 -> path.py)
1174 if ":" in path and path.split(":")[-1].isdigit():
1175 path = ":".join(path.split(":")[:-1])
1176 paths.add(path)
1178 return sorted(paths)
1181def _find_test_file(source_path: str) -> str | None:
1182 """Find corresponding test file for a source file.
1184 Checks common test file naming patterns:
1185 - tests/test_<name>.py
1186 - tests/<path>/test_<name>.py
1187 - <path>/test_<name>.py
1188 - <path>/<name>_test.py
1189 - <path>/tests/test_<name>.py
1191 Args:
1192 source_path: Path to source file (e.g., "src/core/processor.py")
1194 Returns:
1195 Path to test file if found, None otherwise
1196 """
1197 if not source_path.endswith(".py"):
1198 return None # Only check Python files for now
1200 path = Path(source_path)
1201 stem = path.stem # filename without extension
1202 parent = str(path.parent) if path.parent != Path(".") else ""
1204 # Generate candidate test file paths
1205 candidates: list[str] = [
1206 f"tests/test_{stem}.py",
1207 f"{parent}/test_{stem}.py" if parent else f"test_{stem}.py",
1208 f"{parent}/{stem}_test.py" if parent else f"{stem}_test.py",
1209 f"{parent}/tests/test_{stem}.py" if parent else f"tests/test_{stem}.py",
1210 ]
1212 # Add path-aware test locations
1213 if parent:
1214 candidates.append(f"tests/{parent}/test_{stem}.py")
1216 # Project-specific pattern for little-loops
1217 # e.g., scripts/little_loops/foo.py -> scripts/tests/test_foo.py
1218 if source_path.startswith("scripts/little_loops/"):
1219 candidates.append(f"scripts/tests/test_{stem}.py")
1221 for candidate in candidates:
1222 if Path(candidate).exists():
1223 return candidate
1225 return None
1228def _calculate_period_label(start: date, period_type: str) -> str:
1229 """Generate human-readable period label.
1231 Args:
1232 start: Period start date
1233 period_type: "weekly", "monthly", "quarterly"
1235 Returns:
1236 Label like "Q1 2025", "Jan 2025", "Week 3 2025"
1237 """
1238 if period_type == "quarterly":
1239 quarter = (start.month - 1) // 3 + 1
1240 return f"Q{quarter} {start.year}"
1241 elif period_type == "monthly":
1242 return start.strftime("%b %Y")
1243 else: # weekly
1244 week_num = start.isocalendar()[1]
1245 return f"Week {week_num} {start.year}"
1248def _group_by_period(
1249 issues: list[CompletedIssue],
1250 period_type: Literal["weekly", "monthly", "quarterly"] = "monthly",
1251) -> list[PeriodMetrics]:
1252 """Group issues by time period and calculate metrics.
1254 Args:
1255 issues: List of completed issues with dates
1256 period_type: Grouping period
1258 Returns:
1259 List of PeriodMetrics sorted by date ascending
1260 """
1261 # Filter issues with dates
1262 dated_issues = [i for i in issues if i.completed_date]
1263 if not dated_issues:
1264 return []
1266 # Sort by date
1267 dated_issues.sort(key=lambda i: i.completed_date) # type: ignore
1269 # Determine period boundaries
1270 periods: dict[str, list[CompletedIssue]] = defaultdict(list)
1272 for issue in dated_issues:
1273 completed = issue.completed_date
1274 assert completed is not None
1276 if period_type == "quarterly":
1277 quarter = (completed.month - 1) // 3
1278 period_start = date(completed.year, quarter * 3 + 1, 1)
1279 elif period_type == "monthly":
1280 period_start = date(completed.year, completed.month, 1)
1281 else: # weekly
1282 # Start of week (Monday)
1283 period_start = completed - timedelta(days=completed.weekday())
1285 key = period_start.isoformat()
1286 periods[key].append(issue)
1288 # Calculate metrics for each period
1289 result: list[PeriodMetrics] = []
1290 for period_key in sorted(periods.keys()):
1291 period_issues = periods[period_key]
1292 period_start = date.fromisoformat(period_key)
1294 # Calculate period end
1295 if period_type == "quarterly":
1296 month = period_start.month + 3
1297 year = period_start.year
1298 if month > 12:
1299 month = 1
1300 year += 1
1301 period_end = date(year, month, 1) - timedelta(days=1)
1302 elif period_type == "monthly":
1303 month = period_start.month + 1
1304 year = period_start.year
1305 if month > 12:
1306 month = 1
1307 year += 1
1308 period_end = date(year, month, 1) - timedelta(days=1)
1309 else: # weekly
1310 period_end = period_start + timedelta(days=6)
1312 # Count types and priorities
1313 type_counts: dict[str, int] = {}
1314 priority_counts: dict[str, int] = {}
1316 for issue in period_issues:
1317 type_counts[issue.issue_type] = type_counts.get(issue.issue_type, 0) + 1
1318 priority_counts[issue.priority] = priority_counts.get(issue.priority, 0) + 1
1320 result.append(
1321 PeriodMetrics(
1322 period_start=period_start,
1323 period_end=period_end,
1324 period_label=_calculate_period_label(period_start, period_type),
1325 total_completed=len(period_issues),
1326 type_counts=dict(sorted(type_counts.items())),
1327 priority_counts=dict(sorted(priority_counts.items())),
1328 )
1329 )
1331 return result
1334def _calculate_trend(values: list[float]) -> str:
1335 """Determine trend from a series of values.
1337 Args:
1338 values: Time-ordered series of values
1340 Returns:
1341 "increasing", "decreasing", or "stable"
1342 """
1343 if len(values) < 3:
1344 return "stable"
1346 # Simple linear regression slope
1347 n = len(values)
1348 sum_x = sum(range(n))
1349 sum_y = sum(values)
1350 sum_xy = sum(i * v for i, v in enumerate(values))
1351 sum_x2 = sum(i * i for i in range(n))
1353 denominator = n * sum_x2 - sum_x * sum_x
1354 if denominator == 0:
1355 return "stable"
1357 slope = (n * sum_xy - sum_x * sum_y) / denominator
1359 # Normalize slope by average value
1360 avg = sum_y / n if n > 0 else 1
1361 if avg == 0:
1362 avg = 1
1363 normalized_slope = slope / avg
1365 if normalized_slope > 0.05:
1366 return "increasing"
1367 elif normalized_slope < -0.05:
1368 return "decreasing"
1369 return "stable"
1372def _analyze_subsystems(
1373 issues: list[CompletedIssue],
1374 recent_days: int = 30,
1375) -> list[SubsystemHealth]:
1376 """Analyze health by subsystem/directory.
1378 Args:
1379 issues: List of completed issues
1380 recent_days: Days to consider "recent"
1382 Returns:
1383 List of SubsystemHealth sorted by total issues descending
1384 """
1385 subsystems: dict[str, SubsystemHealth] = {}
1386 cutoff = date.today() - timedelta(days=recent_days)
1388 for issue in issues:
1389 try:
1390 content = issue.path.read_text(encoding="utf-8")
1391 except Exception:
1392 continue
1394 subsystem = _extract_subsystem(content)
1395 if not subsystem:
1396 continue
1398 if subsystem not in subsystems:
1399 subsystems[subsystem] = SubsystemHealth(subsystem=subsystem)
1401 health = subsystems[subsystem]
1402 health.total_issues += 1
1403 health.issue_ids.append(issue.issue_id)
1405 if issue.completed_date and issue.completed_date >= cutoff:
1406 health.recent_issues += 1
1408 # Calculate trends based on recent vs historical ratio
1409 for health in subsystems.values():
1410 if health.total_issues >= 5:
1411 recent_ratio = health.recent_issues / health.total_issues
1412 if recent_ratio > 0.5:
1413 health.trend = "degrading"
1414 elif recent_ratio < 0.2:
1415 health.trend = "improving"
1417 # Sort by total issues descending
1418 result = sorted(subsystems.values(), key=lambda s: -s.total_issues)
1419 return result[:10] # Top 10
1422def analyze_hotspots(issues: list[CompletedIssue]) -> HotspotAnalysis:
1423 """Identify files and directories that appear repeatedly in issues.
1425 Args:
1426 issues: List of completed issues
1428 Returns:
1429 HotspotAnalysis with file and directory hotspots
1430 """
1431 file_data: dict[str, dict[str, Any]] = {} # path -> {count, ids, types}
1432 dir_data: dict[str, dict[str, Any]] = {} # dir -> {count, ids, types}
1434 for issue in issues:
1435 try:
1436 content = issue.path.read_text(encoding="utf-8")
1437 except Exception:
1438 continue
1440 paths = _extract_paths_from_issue(content)
1442 for path in paths:
1443 # Track file hotspot
1444 if path not in file_data:
1445 file_data[path] = {"count": 0, "ids": [], "types": {}}
1446 file_data[path]["count"] += 1
1447 file_data[path]["ids"].append(issue.issue_id)
1448 file_data[path]["types"][issue.issue_type] = (
1449 file_data[path]["types"].get(issue.issue_type, 0) + 1
1450 )
1452 # Track directory hotspot
1453 if "/" in path:
1454 dir_path = "/".join(path.split("/")[:-1]) + "/"
1455 else:
1456 dir_path = "./"
1458 if dir_path not in dir_data:
1459 dir_data[dir_path] = {"count": 0, "ids": [], "types": {}}
1460 if issue.issue_id not in dir_data[dir_path]["ids"]:
1461 dir_data[dir_path]["count"] += 1
1462 dir_data[dir_path]["ids"].append(issue.issue_id)
1463 dir_data[dir_path]["types"][issue.issue_type] = (
1464 dir_data[dir_path]["types"].get(issue.issue_type, 0) + 1
1465 )
1467 # Convert to Hotspot objects
1468 file_hotspots: list[Hotspot] = []
1469 for path, data in file_data.items():
1470 bug_count = data["types"].get("BUG", 0)
1471 total = data["count"]
1472 bug_ratio = bug_count / total if total > 0 else 0.0
1474 # Determine churn indicator
1475 if total >= 5:
1476 churn = "high"
1477 elif total >= 3:
1478 churn = "medium"
1479 else:
1480 churn = "low"
1482 file_hotspots.append(
1483 Hotspot(
1484 path=path,
1485 issue_count=total,
1486 issue_ids=data["ids"],
1487 issue_types=data["types"],
1488 bug_ratio=bug_ratio,
1489 churn_indicator=churn,
1490 )
1491 )
1493 # Convert directory data to Hotspot objects
1494 dir_hotspots: list[Hotspot] = []
1495 for path, data in dir_data.items():
1496 bug_count = data["types"].get("BUG", 0)
1497 total = data["count"]
1498 bug_ratio = bug_count / total if total > 0 else 0.0
1500 if total >= 5:
1501 churn = "high"
1502 elif total >= 3:
1503 churn = "medium"
1504 else:
1505 churn = "low"
1507 dir_hotspots.append(
1508 Hotspot(
1509 path=path,
1510 issue_count=total,
1511 issue_ids=data["ids"],
1512 issue_types=data["types"],
1513 bug_ratio=bug_ratio,
1514 churn_indicator=churn,
1515 )
1516 )
1518 # Sort by issue count descending
1519 file_hotspots.sort(key=lambda h: -h.issue_count)
1520 dir_hotspots.sort(key=lambda h: -h.issue_count)
1522 # Identify bug magnets (>60% bug ratio, at least 3 issues)
1523 bug_magnets = [h for h in file_hotspots if h.bug_ratio > 0.6 and h.issue_count >= 3]
1524 bug_magnets.sort(key=lambda h: (-h.bug_ratio, -h.issue_count))
1526 return HotspotAnalysis(
1527 file_hotspots=file_hotspots[:10], # Top 10
1528 directory_hotspots=dir_hotspots[:10], # Top 10
1529 bug_magnets=bug_magnets[:5], # Top 5
1530 )
1533def analyze_coupling(issues: list[CompletedIssue]) -> CouplingAnalysis:
1534 """Identify files that frequently change together across issues.
1536 Uses Jaccard similarity to calculate coupling strength between file pairs.
1537 Files with coupling strength >= 0.3 and at least 2 co-occurrences are included.
1539 Args:
1540 issues: List of completed issues
1542 Returns:
1543 CouplingAnalysis with coupled pairs, clusters, and hotspots
1544 """
1545 # Build file -> set of issue IDs mapping
1546 file_to_issues: dict[str, set[str]] = {}
1548 for issue in issues:
1549 try:
1550 content = issue.path.read_text(encoding="utf-8")
1551 except Exception:
1552 continue
1554 paths = _extract_paths_from_issue(content)
1555 for path in paths:
1556 if path not in file_to_issues:
1557 file_to_issues[path] = set()
1558 file_to_issues[path].add(issue.issue_id)
1560 # Calculate pairwise coupling
1561 files = list(file_to_issues.keys())
1562 pairs: list[CouplingPair] = []
1564 for i, file_a in enumerate(files):
1565 for file_b in files[i + 1 :]:
1566 a_issues = file_to_issues[file_a]
1567 b_issues = file_to_issues[file_b]
1568 co_occur = a_issues & b_issues
1569 union = a_issues | b_issues
1571 if len(co_occur) < 2: # Require at least 2 co-occurrences
1572 continue
1574 # Jaccard similarity
1575 strength = len(co_occur) / len(union) if union else 0.0
1577 if strength >= 0.3: # Only include significant coupling
1578 pairs.append(
1579 CouplingPair(
1580 file_a=file_a,
1581 file_b=file_b,
1582 co_occurrence_count=len(co_occur),
1583 coupling_strength=strength,
1584 issue_ids=sorted(co_occur),
1585 )
1586 )
1588 # Sort by coupling strength descending
1589 pairs.sort(key=lambda p: (-p.coupling_strength, -p.co_occurrence_count))
1591 # Build clusters using simple connected components
1592 clusters = _build_coupling_clusters(pairs)
1594 # Identify hotspots (files coupled with 3+ others)
1595 file_coupling_count: dict[str, int] = {}
1596 for pair in pairs:
1597 file_coupling_count[pair.file_a] = file_coupling_count.get(pair.file_a, 0) + 1
1598 file_coupling_count[pair.file_b] = file_coupling_count.get(pair.file_b, 0) + 1
1600 hotspots = [f for f, count in file_coupling_count.items() if count >= 3]
1601 hotspots.sort(key=lambda f: -file_coupling_count[f])
1603 return CouplingAnalysis(
1604 pairs=pairs[:20], # Top 20 pairs
1605 clusters=clusters[:10], # Top 10 clusters
1606 hotspots=hotspots[:10], # Top 10 hotspots
1607 )
1610def _build_coupling_clusters(pairs: list[CouplingPair]) -> list[list[str]]:
1611 """Build clusters of coupled files using connected components.
1613 Args:
1614 pairs: List of coupling pairs
1616 Returns:
1617 List of file clusters (each cluster is a list of file paths)
1618 """
1619 # Build adjacency for high-coupling pairs (strength >= 0.5)
1620 adjacency: dict[str, set[str]] = {}
1621 for pair in pairs:
1622 if pair.coupling_strength >= 0.5:
1623 if pair.file_a not in adjacency:
1624 adjacency[pair.file_a] = set()
1625 if pair.file_b not in adjacency:
1626 adjacency[pair.file_b] = set()
1627 adjacency[pair.file_a].add(pair.file_b)
1628 adjacency[pair.file_b].add(pair.file_a)
1630 # Find connected components
1631 visited: set[str] = set()
1632 clusters: list[list[str]] = []
1634 for start in adjacency:
1635 if start in visited:
1636 continue
1637 # BFS to find component
1638 cluster: list[str] = []
1639 queue = [start]
1640 while queue:
1641 node = queue.pop(0)
1642 if node in visited:
1643 continue
1644 visited.add(node)
1645 cluster.append(node)
1646 for neighbor in adjacency.get(node, set()):
1647 if neighbor not in visited:
1648 queue.append(neighbor)
1650 if len(cluster) >= 2: # Only include clusters with 2+ files
1651 cluster.sort()
1652 clusters.append(cluster)
1654 # Sort clusters by size descending
1655 clusters.sort(key=lambda c: -len(c))
1656 return clusters
1659def analyze_regression_clustering(
1660 issues: list[CompletedIssue],
1661) -> RegressionAnalysis:
1662 """Detect files where bug fixes frequently lead to new bugs.
1664 Uses heuristics:
1665 1. Temporal proximity: Bug B completed within 7 days of Bug A
1666 2. File overlap: Both bugs affect same file(s)
1668 Args:
1669 issues: List of completed issues
1671 Returns:
1672 RegressionAnalysis with clusters of related regressions
1673 """
1674 # Filter to bugs only and sort by completion date
1675 bugs = [i for i in issues if i.issue_type == "BUG" and i.completed_date]
1676 bugs.sort(key=lambda i: i.completed_date) # type: ignore
1678 if len(bugs) < 2:
1679 return RegressionAnalysis()
1681 # Extract file paths for each bug
1682 bug_files: dict[str, set[str]] = {} # issue_id -> set of files
1683 for bug in bugs:
1684 try:
1685 content = bug.path.read_text(encoding="utf-8")
1686 paths = _extract_paths_from_issue(content)
1687 bug_files[bug.issue_id] = set(paths)
1688 except Exception:
1689 bug_files[bug.issue_id] = set()
1691 # Find regression pairs (temporal proximity + file overlap)
1692 regression_pairs: list[tuple[CompletedIssue, CompletedIssue, set[str]]] = []
1694 for i, bug_a in enumerate(bugs[:-1]):
1695 files_a = bug_files.get(bug_a.issue_id, set())
1696 if not files_a:
1697 continue
1699 for bug_b in bugs[i + 1 :]:
1700 # Check temporal proximity (within 7 days)
1701 days_apart = (bug_b.completed_date - bug_a.completed_date).days # type: ignore
1702 if days_apart > 7:
1703 break # Bugs are sorted, no need to check further
1705 files_b = bug_files.get(bug_b.issue_id, set())
1706 if not files_b:
1707 continue
1709 # Check file overlap
1710 overlap = files_a & files_b
1711 if overlap:
1712 regression_pairs.append((bug_a, bug_b, overlap))
1714 if not regression_pairs:
1715 return RegressionAnalysis()
1717 # Group by primary file (most common overlapping file)
1718 file_regressions: dict[str, list[tuple[str, str, int]]] = {} # file -> [(id_a, id_b, days)]
1720 for bug_a, bug_b, overlap in regression_pairs:
1721 days = (bug_b.completed_date - bug_a.completed_date).days # type: ignore
1722 for file_path in overlap:
1723 if file_path not in file_regressions:
1724 file_regressions[file_path] = []
1725 file_regressions[file_path].append((bug_a.issue_id, bug_b.issue_id, days))
1727 # Build clusters
1728 clusters: list[RegressionCluster] = []
1730 for file_path, pairs in file_regressions.items():
1731 # Determine time pattern
1732 avg_days = sum(d for _, _, d in pairs) / len(pairs)
1733 if avg_days < 3:
1734 time_pattern = "immediate"
1735 elif len(pairs) >= 3:
1736 time_pattern = "chronic"
1737 else:
1738 time_pattern = "delayed"
1740 # Determine severity
1741 if len(pairs) >= 4:
1742 severity = "critical"
1743 elif len(pairs) >= 2:
1744 severity = "high"
1745 else:
1746 severity = "medium"
1748 # Collect related files
1749 related_files: set[str] = set()
1750 for bug_a, bug_b, _ in regression_pairs:
1751 if file_path in (
1752 bug_files.get(bug_a.issue_id, set()) & bug_files.get(bug_b.issue_id, set())
1753 ):
1754 related_files.update(bug_files.get(bug_a.issue_id, set()))
1755 related_files.update(bug_files.get(bug_b.issue_id, set()))
1756 related_files.discard(file_path)
1758 clusters.append(
1759 RegressionCluster(
1760 primary_file=file_path,
1761 regression_count=len(pairs),
1762 fix_bug_pairs=[(a, b) for a, b, _ in pairs],
1763 related_files=sorted(related_files),
1764 time_pattern=time_pattern,
1765 severity=severity,
1766 )
1767 )
1769 # Sort by regression count descending
1770 clusters.sort(key=lambda c: (-c.regression_count, c.primary_file))
1772 # Identify most fragile files
1773 most_fragile = [c.primary_file for c in clusters[:5]]
1775 return RegressionAnalysis(
1776 clusters=clusters[:10], # Top 10
1777 total_regression_chains=len(regression_pairs),
1778 most_fragile_files=most_fragile,
1779 )
1782def analyze_test_gaps(
1783 issues: list[CompletedIssue],
1784 hotspots: HotspotAnalysis,
1785) -> TestGapAnalysis:
1786 """Correlate bug occurrences with test coverage gaps.
1788 Args:
1789 issues: List of completed issues (unused, for API consistency)
1790 hotspots: Pre-computed hotspot analysis
1792 Returns:
1793 TestGapAnalysis with test coverage gap information
1794 """
1795 # Build map of source files to bug info from hotspots
1796 bug_files: dict[str, dict[str, Any]] = {}
1798 for hotspot in hotspots.file_hotspots:
1799 bug_count = hotspot.issue_types.get("BUG", 0)
1800 if bug_count > 0:
1801 # Filter to only BUG issue IDs
1802 bug_ids = [iid for iid in hotspot.issue_ids if iid.startswith("BUG-")]
1803 bug_files[hotspot.path] = {
1804 "bug_count": bug_count,
1805 "bug_ids": bug_ids,
1806 }
1808 if not bug_files:
1809 return TestGapAnalysis()
1811 # Analyze test coverage for each file with bugs
1812 gaps: list[TestGap] = []
1813 files_with_tests: list[int] = [] # bug counts
1814 files_without_tests: list[int] = [] # bug counts
1816 for source_file, data in bug_files.items():
1817 bug_count = data["bug_count"]
1818 bug_ids = data["bug_ids"]
1820 test_file = _find_test_file(source_file)
1821 has_test = test_file is not None
1823 # Calculate gap score: higher = more urgent to add tests
1824 # Files without tests get amplified scores
1825 if has_test:
1826 gap_score = bug_count * 1.0
1827 files_with_tests.append(bug_count)
1828 else:
1829 gap_score = bug_count * 10.0 # Amplify untested files
1830 files_without_tests.append(bug_count)
1832 # Determine priority based on bug count and test presence
1833 if not has_test and bug_count >= 5:
1834 priority = "critical"
1835 elif not has_test and bug_count >= 3:
1836 priority = "high"
1837 elif not has_test or bug_count >= 4:
1838 priority = "medium"
1839 else:
1840 priority = "low"
1842 gaps.append(
1843 TestGap(
1844 source_file=source_file,
1845 bug_count=bug_count,
1846 bug_ids=bug_ids,
1847 has_test_file=has_test,
1848 test_file_path=test_file,
1849 gap_score=gap_score,
1850 priority=priority,
1851 )
1852 )
1854 # Sort by gap score descending (highest priority first)
1855 gaps.sort(key=lambda g: (-g.gap_score, -g.bug_count))
1857 # Calculate averages for correlation
1858 avg_with_tests = sum(files_with_tests) / len(files_with_tests) if files_with_tests else 0.0
1859 avg_without_tests = (
1860 sum(files_without_tests) / len(files_without_tests) if files_without_tests else 0.0
1861 )
1863 # Identify untested bug magnets (from hotspot analysis)
1864 untested_magnets = [h.path for h in hotspots.bug_magnets if _find_test_file(h.path) is None]
1866 # Priority test targets: untested files sorted by bug count
1867 priority_targets = [g.source_file for g in gaps if not g.has_test_file]
1869 return TestGapAnalysis(
1870 gaps=gaps[:15], # Top 15
1871 untested_bug_magnets=untested_magnets,
1872 files_with_tests_avg_bugs=avg_with_tests,
1873 files_without_tests_avg_bugs=avg_without_tests,
1874 priority_test_targets=priority_targets[:10],
1875 )
1878def analyze_rejection_rates(issues: list[CompletedIssue]) -> RejectionAnalysis:
1879 """Analyze rejection and invalid closure patterns.
1881 Args:
1882 issues: List of completed issues
1884 Returns:
1885 RejectionAnalysis with overall and grouped metrics
1886 """
1887 if not issues:
1888 return RejectionAnalysis()
1890 # Count by category
1891 overall = RejectionMetrics()
1892 by_type: dict[str, RejectionMetrics] = {}
1893 by_month: dict[str, RejectionMetrics] = {}
1894 reason_counts: dict[str, int] = {}
1896 for issue in issues:
1897 try:
1898 content = issue.path.read_text(encoding="utf-8")
1899 except Exception:
1900 continue
1902 category = _parse_resolution_action(content)
1903 overall.total_closed += 1
1905 # Update overall counts
1906 if category == "completed":
1907 overall.completed_count += 1
1908 elif category == "rejected":
1909 overall.rejected_count += 1
1910 elif category == "invalid":
1911 overall.invalid_count += 1
1912 elif category == "duplicate":
1913 overall.duplicate_count += 1
1914 elif category == "deferred":
1915 overall.deferred_count += 1
1917 # By type
1918 if issue.issue_type not in by_type:
1919 by_type[issue.issue_type] = RejectionMetrics()
1920 type_metrics = by_type[issue.issue_type]
1921 type_metrics.total_closed += 1
1922 if category == "rejected":
1923 type_metrics.rejected_count += 1
1924 elif category == "invalid":
1925 type_metrics.invalid_count += 1
1926 elif category == "duplicate":
1927 type_metrics.duplicate_count += 1
1928 elif category == "deferred":
1929 type_metrics.deferred_count += 1
1930 elif category == "completed":
1931 type_metrics.completed_count += 1
1933 # By month
1934 if issue.completed_date:
1935 month_key = issue.completed_date.strftime("%Y-%m")
1936 if month_key not in by_month:
1937 by_month[month_key] = RejectionMetrics()
1938 month_metrics = by_month[month_key]
1939 month_metrics.total_closed += 1
1940 if category == "rejected":
1941 month_metrics.rejected_count += 1
1942 elif category == "invalid":
1943 month_metrics.invalid_count += 1
1944 elif category == "duplicate":
1945 month_metrics.duplicate_count += 1
1946 elif category == "deferred":
1947 month_metrics.deferred_count += 1
1948 elif category == "completed":
1949 month_metrics.completed_count += 1
1951 # Extract reason for rejection/invalid
1952 if category in ("rejected", "invalid", "duplicate", "deferred"):
1953 reason_match = re.search(r"\*\*Reason\*\*:\s*(.+?)(?:\n|$)", content)
1954 if reason_match:
1955 reason = reason_match.group(1).strip()
1956 reason_counts[reason] = reason_counts.get(reason, 0) + 1
1958 # Calculate trend from monthly data
1959 sorted_months = sorted(by_month.keys())
1960 if len(sorted_months) >= 3:
1961 recent = sorted_months[-3:]
1962 rates = [by_month[m].rejection_rate + by_month[m].invalid_rate for m in recent]
1963 if rates[-1] < rates[0] * 0.8:
1964 trend = "improving"
1965 elif rates[-1] > rates[0] * 1.2:
1966 trend = "degrading"
1967 else:
1968 trend = "stable"
1969 else:
1970 trend = "stable"
1972 # Sort reasons by count
1973 common_reasons = sorted(reason_counts.items(), key=lambda x: -x[1])[:10]
1975 return RejectionAnalysis(
1976 overall=overall,
1977 by_type=by_type,
1978 by_month=by_month,
1979 common_reasons=common_reasons,
1980 trend=trend,
1981 )
1984# Pattern definitions for manual activity detection
1985_MANUAL_PATTERNS: dict[str, dict[str, Any]] = {
1986 "test": {
1987 "patterns": [
1988 r"(?:pytest|python -m pytest|npm test|yarn test|jest|cargo test|go test)",
1989 r"(?:python -m unittest|nosetests|tox)",
1990 ],
1991 "description": "Test execution after code changes",
1992 "suggestion": "Add post-edit hook for automatic test runs",
1993 "complexity": "trivial",
1994 },
1995 "lint": {
1996 "patterns": [
1997 r"(?:ruff check|ruff format|black|isort|flake8|pylint)",
1998 r"(?:eslint|prettier|tslint)",
1999 ],
2000 "description": "Lint/format fixes after implementation",
2001 "suggestion": "Add pre-commit hook for auto-formatting",
2002 "complexity": "simple",
2003 },
2004 "type_check": {
2005 "patterns": [
2006 r"(?:mypy|pyright|python -m mypy)",
2007 r"(?:tsc|npx tsc)",
2008 ],
2009 "description": "Type checking during development",
2010 "suggestion": "Add mypy to pre-commit or post-edit hook",
2011 "complexity": "simple",
2012 },
2013 "build": {
2014 "patterns": [
2015 r"(?:npm run build|yarn build|make|cargo build|go build)",
2016 r"(?:python -m build|pip install -e)",
2017 ],
2018 "description": "Build steps during implementation",
2019 "suggestion": "Add build verification to test suite or CI",
2020 "complexity": "moderate",
2021 },
2022 "git": {
2023 "patterns": [
2024 r"git (?:add|commit|push|pull|checkout|branch)",
2025 ],
2026 "description": "Git operations during issue resolution",
2027 "suggestion": "Use /ll:commit skill for standardized commits",
2028 "complexity": "trivial",
2029 },
2030}
2032# Cross-cutting concern keywords for smell detection
2033_CROSS_CUTTING_KEYWORDS: dict[str, list[str]] = {
2034 "logging": ["log", "logger", "logging", "debug", "trace", "print"],
2035 "error-handling": ["error", "exception", "try", "catch", "raise", "except", "fail"],
2036 "validation": ["valid", "validate", "check", "assert", "verify", "sanitize"],
2037 "auth": ["auth", "permission", "role", "access", "token", "credential", "login"],
2038 "caching": ["cache", "memo", "memoize", "store", "ttl", "expire", "cached"],
2039}
2041# Suggested patterns for each cross-cutting concern type
2042_CONCERN_PATTERNS: dict[str, str] = {
2043 "logging": "decorator",
2044 "error-handling": "middleware",
2045 "validation": "decorator",
2046 "auth": "middleware",
2047 "caching": "decorator",
2048}
2051def detect_manual_patterns(issues: list[CompletedIssue]) -> ManualPatternAnalysis:
2052 """Detect recurring manual activities that could be automated.
2054 Args:
2055 issues: List of completed issues
2057 Returns:
2058 ManualPatternAnalysis with detected patterns
2059 """
2060 if not issues:
2061 return ManualPatternAnalysis()
2063 # Track pattern occurrences
2064 pattern_data: dict[str, dict[str, Any]] = {}
2066 for pattern_type, config in _MANUAL_PATTERNS.items():
2067 pattern_data[pattern_type] = {
2068 "count": 0,
2069 "issues": [],
2070 "commands": [],
2071 "config": config,
2072 }
2074 # Scan issue content for patterns
2075 for issue in issues:
2076 try:
2077 content = issue.path.read_text(encoding="utf-8")
2078 except Exception:
2079 continue
2081 for pattern_type, config in _MANUAL_PATTERNS.items():
2082 for pattern in config["patterns"]:
2083 matches = re.findall(pattern, content, re.IGNORECASE)
2084 if matches:
2085 data = pattern_data[pattern_type]
2086 data["count"] += len(matches)
2087 if issue.issue_id not in data["issues"]:
2088 data["issues"].append(issue.issue_id)
2089 # Store unique command examples
2090 for match in matches:
2091 if match not in data["commands"]:
2092 data["commands"].append(match)
2094 # Build ManualPattern objects
2095 patterns: list[ManualPattern] = []
2096 total_interventions = 0
2097 automatable = 0
2099 for pattern_type, data in pattern_data.items():
2100 if data["count"] > 0:
2101 config = data["config"]
2102 pattern = ManualPattern(
2103 pattern_type=pattern_type,
2104 pattern_description=config["description"],
2105 occurrence_count=data["count"],
2106 affected_issues=data["issues"],
2107 example_commands=data["commands"][:5],
2108 suggested_automation=config["suggestion"],
2109 automation_complexity=config["complexity"],
2110 )
2111 patterns.append(pattern)
2112 total_interventions += data["count"]
2113 automatable += data["count"]
2115 # Sort by occurrence count descending
2116 patterns.sort(key=lambda p: -p.occurrence_count)
2118 # Build automation suggestions
2119 suggestions = [p.suggested_automation for p in patterns if p.occurrence_count >= 2]
2121 return ManualPatternAnalysis(
2122 patterns=patterns,
2123 total_manual_interventions=total_interventions,
2124 automatable_count=automatable,
2125 automation_suggestions=suggestions[:10],
2126 )
2129def detect_cross_cutting_smells(
2130 issues: list[CompletedIssue],
2131 hotspots: HotspotAnalysis,
2132) -> CrossCuttingAnalysis:
2133 """Detect cross-cutting concerns scattered across the codebase.
2135 Identifies when issues consistently touch multiple unrelated directories,
2136 suggesting missing abstractions for cross-cutting concerns like logging,
2137 error handling, or validation.
2139 Args:
2140 issues: List of completed issues
2141 hotspots: Hotspot analysis results (provides directory reference)
2143 Returns:
2144 CrossCuttingAnalysis with detected smells
2145 """
2146 if not issues:
2147 return CrossCuttingAnalysis()
2149 # Track concern data: {concern_type: {dirs: set, issues: list}}
2150 concern_data: dict[str, dict[str, Any]] = {}
2151 for concern_type in _CROSS_CUTTING_KEYWORDS:
2152 concern_data[concern_type] = {
2153 "directories": set(),
2154 "issue_ids": [],
2155 }
2157 # Get all unique directories from hotspots for scatter score calculation
2158 all_directories: set[str] = set()
2159 if hotspots.directory_hotspots:
2160 all_directories = {h.path for h in hotspots.directory_hotspots}
2162 # Analyze each issue
2163 for issue in issues:
2164 try:
2165 content = issue.path.read_text(encoding="utf-8")
2166 content_lower = content.lower()
2167 except Exception:
2168 continue
2170 # Extract paths from this issue
2171 paths = _extract_paths_from_issue(content)
2172 issue_dirs = {str(Path(p).parent) for p in paths if "/" in p or "\\" in p}
2173 all_directories.update(issue_dirs)
2175 # Check if this issue touches multiple directories (3+)
2176 if len(issue_dirs) < 3:
2177 continue
2179 # Check for concern keywords
2180 for concern_type, keywords in _CROSS_CUTTING_KEYWORDS.items():
2181 if any(kw in content_lower for kw in keywords):
2182 concern_data[concern_type]["directories"].update(issue_dirs)
2183 if issue.issue_id not in concern_data[concern_type]["issue_ids"]:
2184 concern_data[concern_type]["issue_ids"].append(issue.issue_id)
2186 # Build CrossCuttingSmell objects
2187 smells: list[CrossCuttingSmell] = []
2188 total_dirs = len(all_directories) if all_directories else 1
2190 for concern_type, data in concern_data.items():
2191 if data["issue_ids"]: # Only include concerns with detected issues
2192 dirs = sorted(data["directories"])
2193 scatter_score = len(dirs) / total_dirs if total_dirs > 0 else 0.0
2195 smell = CrossCuttingSmell(
2196 concern_type=concern_type,
2197 affected_directories=dirs,
2198 issue_count=len(data["issue_ids"]),
2199 issue_ids=data["issue_ids"],
2200 scatter_score=scatter_score,
2201 suggested_pattern=_CONCERN_PATTERNS.get(concern_type, "aspect"),
2202 )
2203 smells.append(smell)
2205 # Sort by scatter score descending
2206 smells.sort(key=lambda s: -s.scatter_score)
2208 # Identify most scattered concern
2209 most_scattered = smells[0].concern_type if smells else ""
2211 # Build consolidation opportunities
2212 consolidation_opportunities = []
2213 for smell in smells:
2214 if smell.scatter_score >= 0.3: # Threshold for suggesting consolidation
2215 consolidation_opportunities.append(
2216 f"Centralize {smell.concern_type} ({smell.issue_count} issues would benefit)"
2217 )
2219 return CrossCuttingAnalysis(
2220 smells=smells,
2221 most_scattered_concern=most_scattered,
2222 consolidation_opportunities=consolidation_opportunities[:10],
2223 )
2226# Mapping from manual pattern types to configuration solutions
2227_PATTERN_TO_CONFIG: dict[str, dict[str, Any]] = {
2228 "test": {
2229 "hook_event": "PostToolUse",
2230 "description": "Automatic test execution after code changes",
2231 "suggested_config": """hooks/hooks.json:
2232 "PostToolUse": [{
2233 "matcher": "Edit|Write",
2234 "hooks": [{
2235 "type": "command",
2236 "command": "pytest tests/ -x -q",
2237 "timeout": 30000
2238 }]
2239 }]""",
2240 },
2241 "lint": {
2242 "hook_event": "PreToolUse",
2243 "description": "Automatic formatting before file writes",
2244 "suggested_config": """hooks/hooks.json:
2245 "PreToolUse": [{
2246 "matcher": "Write|Edit",
2247 "hooks": [{
2248 "type": "command",
2249 "command": "ruff format --check .",
2250 "timeout": 10000
2251 }]
2252 }]""",
2253 },
2254 "type_check": {
2255 "hook_event": "PostToolUse",
2256 "description": "Type checking after code modifications",
2257 "suggested_config": """hooks/hooks.json:
2258 "PostToolUse": [{
2259 "matcher": "Edit|Write",
2260 "hooks": [{
2261 "type": "command",
2262 "command": "mypy --fast .",
2263 "timeout": 30000
2264 }]
2265 }]""",
2266 },
2267 "build": {
2268 "hook_event": "PostToolUse",
2269 "description": "Build verification after changes",
2270 "suggested_config": """hooks/hooks.json:
2271 "PostToolUse": [{
2272 "matcher": "Edit|Write",
2273 "hooks": [{
2274 "type": "command",
2275 "command": "npm run build",
2276 "timeout": 60000
2277 }]
2278 }]""",
2279 },
2280}
2283def detect_config_gaps(
2284 manual_pattern_analysis: ManualPatternAnalysis,
2285 project_root: Path | None = None,
2286) -> ConfigGapsAnalysis:
2287 """Detect configuration gaps based on manual pattern analysis.
2289 Args:
2290 manual_pattern_analysis: Results from detect_manual_patterns()
2291 project_root: Project root directory (defaults to cwd)
2293 Returns:
2294 ConfigGapsAnalysis with identified gaps and coverage metrics
2295 """
2296 if project_root is None:
2297 project_root = Path.cwd()
2299 # Discover current configuration
2300 current_hooks: list[str] = []
2301 current_skills: list[str] = []
2302 current_agents: list[str] = []
2304 # Load hooks configuration
2305 hooks_file = project_root / "hooks" / "hooks.json"
2306 if hooks_file.exists():
2307 try:
2308 with open(hooks_file, encoding="utf-8") as f:
2309 hooks_data = json.load(f)
2310 current_hooks = list(hooks_data.get("hooks", {}).keys())
2311 except Exception:
2312 pass
2314 # Scan for agents
2315 agents_dir = project_root / "agents"
2316 if agents_dir.is_dir():
2317 for agent_file in agents_dir.glob("*.md"):
2318 current_agents.append(agent_file.stem)
2320 # Scan for skills
2321 skills_dir = project_root / "skills"
2322 if skills_dir.is_dir():
2323 for skill_dir in skills_dir.iterdir():
2324 if skill_dir.is_dir() and (skill_dir / "SKILL.md").exists():
2325 current_skills.append(skill_dir.name)
2327 # Identify gaps from manual patterns
2328 gaps: list[ConfigGap] = []
2329 covered_patterns = 0
2330 recognized_patterns = 0
2332 for pattern in manual_pattern_analysis.patterns:
2333 config_mapping = _PATTERN_TO_CONFIG.get(pattern.pattern_type)
2334 if not config_mapping:
2335 continue
2337 recognized_patterns += 1
2338 hook_event = config_mapping["hook_event"]
2340 # Check if hook event is already configured
2341 if hook_event in current_hooks:
2342 covered_patterns += 1
2343 continue
2345 # Determine priority based on occurrence count
2346 if pattern.occurrence_count >= 10:
2347 priority = "high"
2348 elif pattern.occurrence_count >= 5:
2349 priority = "medium"
2350 else:
2351 priority = "low"
2353 gap = ConfigGap(
2354 gap_type="hook",
2355 description=config_mapping["description"],
2356 evidence=pattern.affected_issues,
2357 suggested_config=config_mapping["suggested_config"],
2358 priority=priority,
2359 pattern_type=pattern.pattern_type,
2360 )
2361 gaps.append(gap)
2363 # Calculate coverage score based on recognized patterns only
2364 coverage_score = covered_patterns / recognized_patterns if recognized_patterns > 0 else 1.0
2366 # Sort gaps by priority (high first)
2367 priority_order = {"high": 0, "medium": 1, "low": 2}
2368 gaps.sort(key=lambda g: priority_order.get(g.priority, 3))
2370 return ConfigGapsAnalysis(
2371 gaps=gaps,
2372 current_hooks=current_hooks,
2373 current_skills=current_skills,
2374 current_agents=current_agents,
2375 coverage_score=coverage_score,
2376 )
2379def analyze_agent_effectiveness(issues: list[CompletedIssue]) -> AgentEffectivenessAnalysis:
2380 """Analyze agent effectiveness across issue types.
2382 Groups issues by processing agent and issue type, calculating
2383 success/failure/rejection rates for each combination.
2385 Args:
2386 issues: List of completed issues
2388 Returns:
2389 AgentEffectivenessAnalysis with outcomes and recommendations
2390 """
2391 if not issues:
2392 return AgentEffectivenessAnalysis()
2394 # Track outcomes by (agent, issue_type)
2395 outcomes_map: dict[tuple[str, str], AgentOutcome] = {}
2397 for issue in issues:
2398 try:
2399 content = issue.path.read_text(encoding="utf-8")
2400 except Exception:
2401 continue
2403 # Detect agent (discovered_by may contain source info in some cases)
2404 agent = _detect_processing_agent(content, issue.discovered_by)
2406 # Get resolution outcome
2407 resolution = _parse_resolution_action(content)
2409 # Get or create outcome tracker
2410 key = (agent, issue.issue_type)
2411 if key not in outcomes_map:
2412 outcomes_map[key] = AgentOutcome(
2413 agent_name=agent,
2414 issue_type=issue.issue_type,
2415 )
2417 outcome = outcomes_map[key]
2419 # Categorize outcome
2420 if resolution == "completed":
2421 outcome.success_count += 1
2422 elif resolution in ("rejected", "invalid", "duplicate"):
2423 outcome.rejection_count += 1
2424 else: # deferred or other
2425 outcome.failure_count += 1
2427 # Build outcomes list
2428 outcomes = list(outcomes_map.values())
2430 # Determine best agent per issue type
2431 best_agent_by_type: dict[str, str] = {}
2432 type_agents: dict[str, list[AgentOutcome]] = {}
2434 for outcome in outcomes:
2435 if outcome.issue_type not in type_agents:
2436 type_agents[outcome.issue_type] = []
2437 type_agents[outcome.issue_type].append(outcome)
2439 for issue_type, agent_outcomes in type_agents.items():
2440 # Require minimum sample size
2441 significant_outcomes = [o for o in agent_outcomes if o.total_count >= 3]
2442 if significant_outcomes:
2443 best = max(significant_outcomes, key=lambda o: o.success_rate)
2444 best_agent_by_type[issue_type] = best.agent_name
2446 # Identify problematic combinations (success rate < 50% with >= 5 samples)
2447 problematic_combinations: list[tuple[str, str, str]] = []
2448 for outcome in outcomes:
2449 if outcome.total_count >= 5 and outcome.success_rate < 0.5:
2450 reason = (
2451 f"{outcome.success_rate * 100:.0f}% success "
2452 f"({outcome.success_count}/{outcome.total_count})"
2453 )
2454 problematic_combinations.append((outcome.agent_name, outcome.issue_type, reason))
2456 # Sort by success rate ascending (worst first)
2457 problematic_combinations.sort(key=lambda x: float(x[2].split("%")[0]))
2459 return AgentEffectivenessAnalysis(
2460 outcomes=sorted(outcomes, key=lambda o: (o.agent_name, o.issue_type)),
2461 best_agent_by_type=best_agent_by_type,
2462 problematic_combinations=problematic_combinations,
2463 )
2466def scan_active_issues(issues_dir: Path) -> list[tuple[Path, str, str, date | None]]:
2467 """Scan active issue directories.
2469 Args:
2470 issues_dir: Path to .issues/ directory
2472 Returns:
2473 List of (path, issue_type, priority, discovered_date) tuples
2474 """
2475 results: list[tuple[Path, str, str, date | None]] = []
2477 for category_dir in ["bugs", "features", "enhancements"]:
2478 category_path = issues_dir / category_dir
2479 if not category_path.exists():
2480 continue
2482 for file_path in category_path.glob("*.md"):
2483 filename = file_path.name
2485 # Extract priority
2486 priority = "P5"
2487 priority_match = re.match(r"^(P\d)", filename)
2488 if priority_match:
2489 priority = priority_match.group(1)
2491 # Extract type
2492 issue_type = "UNKNOWN"
2493 type_match = re.search(r"(BUG|ENH|FEAT)", filename)
2494 if type_match:
2495 issue_type = type_match.group(1)
2497 # Extract discovered date from content
2498 discovered_date = None
2499 try:
2500 content = file_path.read_text(encoding="utf-8")
2501 discovered_date = _parse_discovered_date(content)
2502 except Exception:
2503 pass
2505 results.append((file_path, issue_type, priority, discovered_date))
2507 return results
2510def analyze_complexity_proxy(
2511 issues: list[CompletedIssue],
2512 hotspots: HotspotAnalysis,
2513) -> ComplexityProxyAnalysis:
2514 """Use issue duration as proxy for code complexity.
2516 Areas that consistently take longer to resolve suggest higher complexity,
2517 insufficient documentation, or accumulated technical debt.
2519 Args:
2520 issues: List of completed issues with dates
2521 hotspots: Pre-computed hotspot analysis for path information
2523 Returns:
2524 ComplexityProxyAnalysis with duration-based complexity metrics
2525 """
2526 # Calculate durations for all issues with both dates
2527 issue_durations: dict[str, float] = {} # issue_id -> days
2528 for issue in issues:
2529 if issue.discovered_date and issue.completed_date:
2530 delta = issue.completed_date - issue.discovered_date
2531 days = float(delta.days)
2532 if days >= 0: # Sanity check
2533 issue_durations[issue.issue_id] = days
2535 if not issue_durations:
2536 return ComplexityProxyAnalysis()
2538 # Calculate baseline (median duration)
2539 all_durations = sorted(issue_durations.values())
2540 n = len(all_durations)
2541 if n % 2 == 0:
2542 baseline_days = (all_durations[n // 2 - 1] + all_durations[n // 2]) / 2
2543 else:
2544 baseline_days = all_durations[n // 2]
2546 if baseline_days == 0:
2547 baseline_days = 1.0 # Avoid division by zero
2549 # Map issues to their affected files by reading issue content
2550 issue_to_files: dict[str, list[str]] = {}
2551 for issue in issues:
2552 if issue.issue_id in issue_durations:
2553 try:
2554 content = issue.path.read_text(encoding="utf-8")
2555 paths = _extract_paths_from_issue(content)
2556 if paths:
2557 issue_to_files[issue.issue_id] = paths
2558 except Exception:
2559 continue
2561 # Aggregate durations by file
2562 file_durations: dict[str, list[tuple[str, float]]] = {} # path -> [(issue_id, days), ...]
2563 for issue_id, files in issue_to_files.items():
2564 days = issue_durations[issue_id]
2565 for f in files:
2566 if f not in file_durations:
2567 file_durations[f] = []
2568 file_durations[f].append((issue_id, days))
2570 # Aggregate durations by directory
2571 dir_durations: dict[str, list[tuple[str, float]]] = {}
2572 for path, entries in file_durations.items():
2573 dir_path = "/".join(path.split("/")[:-1]) + "/" if "/" in path else "./"
2574 if dir_path not in dir_durations:
2575 dir_durations[dir_path] = []
2576 dir_durations[dir_path].extend(entries)
2578 # Build file complexity proxies
2579 file_complexity: list[ComplexityProxy] = []
2580 for path, entries in file_durations.items():
2581 if len(entries) < 2: # Need at least 2 data points
2582 continue
2584 durations = [d for _, d in entries]
2585 avg = sum(durations) / len(durations)
2586 sorted_d = sorted(durations)
2587 median = sorted_d[len(sorted_d) // 2]
2588 slowest = max(entries, key=lambda x: x[1])
2590 # Normalize complexity score (0-1 based on how much slower than baseline)
2591 ratio = avg / baseline_days
2592 complexity_score = min(1.0, (ratio - 1) / 4) # 5x slower = 1.0
2593 complexity_score = max(0.0, complexity_score)
2595 comparison = f"{ratio:.1f}x baseline" if ratio >= 1.5 else "near baseline"
2597 file_complexity.append(
2598 ComplexityProxy(
2599 path=path,
2600 avg_resolution_days=avg,
2601 median_resolution_days=median,
2602 issue_count=len(entries),
2603 slowest_issue=slowest,
2604 complexity_score=complexity_score,
2605 comparison_to_baseline=comparison,
2606 )
2607 )
2609 # Build directory complexity proxies
2610 directory_complexity: list[ComplexityProxy] = []
2611 for dir_path, entries in dir_durations.items():
2612 if len(entries) < 3: # Need at least 3 data points for directories
2613 continue
2615 # Deduplicate by issue_id for directory-level stats
2616 unique_entries: dict[str, float] = {}
2617 for issue_id, days in entries:
2618 if issue_id not in unique_entries or days > unique_entries[issue_id]:
2619 unique_entries[issue_id] = days
2621 entries_list = list(unique_entries.items())
2622 durations = list(unique_entries.values())
2623 avg = sum(durations) / len(durations)
2624 sorted_d = sorted(durations)
2625 median = sorted_d[len(sorted_d) // 2]
2626 slowest = max(entries_list, key=lambda x: x[1])
2628 ratio = avg / baseline_days
2629 complexity_score = min(1.0, (ratio - 1) / 4)
2630 complexity_score = max(0.0, complexity_score)
2632 comparison = f"{ratio:.1f}x baseline" if ratio >= 1.5 else "near baseline"
2634 directory_complexity.append(
2635 ComplexityProxy(
2636 path=dir_path,
2637 avg_resolution_days=avg,
2638 median_resolution_days=median,
2639 issue_count=len(unique_entries),
2640 slowest_issue=slowest,
2641 complexity_score=complexity_score,
2642 comparison_to_baseline=comparison,
2643 )
2644 )
2646 # Sort by complexity score descending
2647 file_complexity.sort(key=lambda c: -c.complexity_score)
2648 directory_complexity.sort(key=lambda c: -c.complexity_score)
2650 # Identify outliers (>2x baseline)
2651 complexity_outliers = [
2652 c.path for c in file_complexity if c.avg_resolution_days > baseline_days * 2
2653 ]
2655 return ComplexityProxyAnalysis(
2656 file_complexity=file_complexity[:10],
2657 directory_complexity=directory_complexity[:10],
2658 baseline_days=baseline_days,
2659 complexity_outliers=complexity_outliers[:10],
2660 )
2663def _calculate_debt_metrics(
2664 completed_issues: list[CompletedIssue],
2665 active_issues: list[tuple[Path, str, str, date | None]],
2666) -> TechnicalDebtMetrics:
2667 """Calculate technical debt health metrics.
2669 Args:
2670 completed_issues: List of completed issues
2671 active_issues: List of active issue tuples
2673 Returns:
2674 TechnicalDebtMetrics with calculated values
2675 """
2676 today = date.today()
2677 metrics = TechnicalDebtMetrics()
2679 # Backlog size
2680 metrics.backlog_size = len(active_issues)
2682 # Count aging and high priority
2683 for _path, _issue_type, priority, discovered_date in active_issues:
2684 if priority in ("P0", "P1"):
2685 metrics.high_priority_open += 1
2687 if discovered_date:
2688 age = (today - discovered_date).days
2689 if age >= 30:
2690 metrics.aging_30_plus += 1
2691 if age >= 60:
2692 metrics.aging_60_plus += 1
2694 # Calculate backlog growth rate (issues per week)
2695 # Look at last 4 weeks of completions vs creations
2696 four_weeks_ago = today - timedelta(days=28)
2698 completed_recently = sum(
2699 1 for i in completed_issues if i.completed_date and i.completed_date >= four_weeks_ago
2700 )
2702 created_recently = sum(1 for _, _, _, d in active_issues if d and d >= four_weeks_ago)
2704 # Net change per week
2705 if completed_recently > 0 or created_recently > 0:
2706 metrics.backlog_growth_rate = (created_recently - completed_recently) / 4.0
2708 # Debt paydown ratio (bug fixes vs features)
2709 bug_count = sum(1 for i in completed_issues if i.issue_type == "BUG")
2710 feat_count = sum(1 for i in completed_issues if i.issue_type == "FEAT")
2712 if feat_count > 0:
2713 metrics.debt_paydown_ratio = bug_count / feat_count
2714 elif bug_count > 0:
2715 metrics.debt_paydown_ratio = float(bug_count) # All maintenance
2717 return metrics
2720def calculate_analysis(
2721 completed_issues: list[CompletedIssue],
2722 issues_dir: Path | None = None,
2723 period_type: Literal["weekly", "monthly", "quarterly"] = "monthly",
2724 compare_days: int | None = None,
2725 project_root: Path | None = None,
2726) -> HistoryAnalysis:
2727 """Calculate comprehensive history analysis.
2729 Args:
2730 completed_issues: List of completed issues
2731 issues_dir: Path to .issues/ for active issue scanning
2732 period_type: Grouping period for trend analysis
2733 compare_days: Days for comparative analysis (e.g., 30 for 30d comparison)
2734 project_root: Project root for config gap analysis (defaults to cwd)
2736 Returns:
2737 HistoryAnalysis with all metrics
2738 """
2739 today = date.today()
2741 # Get base summary
2742 summary = calculate_summary(completed_issues)
2744 # Scan active issues if directory provided
2745 active_issues: list[tuple[Path, str, str, date | None]] = []
2746 if issues_dir:
2747 active_issues = scan_active_issues(issues_dir)
2749 # Calculate period metrics
2750 period_metrics = _group_by_period(completed_issues, period_type)
2752 # Determine velocity trend
2753 if len(period_metrics) >= 3:
2754 velocities = [float(p.total_completed) for p in period_metrics]
2755 velocity_trend = _calculate_trend(velocities)
2756 else:
2757 velocity_trend = "stable"
2759 # Determine bug ratio trend
2760 if len(period_metrics) >= 3:
2761 bug_ratios = [p.bug_ratio or 0.0 for p in period_metrics]
2762 # For bug ratio, decreasing is good (keep as-is)
2763 bug_ratio_trend = _calculate_trend(bug_ratios)
2764 else:
2765 bug_ratio_trend = "stable"
2767 # Subsystem health
2768 subsystem_health = _analyze_subsystems(completed_issues)
2770 # Hotspot analysis
2771 hotspot_analysis = analyze_hotspots(completed_issues)
2773 # Coupling analysis
2774 coupling_analysis = analyze_coupling(completed_issues)
2776 # Regression clustering analysis
2777 regression_analysis = analyze_regression_clustering(completed_issues)
2779 # Test gap analysis
2780 test_gap_analysis = analyze_test_gaps(completed_issues, hotspot_analysis)
2782 # Rejection rate analysis
2783 rejection_analysis = analyze_rejection_rates(completed_issues)
2785 # Manual pattern analysis
2786 manual_pattern_analysis = detect_manual_patterns(completed_issues)
2788 # Agent effectiveness analysis
2789 agent_effectiveness_analysis = analyze_agent_effectiveness(completed_issues)
2791 # Complexity proxy analysis
2792 complexity_proxy_analysis = analyze_complexity_proxy(completed_issues, hotspot_analysis)
2794 # Configuration gaps analysis (depends on manual_pattern_analysis)
2795 config_gaps_analysis = detect_config_gaps(manual_pattern_analysis, project_root)
2797 # Cross-cutting concern analysis (depends on hotspot_analysis)
2798 cross_cutting_analysis = detect_cross_cutting_smells(completed_issues, hotspot_analysis)
2800 # Technical debt metrics
2801 debt_metrics = _calculate_debt_metrics(completed_issues, active_issues)
2803 # Build analysis
2804 analysis = HistoryAnalysis(
2805 generated_date=today,
2806 total_completed=len(completed_issues),
2807 total_active=len(active_issues),
2808 date_range_start=summary.earliest_date,
2809 date_range_end=summary.latest_date,
2810 summary=summary,
2811 period_metrics=period_metrics,
2812 velocity_trend=velocity_trend,
2813 bug_ratio_trend=bug_ratio_trend,
2814 subsystem_health=subsystem_health,
2815 hotspot_analysis=hotspot_analysis,
2816 coupling_analysis=coupling_analysis,
2817 regression_analysis=regression_analysis,
2818 test_gap_analysis=test_gap_analysis,
2819 rejection_analysis=rejection_analysis,
2820 manual_pattern_analysis=manual_pattern_analysis,
2821 agent_effectiveness_analysis=agent_effectiveness_analysis,
2822 complexity_proxy_analysis=complexity_proxy_analysis,
2823 config_gaps_analysis=config_gaps_analysis,
2824 cross_cutting_analysis=cross_cutting_analysis,
2825 debt_metrics=debt_metrics,
2826 )
2828 # Comparative analysis
2829 if compare_days:
2830 analysis.comparison_period = f"{compare_days}d"
2831 cutoff = today - timedelta(days=compare_days)
2832 prev_cutoff = cutoff - timedelta(days=compare_days)
2834 current_issues = [
2835 i for i in completed_issues if i.completed_date and i.completed_date >= cutoff
2836 ]
2837 previous_issues = [
2838 i
2839 for i in completed_issues
2840 if i.completed_date and prev_cutoff <= i.completed_date < cutoff
2841 ]
2843 if current_issues:
2844 current_types: dict[str, int] = {}
2845 for i in current_issues:
2846 current_types[i.issue_type] = current_types.get(i.issue_type, 0) + 1
2848 analysis.current_period = PeriodMetrics(
2849 period_start=cutoff,
2850 period_end=today,
2851 period_label=f"Last {compare_days} days",
2852 total_completed=len(current_issues),
2853 type_counts=current_types,
2854 )
2856 if previous_issues:
2857 prev_types: dict[str, int] = {}
2858 for i in previous_issues:
2859 prev_types[i.issue_type] = prev_types.get(i.issue_type, 0) + 1
2861 analysis.previous_period = PeriodMetrics(
2862 period_start=prev_cutoff,
2863 period_end=cutoff - timedelta(days=1),
2864 period_label=f"Previous {compare_days} days",
2865 total_completed=len(previous_issues),
2866 type_counts=prev_types,
2867 )
2869 return analysis
2872# =============================================================================
2873# Analysis Formatting Functions (FEAT-110)
2874# =============================================================================
2877def format_analysis_json(analysis: HistoryAnalysis) -> str:
2878 """Format analysis as JSON.
2880 Args:
2881 analysis: HistoryAnalysis to format
2883 Returns:
2884 JSON string
2885 """
2886 return json.dumps(analysis.to_dict(), indent=2)
2889def format_analysis_yaml(analysis: HistoryAnalysis) -> str:
2890 """Format analysis as YAML.
2892 Args:
2893 analysis: HistoryAnalysis to format
2895 Returns:
2896 YAML string (falls back to JSON if yaml not available)
2897 """
2898 try:
2899 import yaml
2901 return yaml.dump(analysis.to_dict(), default_flow_style=False, sort_keys=False)
2902 except ImportError:
2903 # Fallback to JSON if yaml not available
2904 return format_analysis_json(analysis)
2907def format_analysis_text(analysis: HistoryAnalysis) -> str:
2908 """Format analysis as human-readable text.
2910 Args:
2911 analysis: HistoryAnalysis to format
2913 Returns:
2914 Formatted text string
2915 """
2916 lines: list[str] = []
2918 lines.append("Issue History Analysis")
2919 lines.append("=" * 22)
2920 lines.append(f"Generated: {analysis.generated_date}")
2921 lines.append(f"Completed: {analysis.total_completed} | Active: {analysis.total_active}")
2923 if analysis.date_range_start and analysis.date_range_end:
2924 lines.append(f"Date Range: {analysis.date_range_start} to {analysis.date_range_end}")
2926 # Summary
2927 lines.append("")
2928 lines.append("Summary")
2929 lines.append("-" * 7)
2930 summary = analysis.summary
2931 if summary.velocity:
2932 lines.append(f"Velocity: {summary.velocity:.2f} issues/day")
2933 lines.append(f"Velocity Trend: {analysis.velocity_trend}")
2934 lines.append(f"Bug Ratio Trend: {analysis.bug_ratio_trend}")
2936 # Type distribution
2937 lines.append("")
2938 lines.append("By Type:")
2939 total = analysis.total_completed or 1
2940 for issue_type, count in summary.type_counts.items():
2941 pct = count * 100 // total
2942 lines.append(f" {issue_type:5}: {count:3} ({pct:2}%)")
2944 # Period metrics
2945 if analysis.period_metrics:
2946 lines.append("")
2947 lines.append("Period Metrics")
2948 lines.append("-" * 14)
2949 for period in analysis.period_metrics[-6:]: # Last 6 periods
2950 bug_pct = f"{period.bug_ratio * 100:.0f}%" if period.bug_ratio else "N/A"
2951 lines.append(
2952 f" {period.period_label:12}: {period.total_completed:3} completed, {bug_pct} bugs"
2953 )
2955 # Subsystem health
2956 if analysis.subsystem_health:
2957 lines.append("")
2958 lines.append("Subsystem Health")
2959 lines.append("-" * 16)
2960 for sub in analysis.subsystem_health[:5]:
2961 trend_symbol = {"improving": "↓", "degrading": "↑", "stable": "→"}.get(sub.trend, "?")
2962 lines.append(
2963 f" {sub.subsystem:30}: {sub.total_issues:3} total, "
2964 f"{sub.recent_issues:2} recent {trend_symbol}"
2965 )
2967 # Hotspot analysis
2968 if analysis.hotspot_analysis:
2969 hotspots = analysis.hotspot_analysis
2971 if hotspots.file_hotspots:
2972 lines.append("")
2973 lines.append("File Hotspots")
2974 lines.append("-" * 13)
2975 for h in hotspots.file_hotspots[:5]:
2976 types_str = ", ".join(f"{k}:{v}" for k, v in sorted(h.issue_types.items()))
2977 churn_flag = " [HIGH CHURN]" if h.churn_indicator == "high" else ""
2978 lines.append(f" {h.path:40}: {h.issue_count:2} issues ({types_str}){churn_flag}")
2980 if hotspots.bug_magnets:
2981 lines.append("")
2982 lines.append("Bug Magnets (>60% bugs)")
2983 lines.append("-" * 23)
2984 for h in hotspots.bug_magnets:
2985 lines.append(
2986 f" {h.path}: {h.bug_ratio * 100:.0f}% bugs "
2987 f"({h.issue_types.get('BUG', 0)}/{h.issue_count})"
2988 )
2990 # Coupling analysis
2991 if analysis.coupling_analysis:
2992 coupling = analysis.coupling_analysis
2994 if coupling.pairs:
2995 lines.append("")
2996 lines.append("Coupling Detection")
2997 lines.append("-" * 18)
2999 lines.append("Highly Coupled File Pairs:")
3000 for i, p in enumerate(coupling.pairs[:5], 1):
3001 strength_label = (
3002 "HIGH"
3003 if p.coupling_strength >= 0.7
3004 else "MEDIUM"
3005 if p.coupling_strength >= 0.5
3006 else "LOW"
3007 )
3008 lines.append(f" {i}. {p.file_a} <-> {p.file_b}")
3009 lines.append(
3010 f" Co-occurrences: {p.co_occurrence_count}, "
3011 f"Strength: {p.coupling_strength:.2f} [{strength_label}]"
3012 )
3014 if coupling.clusters:
3015 lines.append("")
3016 lines.append("Coupling Clusters:")
3017 for i, cluster in enumerate(coupling.clusters[:3], 1):
3018 files_str = ", ".join(cluster[:4])
3019 if len(cluster) > 4:
3020 files_str += f" (+{len(cluster) - 4} more)"
3021 lines.append(f" {i}. [{files_str}]")
3023 if coupling.hotspots:
3024 lines.append("")
3025 lines.append("Coupling Hotspots (coupled with 3+ files):")
3026 for f in coupling.hotspots[:5]:
3027 lines.append(f" - {f}")
3029 # Regression clustering analysis
3030 if analysis.regression_analysis:
3031 regression = analysis.regression_analysis
3033 if regression.clusters:
3034 lines.append("")
3035 lines.append("Regression Clustering")
3036 lines.append("-" * 20)
3037 lines.append(f"Total regression chains detected: {regression.total_regression_chains}")
3038 lines.append("")
3039 lines.append("Fragile Code Clusters:")
3040 for i, c in enumerate(regression.clusters[:5], 1):
3041 severity_flag = (
3042 f" [{c.severity.upper()}]" if c.severity in ("critical", "high") else ""
3043 )
3044 lines.append(f" {i}. {c.primary_file}{severity_flag}")
3045 lines.append(f" Regression count: {c.regression_count}")
3046 lines.append(f" Pattern: {c.time_pattern}")
3047 if c.fix_bug_pairs:
3048 chain = " -> ".join(f"{a} fix -> {b}" for a, b in c.fix_bug_pairs[:3])
3049 if len(c.fix_bug_pairs) > 3:
3050 chain += " ..."
3051 lines.append(f" Chain: {chain}")
3053 # Test gap analysis
3054 if analysis.test_gap_analysis:
3055 tga = analysis.test_gap_analysis
3057 if tga.gaps:
3058 lines.append("")
3059 lines.append("Test Gap Correlation")
3060 lines.append("-" * 20)
3062 # Show correlation stats
3063 lines.append(f" Files with tests: avg {tga.files_with_tests_avg_bugs:.1f} bugs")
3064 lines.append(f" Files without tests: avg {tga.files_without_tests_avg_bugs:.1f} bugs")
3065 lines.append("")
3067 # Show critical gaps
3068 critical_gaps = [g for g in tga.gaps if g.priority in ("critical", "high")]
3069 if critical_gaps:
3070 lines.append("Critical Test Gaps:")
3071 for g in critical_gaps[:5]:
3072 test_status = "NO TEST" if not g.has_test_file else g.test_file_path
3073 lines.append(f" {g.source_file} [{g.priority.upper()}]")
3074 bug_ids_str = ", ".join(g.bug_ids[:3])
3075 lines.append(f" Bugs: {g.bug_count} ({bug_ids_str})")
3076 lines.append(f" Test: {test_status}")
3078 if tga.priority_test_targets:
3079 lines.append("")
3080 lines.append("Priority Test Targets:")
3081 for i, target in enumerate(tga.priority_test_targets[:5], 1):
3082 lines.append(f" {i}. {target}")
3084 # Rejection analysis
3085 if analysis.rejection_analysis:
3086 rej = analysis.rejection_analysis
3087 overall = rej.overall
3089 if overall.total_closed > 0:
3090 lines.append("")
3091 lines.append("Rejection Analysis")
3092 lines.append("-" * 18)
3093 lines.append(
3094 f" Overall rejection rate: {overall.rejection_rate * 100:.1f}% "
3095 f"({overall.rejected_count}/{overall.total_closed})"
3096 )
3097 lines.append(
3098 f" Invalid rate: {overall.invalid_rate * 100:.1f}% "
3099 f"({overall.invalid_count}/{overall.total_closed})"
3100 )
3101 if overall.duplicate_count > 0:
3102 lines.append(f" Duplicates: {overall.duplicate_count}")
3103 if overall.deferred_count > 0:
3104 lines.append(f" Deferred: {overall.deferred_count}")
3106 # By type
3107 if rej.by_type:
3108 lines.append("")
3109 lines.append(" By Type:")
3110 for issue_type in sorted(rej.by_type.keys()):
3111 metrics = rej.by_type[issue_type]
3112 rate = metrics.rejection_rate + metrics.invalid_rate
3113 lines.append(f" {issue_type:5}: {rate * 100:.1f}% non-completion")
3115 # Trend
3116 if rej.by_month:
3117 sorted_months = sorted(rej.by_month.keys())[-6:]
3118 if len(sorted_months) >= 2:
3119 lines.append("")
3120 lines.append(" Trend (last 6 months):")
3121 trend_parts = []
3122 for month in sorted_months:
3123 m = rej.by_month[month]
3124 rate = (m.rejection_rate + m.invalid_rate) * 100
3125 trend_parts.append(f"{month[-2:]}: {rate:.0f}%")
3126 lines.append(f" {', '.join(trend_parts)}")
3127 trend_symbol = {"improving": "↓", "degrading": "↑", "stable": "→"}.get(
3128 rej.trend, "→"
3129 )
3130 lines.append(f" Direction: {rej.trend} {trend_symbol}")
3132 # Common reasons
3133 if rej.common_reasons:
3134 lines.append("")
3135 lines.append(" Common Rejection Reasons:")
3136 for reason, count in rej.common_reasons[:5]:
3137 lines.append(f' - "{reason}" ({count})')
3139 # Manual pattern analysis
3140 if analysis.manual_pattern_analysis:
3141 mpa = analysis.manual_pattern_analysis
3143 if mpa.patterns:
3144 lines.append("")
3145 lines.append("Manual Pattern Analysis")
3146 lines.append("-" * 23)
3147 lines.append(f" Total manual interventions: {mpa.total_manual_interventions}")
3148 lines.append(
3149 f" Potentially automatable: {mpa.automatable_percentage:.0f}% "
3150 f"({mpa.automatable_count}/{mpa.total_manual_interventions})"
3151 )
3152 lines.append("")
3153 lines.append(" Recurring Patterns:")
3155 for i, pattern in enumerate(mpa.patterns[:5], 1):
3156 lines.append("")
3157 lines.append(
3158 f" {i}. {pattern.pattern_description} ({pattern.occurrence_count} occurrences)"
3159 )
3160 issues_str = ", ".join(pattern.affected_issues[:3])
3161 if len(pattern.affected_issues) > 3:
3162 issues_str += ", ..."
3163 lines.append(f" Issues: {issues_str}")
3164 lines.append(f" Suggestion: {pattern.suggested_automation}")
3165 lines.append(f" Complexity: {pattern.automation_complexity}")
3167 # Configuration gaps analysis
3168 if analysis.config_gaps_analysis:
3169 cga = analysis.config_gaps_analysis
3171 lines.append("")
3172 lines.append("Configuration Gaps Analysis")
3173 lines.append("-" * 27)
3174 lines.append(f" Coverage score: {cga.coverage_score * 100:.0f}%")
3175 lines.append(f" Current hooks: {', '.join(cga.current_hooks) or 'none'}")
3176 lines.append(f" Current skills: {len(cga.current_skills)}")
3177 lines.append(f" Current agents: {len(cga.current_agents)}")
3179 if cga.gaps:
3180 lines.append("")
3181 lines.append(" Identified Gaps:")
3183 for i, gap in enumerate(cga.gaps[:5], 1):
3184 lines.append("")
3185 lines.append(f" {i}. Missing: {gap.gap_type} for {gap.description}")
3186 lines.append(f" Priority: {gap.priority}")
3187 issues_str = ", ".join(gap.evidence[:3])
3188 if len(gap.evidence) > 3:
3189 issues_str += ", ..."
3190 lines.append(f" Evidence: {issues_str}")
3191 if gap.suggested_config:
3192 lines.append(" Suggested config:")
3193 for config_line in gap.suggested_config.split("\n")[:4]:
3194 lines.append(f" {config_line}")
3196 # Agent effectiveness analysis
3197 if analysis.agent_effectiveness_analysis:
3198 aea = analysis.agent_effectiveness_analysis
3200 if aea.outcomes:
3201 lines.append("")
3202 lines.append("Agent Effectiveness Analysis")
3203 lines.append("-" * 28)
3205 # Group by agent
3206 by_agent: dict[str, list[AgentOutcome]] = {}
3207 for outcome in aea.outcomes:
3208 if outcome.agent_name not in by_agent:
3209 by_agent[outcome.agent_name] = []
3210 by_agent[outcome.agent_name].append(outcome)
3212 for agent in sorted(by_agent.keys()):
3213 lines.append(f" {agent}:")
3214 for outcome in sorted(by_agent[agent], key=lambda o: o.issue_type):
3215 rate_pct = outcome.success_rate * 100
3216 flag = " [!]" if outcome.total_count >= 5 and rate_pct < 50 else ""
3217 lines.append(
3218 f" {outcome.issue_type:5}: {rate_pct:5.1f}% success "
3219 f"({outcome.success_count}/{outcome.total_count}){flag}"
3220 )
3222 # Recommendations
3223 if aea.best_agent_by_type or aea.problematic_combinations:
3224 lines.append("")
3225 lines.append(" Recommendations:")
3226 for issue_type, best_agent in sorted(aea.best_agent_by_type.items()):
3227 lines.append(f" - {issue_type}: best handled by {best_agent}")
3228 for agent, issue_type, reason in aea.problematic_combinations[:3]:
3229 lines.append(f" - {agent} underperforms for {issue_type} ({reason})")
3231 # Complexity proxy analysis
3232 if analysis.complexity_proxy_analysis:
3233 cpa = analysis.complexity_proxy_analysis
3235 lines.append("")
3236 lines.append("Complexity Proxy Analysis")
3237 lines.append("-" * 25)
3238 lines.append(f" Baseline resolution time: {cpa.baseline_days:.1f} days (median)")
3240 if cpa.file_complexity:
3241 lines.append("")
3242 lines.append(" High Complexity Files (by resolution time):")
3243 for i, cp in enumerate(cpa.file_complexity[:5], 1):
3244 score_label = (
3245 "HIGH"
3246 if cp.complexity_score >= 0.7
3247 else "MEDIUM"
3248 if cp.complexity_score >= 0.4
3249 else "LOW"
3250 )
3251 lines.append(f" {i}. {cp.path}")
3252 lines.append(
3253 f" Avg: {cp.avg_resolution_days:.1f} days ({cp.comparison_to_baseline})"
3254 )
3255 lines.append(
3256 f" Median: {cp.median_resolution_days:.1f} days, Issues: {cp.issue_count}"
3257 )
3258 lines.append(
3259 f" Slowest: {cp.slowest_issue[0]} ({cp.slowest_issue[1]:.1f} days)"
3260 )
3261 lines.append(f" Complexity score: {cp.complexity_score:.2f} [{score_label}]")
3263 if cpa.directory_complexity:
3264 lines.append("")
3265 lines.append(" High Complexity Directories:")
3266 for cp in cpa.directory_complexity[:5]:
3267 lines.append(
3268 f" {cp.path}: avg {cp.avg_resolution_days:.1f} days ({cp.comparison_to_baseline})"
3269 )
3271 if cpa.complexity_outliers:
3272 lines.append("")
3273 lines.append(" Complexity Outliers (>2x baseline):")
3274 for path in cpa.complexity_outliers[:5]:
3275 lines.append(f" - {path}")
3277 # Cross-cutting concern analysis
3278 if analysis.cross_cutting_analysis:
3279 cca = analysis.cross_cutting_analysis
3281 if cca.smells:
3282 lines.append("")
3283 lines.append("Cross-Cutting Concern Analysis")
3284 lines.append("-" * 30)
3286 for i, smell in enumerate(cca.smells[:5], 1):
3287 scatter_label = (
3288 "HIGH"
3289 if smell.scatter_score >= 0.6
3290 else "MEDIUM"
3291 if smell.scatter_score >= 0.3
3292 else "LOW"
3293 )
3294 lines.append("")
3295 lines.append(f" {i}. {smell.concern_type.title()} [{scatter_label} SCATTER]")
3296 dirs_str = ", ".join(smell.affected_directories[:3])
3297 if len(smell.affected_directories) > 3:
3298 dirs_str += ", ..."
3299 lines.append(f" Directories: {dirs_str}")
3300 issues_str = ", ".join(smell.issue_ids[:3])
3301 if len(smell.issue_ids) > 3:
3302 issues_str += ", ..."
3303 lines.append(f" Issues: {issues_str} ({smell.issue_count} total)")
3304 lines.append(f" Scatter score: {smell.scatter_score:.2f}")
3305 lines.append(f" Suggested pattern: {smell.suggested_pattern}")
3307 if cca.consolidation_opportunities:
3308 lines.append("")
3309 lines.append(" Consolidation Opportunities:")
3310 for opp in cca.consolidation_opportunities[:5]:
3311 lines.append(f" - {opp}")
3313 # Technical debt
3314 if analysis.debt_metrics:
3315 lines.append("")
3316 lines.append("Technical Debt")
3317 lines.append("-" * 14)
3318 debt = analysis.debt_metrics
3319 lines.append(f" Backlog Size: {debt.backlog_size}")
3320 lines.append(f" Growth Rate: {debt.backlog_growth_rate:+.1f} issues/week")
3321 lines.append(f" High Priority Open (P0-P1): {debt.high_priority_open}")
3322 lines.append(f" Aging >30 days: {debt.aging_30_plus}")
3324 # Comparison
3325 if analysis.comparison_period and analysis.current_period and analysis.previous_period:
3326 lines.append("")
3327 lines.append(f"Comparison ({analysis.comparison_period})")
3328 lines.append("-" * 20)
3329 curr = analysis.current_period
3330 prev = analysis.previous_period
3332 if prev.total_completed > 0:
3333 change = (curr.total_completed - prev.total_completed) / prev.total_completed * 100
3334 lines.append(
3335 f" Completed: {prev.total_completed} -> {curr.total_completed} ({change:+.0f}%)"
3336 )
3337 else:
3338 lines.append(f" Completed: {prev.total_completed} -> {curr.total_completed}")
3340 return "\n".join(lines)
3343def format_analysis_markdown(analysis: HistoryAnalysis) -> str:
3344 """Format analysis as Markdown report.
3346 Args:
3347 analysis: HistoryAnalysis to format
3349 Returns:
3350 Markdown string
3351 """
3352 lines: list[str] = []
3354 lines.append("# Issue History Analysis Report")
3355 lines.append("")
3356 lines.append(
3357 f"**Generated**: {analysis.generated_date} | "
3358 f"**Total Completed**: {analysis.total_completed} | "
3359 f"**Active Issues**: {analysis.total_active}"
3360 )
3362 if analysis.date_range_start and analysis.date_range_end:
3363 lines.append(f"**Date Range**: {analysis.date_range_start} to {analysis.date_range_end}")
3365 # Executive Summary
3366 lines.append("")
3367 lines.append("## Executive Summary")
3368 lines.append("")
3369 lines.append("| Metric | Value | Trend |")
3370 lines.append("|--------|-------|-------|")
3372 velocity = f"{analysis.summary.velocity:.2f}/day" if analysis.summary.velocity else "N/A"
3373 velocity_symbol = {"increasing": "↑", "decreasing": "↓", "stable": "→"}.get(
3374 analysis.velocity_trend, ""
3375 )
3376 lines.append(f"| Velocity | {velocity} | {velocity_symbol} {analysis.velocity_trend} |")
3378 bug_count = analysis.summary.type_counts.get("BUG", 0)
3379 total = analysis.total_completed or 1
3380 bug_pct = bug_count * 100 // total
3381 bug_symbol = {"increasing": "↑ ⚠️", "decreasing": "↓ ✓", "stable": "→"}.get(
3382 analysis.bug_ratio_trend, ""
3383 )
3384 lines.append(f"| Bug Ratio | {bug_pct}% | {bug_symbol} |")
3386 if analysis.debt_metrics:
3387 growth = analysis.debt_metrics.backlog_growth_rate
3388 growth_status = "↓ ✓" if growth < 0 else ("→" if growth == 0 else "↑ ⚠️")
3389 lines.append(f"| Backlog Growth | {growth:+.1f}/week | {growth_status} |")
3391 # Type Distribution
3392 lines.append("")
3393 lines.append("## Type Distribution")
3394 lines.append("")
3395 lines.append("| Type | Count | Percentage |")
3396 lines.append("|------|-------|------------|")
3397 for issue_type, count in analysis.summary.type_counts.items():
3398 pct = count * 100 // total
3399 lines.append(f"| {issue_type} | {count} | {pct}% |")
3401 # Period Trends
3402 if analysis.period_metrics:
3403 lines.append("")
3404 lines.append("## Period Trends")
3405 lines.append("")
3406 lines.append("| Period | Completed | Bug % |")
3407 lines.append("|--------|-----------|-------|")
3408 for period in analysis.period_metrics[-8:]: # Last 8
3409 bug_pct_str = f"{period.bug_ratio * 100:.0f}%" if period.bug_ratio else "N/A"
3410 lines.append(f"| {period.period_label} | {period.total_completed} | {bug_pct_str} |")
3412 # Subsystem Health
3413 if analysis.subsystem_health:
3414 lines.append("")
3415 lines.append("## Subsystem Health")
3416 lines.append("")
3417 lines.append("| Subsystem | Total | Recent (30d) | Trend |")
3418 lines.append("|-----------|-------|--------------|-------|")
3419 for sub in analysis.subsystem_health:
3420 trend_symbol = {"improving": "↓ ✓", "degrading": "↑ ⚠️", "stable": "→"}.get(
3421 sub.trend, ""
3422 )
3423 lines.append(
3424 f"| `{sub.subsystem}` | {sub.total_issues} | {sub.recent_issues} | {trend_symbol} |"
3425 )
3427 # Hotspot Analysis
3428 if analysis.hotspot_analysis:
3429 hotspots = analysis.hotspot_analysis
3431 if hotspots.file_hotspots:
3432 lines.append("")
3433 lines.append("## File Hotspots")
3434 lines.append("")
3435 lines.append("| File | Issues | Types | Churn |")
3436 lines.append("|------|--------|-------|-------|")
3437 for h in hotspots.file_hotspots:
3438 types_str = ", ".join(f"{k}:{v}" for k, v in sorted(h.issue_types.items()))
3439 churn_badge = (
3440 "🔥"
3441 if h.churn_indicator == "high"
3442 else ("⚡" if h.churn_indicator == "medium" else "")
3443 )
3444 lines.append(f"| `{h.path}` | {h.issue_count} | {types_str} | {churn_badge} |")
3446 if hotspots.directory_hotspots:
3447 lines.append("")
3448 lines.append("## Directory Hotspots")
3449 lines.append("")
3450 lines.append("| Directory | Issues | Types |")
3451 lines.append("|-----------|--------|-------|")
3452 for h in hotspots.directory_hotspots[:5]:
3453 types_str = ", ".join(f"{k}:{v}" for k, v in sorted(h.issue_types.items()))
3454 lines.append(f"| `{h.path}` | {h.issue_count} | {types_str} |")
3456 if hotspots.bug_magnets:
3457 lines.append("")
3458 lines.append("## Bug Magnets")
3459 lines.append("")
3460 lines.append("Files with >60% bug ratio that may need refactoring attention:")
3461 lines.append("")
3462 lines.append("| File | Bug Ratio | Bugs/Total |")
3463 lines.append("|------|-----------|------------|")
3464 for h in hotspots.bug_magnets:
3465 lines.append(
3466 f"| `{h.path}` | {h.bug_ratio * 100:.0f}% | "
3467 f"{h.issue_types.get('BUG', 0)}/{h.issue_count} |"
3468 )
3470 # Coupling Analysis
3471 if analysis.coupling_analysis:
3472 coupling = analysis.coupling_analysis
3474 if coupling.pairs:
3475 lines.append("")
3476 lines.append("## Coupling Detection")
3477 lines.append("")
3478 lines.append("Files that frequently change together across issues:")
3479 lines.append("")
3480 lines.append("| File A | File B | Co-occurrences | Strength |")
3481 lines.append("|--------|--------|----------------|----------|")
3482 for p in coupling.pairs[:10]:
3483 strength_badge = (
3484 "🔴"
3485 if p.coupling_strength >= 0.7
3486 else ("🟠" if p.coupling_strength >= 0.5 else "🟡")
3487 )
3488 lines.append(
3489 f"| `{p.file_a}` | `{p.file_b}` | {p.co_occurrence_count} | "
3490 f"{p.coupling_strength:.2f} {strength_badge} |"
3491 )
3493 if coupling.clusters:
3494 lines.append("")
3495 lines.append("### Coupling Clusters")
3496 lines.append("")
3497 lines.append("Groups of tightly coupled files (consider consolidating):")
3498 lines.append("")
3499 for i, cluster in enumerate(coupling.clusters[:5], 1):
3500 files_str = ", ".join(f"`{f}`" for f in cluster[:5])
3501 if len(cluster) > 5:
3502 files_str += f" (+{len(cluster) - 5} more)"
3503 lines.append(f"{i}. {files_str}")
3505 if coupling.hotspots:
3506 lines.append("")
3507 lines.append("### Coupling Hotspots")
3508 lines.append("")
3509 lines.append("Files coupled with 3+ other files (potential abstraction candidates):")
3510 lines.append("")
3511 for f in coupling.hotspots[:5]:
3512 lines.append(f"- `{f}`")
3514 # Regression Clustering Analysis
3515 if analysis.regression_analysis:
3516 regression = analysis.regression_analysis
3518 if regression.clusters:
3519 lines.append("")
3520 lines.append("## Regression Clustering")
3521 lines.append("")
3522 lines.append(
3523 f"**Total regression chains detected**: {regression.total_regression_chains}"
3524 )
3525 lines.append("")
3526 lines.append("Files where fixes frequently lead to new bugs:")
3527 lines.append("")
3528 lines.append("| File | Regressions | Pattern | Severity |")
3529 lines.append("|------|-------------|---------|----------|")
3530 for c in regression.clusters:
3531 severity_badge = (
3532 "🔴" if c.severity == "critical" else ("🟠" if c.severity == "high" else "🟡")
3533 )
3534 lines.append(
3535 f"| `{c.primary_file}` | {c.regression_count} | "
3536 f"{c.time_pattern} | {severity_badge} |"
3537 )
3539 if regression.most_fragile_files:
3540 lines.append("")
3541 lines.append("### Most Fragile Files")
3542 lines.append("")
3543 lines.append("Files requiring architectural attention:")
3544 lines.append("")
3545 for f in regression.most_fragile_files:
3546 lines.append(f"- `{f}`")
3548 # Test Gap Analysis
3549 if analysis.test_gap_analysis:
3550 tga = analysis.test_gap_analysis
3552 if tga.gaps:
3553 lines.append("")
3554 lines.append("## Test Gap Correlation")
3555 lines.append("")
3556 lines.append("Correlating bug occurrences with test coverage gaps:")
3557 lines.append("")
3558 lines.append("| Metric | Value |")
3559 lines.append("|--------|-------|")
3560 lines.append(f"| Files with tests | avg {tga.files_with_tests_avg_bugs:.1f} bugs |")
3561 lines.append(
3562 f"| Files without tests | avg {tga.files_without_tests_avg_bugs:.1f} bugs |"
3563 )
3564 lines.append("")
3566 # Critical gaps table
3567 critical_gaps = [g for g in tga.gaps if g.priority in ("critical", "high")]
3568 if critical_gaps:
3569 lines.append("### Critical Test Gaps")
3570 lines.append("")
3571 lines.append("Files with high bug counts but missing tests:")
3572 lines.append("")
3573 lines.append("| File | Bugs | Priority | Test Status | Action |")
3574 lines.append("|------|------|----------|-------------|--------|")
3575 for g in critical_gaps[:10]:
3576 priority_badge = "🔴" if g.priority == "critical" else "🟠"
3577 test_status = f"`{g.test_file_path}`" if g.has_test_file else "NONE"
3578 action = "Review coverage" if g.has_test_file else "Create test file"
3579 lines.append(
3580 f"| `{g.source_file}` | {g.bug_count} | {priority_badge} | "
3581 f"{test_status} | {action} |"
3582 )
3584 if tga.priority_test_targets:
3585 lines.append("")
3586 lines.append("### Priority Test Targets")
3587 lines.append("")
3588 lines.append("Files recommended for new test creation (ordered by bug count):")
3589 lines.append("")
3590 for target in tga.priority_test_targets[:10]:
3591 lines.append(f"- `{target}`")
3593 # Rejection Analysis
3594 if analysis.rejection_analysis:
3595 rej = analysis.rejection_analysis
3596 overall = rej.overall
3598 if overall.total_closed > 0:
3599 lines.append("")
3600 lines.append("## Rejection Analysis")
3601 lines.append("")
3602 lines.append(
3603 f"**Overall rejection rate**: {overall.rejection_rate * 100:.1f}% "
3604 f"({overall.rejected_count}/{overall.total_closed})"
3605 )
3606 lines.append(
3607 f"**Invalid rate**: {overall.invalid_rate * 100:.1f}% "
3608 f"({overall.invalid_count}/{overall.total_closed})"
3609 )
3610 lines.append("")
3612 # By type table
3613 if rej.by_type:
3614 lines.append("### By Issue Type")
3615 lines.append("")
3616 lines.append("| Type | Rejected | Invalid | Total | Rate |")
3617 lines.append("|------|----------|---------|-------|------|")
3618 for issue_type in sorted(rej.by_type.keys()):
3619 m = rej.by_type[issue_type]
3620 rate = (m.rejection_rate + m.invalid_rate) * 100
3621 lines.append(
3622 f"| {issue_type} | {m.rejected_count} | {m.invalid_count} | "
3623 f"{m.total_closed} | {rate:.1f}% |"
3624 )
3625 lines.append("")
3627 # Trend
3628 if rej.by_month and len(rej.by_month) >= 2:
3629 lines.append("### Trend")
3630 lines.append("")
3631 sorted_months = sorted(rej.by_month.keys())[-6:]
3632 trend_parts = []
3633 for month in sorted_months:
3634 m = rej.by_month[month]
3635 rate = (m.rejection_rate + m.invalid_rate) * 100
3636 trend_parts.append(f"{month}: {rate:.0f}%")
3637 lines.append(" → ".join(trend_parts))
3638 lines.append(f"*Trend: {rej.trend}*")
3639 lines.append("")
3641 # Common reasons
3642 if rej.common_reasons:
3643 lines.append("### Common Rejection Reasons")
3644 lines.append("")
3645 for reason, count in rej.common_reasons[:5]:
3646 lines.append(f'- "{reason}" ({count})')
3648 # Manual Pattern Analysis
3649 if analysis.manual_pattern_analysis:
3650 mpa = analysis.manual_pattern_analysis
3652 if mpa.patterns:
3653 lines.append("")
3654 lines.append("## Manual Pattern Analysis")
3655 lines.append("")
3656 lines.append(
3657 f"**Total manual interventions detected**: {mpa.total_manual_interventions}"
3658 )
3659 lines.append(
3660 f"**Potentially automatable**: {mpa.automatable_percentage:.0f}% "
3661 f"({mpa.automatable_count}/{mpa.total_manual_interventions})"
3662 )
3663 lines.append("")
3664 lines.append("### Recurring Patterns")
3665 lines.append("")
3666 lines.append("| Pattern | Occurrences | Affected Issues | Suggestion | Complexity |")
3667 lines.append("|---------|-------------|-----------------|------------|------------|")
3669 for pattern in mpa.patterns[:10]:
3670 issues_str = ", ".join(pattern.affected_issues[:3])
3671 if len(pattern.affected_issues) > 3:
3672 issues_str += "..."
3673 lines.append(
3674 f"| {pattern.pattern_description} | {pattern.occurrence_count} | "
3675 f"{issues_str} | {pattern.suggested_automation} | "
3676 f"{pattern.automation_complexity} |"
3677 )
3679 if mpa.automation_suggestions:
3680 lines.append("")
3681 lines.append("### Automation Suggestions")
3682 lines.append("")
3683 lines.append("Based on detected patterns, consider implementing:")
3684 lines.append("")
3685 for suggestion in mpa.automation_suggestions[:5]:
3686 lines.append(f"- {suggestion}")
3688 # Configuration Gaps Analysis
3689 if analysis.config_gaps_analysis:
3690 cga = analysis.config_gaps_analysis
3692 lines.append("")
3693 lines.append("## Configuration Gaps Analysis")
3694 lines.append("")
3695 lines.append(f"**Coverage score**: {cga.coverage_score * 100:.0f}%")
3696 lines.append("")
3697 lines.append("### Current Configuration")
3698 lines.append("")
3699 lines.append(f"- **Hooks**: {', '.join(cga.current_hooks) or 'none'}")
3700 lines.append(f"- **Skills**: {len(cga.current_skills)}")
3701 lines.append(f"- **Agents**: {len(cga.current_agents)}")
3703 if cga.gaps:
3704 lines.append("")
3705 lines.append("### Identified Gaps")
3706 lines.append("")
3707 lines.append("| Priority | Type | Description | Evidence |")
3708 lines.append("|----------|------|-------------|----------|")
3710 for gap in cga.gaps[:10]:
3711 issues_str = ", ".join(gap.evidence[:3])
3712 if len(gap.evidence) > 3:
3713 issues_str += "..."
3714 lines.append(
3715 f"| {gap.priority} | {gap.gap_type} | {gap.description} | {issues_str} |"
3716 )
3718 lines.append("")
3719 lines.append("### Suggested Configurations")
3720 lines.append("")
3721 for i, gap in enumerate(cga.gaps[:5], 1):
3722 if gap.suggested_config:
3723 lines.append(f"**{i}. {gap.description}**")
3724 lines.append("")
3725 lines.append("```json")
3726 lines.append(gap.suggested_config)
3727 lines.append("```")
3728 lines.append("")
3730 # Agent Effectiveness Analysis
3731 if analysis.agent_effectiveness_analysis:
3732 aea = analysis.agent_effectiveness_analysis
3734 if aea.outcomes:
3735 lines.append("")
3736 lines.append("## Agent Effectiveness Analysis")
3737 lines.append("")
3738 lines.append("| Agent | Type | Success Rate | Completed | Rejected | Failed |")
3739 lines.append("|-------|------|--------------|-----------|----------|--------|")
3741 for outcome in sorted(aea.outcomes, key=lambda o: (o.agent_name, o.issue_type)):
3742 rate_pct = outcome.success_rate * 100
3743 flag = " ⚠️" if outcome.total_count >= 5 and rate_pct < 50 else ""
3744 lines.append(
3745 f"| {outcome.agent_name} | {outcome.issue_type} | "
3746 f"{rate_pct:.1f}%{flag} | {outcome.success_count} | "
3747 f"{outcome.rejection_count} | {outcome.failure_count} |"
3748 )
3750 # Recommendations
3751 if aea.best_agent_by_type or aea.problematic_combinations:
3752 lines.append("")
3753 lines.append("### Recommendations")
3754 lines.append("")
3755 for issue_type, best_agent in sorted(aea.best_agent_by_type.items()):
3756 lines.append(f"- **{issue_type}**: Best handled by `{best_agent}`")
3757 for agent, issue_type, reason in aea.problematic_combinations[:3]:
3758 lines.append(f"- **{agent}** underperforms for {issue_type} ({reason})")
3760 # Technical Debt
3761 if analysis.debt_metrics:
3762 lines.append("")
3763 lines.append("## Technical Debt Health")
3764 lines.append("")
3765 debt = analysis.debt_metrics
3766 lines.append("| Metric | Value | Assessment |")
3767 lines.append("|--------|-------|------------|")
3769 backlog_status = (
3770 "✓ Low"
3771 if debt.backlog_size < 20
3772 else ("⚠️ High" if debt.backlog_size > 50 else "Moderate")
3773 )
3774 lines.append(f"| Backlog Size | {debt.backlog_size} | {backlog_status} |")
3776 growth_status = (
3777 "✓ Shrinking"
3778 if debt.backlog_growth_rate < 0
3779 else ("⚠️ Growing" if debt.backlog_growth_rate > 2 else "Stable")
3780 )
3781 lines.append(f"| Growth Rate | {debt.backlog_growth_rate:+.1f}/week | {growth_status} |")
3783 hp_status = "✓ Good" if debt.high_priority_open < 3 else "⚠️ Attention needed"
3784 lines.append(f"| High Priority Open | {debt.high_priority_open} | {hp_status} |")
3786 aging_status = (
3787 "✓ Healthy"
3788 if debt.aging_30_plus < 5
3789 else ("⚠️ Review needed" if debt.aging_30_plus > 10 else "Moderate")
3790 )
3791 lines.append(f"| Aging >30 days | {debt.aging_30_plus} | {aging_status} |")
3793 # Comparison
3794 if analysis.comparison_period and analysis.current_period and analysis.previous_period:
3795 lines.append("")
3796 lines.append(f"## Comparative Analysis (Last {analysis.comparison_period})")
3797 lines.append("")
3798 curr = analysis.current_period
3799 prev = analysis.previous_period
3801 lines.append("| Metric | Previous | Current | Change |")
3802 lines.append("|--------|----------|---------|--------|")
3804 if prev.total_completed > 0:
3805 change = (curr.total_completed - prev.total_completed) / prev.total_completed * 100
3806 change_str = f"{change:+.0f}%"
3807 else:
3808 change_str = "N/A"
3809 lines.append(
3810 f"| Completed | {prev.total_completed} | {curr.total_completed} | {change_str} |"
3811 )
3813 prev_bugs = prev.type_counts.get("BUG", 0)
3814 curr_bugs = curr.type_counts.get("BUG", 0)
3815 if prev_bugs > 0:
3816 bug_change = (curr_bugs - prev_bugs) / prev_bugs * 100
3817 bug_change_str = f"{bug_change:+.0f}%"
3818 if bug_change < 0:
3819 bug_change_str += " ✓"
3820 else:
3821 bug_change_str = "N/A"
3822 lines.append(f"| Bugs Fixed | {prev_bugs} | {curr_bugs} | {bug_change_str} |")
3824 return "\n".join(lines)