Coverage for little_loops / issue_history.py: 16%

1825 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-02-13 16:40 -0600

1"""Issue history analysis and summary statistics. 

2 

3Provides analysis of completed issues including: 

4- Type distribution (BUG, ENH, FEAT) 

5- Priority distribution (P0-P5) 

6- Discovery source breakdown 

7- Completion velocity metrics 

8- Trend analysis over time periods 

9- Subsystem health tracking 

10- Technical debt metrics 

11""" 

12 

13from __future__ import annotations 

14 

15import json 

16import re 

17from collections import defaultdict 

18from dataclasses import dataclass, field 

19from datetime import date, timedelta 

20from pathlib import Path 

21from typing import Any, Literal 

22 

23from little_loops.frontmatter import parse_frontmatter 

24 

25__all__ = [ 

26 # Core dataclasses 

27 "CompletedIssue", 

28 "HistorySummary", 

29 # Advanced analysis dataclasses 

30 "PeriodMetrics", 

31 "SubsystemHealth", 

32 "Hotspot", 

33 "HotspotAnalysis", 

34 "RegressionCluster", 

35 "RegressionAnalysis", 

36 "TestGap", 

37 "TestGapAnalysis", 

38 "RejectionMetrics", 

39 "RejectionAnalysis", 

40 "ManualPattern", 

41 "ManualPatternAnalysis", 

42 "ConfigGap", 

43 "ConfigGapsAnalysis", 

44 "AgentOutcome", 

45 "AgentEffectivenessAnalysis", 

46 "TechnicalDebtMetrics", 

47 "ComplexityProxy", 

48 "ComplexityProxyAnalysis", 

49 "CrossCuttingSmell", 

50 "CrossCuttingAnalysis", 

51 "HistoryAnalysis", 

52 # Parsing and scanning 

53 "parse_completed_issue", 

54 "scan_completed_issues", 

55 "scan_active_issues", 

56 # Summary functions 

57 "calculate_summary", 

58 "calculate_analysis", 

59 "analyze_hotspots", 

60 "analyze_regression_clustering", 

61 "analyze_test_gaps", 

62 "analyze_rejection_rates", 

63 "detect_manual_patterns", 

64 "detect_config_gaps", 

65 "analyze_agent_effectiveness", 

66 "analyze_complexity_proxy", 

67 "detect_cross_cutting_smells", 

68 # Formatting functions 

69 "format_summary_text", 

70 "format_summary_json", 

71 "format_analysis_text", 

72 "format_analysis_json", 

73 "format_analysis_markdown", 

74 "format_analysis_yaml", 

75] 

76 

77 

78@dataclass 

79class CompletedIssue: 

80 """Parsed information from a completed issue file.""" 

81 

82 path: Path 

83 issue_type: str # BUG, ENH, FEAT 

84 priority: str # P0-P5 

85 issue_id: str # e.g., BUG-001 

86 discovered_by: str | None = None 

87 discovered_date: date | None = None 

88 completed_date: date | None = None 

89 

90 def to_dict(self) -> dict[str, Any]: 

91 """Convert to dictionary for JSON serialization.""" 

92 return { 

93 "path": str(self.path), 

94 "issue_type": self.issue_type, 

95 "priority": self.priority, 

96 "issue_id": self.issue_id, 

97 "discovered_by": self.discovered_by, 

98 "discovered_date": (self.discovered_date.isoformat() if self.discovered_date else None), 

99 "completed_date": (self.completed_date.isoformat() if self.completed_date else None), 

100 } 

101 

102 

103@dataclass 

104class HistorySummary: 

105 """Summary statistics for completed issues.""" 

106 

107 total_count: int 

108 type_counts: dict[str, int] = field(default_factory=dict) 

109 priority_counts: dict[str, int] = field(default_factory=dict) 

110 discovery_counts: dict[str, int] = field(default_factory=dict) 

111 earliest_date: date | None = None 

112 latest_date: date | None = None 

113 

114 @property 

115 def date_range_days(self) -> int | None: 

116 """Calculate days between earliest and latest completion.""" 

117 if self.earliest_date and self.latest_date: 

118 return (self.latest_date - self.earliest_date).days + 1 

119 return None 

120 

121 @property 

122 def velocity(self) -> float | None: 

123 """Calculate issues per day.""" 

124 if self.date_range_days and self.date_range_days > 0: 

125 return self.total_count / self.date_range_days 

126 return None 

127 

128 def to_dict(self) -> dict[str, Any]: 

129 """Convert to dictionary for JSON serialization.""" 

130 return { 

131 "total_count": self.total_count, 

132 "type_counts": self.type_counts, 

133 "priority_counts": self.priority_counts, 

134 "discovery_counts": self.discovery_counts, 

135 "earliest_date": (self.earliest_date.isoformat() if self.earliest_date else None), 

136 "latest_date": self.latest_date.isoformat() if self.latest_date else None, 

137 "date_range_days": self.date_range_days, 

138 "velocity": round(self.velocity, 2) if self.velocity else None, 

139 } 

140 

141 

142# ============================================================================= 

143# Advanced Analysis Dataclasses (FEAT-110) 

144# ============================================================================= 

145 

146 

147@dataclass 

148class PeriodMetrics: 

149 """Metrics for a specific time period.""" 

150 

151 period_start: date 

152 period_end: date 

153 period_label: str # e.g., "Q1 2025", "Jan 2025", "Week 3" 

154 total_completed: int = 0 

155 type_counts: dict[str, int] = field(default_factory=dict) 

156 priority_counts: dict[str, int] = field(default_factory=dict) 

157 avg_completion_days: float | None = None 

158 

159 @property 

160 def bug_ratio(self) -> float | None: 

161 """Calculate bug percentage.""" 

162 if self.total_completed == 0: 

163 return None 

164 bug_count = self.type_counts.get("BUG", 0) 

165 return bug_count / self.total_completed 

166 

167 def to_dict(self) -> dict[str, Any]: 

168 """Convert to dictionary for serialization.""" 

169 return { 

170 "period_start": self.period_start.isoformat(), 

171 "period_end": self.period_end.isoformat(), 

172 "period_label": self.period_label, 

173 "total_completed": self.total_completed, 

174 "type_counts": self.type_counts, 

175 "priority_counts": self.priority_counts, 

176 "bug_ratio": round(self.bug_ratio, 3) if self.bug_ratio is not None else None, 

177 "avg_completion_days": ( 

178 round(self.avg_completion_days, 1) if self.avg_completion_days else None 

179 ), 

180 } 

181 

182 

183@dataclass 

184class SubsystemHealth: 

185 """Health metrics for a subsystem (directory).""" 

186 

187 subsystem: str # Directory path 

188 total_issues: int = 0 

189 recent_issues: int = 0 # Issues in last 30 days 

190 issue_ids: list[str] = field(default_factory=list) 

191 trend: str = "stable" # "improving", "stable", "degrading" 

192 

193 def to_dict(self) -> dict[str, Any]: 

194 """Convert to dictionary for serialization.""" 

195 return { 

196 "subsystem": self.subsystem, 

197 "total_issues": self.total_issues, 

198 "recent_issues": self.recent_issues, 

199 "issue_ids": self.issue_ids[:5], # Top 5 

200 "trend": self.trend, 

201 } 

202 

203 

204@dataclass 

205class Hotspot: 

206 """A file or directory that appears in multiple issues.""" 

207 

208 path: str 

209 issue_count: int = 0 

210 issue_ids: list[str] = field(default_factory=list) 

211 issue_types: dict[str, int] = field(default_factory=dict) # {"BUG": 5, "ENH": 3} 

212 bug_ratio: float = 0.0 # bugs / total issues 

213 churn_indicator: str = "low" # "high", "medium", "low" 

214 

215 def to_dict(self) -> dict[str, Any]: 

216 """Convert to dictionary for serialization.""" 

217 return { 

218 "path": self.path, 

219 "issue_count": self.issue_count, 

220 "issue_ids": self.issue_ids[:10], # Top 10 

221 "issue_types": self.issue_types, 

222 "bug_ratio": round(self.bug_ratio, 3), 

223 "churn_indicator": self.churn_indicator, 

224 } 

225 

226 

227@dataclass 

228class HotspotAnalysis: 

229 """Analysis of files and directories appearing repeatedly in issues.""" 

230 

231 file_hotspots: list[Hotspot] = field(default_factory=list) 

232 directory_hotspots: list[Hotspot] = field(default_factory=list) 

233 bug_magnets: list[Hotspot] = field(default_factory=list) # >60% bug ratio 

234 

235 def to_dict(self) -> dict[str, Any]: 

236 """Convert to dictionary for serialization.""" 

237 return { 

238 "file_hotspots": [h.to_dict() for h in self.file_hotspots], 

239 "directory_hotspots": [h.to_dict() for h in self.directory_hotspots], 

240 "bug_magnets": [h.to_dict() for h in self.bug_magnets], 

241 } 

242 

243 

244@dataclass 

245class CouplingPair: 

246 """A pair of files that frequently appear together in issues.""" 

247 

248 file_a: str 

249 file_b: str 

250 co_occurrence_count: int = 0 

251 coupling_strength: float = 0.0 # 0-1, Jaccard similarity 

252 issue_ids: list[str] = field(default_factory=list) 

253 

254 def to_dict(self) -> dict[str, Any]: 

255 """Convert to dictionary for serialization.""" 

256 return { 

257 "file_a": self.file_a, 

258 "file_b": self.file_b, 

259 "co_occurrence_count": self.co_occurrence_count, 

260 "coupling_strength": round(self.coupling_strength, 3), 

261 "issue_ids": self.issue_ids[:10], # Top 10 

262 } 

263 

264 

265@dataclass 

266class CouplingAnalysis: 

267 """Analysis of files that frequently change together.""" 

268 

269 pairs: list[CouplingPair] = field(default_factory=list) 

270 clusters: list[list[str]] = field(default_factory=list) # Groups of coupled files 

271 hotspots: list[str] = field(default_factory=list) # Files coupled with 3+ others 

272 

273 def to_dict(self) -> dict[str, Any]: 

274 """Convert to dictionary for serialization.""" 

275 return { 

276 "pairs": [p.to_dict() for p in self.pairs], 

277 "clusters": self.clusters[:10], # Top 10 clusters 

278 "hotspots": self.hotspots[:10], # Top 10 hotspots 

279 } 

280 

281 

282@dataclass 

283class RegressionCluster: 

284 """A cluster of bugs where fixes led to new bugs.""" 

285 

286 primary_file: str # Main file in the regression chain 

287 regression_count: int = 0 # Number of regression pairs 

288 fix_bug_pairs: list[tuple[str, str]] = field(default_factory=list) # (fixed_id, caused_id) 

289 related_files: list[str] = field(default_factory=list) # All files in chain 

290 time_pattern: str = "immediate" # "immediate" (<3d), "delayed" (3-7d), "chronic" (recurring) 

291 severity: str = "medium" # "critical", "high", "medium" 

292 

293 def to_dict(self) -> dict[str, Any]: 

294 """Convert to dictionary for serialization.""" 

295 return { 

296 "primary_file": self.primary_file, 

297 "regression_count": self.regression_count, 

298 "fix_bug_pairs": self.fix_bug_pairs[:10], # Top 10 

299 "related_files": self.related_files[:10], # Top 10 

300 "time_pattern": self.time_pattern, 

301 "severity": self.severity, 

302 } 

303 

304 

305@dataclass 

306class RegressionAnalysis: 

307 """Analysis of regression patterns in bug fixes.""" 

308 

309 clusters: list[RegressionCluster] = field(default_factory=list) 

310 total_regression_chains: int = 0 

311 most_fragile_files: list[str] = field(default_factory=list) 

312 

313 def to_dict(self) -> dict[str, Any]: 

314 """Convert to dictionary for serialization.""" 

315 return { 

316 "clusters": [c.to_dict() for c in self.clusters], 

317 "total_regression_chains": self.total_regression_chains, 

318 "most_fragile_files": self.most_fragile_files[:5], # Top 5 

319 } 

320 

321 

322@dataclass 

323class TestGap: 

324 """A source file with bugs but missing or weak test coverage.""" 

325 

326 source_file: str 

327 bug_count: int = 0 

328 bug_ids: list[str] = field(default_factory=list) 

329 has_test_file: bool = False 

330 test_file_path: str | None = None 

331 gap_score: float = 0.0 # bug_count * multiplier, higher = worse 

332 priority: str = "low" # "critical", "high", "medium", "low" 

333 

334 def to_dict(self) -> dict[str, Any]: 

335 """Convert to dictionary for serialization.""" 

336 return { 

337 "source_file": self.source_file, 

338 "bug_count": self.bug_count, 

339 "bug_ids": self.bug_ids[:10], # Top 10 

340 "has_test_file": self.has_test_file, 

341 "test_file_path": self.test_file_path, 

342 "gap_score": round(self.gap_score, 2), 

343 "priority": self.priority, 

344 } 

345 

346 

347@dataclass 

348class TestGapAnalysis: 

349 """Analysis of test coverage gaps correlated with bug occurrences.""" 

350 

351 gaps: list[TestGap] = field(default_factory=list) 

352 untested_bug_magnets: list[str] = field(default_factory=list) 

353 files_with_tests_avg_bugs: float = 0.0 

354 files_without_tests_avg_bugs: float = 0.0 

355 priority_test_targets: list[str] = field(default_factory=list) 

356 

357 def to_dict(self) -> dict[str, Any]: 

358 """Convert to dictionary for serialization.""" 

359 return { 

360 "gaps": [g.to_dict() for g in self.gaps], 

361 "untested_bug_magnets": self.untested_bug_magnets[:5], 

362 "files_with_tests_avg_bugs": round(self.files_with_tests_avg_bugs, 2), 

363 "files_without_tests_avg_bugs": round(self.files_without_tests_avg_bugs, 2), 

364 "priority_test_targets": self.priority_test_targets[:10], 

365 } 

366 

367 

368@dataclass 

369class RejectionMetrics: 

370 """Metrics for rejection and invalid closure tracking.""" 

371 

372 total_closed: int = 0 

373 rejected_count: int = 0 

374 invalid_count: int = 0 

375 duplicate_count: int = 0 

376 deferred_count: int = 0 

377 completed_count: int = 0 

378 

379 @property 

380 def rejection_rate(self) -> float: 

381 """Calculate rejection rate.""" 

382 if self.total_closed == 0: 

383 return 0.0 

384 return self.rejected_count / self.total_closed 

385 

386 @property 

387 def invalid_rate(self) -> float: 

388 """Calculate invalid rate.""" 

389 if self.total_closed == 0: 

390 return 0.0 

391 return self.invalid_count / self.total_closed 

392 

393 def to_dict(self) -> dict[str, Any]: 

394 """Convert to dictionary for serialization.""" 

395 return { 

396 "total_closed": self.total_closed, 

397 "rejected_count": self.rejected_count, 

398 "invalid_count": self.invalid_count, 

399 "duplicate_count": self.duplicate_count, 

400 "deferred_count": self.deferred_count, 

401 "completed_count": self.completed_count, 

402 "rejection_rate": round(self.rejection_rate, 3), 

403 "invalid_rate": round(self.invalid_rate, 3), 

404 } 

405 

406 

407@dataclass 

408class RejectionAnalysis: 

409 """Analysis of rejection and invalid closure patterns.""" 

410 

411 overall: RejectionMetrics = field(default_factory=RejectionMetrics) 

412 by_type: dict[str, RejectionMetrics] = field(default_factory=dict) 

413 by_month: dict[str, RejectionMetrics] = field(default_factory=dict) 

414 common_reasons: list[tuple[str, int]] = field(default_factory=list) 

415 trend: str = "stable" # "improving", "stable", "degrading" 

416 

417 def to_dict(self) -> dict[str, Any]: 

418 """Convert to dictionary for serialization.""" 

419 return { 

420 "overall": self.overall.to_dict(), 

421 "by_type": {k: v.to_dict() for k, v in self.by_type.items()}, 

422 "by_month": {k: v.to_dict() for k, v in sorted(self.by_month.items())}, 

423 "common_reasons": self.common_reasons[:10], 

424 "trend": self.trend, 

425 } 

426 

427 

428@dataclass 

429class ManualPattern: 

430 """A recurring manual activity detected across issues.""" 

431 

432 pattern_type: str # "test", "lint", "build", "git", "verification" 

433 pattern_description: str 

434 occurrence_count: int = 0 

435 affected_issues: list[str] = field(default_factory=list) # issue IDs 

436 example_commands: list[str] = field(default_factory=list) # sample commands found 

437 suggested_automation: str = "" # hook, skill, or agent suggestion 

438 automation_complexity: str = "simple" # "trivial", "simple", "moderate" 

439 

440 def to_dict(self) -> dict[str, Any]: 

441 """Convert to dictionary for serialization.""" 

442 return { 

443 "pattern_type": self.pattern_type, 

444 "pattern_description": self.pattern_description, 

445 "occurrence_count": self.occurrence_count, 

446 "affected_issues": self.affected_issues[:10], 

447 "example_commands": self.example_commands[:5], 

448 "suggested_automation": self.suggested_automation, 

449 "automation_complexity": self.automation_complexity, 

450 } 

451 

452 

453@dataclass 

454class ManualPatternAnalysis: 

455 """Analysis of recurring manual activities that could be automated.""" 

456 

457 patterns: list[ManualPattern] = field(default_factory=list) 

458 total_manual_interventions: int = 0 

459 automatable_count: int = 0 

460 automation_suggestions: list[str] = field(default_factory=list) 

461 

462 @property 

463 def automatable_percentage(self) -> float: 

464 """Calculate percentage of patterns that are automatable.""" 

465 if self.total_manual_interventions == 0: 

466 return 0.0 

467 return self.automatable_count / self.total_manual_interventions * 100 

468 

469 def to_dict(self) -> dict[str, Any]: 

470 """Convert to dictionary for serialization.""" 

471 return { 

472 "patterns": [p.to_dict() for p in self.patterns], 

473 "total_manual_interventions": self.total_manual_interventions, 

474 "automatable_count": self.automatable_count, 

475 "automatable_percentage": round(self.automatable_percentage, 1), 

476 "automation_suggestions": self.automation_suggestions[:10], 

477 } 

478 

479 

480@dataclass 

481class ConfigGap: 

482 """A gap in configuration that could address recurring manual work.""" 

483 

484 gap_type: str # "hook", "skill", "agent" 

485 description: str 

486 evidence: list[str] = field(default_factory=list) # issue IDs showing the pattern 

487 suggested_config: str = "" # example configuration 

488 priority: str = "medium" # "high", "medium", "low" 

489 pattern_type: str = "" # links back to ManualPattern.pattern_type 

490 

491 def to_dict(self) -> dict[str, Any]: 

492 """Convert to dictionary for serialization.""" 

493 return { 

494 "gap_type": self.gap_type, 

495 "description": self.description, 

496 "evidence": self.evidence[:10], 

497 "suggested_config": self.suggested_config, 

498 "priority": self.priority, 

499 "pattern_type": self.pattern_type, 

500 } 

501 

502 

503@dataclass 

504class ConfigGapsAnalysis: 

505 """Analysis of configuration gaps based on manual pattern detection.""" 

506 

507 gaps: list[ConfigGap] = field(default_factory=list) 

508 current_hooks: list[str] = field(default_factory=list) 

509 current_skills: list[str] = field(default_factory=list) 

510 current_agents: list[str] = field(default_factory=list) 

511 coverage_score: float = 0.0 # 0-1, how well config covers common needs 

512 

513 def to_dict(self) -> dict[str, Any]: 

514 """Convert to dictionary for serialization.""" 

515 return { 

516 "gaps": [g.to_dict() for g in self.gaps], 

517 "current_hooks": self.current_hooks, 

518 "current_skills": self.current_skills, 

519 "current_agents": self.current_agents, 

520 "coverage_score": round(self.coverage_score, 2), 

521 } 

522 

523 

524@dataclass 

525class AgentOutcome: 

526 """Metrics for a single agent processing a specific issue type.""" 

527 

528 agent_name: str 

529 issue_type: str 

530 success_count: int = 0 

531 failure_count: int = 0 

532 rejection_count: int = 0 

533 

534 @property 

535 def total_count(self) -> int: 

536 """Total issues handled.""" 

537 return self.success_count + self.failure_count + self.rejection_count 

538 

539 @property 

540 def success_rate(self) -> float: 

541 """Calculate success rate.""" 

542 if self.total_count == 0: 

543 return 0.0 

544 return self.success_count / self.total_count 

545 

546 def to_dict(self) -> dict[str, Any]: 

547 """Convert to dictionary for serialization.""" 

548 return { 

549 "agent_name": self.agent_name, 

550 "issue_type": self.issue_type, 

551 "success_count": self.success_count, 

552 "failure_count": self.failure_count, 

553 "rejection_count": self.rejection_count, 

554 "total_count": self.total_count, 

555 "success_rate": round(self.success_rate, 3), 

556 } 

557 

558 

559@dataclass 

560class AgentEffectivenessAnalysis: 

561 """Analysis of agent effectiveness across issue types.""" 

562 

563 outcomes: list[AgentOutcome] = field(default_factory=list) 

564 best_agent_by_type: dict[str, str] = field(default_factory=dict) 

565 problematic_combinations: list[tuple[str, str, str]] = field(default_factory=list) 

566 

567 def to_dict(self) -> dict[str, Any]: 

568 """Convert to dictionary for serialization.""" 

569 return { 

570 "outcomes": [o.to_dict() for o in self.outcomes], 

571 "best_agent_by_type": self.best_agent_by_type, 

572 "problematic_combinations": self.problematic_combinations[:10], 

573 } 

574 

575 

576@dataclass 

577class TechnicalDebtMetrics: 

578 """Technical debt health indicators.""" 

579 

580 backlog_size: int = 0 # Total open issues 

581 backlog_growth_rate: float = 0.0 # Net issues/week 

582 aging_30_plus: int = 0 # Issues > 30 days old 

583 aging_60_plus: int = 0 # Issues > 60 days old 

584 high_priority_open: int = 0 # P0-P1 open 

585 debt_paydown_ratio: float = 0.0 # maintenance vs features 

586 

587 def to_dict(self) -> dict[str, Any]: 

588 """Convert to dictionary for serialization.""" 

589 return { 

590 "backlog_size": self.backlog_size, 

591 "backlog_growth_rate": round(self.backlog_growth_rate, 2), 

592 "aging_30_plus": self.aging_30_plus, 

593 "aging_60_plus": self.aging_60_plus, 

594 "high_priority_open": self.high_priority_open, 

595 "debt_paydown_ratio": round(self.debt_paydown_ratio, 2), 

596 } 

597 

598 

599@dataclass 

600class ComplexityProxy: 

601 """Duration-based complexity proxy for a file or directory.""" 

602 

603 path: str 

604 avg_resolution_days: float 

605 median_resolution_days: float 

606 issue_count: int 

607 slowest_issue: tuple[str, float] # (issue_id, days) 

608 complexity_score: float # normalized 0-1 

609 comparison_to_baseline: str # "2.1x baseline", etc. 

610 

611 def to_dict(self) -> dict[str, Any]: 

612 """Convert to dictionary for serialization.""" 

613 return { 

614 "path": self.path, 

615 "avg_resolution_days": round(self.avg_resolution_days, 1), 

616 "median_resolution_days": round(self.median_resolution_days, 1), 

617 "issue_count": self.issue_count, 

618 "slowest_issue": { 

619 "issue_id": self.slowest_issue[0], 

620 "days": round(self.slowest_issue[1], 1), 

621 }, 

622 "complexity_score": round(self.complexity_score, 3), 

623 "comparison_to_baseline": self.comparison_to_baseline, 

624 } 

625 

626 

627@dataclass 

628class ComplexityProxyAnalysis: 

629 """Analysis using issue duration as complexity proxy.""" 

630 

631 file_complexity: list[ComplexityProxy] = field(default_factory=list) 

632 directory_complexity: list[ComplexityProxy] = field(default_factory=list) 

633 baseline_days: float = 0.0 # median across all issues 

634 complexity_outliers: list[str] = field(default_factory=list) # files >2x baseline 

635 

636 def to_dict(self) -> dict[str, Any]: 

637 """Convert to dictionary for serialization.""" 

638 return { 

639 "file_complexity": [c.to_dict() for c in self.file_complexity[:10]], 

640 "directory_complexity": [c.to_dict() for c in self.directory_complexity[:10]], 

641 "baseline_days": round(self.baseline_days, 1), 

642 "complexity_outliers": self.complexity_outliers[:10], 

643 } 

644 

645 

646@dataclass 

647class CrossCuttingSmell: 

648 """A detected cross-cutting concern scattered across the codebase.""" 

649 

650 concern_type: str # "logging", "error-handling", "validation", "auth", "caching" 

651 affected_directories: list[str] = field(default_factory=list) 

652 issue_count: int = 0 

653 issue_ids: list[str] = field(default_factory=list) 

654 scatter_score: float = 0.0 # higher = more scattered (0-1) 

655 suggested_pattern: str = "" # "middleware", "decorator", "aspect" 

656 

657 def to_dict(self) -> dict[str, Any]: 

658 """Convert to dictionary for serialization.""" 

659 return { 

660 "concern_type": self.concern_type, 

661 "affected_directories": self.affected_directories[:10], 

662 "issue_count": self.issue_count, 

663 "issue_ids": self.issue_ids[:10], 

664 "scatter_score": round(self.scatter_score, 2), 

665 "suggested_pattern": self.suggested_pattern, 

666 } 

667 

668 

669@dataclass 

670class CrossCuttingAnalysis: 

671 """Analysis of cross-cutting concerns scattered across the codebase.""" 

672 

673 smells: list[CrossCuttingSmell] = field(default_factory=list) 

674 most_scattered_concern: str = "" 

675 consolidation_opportunities: list[str] = field(default_factory=list) 

676 

677 def to_dict(self) -> dict[str, Any]: 

678 """Convert to dictionary for serialization.""" 

679 return { 

680 "smells": [s.to_dict() for s in self.smells], 

681 "most_scattered_concern": self.most_scattered_concern, 

682 "consolidation_opportunities": self.consolidation_opportunities[:10], 

683 } 

684 

685 

686@dataclass 

687class HistoryAnalysis: 

688 """Complete history analysis report.""" 

689 

690 generated_date: date 

691 total_completed: int 

692 total_active: int 

693 date_range_start: date | None 

694 date_range_end: date | None 

695 

696 # Core summary (from existing HistorySummary) 

697 summary: HistorySummary 

698 

699 # Trend analysis 

700 period_metrics: list[PeriodMetrics] = field(default_factory=list) 

701 velocity_trend: str = "stable" # "increasing", "stable", "decreasing" 

702 bug_ratio_trend: str = "stable" 

703 

704 # Subsystem health 

705 subsystem_health: list[SubsystemHealth] = field(default_factory=list) 

706 

707 # Hotspot analysis 

708 hotspot_analysis: HotspotAnalysis | None = None 

709 

710 # Coupling analysis 

711 coupling_analysis: CouplingAnalysis | None = None 

712 

713 # Regression clustering analysis 

714 regression_analysis: RegressionAnalysis | None = None 

715 

716 # Test gap analysis 

717 test_gap_analysis: TestGapAnalysis | None = None 

718 

719 # Rejection analysis 

720 rejection_analysis: RejectionAnalysis | None = None 

721 

722 # Manual pattern analysis 

723 manual_pattern_analysis: ManualPatternAnalysis | None = None 

724 

725 # Agent effectiveness analysis 

726 agent_effectiveness_analysis: AgentEffectivenessAnalysis | None = None 

727 

728 # Complexity proxy analysis 

729 complexity_proxy_analysis: ComplexityProxyAnalysis | None = None 

730 

731 # Configuration gaps analysis 

732 config_gaps_analysis: ConfigGapsAnalysis | None = None 

733 

734 # Cross-cutting concern analysis 

735 cross_cutting_analysis: CrossCuttingAnalysis | None = None 

736 

737 # Technical debt 

738 debt_metrics: TechnicalDebtMetrics | None = None 

739 

740 # Comparative analysis (optional) 

741 comparison_period: str | None = None # e.g., "30d" 

742 previous_period: PeriodMetrics | None = None 

743 current_period: PeriodMetrics | None = None 

744 

745 def to_dict(self) -> dict[str, Any]: 

746 """Convert to dictionary for serialization.""" 

747 return { 

748 "generated_date": self.generated_date.isoformat(), 

749 "total_completed": self.total_completed, 

750 "total_active": self.total_active, 

751 "date_range_start": ( 

752 self.date_range_start.isoformat() if self.date_range_start else None 

753 ), 

754 "date_range_end": (self.date_range_end.isoformat() if self.date_range_end else None), 

755 "summary": self.summary.to_dict(), 

756 "period_metrics": [p.to_dict() for p in self.period_metrics], 

757 "velocity_trend": self.velocity_trend, 

758 "bug_ratio_trend": self.bug_ratio_trend, 

759 "subsystem_health": [s.to_dict() for s in self.subsystem_health], 

760 "hotspot_analysis": ( 

761 self.hotspot_analysis.to_dict() if self.hotspot_analysis else None 

762 ), 

763 "coupling_analysis": ( 

764 self.coupling_analysis.to_dict() if self.coupling_analysis else None 

765 ), 

766 "regression_analysis": ( 

767 self.regression_analysis.to_dict() if self.regression_analysis else None 

768 ), 

769 "test_gap_analysis": ( 

770 self.test_gap_analysis.to_dict() if self.test_gap_analysis else None 

771 ), 

772 "rejection_analysis": ( 

773 self.rejection_analysis.to_dict() if self.rejection_analysis else None 

774 ), 

775 "manual_pattern_analysis": ( 

776 self.manual_pattern_analysis.to_dict() if self.manual_pattern_analysis else None 

777 ), 

778 "agent_effectiveness_analysis": ( 

779 self.agent_effectiveness_analysis.to_dict() 

780 if self.agent_effectiveness_analysis 

781 else None 

782 ), 

783 "complexity_proxy_analysis": ( 

784 self.complexity_proxy_analysis.to_dict() if self.complexity_proxy_analysis else None 

785 ), 

786 "config_gaps_analysis": ( 

787 self.config_gaps_analysis.to_dict() if self.config_gaps_analysis else None 

788 ), 

789 "cross_cutting_analysis": ( 

790 self.cross_cutting_analysis.to_dict() if self.cross_cutting_analysis else None 

791 ), 

792 "debt_metrics": self.debt_metrics.to_dict() if self.debt_metrics else None, 

793 "comparison_period": self.comparison_period, 

794 "previous_period": (self.previous_period.to_dict() if self.previous_period else None), 

795 "current_period": (self.current_period.to_dict() if self.current_period else None), 

796 } 

797 

798 

799# ============================================================================= 

800# Parsing Functions 

801# ============================================================================= 

802 

803 

804def parse_completed_issue(file_path: Path) -> CompletedIssue: 

805 """Parse a completed issue file. 

806 

807 Args: 

808 file_path: Path to the issue markdown file 

809 

810 Returns: 

811 CompletedIssue with parsed metadata 

812 """ 

813 filename = file_path.name 

814 content = file_path.read_text(encoding="utf-8") 

815 

816 # Extract from filename: P[0-5]-[TYPE]-[NNN]-description.md 

817 issue_type = "UNKNOWN" 

818 priority = "P5" 

819 issue_id = "UNKNOWN" 

820 

821 # Match priority 

822 priority_match = re.match(r"^(P\d)", filename) 

823 if priority_match: 

824 priority = priority_match.group(1) 

825 

826 # Match type and ID 

827 type_match = re.search(r"(BUG|ENH|FEAT)-(\d+)", filename) 

828 if type_match: 

829 issue_type = type_match.group(1) 

830 issue_id = f"{type_match.group(1)}-{type_match.group(2)}" 

831 

832 # Parse frontmatter for discovered_by and discovered_date 

833 discovered_by = _parse_discovered_by(content) 

834 discovered_date = _parse_discovered_date(content) 

835 

836 # Parse completion date from Resolution section or file mtime 

837 completed_date = _parse_completion_date(content, file_path) 

838 

839 return CompletedIssue( 

840 path=file_path, 

841 issue_type=issue_type, 

842 priority=priority, 

843 issue_id=issue_id, 

844 discovered_by=discovered_by, 

845 discovered_date=discovered_date, 

846 completed_date=completed_date, 

847 ) 

848 

849 

850def _parse_discovered_by(content: str) -> str | None: 

851 """Extract discovered_by from YAML frontmatter. 

852 

853 Args: 

854 content: File content 

855 

856 Returns: 

857 discovered_by value or None 

858 """ 

859 fm = parse_frontmatter(content) 

860 value = fm.get("discovered_by") 

861 return value if isinstance(value, str) else None 

862 

863 

864def _parse_completion_date(content: str, file_path: Path) -> date | None: 

865 """Extract completion date from Resolution section or file mtime. 

866 

867 Args: 

868 content: File content 

869 file_path: Path for mtime fallback 

870 

871 Returns: 

872 Completion date or None 

873 """ 

874 # Try Resolution section: **Completed**: YYYY-MM-DD 

875 match = re.search(r"\*\*Completed\*\*:\s*(\d{4}-\d{2}-\d{2})", content) 

876 if match: 

877 try: 

878 return date.fromisoformat(match.group(1)) 

879 except ValueError: 

880 pass 

881 

882 # Fallback to file mtime 

883 try: 

884 mtime = file_path.stat().st_mtime 

885 return date.fromtimestamp(mtime) 

886 except OSError: 

887 return None 

888 

889 

890def _parse_resolution_action(content: str) -> str: 

891 """Extract resolution action category from issue content. 

892 

893 Categorizes based on Resolution section fields: 

894 - "completed": Normal completion with **Action**: fix/implement 

895 - "rejected": Explicitly rejected (out of scope, not valid) 

896 - "invalid": Invalid reference or spec 

897 - "duplicate": Duplicate of existing issue 

898 - "deferred": Deferred to future work 

899 

900 Args: 

901 content: Issue file content 

902 

903 Returns: 

904 Resolution category string 

905 """ 

906 # Look for Status field patterns 

907 status_match = re.search(r"\*\*Status\*\*:\s*(.+?)(?:\n|$)", content) 

908 if status_match: 

909 status = status_match.group(1).strip().lower() 

910 if "closed" in status: 

911 # Check Reason field for specific category 

912 reason_match = re.search(r"\*\*Reason\*\*:\s*(.+?)(?:\n|$)", content) 

913 if reason_match: 

914 reason = reason_match.group(1).strip().lower() 

915 if "duplicate" in reason: 

916 return "duplicate" 

917 if "invalid" in reason: 

918 return "invalid" 

919 if "deferred" in reason: 

920 return "deferred" 

921 if "rejected" in reason or "out of scope" in reason: 

922 return "rejected" 

923 # Generic closed without specific reason 

924 return "rejected" 

925 

926 # Check for Action field (normal completion) 

927 action_match = re.search(r"\*\*Action\*\*:\s*(.+?)(?:\n|$)", content) 

928 if action_match: 

929 return "completed" 

930 

931 # Default to completed if no resolution section 

932 return "completed" 

933 

934 

935def _detect_processing_agent(content: str, discovered_source: str | None = None) -> str: 

936 """Detect which processing agent handled an issue. 

937 

938 Detection strategy (in priority order): 

939 1. Check discovered_source field for 'll-parallel' or 'll-auto' 

940 2. Check content for '**Log Type**:' field 

941 3. Check content for '**Tool**:' field 

942 4. Default to 'manual' 

943 

944 Args: 

945 content: Issue file content 

946 discovered_source: Optional discovered_source frontmatter value 

947 

948 Returns: 

949 Agent name: 'll-auto', 'll-parallel', or 'manual' 

950 """ 

951 # Check discovered_source first 

952 if discovered_source: 

953 source_lower = discovered_source.lower() 

954 if "ll-parallel" in source_lower: 

955 return "ll-parallel" 

956 if "ll-auto" in source_lower: 

957 return "ll-auto" 

958 

959 # Check Log Type field 

960 log_type_match = re.search(r"\*\*Log Type\*\*:\s*(.+?)(?:\n|$)", content) 

961 if log_type_match: 

962 log_type = log_type_match.group(1).strip().lower() 

963 if "ll-parallel" in log_type: 

964 return "ll-parallel" 

965 if "ll-auto" in log_type: 

966 return "ll-auto" 

967 

968 # Check Tool field 

969 tool_match = re.search(r"\*\*Tool\*\*:\s*(.+?)(?:\n|$)", content) 

970 if tool_match: 

971 tool = tool_match.group(1).strip().lower() 

972 if "ll-parallel" in tool: 

973 return "ll-parallel" 

974 if "ll-auto" in tool: 

975 return "ll-auto" 

976 

977 # Default to manual 

978 return "manual" 

979 

980 

981def scan_completed_issues(completed_dir: Path) -> list[CompletedIssue]: 

982 """Scan completed directory for issue files. 

983 

984 Args: 

985 completed_dir: Path to .issues/completed/ 

986 

987 Returns: 

988 List of parsed CompletedIssue objects 

989 """ 

990 issues: list[CompletedIssue] = [] 

991 

992 if not completed_dir.exists(): 

993 return issues 

994 

995 for file_path in sorted(completed_dir.glob("*.md")): 

996 try: 

997 issue = parse_completed_issue(file_path) 

998 issues.append(issue) 

999 except Exception: 

1000 # Skip unparseable files 

1001 continue 

1002 

1003 return issues 

1004 

1005 

1006def calculate_summary(issues: list[CompletedIssue]) -> HistorySummary: 

1007 """Calculate summary statistics from issues. 

1008 

1009 Args: 

1010 issues: List of CompletedIssue objects 

1011 

1012 Returns: 

1013 HistorySummary with calculated statistics 

1014 """ 

1015 type_counts: dict[str, int] = {} 

1016 priority_counts: dict[str, int] = {} 

1017 discovery_counts: dict[str, int] = {} 

1018 dates: list[date] = [] 

1019 

1020 for issue in issues: 

1021 # Count by type 

1022 type_counts[issue.issue_type] = type_counts.get(issue.issue_type, 0) + 1 

1023 

1024 # Count by priority 

1025 priority_counts[issue.priority] = priority_counts.get(issue.priority, 0) + 1 

1026 

1027 # Count by discovery source 

1028 source = issue.discovered_by or "unknown" 

1029 discovery_counts[source] = discovery_counts.get(source, 0) + 1 

1030 

1031 # Collect dates 

1032 if issue.completed_date: 

1033 dates.append(issue.completed_date) 

1034 

1035 # Sort counts for consistent output 

1036 type_counts = dict(sorted(type_counts.items())) 

1037 priority_counts = dict(sorted(priority_counts.items())) 

1038 discovery_counts = dict(sorted(discovery_counts.items(), key=lambda x: (-x[1], x[0]))) 

1039 

1040 return HistorySummary( 

1041 total_count=len(issues), 

1042 type_counts=type_counts, 

1043 priority_counts=priority_counts, 

1044 discovery_counts=discovery_counts, 

1045 earliest_date=min(dates) if dates else None, 

1046 latest_date=max(dates) if dates else None, 

1047 ) 

1048 

1049 

1050def format_summary_text(summary: HistorySummary) -> str: 

1051 """Format summary as human-readable text. 

1052 

1053 Args: 

1054 summary: HistorySummary to format 

1055 

1056 Returns: 

1057 Formatted text string 

1058 """ 

1059 lines: list[str] = [] 

1060 

1061 lines.append("Issue History Summary") 

1062 lines.append("=" * 21) 

1063 lines.append(f"Total Completed: {summary.total_count}") 

1064 

1065 if summary.earliest_date and summary.latest_date: 

1066 days = summary.date_range_days or 0 

1067 lines.append(f"Date Range: {summary.earliest_date} to {summary.latest_date} ({days} days)") 

1068 if summary.velocity: 

1069 lines.append(f"Velocity: {summary.velocity:.1f} issues/day") 

1070 

1071 lines.append("") 

1072 lines.append("By Type:") 

1073 total = summary.total_count or 1 

1074 for issue_type, count in summary.type_counts.items(): 

1075 pct = count * 100 // total 

1076 lines.append(f" {issue_type:5}: {count:3} ({pct:2}%)") 

1077 

1078 lines.append("") 

1079 lines.append("By Priority:") 

1080 for priority, count in summary.priority_counts.items(): 

1081 pct = count * 100 // total 

1082 lines.append(f" {priority}: {count:3} ({pct:2}%)") 

1083 

1084 lines.append("") 

1085 lines.append("By Discovery Source:") 

1086 for source, count in summary.discovery_counts.items(): 

1087 pct = count * 100 // total 

1088 lines.append(f" {source:15}: {count:3} ({pct:2}%)") 

1089 

1090 return "\n".join(lines) 

1091 

1092 

1093def format_summary_json(summary: HistorySummary) -> str: 

1094 """Format summary as JSON. 

1095 

1096 Args: 

1097 summary: HistorySummary to format 

1098 

1099 Returns: 

1100 JSON string 

1101 """ 

1102 return json.dumps(summary.to_dict(), indent=2) 

1103 

1104 

1105# ============================================================================= 

1106# Advanced Analysis Functions (FEAT-110) 

1107# ============================================================================= 

1108 

1109 

1110def _parse_discovered_date(content: str) -> date | None: 

1111 """Extract discovered_date from YAML frontmatter. 

1112 

1113 Args: 

1114 content: File content 

1115 

1116 Returns: 

1117 discovered_date value or None 

1118 """ 

1119 fm = parse_frontmatter(content) 

1120 value = fm.get("discovered_date") 

1121 if not isinstance(value, str): 

1122 return None 

1123 try: 

1124 return date.fromisoformat(value) 

1125 except ValueError: 

1126 return None 

1127 

1128 

1129def _extract_subsystem(content: str) -> str | None: 

1130 """Extract primary subsystem/directory from issue content. 

1131 

1132 Args: 

1133 content: Issue file content 

1134 

1135 Returns: 

1136 Directory path (e.g., "scripts/little_loops/") or None 

1137 """ 

1138 # Look for file paths in Location or common patterns 

1139 patterns = [ 

1140 r"\*\*File\*\*:\s*`?([^`\n]+/)[^/`\n]+`?", # **File**: path/to/file.py 

1141 r"`([a-zA-Z_][\w/.-]+/)[^/`]+\.py`", # `path/to/file.py` 

1142 ] 

1143 

1144 for pattern in patterns: 

1145 match = re.search(pattern, content) 

1146 if match: 

1147 return match.group(1) 

1148 

1149 return None 

1150 

1151 

1152def _extract_paths_from_issue(content: str) -> list[str]: 

1153 """Extract all file paths from issue content. 

1154 

1155 Args: 

1156 content: Issue file content 

1157 

1158 Returns: 

1159 List of file paths found in content 

1160 """ 

1161 patterns = [ 

1162 r"\*\*File\*\*:\s*`?([^`\n:]+)`?", # **File**: path/to/file.py 

1163 r"`([a-zA-Z_][\w/.-]+\.[a-z]{2,4})`", # `path/to/file.py` 

1164 r"(?:^|\s)([a-zA-Z_][\w/.-]+\.[a-z]{2,4})(?::\d+)?(?:\s|$|:|\))", # path.py:123 

1165 ] 

1166 

1167 paths: set[str] = set() 

1168 for pattern in patterns: 

1169 for match in re.finditer(pattern, content, re.MULTILINE): 

1170 path = match.group(1).strip() 

1171 # Must look like a file path 

1172 if "/" in path or path.endswith((".py", ".md", ".js", ".ts", ".json", ".yaml", ".yml")): 

1173 # Normalize: remove line numbers (path.py:123 -> path.py) 

1174 if ":" in path and path.split(":")[-1].isdigit(): 

1175 path = ":".join(path.split(":")[:-1]) 

1176 paths.add(path) 

1177 

1178 return sorted(paths) 

1179 

1180 

1181def _find_test_file(source_path: str) -> str | None: 

1182 """Find corresponding test file for a source file. 

1183 

1184 Checks common test file naming patterns: 

1185 - tests/test_<name>.py 

1186 - tests/<path>/test_<name>.py 

1187 - <path>/test_<name>.py 

1188 - <path>/<name>_test.py 

1189 - <path>/tests/test_<name>.py 

1190 

1191 Args: 

1192 source_path: Path to source file (e.g., "src/core/processor.py") 

1193 

1194 Returns: 

1195 Path to test file if found, None otherwise 

1196 """ 

1197 if not source_path.endswith(".py"): 

1198 return None # Only check Python files for now 

1199 

1200 path = Path(source_path) 

1201 stem = path.stem # filename without extension 

1202 parent = str(path.parent) if path.parent != Path(".") else "" 

1203 

1204 # Generate candidate test file paths 

1205 candidates: list[str] = [ 

1206 f"tests/test_{stem}.py", 

1207 f"{parent}/test_{stem}.py" if parent else f"test_{stem}.py", 

1208 f"{parent}/{stem}_test.py" if parent else f"{stem}_test.py", 

1209 f"{parent}/tests/test_{stem}.py" if parent else f"tests/test_{stem}.py", 

1210 ] 

1211 

1212 # Add path-aware test locations 

1213 if parent: 

1214 candidates.append(f"tests/{parent}/test_{stem}.py") 

1215 

1216 # Project-specific pattern for little-loops 

1217 # e.g., scripts/little_loops/foo.py -> scripts/tests/test_foo.py 

1218 if source_path.startswith("scripts/little_loops/"): 

1219 candidates.append(f"scripts/tests/test_{stem}.py") 

1220 

1221 for candidate in candidates: 

1222 if Path(candidate).exists(): 

1223 return candidate 

1224 

1225 return None 

1226 

1227 

1228def _calculate_period_label(start: date, period_type: str) -> str: 

1229 """Generate human-readable period label. 

1230 

1231 Args: 

1232 start: Period start date 

1233 period_type: "weekly", "monthly", "quarterly" 

1234 

1235 Returns: 

1236 Label like "Q1 2025", "Jan 2025", "Week 3 2025" 

1237 """ 

1238 if period_type == "quarterly": 

1239 quarter = (start.month - 1) // 3 + 1 

1240 return f"Q{quarter} {start.year}" 

1241 elif period_type == "monthly": 

1242 return start.strftime("%b %Y") 

1243 else: # weekly 

1244 week_num = start.isocalendar()[1] 

1245 return f"Week {week_num} {start.year}" 

1246 

1247 

1248def _group_by_period( 

1249 issues: list[CompletedIssue], 

1250 period_type: Literal["weekly", "monthly", "quarterly"] = "monthly", 

1251) -> list[PeriodMetrics]: 

1252 """Group issues by time period and calculate metrics. 

1253 

1254 Args: 

1255 issues: List of completed issues with dates 

1256 period_type: Grouping period 

1257 

1258 Returns: 

1259 List of PeriodMetrics sorted by date ascending 

1260 """ 

1261 # Filter issues with dates 

1262 dated_issues = [i for i in issues if i.completed_date] 

1263 if not dated_issues: 

1264 return [] 

1265 

1266 # Sort by date 

1267 dated_issues.sort(key=lambda i: i.completed_date) # type: ignore 

1268 

1269 # Determine period boundaries 

1270 periods: dict[str, list[CompletedIssue]] = defaultdict(list) 

1271 

1272 for issue in dated_issues: 

1273 completed = issue.completed_date 

1274 assert completed is not None 

1275 

1276 if period_type == "quarterly": 

1277 quarter = (completed.month - 1) // 3 

1278 period_start = date(completed.year, quarter * 3 + 1, 1) 

1279 elif period_type == "monthly": 

1280 period_start = date(completed.year, completed.month, 1) 

1281 else: # weekly 

1282 # Start of week (Monday) 

1283 period_start = completed - timedelta(days=completed.weekday()) 

1284 

1285 key = period_start.isoformat() 

1286 periods[key].append(issue) 

1287 

1288 # Calculate metrics for each period 

1289 result: list[PeriodMetrics] = [] 

1290 for period_key in sorted(periods.keys()): 

1291 period_issues = periods[period_key] 

1292 period_start = date.fromisoformat(period_key) 

1293 

1294 # Calculate period end 

1295 if period_type == "quarterly": 

1296 month = period_start.month + 3 

1297 year = period_start.year 

1298 if month > 12: 

1299 month = 1 

1300 year += 1 

1301 period_end = date(year, month, 1) - timedelta(days=1) 

1302 elif period_type == "monthly": 

1303 month = period_start.month + 1 

1304 year = period_start.year 

1305 if month > 12: 

1306 month = 1 

1307 year += 1 

1308 period_end = date(year, month, 1) - timedelta(days=1) 

1309 else: # weekly 

1310 period_end = period_start + timedelta(days=6) 

1311 

1312 # Count types and priorities 

1313 type_counts: dict[str, int] = {} 

1314 priority_counts: dict[str, int] = {} 

1315 

1316 for issue in period_issues: 

1317 type_counts[issue.issue_type] = type_counts.get(issue.issue_type, 0) + 1 

1318 priority_counts[issue.priority] = priority_counts.get(issue.priority, 0) + 1 

1319 

1320 result.append( 

1321 PeriodMetrics( 

1322 period_start=period_start, 

1323 period_end=period_end, 

1324 period_label=_calculate_period_label(period_start, period_type), 

1325 total_completed=len(period_issues), 

1326 type_counts=dict(sorted(type_counts.items())), 

1327 priority_counts=dict(sorted(priority_counts.items())), 

1328 ) 

1329 ) 

1330 

1331 return result 

1332 

1333 

1334def _calculate_trend(values: list[float]) -> str: 

1335 """Determine trend from a series of values. 

1336 

1337 Args: 

1338 values: Time-ordered series of values 

1339 

1340 Returns: 

1341 "increasing", "decreasing", or "stable" 

1342 """ 

1343 if len(values) < 3: 

1344 return "stable" 

1345 

1346 # Simple linear regression slope 

1347 n = len(values) 

1348 sum_x = sum(range(n)) 

1349 sum_y = sum(values) 

1350 sum_xy = sum(i * v for i, v in enumerate(values)) 

1351 sum_x2 = sum(i * i for i in range(n)) 

1352 

1353 denominator = n * sum_x2 - sum_x * sum_x 

1354 if denominator == 0: 

1355 return "stable" 

1356 

1357 slope = (n * sum_xy - sum_x * sum_y) / denominator 

1358 

1359 # Normalize slope by average value 

1360 avg = sum_y / n if n > 0 else 1 

1361 if avg == 0: 

1362 avg = 1 

1363 normalized_slope = slope / avg 

1364 

1365 if normalized_slope > 0.05: 

1366 return "increasing" 

1367 elif normalized_slope < -0.05: 

1368 return "decreasing" 

1369 return "stable" 

1370 

1371 

1372def _analyze_subsystems( 

1373 issues: list[CompletedIssue], 

1374 recent_days: int = 30, 

1375) -> list[SubsystemHealth]: 

1376 """Analyze health by subsystem/directory. 

1377 

1378 Args: 

1379 issues: List of completed issues 

1380 recent_days: Days to consider "recent" 

1381 

1382 Returns: 

1383 List of SubsystemHealth sorted by total issues descending 

1384 """ 

1385 subsystems: dict[str, SubsystemHealth] = {} 

1386 cutoff = date.today() - timedelta(days=recent_days) 

1387 

1388 for issue in issues: 

1389 try: 

1390 content = issue.path.read_text(encoding="utf-8") 

1391 except Exception: 

1392 continue 

1393 

1394 subsystem = _extract_subsystem(content) 

1395 if not subsystem: 

1396 continue 

1397 

1398 if subsystem not in subsystems: 

1399 subsystems[subsystem] = SubsystemHealth(subsystem=subsystem) 

1400 

1401 health = subsystems[subsystem] 

1402 health.total_issues += 1 

1403 health.issue_ids.append(issue.issue_id) 

1404 

1405 if issue.completed_date and issue.completed_date >= cutoff: 

1406 health.recent_issues += 1 

1407 

1408 # Calculate trends based on recent vs historical ratio 

1409 for health in subsystems.values(): 

1410 if health.total_issues >= 5: 

1411 recent_ratio = health.recent_issues / health.total_issues 

1412 if recent_ratio > 0.5: 

1413 health.trend = "degrading" 

1414 elif recent_ratio < 0.2: 

1415 health.trend = "improving" 

1416 

1417 # Sort by total issues descending 

1418 result = sorted(subsystems.values(), key=lambda s: -s.total_issues) 

1419 return result[:10] # Top 10 

1420 

1421 

1422def analyze_hotspots(issues: list[CompletedIssue]) -> HotspotAnalysis: 

1423 """Identify files and directories that appear repeatedly in issues. 

1424 

1425 Args: 

1426 issues: List of completed issues 

1427 

1428 Returns: 

1429 HotspotAnalysis with file and directory hotspots 

1430 """ 

1431 file_data: dict[str, dict[str, Any]] = {} # path -> {count, ids, types} 

1432 dir_data: dict[str, dict[str, Any]] = {} # dir -> {count, ids, types} 

1433 

1434 for issue in issues: 

1435 try: 

1436 content = issue.path.read_text(encoding="utf-8") 

1437 except Exception: 

1438 continue 

1439 

1440 paths = _extract_paths_from_issue(content) 

1441 

1442 for path in paths: 

1443 # Track file hotspot 

1444 if path not in file_data: 

1445 file_data[path] = {"count": 0, "ids": [], "types": {}} 

1446 file_data[path]["count"] += 1 

1447 file_data[path]["ids"].append(issue.issue_id) 

1448 file_data[path]["types"][issue.issue_type] = ( 

1449 file_data[path]["types"].get(issue.issue_type, 0) + 1 

1450 ) 

1451 

1452 # Track directory hotspot 

1453 if "/" in path: 

1454 dir_path = "/".join(path.split("/")[:-1]) + "/" 

1455 else: 

1456 dir_path = "./" 

1457 

1458 if dir_path not in dir_data: 

1459 dir_data[dir_path] = {"count": 0, "ids": [], "types": {}} 

1460 if issue.issue_id not in dir_data[dir_path]["ids"]: 

1461 dir_data[dir_path]["count"] += 1 

1462 dir_data[dir_path]["ids"].append(issue.issue_id) 

1463 dir_data[dir_path]["types"][issue.issue_type] = ( 

1464 dir_data[dir_path]["types"].get(issue.issue_type, 0) + 1 

1465 ) 

1466 

1467 # Convert to Hotspot objects 

1468 file_hotspots: list[Hotspot] = [] 

1469 for path, data in file_data.items(): 

1470 bug_count = data["types"].get("BUG", 0) 

1471 total = data["count"] 

1472 bug_ratio = bug_count / total if total > 0 else 0.0 

1473 

1474 # Determine churn indicator 

1475 if total >= 5: 

1476 churn = "high" 

1477 elif total >= 3: 

1478 churn = "medium" 

1479 else: 

1480 churn = "low" 

1481 

1482 file_hotspots.append( 

1483 Hotspot( 

1484 path=path, 

1485 issue_count=total, 

1486 issue_ids=data["ids"], 

1487 issue_types=data["types"], 

1488 bug_ratio=bug_ratio, 

1489 churn_indicator=churn, 

1490 ) 

1491 ) 

1492 

1493 # Convert directory data to Hotspot objects 

1494 dir_hotspots: list[Hotspot] = [] 

1495 for path, data in dir_data.items(): 

1496 bug_count = data["types"].get("BUG", 0) 

1497 total = data["count"] 

1498 bug_ratio = bug_count / total if total > 0 else 0.0 

1499 

1500 if total >= 5: 

1501 churn = "high" 

1502 elif total >= 3: 

1503 churn = "medium" 

1504 else: 

1505 churn = "low" 

1506 

1507 dir_hotspots.append( 

1508 Hotspot( 

1509 path=path, 

1510 issue_count=total, 

1511 issue_ids=data["ids"], 

1512 issue_types=data["types"], 

1513 bug_ratio=bug_ratio, 

1514 churn_indicator=churn, 

1515 ) 

1516 ) 

1517 

1518 # Sort by issue count descending 

1519 file_hotspots.sort(key=lambda h: -h.issue_count) 

1520 dir_hotspots.sort(key=lambda h: -h.issue_count) 

1521 

1522 # Identify bug magnets (>60% bug ratio, at least 3 issues) 

1523 bug_magnets = [h for h in file_hotspots if h.bug_ratio > 0.6 and h.issue_count >= 3] 

1524 bug_magnets.sort(key=lambda h: (-h.bug_ratio, -h.issue_count)) 

1525 

1526 return HotspotAnalysis( 

1527 file_hotspots=file_hotspots[:10], # Top 10 

1528 directory_hotspots=dir_hotspots[:10], # Top 10 

1529 bug_magnets=bug_magnets[:5], # Top 5 

1530 ) 

1531 

1532 

1533def analyze_coupling(issues: list[CompletedIssue]) -> CouplingAnalysis: 

1534 """Identify files that frequently change together across issues. 

1535 

1536 Uses Jaccard similarity to calculate coupling strength between file pairs. 

1537 Files with coupling strength >= 0.3 and at least 2 co-occurrences are included. 

1538 

1539 Args: 

1540 issues: List of completed issues 

1541 

1542 Returns: 

1543 CouplingAnalysis with coupled pairs, clusters, and hotspots 

1544 """ 

1545 # Build file -> set of issue IDs mapping 

1546 file_to_issues: dict[str, set[str]] = {} 

1547 

1548 for issue in issues: 

1549 try: 

1550 content = issue.path.read_text(encoding="utf-8") 

1551 except Exception: 

1552 continue 

1553 

1554 paths = _extract_paths_from_issue(content) 

1555 for path in paths: 

1556 if path not in file_to_issues: 

1557 file_to_issues[path] = set() 

1558 file_to_issues[path].add(issue.issue_id) 

1559 

1560 # Calculate pairwise coupling 

1561 files = list(file_to_issues.keys()) 

1562 pairs: list[CouplingPair] = [] 

1563 

1564 for i, file_a in enumerate(files): 

1565 for file_b in files[i + 1 :]: 

1566 a_issues = file_to_issues[file_a] 

1567 b_issues = file_to_issues[file_b] 

1568 co_occur = a_issues & b_issues 

1569 union = a_issues | b_issues 

1570 

1571 if len(co_occur) < 2: # Require at least 2 co-occurrences 

1572 continue 

1573 

1574 # Jaccard similarity 

1575 strength = len(co_occur) / len(union) if union else 0.0 

1576 

1577 if strength >= 0.3: # Only include significant coupling 

1578 pairs.append( 

1579 CouplingPair( 

1580 file_a=file_a, 

1581 file_b=file_b, 

1582 co_occurrence_count=len(co_occur), 

1583 coupling_strength=strength, 

1584 issue_ids=sorted(co_occur), 

1585 ) 

1586 ) 

1587 

1588 # Sort by coupling strength descending 

1589 pairs.sort(key=lambda p: (-p.coupling_strength, -p.co_occurrence_count)) 

1590 

1591 # Build clusters using simple connected components 

1592 clusters = _build_coupling_clusters(pairs) 

1593 

1594 # Identify hotspots (files coupled with 3+ others) 

1595 file_coupling_count: dict[str, int] = {} 

1596 for pair in pairs: 

1597 file_coupling_count[pair.file_a] = file_coupling_count.get(pair.file_a, 0) + 1 

1598 file_coupling_count[pair.file_b] = file_coupling_count.get(pair.file_b, 0) + 1 

1599 

1600 hotspots = [f for f, count in file_coupling_count.items() if count >= 3] 

1601 hotspots.sort(key=lambda f: -file_coupling_count[f]) 

1602 

1603 return CouplingAnalysis( 

1604 pairs=pairs[:20], # Top 20 pairs 

1605 clusters=clusters[:10], # Top 10 clusters 

1606 hotspots=hotspots[:10], # Top 10 hotspots 

1607 ) 

1608 

1609 

1610def _build_coupling_clusters(pairs: list[CouplingPair]) -> list[list[str]]: 

1611 """Build clusters of coupled files using connected components. 

1612 

1613 Args: 

1614 pairs: List of coupling pairs 

1615 

1616 Returns: 

1617 List of file clusters (each cluster is a list of file paths) 

1618 """ 

1619 # Build adjacency for high-coupling pairs (strength >= 0.5) 

1620 adjacency: dict[str, set[str]] = {} 

1621 for pair in pairs: 

1622 if pair.coupling_strength >= 0.5: 

1623 if pair.file_a not in adjacency: 

1624 adjacency[pair.file_a] = set() 

1625 if pair.file_b not in adjacency: 

1626 adjacency[pair.file_b] = set() 

1627 adjacency[pair.file_a].add(pair.file_b) 

1628 adjacency[pair.file_b].add(pair.file_a) 

1629 

1630 # Find connected components 

1631 visited: set[str] = set() 

1632 clusters: list[list[str]] = [] 

1633 

1634 for start in adjacency: 

1635 if start in visited: 

1636 continue 

1637 # BFS to find component 

1638 cluster: list[str] = [] 

1639 queue = [start] 

1640 while queue: 

1641 node = queue.pop(0) 

1642 if node in visited: 

1643 continue 

1644 visited.add(node) 

1645 cluster.append(node) 

1646 for neighbor in adjacency.get(node, set()): 

1647 if neighbor not in visited: 

1648 queue.append(neighbor) 

1649 

1650 if len(cluster) >= 2: # Only include clusters with 2+ files 

1651 cluster.sort() 

1652 clusters.append(cluster) 

1653 

1654 # Sort clusters by size descending 

1655 clusters.sort(key=lambda c: -len(c)) 

1656 return clusters 

1657 

1658 

1659def analyze_regression_clustering( 

1660 issues: list[CompletedIssue], 

1661) -> RegressionAnalysis: 

1662 """Detect files where bug fixes frequently lead to new bugs. 

1663 

1664 Uses heuristics: 

1665 1. Temporal proximity: Bug B completed within 7 days of Bug A 

1666 2. File overlap: Both bugs affect same file(s) 

1667 

1668 Args: 

1669 issues: List of completed issues 

1670 

1671 Returns: 

1672 RegressionAnalysis with clusters of related regressions 

1673 """ 

1674 # Filter to bugs only and sort by completion date 

1675 bugs = [i for i in issues if i.issue_type == "BUG" and i.completed_date] 

1676 bugs.sort(key=lambda i: i.completed_date) # type: ignore 

1677 

1678 if len(bugs) < 2: 

1679 return RegressionAnalysis() 

1680 

1681 # Extract file paths for each bug 

1682 bug_files: dict[str, set[str]] = {} # issue_id -> set of files 

1683 for bug in bugs: 

1684 try: 

1685 content = bug.path.read_text(encoding="utf-8") 

1686 paths = _extract_paths_from_issue(content) 

1687 bug_files[bug.issue_id] = set(paths) 

1688 except Exception: 

1689 bug_files[bug.issue_id] = set() 

1690 

1691 # Find regression pairs (temporal proximity + file overlap) 

1692 regression_pairs: list[tuple[CompletedIssue, CompletedIssue, set[str]]] = [] 

1693 

1694 for i, bug_a in enumerate(bugs[:-1]): 

1695 files_a = bug_files.get(bug_a.issue_id, set()) 

1696 if not files_a: 

1697 continue 

1698 

1699 for bug_b in bugs[i + 1 :]: 

1700 # Check temporal proximity (within 7 days) 

1701 days_apart = (bug_b.completed_date - bug_a.completed_date).days # type: ignore 

1702 if days_apart > 7: 

1703 break # Bugs are sorted, no need to check further 

1704 

1705 files_b = bug_files.get(bug_b.issue_id, set()) 

1706 if not files_b: 

1707 continue 

1708 

1709 # Check file overlap 

1710 overlap = files_a & files_b 

1711 if overlap: 

1712 regression_pairs.append((bug_a, bug_b, overlap)) 

1713 

1714 if not regression_pairs: 

1715 return RegressionAnalysis() 

1716 

1717 # Group by primary file (most common overlapping file) 

1718 file_regressions: dict[str, list[tuple[str, str, int]]] = {} # file -> [(id_a, id_b, days)] 

1719 

1720 for bug_a, bug_b, overlap in regression_pairs: 

1721 days = (bug_b.completed_date - bug_a.completed_date).days # type: ignore 

1722 for file_path in overlap: 

1723 if file_path not in file_regressions: 

1724 file_regressions[file_path] = [] 

1725 file_regressions[file_path].append((bug_a.issue_id, bug_b.issue_id, days)) 

1726 

1727 # Build clusters 

1728 clusters: list[RegressionCluster] = [] 

1729 

1730 for file_path, pairs in file_regressions.items(): 

1731 # Determine time pattern 

1732 avg_days = sum(d for _, _, d in pairs) / len(pairs) 

1733 if avg_days < 3: 

1734 time_pattern = "immediate" 

1735 elif len(pairs) >= 3: 

1736 time_pattern = "chronic" 

1737 else: 

1738 time_pattern = "delayed" 

1739 

1740 # Determine severity 

1741 if len(pairs) >= 4: 

1742 severity = "critical" 

1743 elif len(pairs) >= 2: 

1744 severity = "high" 

1745 else: 

1746 severity = "medium" 

1747 

1748 # Collect related files 

1749 related_files: set[str] = set() 

1750 for bug_a, bug_b, _ in regression_pairs: 

1751 if file_path in ( 

1752 bug_files.get(bug_a.issue_id, set()) & bug_files.get(bug_b.issue_id, set()) 

1753 ): 

1754 related_files.update(bug_files.get(bug_a.issue_id, set())) 

1755 related_files.update(bug_files.get(bug_b.issue_id, set())) 

1756 related_files.discard(file_path) 

1757 

1758 clusters.append( 

1759 RegressionCluster( 

1760 primary_file=file_path, 

1761 regression_count=len(pairs), 

1762 fix_bug_pairs=[(a, b) for a, b, _ in pairs], 

1763 related_files=sorted(related_files), 

1764 time_pattern=time_pattern, 

1765 severity=severity, 

1766 ) 

1767 ) 

1768 

1769 # Sort by regression count descending 

1770 clusters.sort(key=lambda c: (-c.regression_count, c.primary_file)) 

1771 

1772 # Identify most fragile files 

1773 most_fragile = [c.primary_file for c in clusters[:5]] 

1774 

1775 return RegressionAnalysis( 

1776 clusters=clusters[:10], # Top 10 

1777 total_regression_chains=len(regression_pairs), 

1778 most_fragile_files=most_fragile, 

1779 ) 

1780 

1781 

1782def analyze_test_gaps( 

1783 issues: list[CompletedIssue], 

1784 hotspots: HotspotAnalysis, 

1785) -> TestGapAnalysis: 

1786 """Correlate bug occurrences with test coverage gaps. 

1787 

1788 Args: 

1789 issues: List of completed issues (unused, for API consistency) 

1790 hotspots: Pre-computed hotspot analysis 

1791 

1792 Returns: 

1793 TestGapAnalysis with test coverage gap information 

1794 """ 

1795 # Build map of source files to bug info from hotspots 

1796 bug_files: dict[str, dict[str, Any]] = {} 

1797 

1798 for hotspot in hotspots.file_hotspots: 

1799 bug_count = hotspot.issue_types.get("BUG", 0) 

1800 if bug_count > 0: 

1801 # Filter to only BUG issue IDs 

1802 bug_ids = [iid for iid in hotspot.issue_ids if iid.startswith("BUG-")] 

1803 bug_files[hotspot.path] = { 

1804 "bug_count": bug_count, 

1805 "bug_ids": bug_ids, 

1806 } 

1807 

1808 if not bug_files: 

1809 return TestGapAnalysis() 

1810 

1811 # Analyze test coverage for each file with bugs 

1812 gaps: list[TestGap] = [] 

1813 files_with_tests: list[int] = [] # bug counts 

1814 files_without_tests: list[int] = [] # bug counts 

1815 

1816 for source_file, data in bug_files.items(): 

1817 bug_count = data["bug_count"] 

1818 bug_ids = data["bug_ids"] 

1819 

1820 test_file = _find_test_file(source_file) 

1821 has_test = test_file is not None 

1822 

1823 # Calculate gap score: higher = more urgent to add tests 

1824 # Files without tests get amplified scores 

1825 if has_test: 

1826 gap_score = bug_count * 1.0 

1827 files_with_tests.append(bug_count) 

1828 else: 

1829 gap_score = bug_count * 10.0 # Amplify untested files 

1830 files_without_tests.append(bug_count) 

1831 

1832 # Determine priority based on bug count and test presence 

1833 if not has_test and bug_count >= 5: 

1834 priority = "critical" 

1835 elif not has_test and bug_count >= 3: 

1836 priority = "high" 

1837 elif not has_test or bug_count >= 4: 

1838 priority = "medium" 

1839 else: 

1840 priority = "low" 

1841 

1842 gaps.append( 

1843 TestGap( 

1844 source_file=source_file, 

1845 bug_count=bug_count, 

1846 bug_ids=bug_ids, 

1847 has_test_file=has_test, 

1848 test_file_path=test_file, 

1849 gap_score=gap_score, 

1850 priority=priority, 

1851 ) 

1852 ) 

1853 

1854 # Sort by gap score descending (highest priority first) 

1855 gaps.sort(key=lambda g: (-g.gap_score, -g.bug_count)) 

1856 

1857 # Calculate averages for correlation 

1858 avg_with_tests = sum(files_with_tests) / len(files_with_tests) if files_with_tests else 0.0 

1859 avg_without_tests = ( 

1860 sum(files_without_tests) / len(files_without_tests) if files_without_tests else 0.0 

1861 ) 

1862 

1863 # Identify untested bug magnets (from hotspot analysis) 

1864 untested_magnets = [h.path for h in hotspots.bug_magnets if _find_test_file(h.path) is None] 

1865 

1866 # Priority test targets: untested files sorted by bug count 

1867 priority_targets = [g.source_file for g in gaps if not g.has_test_file] 

1868 

1869 return TestGapAnalysis( 

1870 gaps=gaps[:15], # Top 15 

1871 untested_bug_magnets=untested_magnets, 

1872 files_with_tests_avg_bugs=avg_with_tests, 

1873 files_without_tests_avg_bugs=avg_without_tests, 

1874 priority_test_targets=priority_targets[:10], 

1875 ) 

1876 

1877 

1878def analyze_rejection_rates(issues: list[CompletedIssue]) -> RejectionAnalysis: 

1879 """Analyze rejection and invalid closure patterns. 

1880 

1881 Args: 

1882 issues: List of completed issues 

1883 

1884 Returns: 

1885 RejectionAnalysis with overall and grouped metrics 

1886 """ 

1887 if not issues: 

1888 return RejectionAnalysis() 

1889 

1890 # Count by category 

1891 overall = RejectionMetrics() 

1892 by_type: dict[str, RejectionMetrics] = {} 

1893 by_month: dict[str, RejectionMetrics] = {} 

1894 reason_counts: dict[str, int] = {} 

1895 

1896 for issue in issues: 

1897 try: 

1898 content = issue.path.read_text(encoding="utf-8") 

1899 except Exception: 

1900 continue 

1901 

1902 category = _parse_resolution_action(content) 

1903 overall.total_closed += 1 

1904 

1905 # Update overall counts 

1906 if category == "completed": 

1907 overall.completed_count += 1 

1908 elif category == "rejected": 

1909 overall.rejected_count += 1 

1910 elif category == "invalid": 

1911 overall.invalid_count += 1 

1912 elif category == "duplicate": 

1913 overall.duplicate_count += 1 

1914 elif category == "deferred": 

1915 overall.deferred_count += 1 

1916 

1917 # By type 

1918 if issue.issue_type not in by_type: 

1919 by_type[issue.issue_type] = RejectionMetrics() 

1920 type_metrics = by_type[issue.issue_type] 

1921 type_metrics.total_closed += 1 

1922 if category == "rejected": 

1923 type_metrics.rejected_count += 1 

1924 elif category == "invalid": 

1925 type_metrics.invalid_count += 1 

1926 elif category == "duplicate": 

1927 type_metrics.duplicate_count += 1 

1928 elif category == "deferred": 

1929 type_metrics.deferred_count += 1 

1930 elif category == "completed": 

1931 type_metrics.completed_count += 1 

1932 

1933 # By month 

1934 if issue.completed_date: 

1935 month_key = issue.completed_date.strftime("%Y-%m") 

1936 if month_key not in by_month: 

1937 by_month[month_key] = RejectionMetrics() 

1938 month_metrics = by_month[month_key] 

1939 month_metrics.total_closed += 1 

1940 if category == "rejected": 

1941 month_metrics.rejected_count += 1 

1942 elif category == "invalid": 

1943 month_metrics.invalid_count += 1 

1944 elif category == "duplicate": 

1945 month_metrics.duplicate_count += 1 

1946 elif category == "deferred": 

1947 month_metrics.deferred_count += 1 

1948 elif category == "completed": 

1949 month_metrics.completed_count += 1 

1950 

1951 # Extract reason for rejection/invalid 

1952 if category in ("rejected", "invalid", "duplicate", "deferred"): 

1953 reason_match = re.search(r"\*\*Reason\*\*:\s*(.+?)(?:\n|$)", content) 

1954 if reason_match: 

1955 reason = reason_match.group(1).strip() 

1956 reason_counts[reason] = reason_counts.get(reason, 0) + 1 

1957 

1958 # Calculate trend from monthly data 

1959 sorted_months = sorted(by_month.keys()) 

1960 if len(sorted_months) >= 3: 

1961 recent = sorted_months[-3:] 

1962 rates = [by_month[m].rejection_rate + by_month[m].invalid_rate for m in recent] 

1963 if rates[-1] < rates[0] * 0.8: 

1964 trend = "improving" 

1965 elif rates[-1] > rates[0] * 1.2: 

1966 trend = "degrading" 

1967 else: 

1968 trend = "stable" 

1969 else: 

1970 trend = "stable" 

1971 

1972 # Sort reasons by count 

1973 common_reasons = sorted(reason_counts.items(), key=lambda x: -x[1])[:10] 

1974 

1975 return RejectionAnalysis( 

1976 overall=overall, 

1977 by_type=by_type, 

1978 by_month=by_month, 

1979 common_reasons=common_reasons, 

1980 trend=trend, 

1981 ) 

1982 

1983 

1984# Pattern definitions for manual activity detection 

1985_MANUAL_PATTERNS: dict[str, dict[str, Any]] = { 

1986 "test": { 

1987 "patterns": [ 

1988 r"(?:pytest|python -m pytest|npm test|yarn test|jest|cargo test|go test)", 

1989 r"(?:python -m unittest|nosetests|tox)", 

1990 ], 

1991 "description": "Test execution after code changes", 

1992 "suggestion": "Add post-edit hook for automatic test runs", 

1993 "complexity": "trivial", 

1994 }, 

1995 "lint": { 

1996 "patterns": [ 

1997 r"(?:ruff check|ruff format|black|isort|flake8|pylint)", 

1998 r"(?:eslint|prettier|tslint)", 

1999 ], 

2000 "description": "Lint/format fixes after implementation", 

2001 "suggestion": "Add pre-commit hook for auto-formatting", 

2002 "complexity": "simple", 

2003 }, 

2004 "type_check": { 

2005 "patterns": [ 

2006 r"(?:mypy|pyright|python -m mypy)", 

2007 r"(?:tsc|npx tsc)", 

2008 ], 

2009 "description": "Type checking during development", 

2010 "suggestion": "Add mypy to pre-commit or post-edit hook", 

2011 "complexity": "simple", 

2012 }, 

2013 "build": { 

2014 "patterns": [ 

2015 r"(?:npm run build|yarn build|make|cargo build|go build)", 

2016 r"(?:python -m build|pip install -e)", 

2017 ], 

2018 "description": "Build steps during implementation", 

2019 "suggestion": "Add build verification to test suite or CI", 

2020 "complexity": "moderate", 

2021 }, 

2022 "git": { 

2023 "patterns": [ 

2024 r"git (?:add|commit|push|pull|checkout|branch)", 

2025 ], 

2026 "description": "Git operations during issue resolution", 

2027 "suggestion": "Use /ll:commit skill for standardized commits", 

2028 "complexity": "trivial", 

2029 }, 

2030} 

2031 

2032# Cross-cutting concern keywords for smell detection 

2033_CROSS_CUTTING_KEYWORDS: dict[str, list[str]] = { 

2034 "logging": ["log", "logger", "logging", "debug", "trace", "print"], 

2035 "error-handling": ["error", "exception", "try", "catch", "raise", "except", "fail"], 

2036 "validation": ["valid", "validate", "check", "assert", "verify", "sanitize"], 

2037 "auth": ["auth", "permission", "role", "access", "token", "credential", "login"], 

2038 "caching": ["cache", "memo", "memoize", "store", "ttl", "expire", "cached"], 

2039} 

2040 

2041# Suggested patterns for each cross-cutting concern type 

2042_CONCERN_PATTERNS: dict[str, str] = { 

2043 "logging": "decorator", 

2044 "error-handling": "middleware", 

2045 "validation": "decorator", 

2046 "auth": "middleware", 

2047 "caching": "decorator", 

2048} 

2049 

2050 

2051def detect_manual_patterns(issues: list[CompletedIssue]) -> ManualPatternAnalysis: 

2052 """Detect recurring manual activities that could be automated. 

2053 

2054 Args: 

2055 issues: List of completed issues 

2056 

2057 Returns: 

2058 ManualPatternAnalysis with detected patterns 

2059 """ 

2060 if not issues: 

2061 return ManualPatternAnalysis() 

2062 

2063 # Track pattern occurrences 

2064 pattern_data: dict[str, dict[str, Any]] = {} 

2065 

2066 for pattern_type, config in _MANUAL_PATTERNS.items(): 

2067 pattern_data[pattern_type] = { 

2068 "count": 0, 

2069 "issues": [], 

2070 "commands": [], 

2071 "config": config, 

2072 } 

2073 

2074 # Scan issue content for patterns 

2075 for issue in issues: 

2076 try: 

2077 content = issue.path.read_text(encoding="utf-8") 

2078 except Exception: 

2079 continue 

2080 

2081 for pattern_type, config in _MANUAL_PATTERNS.items(): 

2082 for pattern in config["patterns"]: 

2083 matches = re.findall(pattern, content, re.IGNORECASE) 

2084 if matches: 

2085 data = pattern_data[pattern_type] 

2086 data["count"] += len(matches) 

2087 if issue.issue_id not in data["issues"]: 

2088 data["issues"].append(issue.issue_id) 

2089 # Store unique command examples 

2090 for match in matches: 

2091 if match not in data["commands"]: 

2092 data["commands"].append(match) 

2093 

2094 # Build ManualPattern objects 

2095 patterns: list[ManualPattern] = [] 

2096 total_interventions = 0 

2097 automatable = 0 

2098 

2099 for pattern_type, data in pattern_data.items(): 

2100 if data["count"] > 0: 

2101 config = data["config"] 

2102 pattern = ManualPattern( 

2103 pattern_type=pattern_type, 

2104 pattern_description=config["description"], 

2105 occurrence_count=data["count"], 

2106 affected_issues=data["issues"], 

2107 example_commands=data["commands"][:5], 

2108 suggested_automation=config["suggestion"], 

2109 automation_complexity=config["complexity"], 

2110 ) 

2111 patterns.append(pattern) 

2112 total_interventions += data["count"] 

2113 automatable += data["count"] 

2114 

2115 # Sort by occurrence count descending 

2116 patterns.sort(key=lambda p: -p.occurrence_count) 

2117 

2118 # Build automation suggestions 

2119 suggestions = [p.suggested_automation for p in patterns if p.occurrence_count >= 2] 

2120 

2121 return ManualPatternAnalysis( 

2122 patterns=patterns, 

2123 total_manual_interventions=total_interventions, 

2124 automatable_count=automatable, 

2125 automation_suggestions=suggestions[:10], 

2126 ) 

2127 

2128 

2129def detect_cross_cutting_smells( 

2130 issues: list[CompletedIssue], 

2131 hotspots: HotspotAnalysis, 

2132) -> CrossCuttingAnalysis: 

2133 """Detect cross-cutting concerns scattered across the codebase. 

2134 

2135 Identifies when issues consistently touch multiple unrelated directories, 

2136 suggesting missing abstractions for cross-cutting concerns like logging, 

2137 error handling, or validation. 

2138 

2139 Args: 

2140 issues: List of completed issues 

2141 hotspots: Hotspot analysis results (provides directory reference) 

2142 

2143 Returns: 

2144 CrossCuttingAnalysis with detected smells 

2145 """ 

2146 if not issues: 

2147 return CrossCuttingAnalysis() 

2148 

2149 # Track concern data: {concern_type: {dirs: set, issues: list}} 

2150 concern_data: dict[str, dict[str, Any]] = {} 

2151 for concern_type in _CROSS_CUTTING_KEYWORDS: 

2152 concern_data[concern_type] = { 

2153 "directories": set(), 

2154 "issue_ids": [], 

2155 } 

2156 

2157 # Get all unique directories from hotspots for scatter score calculation 

2158 all_directories: set[str] = set() 

2159 if hotspots.directory_hotspots: 

2160 all_directories = {h.path for h in hotspots.directory_hotspots} 

2161 

2162 # Analyze each issue 

2163 for issue in issues: 

2164 try: 

2165 content = issue.path.read_text(encoding="utf-8") 

2166 content_lower = content.lower() 

2167 except Exception: 

2168 continue 

2169 

2170 # Extract paths from this issue 

2171 paths = _extract_paths_from_issue(content) 

2172 issue_dirs = {str(Path(p).parent) for p in paths if "/" in p or "\\" in p} 

2173 all_directories.update(issue_dirs) 

2174 

2175 # Check if this issue touches multiple directories (3+) 

2176 if len(issue_dirs) < 3: 

2177 continue 

2178 

2179 # Check for concern keywords 

2180 for concern_type, keywords in _CROSS_CUTTING_KEYWORDS.items(): 

2181 if any(kw in content_lower for kw in keywords): 

2182 concern_data[concern_type]["directories"].update(issue_dirs) 

2183 if issue.issue_id not in concern_data[concern_type]["issue_ids"]: 

2184 concern_data[concern_type]["issue_ids"].append(issue.issue_id) 

2185 

2186 # Build CrossCuttingSmell objects 

2187 smells: list[CrossCuttingSmell] = [] 

2188 total_dirs = len(all_directories) if all_directories else 1 

2189 

2190 for concern_type, data in concern_data.items(): 

2191 if data["issue_ids"]: # Only include concerns with detected issues 

2192 dirs = sorted(data["directories"]) 

2193 scatter_score = len(dirs) / total_dirs if total_dirs > 0 else 0.0 

2194 

2195 smell = CrossCuttingSmell( 

2196 concern_type=concern_type, 

2197 affected_directories=dirs, 

2198 issue_count=len(data["issue_ids"]), 

2199 issue_ids=data["issue_ids"], 

2200 scatter_score=scatter_score, 

2201 suggested_pattern=_CONCERN_PATTERNS.get(concern_type, "aspect"), 

2202 ) 

2203 smells.append(smell) 

2204 

2205 # Sort by scatter score descending 

2206 smells.sort(key=lambda s: -s.scatter_score) 

2207 

2208 # Identify most scattered concern 

2209 most_scattered = smells[0].concern_type if smells else "" 

2210 

2211 # Build consolidation opportunities 

2212 consolidation_opportunities = [] 

2213 for smell in smells: 

2214 if smell.scatter_score >= 0.3: # Threshold for suggesting consolidation 

2215 consolidation_opportunities.append( 

2216 f"Centralize {smell.concern_type} ({smell.issue_count} issues would benefit)" 

2217 ) 

2218 

2219 return CrossCuttingAnalysis( 

2220 smells=smells, 

2221 most_scattered_concern=most_scattered, 

2222 consolidation_opportunities=consolidation_opportunities[:10], 

2223 ) 

2224 

2225 

2226# Mapping from manual pattern types to configuration solutions 

2227_PATTERN_TO_CONFIG: dict[str, dict[str, Any]] = { 

2228 "test": { 

2229 "hook_event": "PostToolUse", 

2230 "description": "Automatic test execution after code changes", 

2231 "suggested_config": """hooks/hooks.json: 

2232 "PostToolUse": [{ 

2233 "matcher": "Edit|Write", 

2234 "hooks": [{ 

2235 "type": "command", 

2236 "command": "pytest tests/ -x -q", 

2237 "timeout": 30000 

2238 }] 

2239 }]""", 

2240 }, 

2241 "lint": { 

2242 "hook_event": "PreToolUse", 

2243 "description": "Automatic formatting before file writes", 

2244 "suggested_config": """hooks/hooks.json: 

2245 "PreToolUse": [{ 

2246 "matcher": "Write|Edit", 

2247 "hooks": [{ 

2248 "type": "command", 

2249 "command": "ruff format --check .", 

2250 "timeout": 10000 

2251 }] 

2252 }]""", 

2253 }, 

2254 "type_check": { 

2255 "hook_event": "PostToolUse", 

2256 "description": "Type checking after code modifications", 

2257 "suggested_config": """hooks/hooks.json: 

2258 "PostToolUse": [{ 

2259 "matcher": "Edit|Write", 

2260 "hooks": [{ 

2261 "type": "command", 

2262 "command": "mypy --fast .", 

2263 "timeout": 30000 

2264 }] 

2265 }]""", 

2266 }, 

2267 "build": { 

2268 "hook_event": "PostToolUse", 

2269 "description": "Build verification after changes", 

2270 "suggested_config": """hooks/hooks.json: 

2271 "PostToolUse": [{ 

2272 "matcher": "Edit|Write", 

2273 "hooks": [{ 

2274 "type": "command", 

2275 "command": "npm run build", 

2276 "timeout": 60000 

2277 }] 

2278 }]""", 

2279 }, 

2280} 

2281 

2282 

2283def detect_config_gaps( 

2284 manual_pattern_analysis: ManualPatternAnalysis, 

2285 project_root: Path | None = None, 

2286) -> ConfigGapsAnalysis: 

2287 """Detect configuration gaps based on manual pattern analysis. 

2288 

2289 Args: 

2290 manual_pattern_analysis: Results from detect_manual_patterns() 

2291 project_root: Project root directory (defaults to cwd) 

2292 

2293 Returns: 

2294 ConfigGapsAnalysis with identified gaps and coverage metrics 

2295 """ 

2296 if project_root is None: 

2297 project_root = Path.cwd() 

2298 

2299 # Discover current configuration 

2300 current_hooks: list[str] = [] 

2301 current_skills: list[str] = [] 

2302 current_agents: list[str] = [] 

2303 

2304 # Load hooks configuration 

2305 hooks_file = project_root / "hooks" / "hooks.json" 

2306 if hooks_file.exists(): 

2307 try: 

2308 with open(hooks_file, encoding="utf-8") as f: 

2309 hooks_data = json.load(f) 

2310 current_hooks = list(hooks_data.get("hooks", {}).keys()) 

2311 except Exception: 

2312 pass 

2313 

2314 # Scan for agents 

2315 agents_dir = project_root / "agents" 

2316 if agents_dir.is_dir(): 

2317 for agent_file in agents_dir.glob("*.md"): 

2318 current_agents.append(agent_file.stem) 

2319 

2320 # Scan for skills 

2321 skills_dir = project_root / "skills" 

2322 if skills_dir.is_dir(): 

2323 for skill_dir in skills_dir.iterdir(): 

2324 if skill_dir.is_dir() and (skill_dir / "SKILL.md").exists(): 

2325 current_skills.append(skill_dir.name) 

2326 

2327 # Identify gaps from manual patterns 

2328 gaps: list[ConfigGap] = [] 

2329 covered_patterns = 0 

2330 recognized_patterns = 0 

2331 

2332 for pattern in manual_pattern_analysis.patterns: 

2333 config_mapping = _PATTERN_TO_CONFIG.get(pattern.pattern_type) 

2334 if not config_mapping: 

2335 continue 

2336 

2337 recognized_patterns += 1 

2338 hook_event = config_mapping["hook_event"] 

2339 

2340 # Check if hook event is already configured 

2341 if hook_event in current_hooks: 

2342 covered_patterns += 1 

2343 continue 

2344 

2345 # Determine priority based on occurrence count 

2346 if pattern.occurrence_count >= 10: 

2347 priority = "high" 

2348 elif pattern.occurrence_count >= 5: 

2349 priority = "medium" 

2350 else: 

2351 priority = "low" 

2352 

2353 gap = ConfigGap( 

2354 gap_type="hook", 

2355 description=config_mapping["description"], 

2356 evidence=pattern.affected_issues, 

2357 suggested_config=config_mapping["suggested_config"], 

2358 priority=priority, 

2359 pattern_type=pattern.pattern_type, 

2360 ) 

2361 gaps.append(gap) 

2362 

2363 # Calculate coverage score based on recognized patterns only 

2364 coverage_score = covered_patterns / recognized_patterns if recognized_patterns > 0 else 1.0 

2365 

2366 # Sort gaps by priority (high first) 

2367 priority_order = {"high": 0, "medium": 1, "low": 2} 

2368 gaps.sort(key=lambda g: priority_order.get(g.priority, 3)) 

2369 

2370 return ConfigGapsAnalysis( 

2371 gaps=gaps, 

2372 current_hooks=current_hooks, 

2373 current_skills=current_skills, 

2374 current_agents=current_agents, 

2375 coverage_score=coverage_score, 

2376 ) 

2377 

2378 

2379def analyze_agent_effectiveness(issues: list[CompletedIssue]) -> AgentEffectivenessAnalysis: 

2380 """Analyze agent effectiveness across issue types. 

2381 

2382 Groups issues by processing agent and issue type, calculating 

2383 success/failure/rejection rates for each combination. 

2384 

2385 Args: 

2386 issues: List of completed issues 

2387 

2388 Returns: 

2389 AgentEffectivenessAnalysis with outcomes and recommendations 

2390 """ 

2391 if not issues: 

2392 return AgentEffectivenessAnalysis() 

2393 

2394 # Track outcomes by (agent, issue_type) 

2395 outcomes_map: dict[tuple[str, str], AgentOutcome] = {} 

2396 

2397 for issue in issues: 

2398 try: 

2399 content = issue.path.read_text(encoding="utf-8") 

2400 except Exception: 

2401 continue 

2402 

2403 # Detect agent (discovered_by may contain source info in some cases) 

2404 agent = _detect_processing_agent(content, issue.discovered_by) 

2405 

2406 # Get resolution outcome 

2407 resolution = _parse_resolution_action(content) 

2408 

2409 # Get or create outcome tracker 

2410 key = (agent, issue.issue_type) 

2411 if key not in outcomes_map: 

2412 outcomes_map[key] = AgentOutcome( 

2413 agent_name=agent, 

2414 issue_type=issue.issue_type, 

2415 ) 

2416 

2417 outcome = outcomes_map[key] 

2418 

2419 # Categorize outcome 

2420 if resolution == "completed": 

2421 outcome.success_count += 1 

2422 elif resolution in ("rejected", "invalid", "duplicate"): 

2423 outcome.rejection_count += 1 

2424 else: # deferred or other 

2425 outcome.failure_count += 1 

2426 

2427 # Build outcomes list 

2428 outcomes = list(outcomes_map.values()) 

2429 

2430 # Determine best agent per issue type 

2431 best_agent_by_type: dict[str, str] = {} 

2432 type_agents: dict[str, list[AgentOutcome]] = {} 

2433 

2434 for outcome in outcomes: 

2435 if outcome.issue_type not in type_agents: 

2436 type_agents[outcome.issue_type] = [] 

2437 type_agents[outcome.issue_type].append(outcome) 

2438 

2439 for issue_type, agent_outcomes in type_agents.items(): 

2440 # Require minimum sample size 

2441 significant_outcomes = [o for o in agent_outcomes if o.total_count >= 3] 

2442 if significant_outcomes: 

2443 best = max(significant_outcomes, key=lambda o: o.success_rate) 

2444 best_agent_by_type[issue_type] = best.agent_name 

2445 

2446 # Identify problematic combinations (success rate < 50% with >= 5 samples) 

2447 problematic_combinations: list[tuple[str, str, str]] = [] 

2448 for outcome in outcomes: 

2449 if outcome.total_count >= 5 and outcome.success_rate < 0.5: 

2450 reason = ( 

2451 f"{outcome.success_rate * 100:.0f}% success " 

2452 f"({outcome.success_count}/{outcome.total_count})" 

2453 ) 

2454 problematic_combinations.append((outcome.agent_name, outcome.issue_type, reason)) 

2455 

2456 # Sort by success rate ascending (worst first) 

2457 problematic_combinations.sort(key=lambda x: float(x[2].split("%")[0])) 

2458 

2459 return AgentEffectivenessAnalysis( 

2460 outcomes=sorted(outcomes, key=lambda o: (o.agent_name, o.issue_type)), 

2461 best_agent_by_type=best_agent_by_type, 

2462 problematic_combinations=problematic_combinations, 

2463 ) 

2464 

2465 

2466def scan_active_issues(issues_dir: Path) -> list[tuple[Path, str, str, date | None]]: 

2467 """Scan active issue directories. 

2468 

2469 Args: 

2470 issues_dir: Path to .issues/ directory 

2471 

2472 Returns: 

2473 List of (path, issue_type, priority, discovered_date) tuples 

2474 """ 

2475 results: list[tuple[Path, str, str, date | None]] = [] 

2476 

2477 for category_dir in ["bugs", "features", "enhancements"]: 

2478 category_path = issues_dir / category_dir 

2479 if not category_path.exists(): 

2480 continue 

2481 

2482 for file_path in category_path.glob("*.md"): 

2483 filename = file_path.name 

2484 

2485 # Extract priority 

2486 priority = "P5" 

2487 priority_match = re.match(r"^(P\d)", filename) 

2488 if priority_match: 

2489 priority = priority_match.group(1) 

2490 

2491 # Extract type 

2492 issue_type = "UNKNOWN" 

2493 type_match = re.search(r"(BUG|ENH|FEAT)", filename) 

2494 if type_match: 

2495 issue_type = type_match.group(1) 

2496 

2497 # Extract discovered date from content 

2498 discovered_date = None 

2499 try: 

2500 content = file_path.read_text(encoding="utf-8") 

2501 discovered_date = _parse_discovered_date(content) 

2502 except Exception: 

2503 pass 

2504 

2505 results.append((file_path, issue_type, priority, discovered_date)) 

2506 

2507 return results 

2508 

2509 

2510def analyze_complexity_proxy( 

2511 issues: list[CompletedIssue], 

2512 hotspots: HotspotAnalysis, 

2513) -> ComplexityProxyAnalysis: 

2514 """Use issue duration as proxy for code complexity. 

2515 

2516 Areas that consistently take longer to resolve suggest higher complexity, 

2517 insufficient documentation, or accumulated technical debt. 

2518 

2519 Args: 

2520 issues: List of completed issues with dates 

2521 hotspots: Pre-computed hotspot analysis for path information 

2522 

2523 Returns: 

2524 ComplexityProxyAnalysis with duration-based complexity metrics 

2525 """ 

2526 # Calculate durations for all issues with both dates 

2527 issue_durations: dict[str, float] = {} # issue_id -> days 

2528 for issue in issues: 

2529 if issue.discovered_date and issue.completed_date: 

2530 delta = issue.completed_date - issue.discovered_date 

2531 days = float(delta.days) 

2532 if days >= 0: # Sanity check 

2533 issue_durations[issue.issue_id] = days 

2534 

2535 if not issue_durations: 

2536 return ComplexityProxyAnalysis() 

2537 

2538 # Calculate baseline (median duration) 

2539 all_durations = sorted(issue_durations.values()) 

2540 n = len(all_durations) 

2541 if n % 2 == 0: 

2542 baseline_days = (all_durations[n // 2 - 1] + all_durations[n // 2]) / 2 

2543 else: 

2544 baseline_days = all_durations[n // 2] 

2545 

2546 if baseline_days == 0: 

2547 baseline_days = 1.0 # Avoid division by zero 

2548 

2549 # Map issues to their affected files by reading issue content 

2550 issue_to_files: dict[str, list[str]] = {} 

2551 for issue in issues: 

2552 if issue.issue_id in issue_durations: 

2553 try: 

2554 content = issue.path.read_text(encoding="utf-8") 

2555 paths = _extract_paths_from_issue(content) 

2556 if paths: 

2557 issue_to_files[issue.issue_id] = paths 

2558 except Exception: 

2559 continue 

2560 

2561 # Aggregate durations by file 

2562 file_durations: dict[str, list[tuple[str, float]]] = {} # path -> [(issue_id, days), ...] 

2563 for issue_id, files in issue_to_files.items(): 

2564 days = issue_durations[issue_id] 

2565 for f in files: 

2566 if f not in file_durations: 

2567 file_durations[f] = [] 

2568 file_durations[f].append((issue_id, days)) 

2569 

2570 # Aggregate durations by directory 

2571 dir_durations: dict[str, list[tuple[str, float]]] = {} 

2572 for path, entries in file_durations.items(): 

2573 dir_path = "/".join(path.split("/")[:-1]) + "/" if "/" in path else "./" 

2574 if dir_path not in dir_durations: 

2575 dir_durations[dir_path] = [] 

2576 dir_durations[dir_path].extend(entries) 

2577 

2578 # Build file complexity proxies 

2579 file_complexity: list[ComplexityProxy] = [] 

2580 for path, entries in file_durations.items(): 

2581 if len(entries) < 2: # Need at least 2 data points 

2582 continue 

2583 

2584 durations = [d for _, d in entries] 

2585 avg = sum(durations) / len(durations) 

2586 sorted_d = sorted(durations) 

2587 median = sorted_d[len(sorted_d) // 2] 

2588 slowest = max(entries, key=lambda x: x[1]) 

2589 

2590 # Normalize complexity score (0-1 based on how much slower than baseline) 

2591 ratio = avg / baseline_days 

2592 complexity_score = min(1.0, (ratio - 1) / 4) # 5x slower = 1.0 

2593 complexity_score = max(0.0, complexity_score) 

2594 

2595 comparison = f"{ratio:.1f}x baseline" if ratio >= 1.5 else "near baseline" 

2596 

2597 file_complexity.append( 

2598 ComplexityProxy( 

2599 path=path, 

2600 avg_resolution_days=avg, 

2601 median_resolution_days=median, 

2602 issue_count=len(entries), 

2603 slowest_issue=slowest, 

2604 complexity_score=complexity_score, 

2605 comparison_to_baseline=comparison, 

2606 ) 

2607 ) 

2608 

2609 # Build directory complexity proxies 

2610 directory_complexity: list[ComplexityProxy] = [] 

2611 for dir_path, entries in dir_durations.items(): 

2612 if len(entries) < 3: # Need at least 3 data points for directories 

2613 continue 

2614 

2615 # Deduplicate by issue_id for directory-level stats 

2616 unique_entries: dict[str, float] = {} 

2617 for issue_id, days in entries: 

2618 if issue_id not in unique_entries or days > unique_entries[issue_id]: 

2619 unique_entries[issue_id] = days 

2620 

2621 entries_list = list(unique_entries.items()) 

2622 durations = list(unique_entries.values()) 

2623 avg = sum(durations) / len(durations) 

2624 sorted_d = sorted(durations) 

2625 median = sorted_d[len(sorted_d) // 2] 

2626 slowest = max(entries_list, key=lambda x: x[1]) 

2627 

2628 ratio = avg / baseline_days 

2629 complexity_score = min(1.0, (ratio - 1) / 4) 

2630 complexity_score = max(0.0, complexity_score) 

2631 

2632 comparison = f"{ratio:.1f}x baseline" if ratio >= 1.5 else "near baseline" 

2633 

2634 directory_complexity.append( 

2635 ComplexityProxy( 

2636 path=dir_path, 

2637 avg_resolution_days=avg, 

2638 median_resolution_days=median, 

2639 issue_count=len(unique_entries), 

2640 slowest_issue=slowest, 

2641 complexity_score=complexity_score, 

2642 comparison_to_baseline=comparison, 

2643 ) 

2644 ) 

2645 

2646 # Sort by complexity score descending 

2647 file_complexity.sort(key=lambda c: -c.complexity_score) 

2648 directory_complexity.sort(key=lambda c: -c.complexity_score) 

2649 

2650 # Identify outliers (>2x baseline) 

2651 complexity_outliers = [ 

2652 c.path for c in file_complexity if c.avg_resolution_days > baseline_days * 2 

2653 ] 

2654 

2655 return ComplexityProxyAnalysis( 

2656 file_complexity=file_complexity[:10], 

2657 directory_complexity=directory_complexity[:10], 

2658 baseline_days=baseline_days, 

2659 complexity_outliers=complexity_outliers[:10], 

2660 ) 

2661 

2662 

2663def _calculate_debt_metrics( 

2664 completed_issues: list[CompletedIssue], 

2665 active_issues: list[tuple[Path, str, str, date | None]], 

2666) -> TechnicalDebtMetrics: 

2667 """Calculate technical debt health metrics. 

2668 

2669 Args: 

2670 completed_issues: List of completed issues 

2671 active_issues: List of active issue tuples 

2672 

2673 Returns: 

2674 TechnicalDebtMetrics with calculated values 

2675 """ 

2676 today = date.today() 

2677 metrics = TechnicalDebtMetrics() 

2678 

2679 # Backlog size 

2680 metrics.backlog_size = len(active_issues) 

2681 

2682 # Count aging and high priority 

2683 for _path, _issue_type, priority, discovered_date in active_issues: 

2684 if priority in ("P0", "P1"): 

2685 metrics.high_priority_open += 1 

2686 

2687 if discovered_date: 

2688 age = (today - discovered_date).days 

2689 if age >= 30: 

2690 metrics.aging_30_plus += 1 

2691 if age >= 60: 

2692 metrics.aging_60_plus += 1 

2693 

2694 # Calculate backlog growth rate (issues per week) 

2695 # Look at last 4 weeks of completions vs creations 

2696 four_weeks_ago = today - timedelta(days=28) 

2697 

2698 completed_recently = sum( 

2699 1 for i in completed_issues if i.completed_date and i.completed_date >= four_weeks_ago 

2700 ) 

2701 

2702 created_recently = sum(1 for _, _, _, d in active_issues if d and d >= four_weeks_ago) 

2703 

2704 # Net change per week 

2705 if completed_recently > 0 or created_recently > 0: 

2706 metrics.backlog_growth_rate = (created_recently - completed_recently) / 4.0 

2707 

2708 # Debt paydown ratio (bug fixes vs features) 

2709 bug_count = sum(1 for i in completed_issues if i.issue_type == "BUG") 

2710 feat_count = sum(1 for i in completed_issues if i.issue_type == "FEAT") 

2711 

2712 if feat_count > 0: 

2713 metrics.debt_paydown_ratio = bug_count / feat_count 

2714 elif bug_count > 0: 

2715 metrics.debt_paydown_ratio = float(bug_count) # All maintenance 

2716 

2717 return metrics 

2718 

2719 

2720def calculate_analysis( 

2721 completed_issues: list[CompletedIssue], 

2722 issues_dir: Path | None = None, 

2723 period_type: Literal["weekly", "monthly", "quarterly"] = "monthly", 

2724 compare_days: int | None = None, 

2725 project_root: Path | None = None, 

2726) -> HistoryAnalysis: 

2727 """Calculate comprehensive history analysis. 

2728 

2729 Args: 

2730 completed_issues: List of completed issues 

2731 issues_dir: Path to .issues/ for active issue scanning 

2732 period_type: Grouping period for trend analysis 

2733 compare_days: Days for comparative analysis (e.g., 30 for 30d comparison) 

2734 project_root: Project root for config gap analysis (defaults to cwd) 

2735 

2736 Returns: 

2737 HistoryAnalysis with all metrics 

2738 """ 

2739 today = date.today() 

2740 

2741 # Get base summary 

2742 summary = calculate_summary(completed_issues) 

2743 

2744 # Scan active issues if directory provided 

2745 active_issues: list[tuple[Path, str, str, date | None]] = [] 

2746 if issues_dir: 

2747 active_issues = scan_active_issues(issues_dir) 

2748 

2749 # Calculate period metrics 

2750 period_metrics = _group_by_period(completed_issues, period_type) 

2751 

2752 # Determine velocity trend 

2753 if len(period_metrics) >= 3: 

2754 velocities = [float(p.total_completed) for p in period_metrics] 

2755 velocity_trend = _calculate_trend(velocities) 

2756 else: 

2757 velocity_trend = "stable" 

2758 

2759 # Determine bug ratio trend 

2760 if len(period_metrics) >= 3: 

2761 bug_ratios = [p.bug_ratio or 0.0 for p in period_metrics] 

2762 # For bug ratio, decreasing is good (keep as-is) 

2763 bug_ratio_trend = _calculate_trend(bug_ratios) 

2764 else: 

2765 bug_ratio_trend = "stable" 

2766 

2767 # Subsystem health 

2768 subsystem_health = _analyze_subsystems(completed_issues) 

2769 

2770 # Hotspot analysis 

2771 hotspot_analysis = analyze_hotspots(completed_issues) 

2772 

2773 # Coupling analysis 

2774 coupling_analysis = analyze_coupling(completed_issues) 

2775 

2776 # Regression clustering analysis 

2777 regression_analysis = analyze_regression_clustering(completed_issues) 

2778 

2779 # Test gap analysis 

2780 test_gap_analysis = analyze_test_gaps(completed_issues, hotspot_analysis) 

2781 

2782 # Rejection rate analysis 

2783 rejection_analysis = analyze_rejection_rates(completed_issues) 

2784 

2785 # Manual pattern analysis 

2786 manual_pattern_analysis = detect_manual_patterns(completed_issues) 

2787 

2788 # Agent effectiveness analysis 

2789 agent_effectiveness_analysis = analyze_agent_effectiveness(completed_issues) 

2790 

2791 # Complexity proxy analysis 

2792 complexity_proxy_analysis = analyze_complexity_proxy(completed_issues, hotspot_analysis) 

2793 

2794 # Configuration gaps analysis (depends on manual_pattern_analysis) 

2795 config_gaps_analysis = detect_config_gaps(manual_pattern_analysis, project_root) 

2796 

2797 # Cross-cutting concern analysis (depends on hotspot_analysis) 

2798 cross_cutting_analysis = detect_cross_cutting_smells(completed_issues, hotspot_analysis) 

2799 

2800 # Technical debt metrics 

2801 debt_metrics = _calculate_debt_metrics(completed_issues, active_issues) 

2802 

2803 # Build analysis 

2804 analysis = HistoryAnalysis( 

2805 generated_date=today, 

2806 total_completed=len(completed_issues), 

2807 total_active=len(active_issues), 

2808 date_range_start=summary.earliest_date, 

2809 date_range_end=summary.latest_date, 

2810 summary=summary, 

2811 period_metrics=period_metrics, 

2812 velocity_trend=velocity_trend, 

2813 bug_ratio_trend=bug_ratio_trend, 

2814 subsystem_health=subsystem_health, 

2815 hotspot_analysis=hotspot_analysis, 

2816 coupling_analysis=coupling_analysis, 

2817 regression_analysis=regression_analysis, 

2818 test_gap_analysis=test_gap_analysis, 

2819 rejection_analysis=rejection_analysis, 

2820 manual_pattern_analysis=manual_pattern_analysis, 

2821 agent_effectiveness_analysis=agent_effectiveness_analysis, 

2822 complexity_proxy_analysis=complexity_proxy_analysis, 

2823 config_gaps_analysis=config_gaps_analysis, 

2824 cross_cutting_analysis=cross_cutting_analysis, 

2825 debt_metrics=debt_metrics, 

2826 ) 

2827 

2828 # Comparative analysis 

2829 if compare_days: 

2830 analysis.comparison_period = f"{compare_days}d" 

2831 cutoff = today - timedelta(days=compare_days) 

2832 prev_cutoff = cutoff - timedelta(days=compare_days) 

2833 

2834 current_issues = [ 

2835 i for i in completed_issues if i.completed_date and i.completed_date >= cutoff 

2836 ] 

2837 previous_issues = [ 

2838 i 

2839 for i in completed_issues 

2840 if i.completed_date and prev_cutoff <= i.completed_date < cutoff 

2841 ] 

2842 

2843 if current_issues: 

2844 current_types: dict[str, int] = {} 

2845 for i in current_issues: 

2846 current_types[i.issue_type] = current_types.get(i.issue_type, 0) + 1 

2847 

2848 analysis.current_period = PeriodMetrics( 

2849 period_start=cutoff, 

2850 period_end=today, 

2851 period_label=f"Last {compare_days} days", 

2852 total_completed=len(current_issues), 

2853 type_counts=current_types, 

2854 ) 

2855 

2856 if previous_issues: 

2857 prev_types: dict[str, int] = {} 

2858 for i in previous_issues: 

2859 prev_types[i.issue_type] = prev_types.get(i.issue_type, 0) + 1 

2860 

2861 analysis.previous_period = PeriodMetrics( 

2862 period_start=prev_cutoff, 

2863 period_end=cutoff - timedelta(days=1), 

2864 period_label=f"Previous {compare_days} days", 

2865 total_completed=len(previous_issues), 

2866 type_counts=prev_types, 

2867 ) 

2868 

2869 return analysis 

2870 

2871 

2872# ============================================================================= 

2873# Analysis Formatting Functions (FEAT-110) 

2874# ============================================================================= 

2875 

2876 

2877def format_analysis_json(analysis: HistoryAnalysis) -> str: 

2878 """Format analysis as JSON. 

2879 

2880 Args: 

2881 analysis: HistoryAnalysis to format 

2882 

2883 Returns: 

2884 JSON string 

2885 """ 

2886 return json.dumps(analysis.to_dict(), indent=2) 

2887 

2888 

2889def format_analysis_yaml(analysis: HistoryAnalysis) -> str: 

2890 """Format analysis as YAML. 

2891 

2892 Args: 

2893 analysis: HistoryAnalysis to format 

2894 

2895 Returns: 

2896 YAML string (falls back to JSON if yaml not available) 

2897 """ 

2898 try: 

2899 import yaml 

2900 

2901 return yaml.dump(analysis.to_dict(), default_flow_style=False, sort_keys=False) 

2902 except ImportError: 

2903 # Fallback to JSON if yaml not available 

2904 return format_analysis_json(analysis) 

2905 

2906 

2907def format_analysis_text(analysis: HistoryAnalysis) -> str: 

2908 """Format analysis as human-readable text. 

2909 

2910 Args: 

2911 analysis: HistoryAnalysis to format 

2912 

2913 Returns: 

2914 Formatted text string 

2915 """ 

2916 lines: list[str] = [] 

2917 

2918 lines.append("Issue History Analysis") 

2919 lines.append("=" * 22) 

2920 lines.append(f"Generated: {analysis.generated_date}") 

2921 lines.append(f"Completed: {analysis.total_completed} | Active: {analysis.total_active}") 

2922 

2923 if analysis.date_range_start and analysis.date_range_end: 

2924 lines.append(f"Date Range: {analysis.date_range_start} to {analysis.date_range_end}") 

2925 

2926 # Summary 

2927 lines.append("") 

2928 lines.append("Summary") 

2929 lines.append("-" * 7) 

2930 summary = analysis.summary 

2931 if summary.velocity: 

2932 lines.append(f"Velocity: {summary.velocity:.2f} issues/day") 

2933 lines.append(f"Velocity Trend: {analysis.velocity_trend}") 

2934 lines.append(f"Bug Ratio Trend: {analysis.bug_ratio_trend}") 

2935 

2936 # Type distribution 

2937 lines.append("") 

2938 lines.append("By Type:") 

2939 total = analysis.total_completed or 1 

2940 for issue_type, count in summary.type_counts.items(): 

2941 pct = count * 100 // total 

2942 lines.append(f" {issue_type:5}: {count:3} ({pct:2}%)") 

2943 

2944 # Period metrics 

2945 if analysis.period_metrics: 

2946 lines.append("") 

2947 lines.append("Period Metrics") 

2948 lines.append("-" * 14) 

2949 for period in analysis.period_metrics[-6:]: # Last 6 periods 

2950 bug_pct = f"{period.bug_ratio * 100:.0f}%" if period.bug_ratio else "N/A" 

2951 lines.append( 

2952 f" {period.period_label:12}: {period.total_completed:3} completed, {bug_pct} bugs" 

2953 ) 

2954 

2955 # Subsystem health 

2956 if analysis.subsystem_health: 

2957 lines.append("") 

2958 lines.append("Subsystem Health") 

2959 lines.append("-" * 16) 

2960 for sub in analysis.subsystem_health[:5]: 

2961 trend_symbol = {"improving": "↓", "degrading": "↑", "stable": "→"}.get(sub.trend, "?") 

2962 lines.append( 

2963 f" {sub.subsystem:30}: {sub.total_issues:3} total, " 

2964 f"{sub.recent_issues:2} recent {trend_symbol}" 

2965 ) 

2966 

2967 # Hotspot analysis 

2968 if analysis.hotspot_analysis: 

2969 hotspots = analysis.hotspot_analysis 

2970 

2971 if hotspots.file_hotspots: 

2972 lines.append("") 

2973 lines.append("File Hotspots") 

2974 lines.append("-" * 13) 

2975 for h in hotspots.file_hotspots[:5]: 

2976 types_str = ", ".join(f"{k}:{v}" for k, v in sorted(h.issue_types.items())) 

2977 churn_flag = " [HIGH CHURN]" if h.churn_indicator == "high" else "" 

2978 lines.append(f" {h.path:40}: {h.issue_count:2} issues ({types_str}){churn_flag}") 

2979 

2980 if hotspots.bug_magnets: 

2981 lines.append("") 

2982 lines.append("Bug Magnets (>60% bugs)") 

2983 lines.append("-" * 23) 

2984 for h in hotspots.bug_magnets: 

2985 lines.append( 

2986 f" {h.path}: {h.bug_ratio * 100:.0f}% bugs " 

2987 f"({h.issue_types.get('BUG', 0)}/{h.issue_count})" 

2988 ) 

2989 

2990 # Coupling analysis 

2991 if analysis.coupling_analysis: 

2992 coupling = analysis.coupling_analysis 

2993 

2994 if coupling.pairs: 

2995 lines.append("") 

2996 lines.append("Coupling Detection") 

2997 lines.append("-" * 18) 

2998 

2999 lines.append("Highly Coupled File Pairs:") 

3000 for i, p in enumerate(coupling.pairs[:5], 1): 

3001 strength_label = ( 

3002 "HIGH" 

3003 if p.coupling_strength >= 0.7 

3004 else "MEDIUM" 

3005 if p.coupling_strength >= 0.5 

3006 else "LOW" 

3007 ) 

3008 lines.append(f" {i}. {p.file_a} <-> {p.file_b}") 

3009 lines.append( 

3010 f" Co-occurrences: {p.co_occurrence_count}, " 

3011 f"Strength: {p.coupling_strength:.2f} [{strength_label}]" 

3012 ) 

3013 

3014 if coupling.clusters: 

3015 lines.append("") 

3016 lines.append("Coupling Clusters:") 

3017 for i, cluster in enumerate(coupling.clusters[:3], 1): 

3018 files_str = ", ".join(cluster[:4]) 

3019 if len(cluster) > 4: 

3020 files_str += f" (+{len(cluster) - 4} more)" 

3021 lines.append(f" {i}. [{files_str}]") 

3022 

3023 if coupling.hotspots: 

3024 lines.append("") 

3025 lines.append("Coupling Hotspots (coupled with 3+ files):") 

3026 for f in coupling.hotspots[:5]: 

3027 lines.append(f" - {f}") 

3028 

3029 # Regression clustering analysis 

3030 if analysis.regression_analysis: 

3031 regression = analysis.regression_analysis 

3032 

3033 if regression.clusters: 

3034 lines.append("") 

3035 lines.append("Regression Clustering") 

3036 lines.append("-" * 20) 

3037 lines.append(f"Total regression chains detected: {regression.total_regression_chains}") 

3038 lines.append("") 

3039 lines.append("Fragile Code Clusters:") 

3040 for i, c in enumerate(regression.clusters[:5], 1): 

3041 severity_flag = ( 

3042 f" [{c.severity.upper()}]" if c.severity in ("critical", "high") else "" 

3043 ) 

3044 lines.append(f" {i}. {c.primary_file}{severity_flag}") 

3045 lines.append(f" Regression count: {c.regression_count}") 

3046 lines.append(f" Pattern: {c.time_pattern}") 

3047 if c.fix_bug_pairs: 

3048 chain = " -> ".join(f"{a} fix -> {b}" for a, b in c.fix_bug_pairs[:3]) 

3049 if len(c.fix_bug_pairs) > 3: 

3050 chain += " ..." 

3051 lines.append(f" Chain: {chain}") 

3052 

3053 # Test gap analysis 

3054 if analysis.test_gap_analysis: 

3055 tga = analysis.test_gap_analysis 

3056 

3057 if tga.gaps: 

3058 lines.append("") 

3059 lines.append("Test Gap Correlation") 

3060 lines.append("-" * 20) 

3061 

3062 # Show correlation stats 

3063 lines.append(f" Files with tests: avg {tga.files_with_tests_avg_bugs:.1f} bugs") 

3064 lines.append(f" Files without tests: avg {tga.files_without_tests_avg_bugs:.1f} bugs") 

3065 lines.append("") 

3066 

3067 # Show critical gaps 

3068 critical_gaps = [g for g in tga.gaps if g.priority in ("critical", "high")] 

3069 if critical_gaps: 

3070 lines.append("Critical Test Gaps:") 

3071 for g in critical_gaps[:5]: 

3072 test_status = "NO TEST" if not g.has_test_file else g.test_file_path 

3073 lines.append(f" {g.source_file} [{g.priority.upper()}]") 

3074 bug_ids_str = ", ".join(g.bug_ids[:3]) 

3075 lines.append(f" Bugs: {g.bug_count} ({bug_ids_str})") 

3076 lines.append(f" Test: {test_status}") 

3077 

3078 if tga.priority_test_targets: 

3079 lines.append("") 

3080 lines.append("Priority Test Targets:") 

3081 for i, target in enumerate(tga.priority_test_targets[:5], 1): 

3082 lines.append(f" {i}. {target}") 

3083 

3084 # Rejection analysis 

3085 if analysis.rejection_analysis: 

3086 rej = analysis.rejection_analysis 

3087 overall = rej.overall 

3088 

3089 if overall.total_closed > 0: 

3090 lines.append("") 

3091 lines.append("Rejection Analysis") 

3092 lines.append("-" * 18) 

3093 lines.append( 

3094 f" Overall rejection rate: {overall.rejection_rate * 100:.1f}% " 

3095 f"({overall.rejected_count}/{overall.total_closed})" 

3096 ) 

3097 lines.append( 

3098 f" Invalid rate: {overall.invalid_rate * 100:.1f}% " 

3099 f"({overall.invalid_count}/{overall.total_closed})" 

3100 ) 

3101 if overall.duplicate_count > 0: 

3102 lines.append(f" Duplicates: {overall.duplicate_count}") 

3103 if overall.deferred_count > 0: 

3104 lines.append(f" Deferred: {overall.deferred_count}") 

3105 

3106 # By type 

3107 if rej.by_type: 

3108 lines.append("") 

3109 lines.append(" By Type:") 

3110 for issue_type in sorted(rej.by_type.keys()): 

3111 metrics = rej.by_type[issue_type] 

3112 rate = metrics.rejection_rate + metrics.invalid_rate 

3113 lines.append(f" {issue_type:5}: {rate * 100:.1f}% non-completion") 

3114 

3115 # Trend 

3116 if rej.by_month: 

3117 sorted_months = sorted(rej.by_month.keys())[-6:] 

3118 if len(sorted_months) >= 2: 

3119 lines.append("") 

3120 lines.append(" Trend (last 6 months):") 

3121 trend_parts = [] 

3122 for month in sorted_months: 

3123 m = rej.by_month[month] 

3124 rate = (m.rejection_rate + m.invalid_rate) * 100 

3125 trend_parts.append(f"{month[-2:]}: {rate:.0f}%") 

3126 lines.append(f" {', '.join(trend_parts)}") 

3127 trend_symbol = {"improving": "↓", "degrading": "↑", "stable": "→"}.get( 

3128 rej.trend, "→" 

3129 ) 

3130 lines.append(f" Direction: {rej.trend} {trend_symbol}") 

3131 

3132 # Common reasons 

3133 if rej.common_reasons: 

3134 lines.append("") 

3135 lines.append(" Common Rejection Reasons:") 

3136 for reason, count in rej.common_reasons[:5]: 

3137 lines.append(f' - "{reason}" ({count})') 

3138 

3139 # Manual pattern analysis 

3140 if analysis.manual_pattern_analysis: 

3141 mpa = analysis.manual_pattern_analysis 

3142 

3143 if mpa.patterns: 

3144 lines.append("") 

3145 lines.append("Manual Pattern Analysis") 

3146 lines.append("-" * 23) 

3147 lines.append(f" Total manual interventions: {mpa.total_manual_interventions}") 

3148 lines.append( 

3149 f" Potentially automatable: {mpa.automatable_percentage:.0f}% " 

3150 f"({mpa.automatable_count}/{mpa.total_manual_interventions})" 

3151 ) 

3152 lines.append("") 

3153 lines.append(" Recurring Patterns:") 

3154 

3155 for i, pattern in enumerate(mpa.patterns[:5], 1): 

3156 lines.append("") 

3157 lines.append( 

3158 f" {i}. {pattern.pattern_description} ({pattern.occurrence_count} occurrences)" 

3159 ) 

3160 issues_str = ", ".join(pattern.affected_issues[:3]) 

3161 if len(pattern.affected_issues) > 3: 

3162 issues_str += ", ..." 

3163 lines.append(f" Issues: {issues_str}") 

3164 lines.append(f" Suggestion: {pattern.suggested_automation}") 

3165 lines.append(f" Complexity: {pattern.automation_complexity}") 

3166 

3167 # Configuration gaps analysis 

3168 if analysis.config_gaps_analysis: 

3169 cga = analysis.config_gaps_analysis 

3170 

3171 lines.append("") 

3172 lines.append("Configuration Gaps Analysis") 

3173 lines.append("-" * 27) 

3174 lines.append(f" Coverage score: {cga.coverage_score * 100:.0f}%") 

3175 lines.append(f" Current hooks: {', '.join(cga.current_hooks) or 'none'}") 

3176 lines.append(f" Current skills: {len(cga.current_skills)}") 

3177 lines.append(f" Current agents: {len(cga.current_agents)}") 

3178 

3179 if cga.gaps: 

3180 lines.append("") 

3181 lines.append(" Identified Gaps:") 

3182 

3183 for i, gap in enumerate(cga.gaps[:5], 1): 

3184 lines.append("") 

3185 lines.append(f" {i}. Missing: {gap.gap_type} for {gap.description}") 

3186 lines.append(f" Priority: {gap.priority}") 

3187 issues_str = ", ".join(gap.evidence[:3]) 

3188 if len(gap.evidence) > 3: 

3189 issues_str += ", ..." 

3190 lines.append(f" Evidence: {issues_str}") 

3191 if gap.suggested_config: 

3192 lines.append(" Suggested config:") 

3193 for config_line in gap.suggested_config.split("\n")[:4]: 

3194 lines.append(f" {config_line}") 

3195 

3196 # Agent effectiveness analysis 

3197 if analysis.agent_effectiveness_analysis: 

3198 aea = analysis.agent_effectiveness_analysis 

3199 

3200 if aea.outcomes: 

3201 lines.append("") 

3202 lines.append("Agent Effectiveness Analysis") 

3203 lines.append("-" * 28) 

3204 

3205 # Group by agent 

3206 by_agent: dict[str, list[AgentOutcome]] = {} 

3207 for outcome in aea.outcomes: 

3208 if outcome.agent_name not in by_agent: 

3209 by_agent[outcome.agent_name] = [] 

3210 by_agent[outcome.agent_name].append(outcome) 

3211 

3212 for agent in sorted(by_agent.keys()): 

3213 lines.append(f" {agent}:") 

3214 for outcome in sorted(by_agent[agent], key=lambda o: o.issue_type): 

3215 rate_pct = outcome.success_rate * 100 

3216 flag = " [!]" if outcome.total_count >= 5 and rate_pct < 50 else "" 

3217 lines.append( 

3218 f" {outcome.issue_type:5}: {rate_pct:5.1f}% success " 

3219 f"({outcome.success_count}/{outcome.total_count}){flag}" 

3220 ) 

3221 

3222 # Recommendations 

3223 if aea.best_agent_by_type or aea.problematic_combinations: 

3224 lines.append("") 

3225 lines.append(" Recommendations:") 

3226 for issue_type, best_agent in sorted(aea.best_agent_by_type.items()): 

3227 lines.append(f" - {issue_type}: best handled by {best_agent}") 

3228 for agent, issue_type, reason in aea.problematic_combinations[:3]: 

3229 lines.append(f" - {agent} underperforms for {issue_type} ({reason})") 

3230 

3231 # Complexity proxy analysis 

3232 if analysis.complexity_proxy_analysis: 

3233 cpa = analysis.complexity_proxy_analysis 

3234 

3235 lines.append("") 

3236 lines.append("Complexity Proxy Analysis") 

3237 lines.append("-" * 25) 

3238 lines.append(f" Baseline resolution time: {cpa.baseline_days:.1f} days (median)") 

3239 

3240 if cpa.file_complexity: 

3241 lines.append("") 

3242 lines.append(" High Complexity Files (by resolution time):") 

3243 for i, cp in enumerate(cpa.file_complexity[:5], 1): 

3244 score_label = ( 

3245 "HIGH" 

3246 if cp.complexity_score >= 0.7 

3247 else "MEDIUM" 

3248 if cp.complexity_score >= 0.4 

3249 else "LOW" 

3250 ) 

3251 lines.append(f" {i}. {cp.path}") 

3252 lines.append( 

3253 f" Avg: {cp.avg_resolution_days:.1f} days ({cp.comparison_to_baseline})" 

3254 ) 

3255 lines.append( 

3256 f" Median: {cp.median_resolution_days:.1f} days, Issues: {cp.issue_count}" 

3257 ) 

3258 lines.append( 

3259 f" Slowest: {cp.slowest_issue[0]} ({cp.slowest_issue[1]:.1f} days)" 

3260 ) 

3261 lines.append(f" Complexity score: {cp.complexity_score:.2f} [{score_label}]") 

3262 

3263 if cpa.directory_complexity: 

3264 lines.append("") 

3265 lines.append(" High Complexity Directories:") 

3266 for cp in cpa.directory_complexity[:5]: 

3267 lines.append( 

3268 f" {cp.path}: avg {cp.avg_resolution_days:.1f} days ({cp.comparison_to_baseline})" 

3269 ) 

3270 

3271 if cpa.complexity_outliers: 

3272 lines.append("") 

3273 lines.append(" Complexity Outliers (>2x baseline):") 

3274 for path in cpa.complexity_outliers[:5]: 

3275 lines.append(f" - {path}") 

3276 

3277 # Cross-cutting concern analysis 

3278 if analysis.cross_cutting_analysis: 

3279 cca = analysis.cross_cutting_analysis 

3280 

3281 if cca.smells: 

3282 lines.append("") 

3283 lines.append("Cross-Cutting Concern Analysis") 

3284 lines.append("-" * 30) 

3285 

3286 for i, smell in enumerate(cca.smells[:5], 1): 

3287 scatter_label = ( 

3288 "HIGH" 

3289 if smell.scatter_score >= 0.6 

3290 else "MEDIUM" 

3291 if smell.scatter_score >= 0.3 

3292 else "LOW" 

3293 ) 

3294 lines.append("") 

3295 lines.append(f" {i}. {smell.concern_type.title()} [{scatter_label} SCATTER]") 

3296 dirs_str = ", ".join(smell.affected_directories[:3]) 

3297 if len(smell.affected_directories) > 3: 

3298 dirs_str += ", ..." 

3299 lines.append(f" Directories: {dirs_str}") 

3300 issues_str = ", ".join(smell.issue_ids[:3]) 

3301 if len(smell.issue_ids) > 3: 

3302 issues_str += ", ..." 

3303 lines.append(f" Issues: {issues_str} ({smell.issue_count} total)") 

3304 lines.append(f" Scatter score: {smell.scatter_score:.2f}") 

3305 lines.append(f" Suggested pattern: {smell.suggested_pattern}") 

3306 

3307 if cca.consolidation_opportunities: 

3308 lines.append("") 

3309 lines.append(" Consolidation Opportunities:") 

3310 for opp in cca.consolidation_opportunities[:5]: 

3311 lines.append(f" - {opp}") 

3312 

3313 # Technical debt 

3314 if analysis.debt_metrics: 

3315 lines.append("") 

3316 lines.append("Technical Debt") 

3317 lines.append("-" * 14) 

3318 debt = analysis.debt_metrics 

3319 lines.append(f" Backlog Size: {debt.backlog_size}") 

3320 lines.append(f" Growth Rate: {debt.backlog_growth_rate:+.1f} issues/week") 

3321 lines.append(f" High Priority Open (P0-P1): {debt.high_priority_open}") 

3322 lines.append(f" Aging >30 days: {debt.aging_30_plus}") 

3323 

3324 # Comparison 

3325 if analysis.comparison_period and analysis.current_period and analysis.previous_period: 

3326 lines.append("") 

3327 lines.append(f"Comparison ({analysis.comparison_period})") 

3328 lines.append("-" * 20) 

3329 curr = analysis.current_period 

3330 prev = analysis.previous_period 

3331 

3332 if prev.total_completed > 0: 

3333 change = (curr.total_completed - prev.total_completed) / prev.total_completed * 100 

3334 lines.append( 

3335 f" Completed: {prev.total_completed} -> {curr.total_completed} ({change:+.0f}%)" 

3336 ) 

3337 else: 

3338 lines.append(f" Completed: {prev.total_completed} -> {curr.total_completed}") 

3339 

3340 return "\n".join(lines) 

3341 

3342 

3343def format_analysis_markdown(analysis: HistoryAnalysis) -> str: 

3344 """Format analysis as Markdown report. 

3345 

3346 Args: 

3347 analysis: HistoryAnalysis to format 

3348 

3349 Returns: 

3350 Markdown string 

3351 """ 

3352 lines: list[str] = [] 

3353 

3354 lines.append("# Issue History Analysis Report") 

3355 lines.append("") 

3356 lines.append( 

3357 f"**Generated**: {analysis.generated_date} | " 

3358 f"**Total Completed**: {analysis.total_completed} | " 

3359 f"**Active Issues**: {analysis.total_active}" 

3360 ) 

3361 

3362 if analysis.date_range_start and analysis.date_range_end: 

3363 lines.append(f"**Date Range**: {analysis.date_range_start} to {analysis.date_range_end}") 

3364 

3365 # Executive Summary 

3366 lines.append("") 

3367 lines.append("## Executive Summary") 

3368 lines.append("") 

3369 lines.append("| Metric | Value | Trend |") 

3370 lines.append("|--------|-------|-------|") 

3371 

3372 velocity = f"{analysis.summary.velocity:.2f}/day" if analysis.summary.velocity else "N/A" 

3373 velocity_symbol = {"increasing": "↑", "decreasing": "↓", "stable": "→"}.get( 

3374 analysis.velocity_trend, "" 

3375 ) 

3376 lines.append(f"| Velocity | {velocity} | {velocity_symbol} {analysis.velocity_trend} |") 

3377 

3378 bug_count = analysis.summary.type_counts.get("BUG", 0) 

3379 total = analysis.total_completed or 1 

3380 bug_pct = bug_count * 100 // total 

3381 bug_symbol = {"increasing": "↑ ⚠️", "decreasing": "↓ ✓", "stable": "→"}.get( 

3382 analysis.bug_ratio_trend, "" 

3383 ) 

3384 lines.append(f"| Bug Ratio | {bug_pct}% | {bug_symbol} |") 

3385 

3386 if analysis.debt_metrics: 

3387 growth = analysis.debt_metrics.backlog_growth_rate 

3388 growth_status = "↓ ✓" if growth < 0 else ("→" if growth == 0 else "↑ ⚠️") 

3389 lines.append(f"| Backlog Growth | {growth:+.1f}/week | {growth_status} |") 

3390 

3391 # Type Distribution 

3392 lines.append("") 

3393 lines.append("## Type Distribution") 

3394 lines.append("") 

3395 lines.append("| Type | Count | Percentage |") 

3396 lines.append("|------|-------|------------|") 

3397 for issue_type, count in analysis.summary.type_counts.items(): 

3398 pct = count * 100 // total 

3399 lines.append(f"| {issue_type} | {count} | {pct}% |") 

3400 

3401 # Period Trends 

3402 if analysis.period_metrics: 

3403 lines.append("") 

3404 lines.append("## Period Trends") 

3405 lines.append("") 

3406 lines.append("| Period | Completed | Bug % |") 

3407 lines.append("|--------|-----------|-------|") 

3408 for period in analysis.period_metrics[-8:]: # Last 8 

3409 bug_pct_str = f"{period.bug_ratio * 100:.0f}%" if period.bug_ratio else "N/A" 

3410 lines.append(f"| {period.period_label} | {period.total_completed} | {bug_pct_str} |") 

3411 

3412 # Subsystem Health 

3413 if analysis.subsystem_health: 

3414 lines.append("") 

3415 lines.append("## Subsystem Health") 

3416 lines.append("") 

3417 lines.append("| Subsystem | Total | Recent (30d) | Trend |") 

3418 lines.append("|-----------|-------|--------------|-------|") 

3419 for sub in analysis.subsystem_health: 

3420 trend_symbol = {"improving": "↓ ✓", "degrading": "↑ ⚠️", "stable": "→"}.get( 

3421 sub.trend, "" 

3422 ) 

3423 lines.append( 

3424 f"| `{sub.subsystem}` | {sub.total_issues} | {sub.recent_issues} | {trend_symbol} |" 

3425 ) 

3426 

3427 # Hotspot Analysis 

3428 if analysis.hotspot_analysis: 

3429 hotspots = analysis.hotspot_analysis 

3430 

3431 if hotspots.file_hotspots: 

3432 lines.append("") 

3433 lines.append("## File Hotspots") 

3434 lines.append("") 

3435 lines.append("| File | Issues | Types | Churn |") 

3436 lines.append("|------|--------|-------|-------|") 

3437 for h in hotspots.file_hotspots: 

3438 types_str = ", ".join(f"{k}:{v}" for k, v in sorted(h.issue_types.items())) 

3439 churn_badge = ( 

3440 "🔥" 

3441 if h.churn_indicator == "high" 

3442 else ("⚡" if h.churn_indicator == "medium" else "") 

3443 ) 

3444 lines.append(f"| `{h.path}` | {h.issue_count} | {types_str} | {churn_badge} |") 

3445 

3446 if hotspots.directory_hotspots: 

3447 lines.append("") 

3448 lines.append("## Directory Hotspots") 

3449 lines.append("") 

3450 lines.append("| Directory | Issues | Types |") 

3451 lines.append("|-----------|--------|-------|") 

3452 for h in hotspots.directory_hotspots[:5]: 

3453 types_str = ", ".join(f"{k}:{v}" for k, v in sorted(h.issue_types.items())) 

3454 lines.append(f"| `{h.path}` | {h.issue_count} | {types_str} |") 

3455 

3456 if hotspots.bug_magnets: 

3457 lines.append("") 

3458 lines.append("## Bug Magnets") 

3459 lines.append("") 

3460 lines.append("Files with >60% bug ratio that may need refactoring attention:") 

3461 lines.append("") 

3462 lines.append("| File | Bug Ratio | Bugs/Total |") 

3463 lines.append("|------|-----------|------------|") 

3464 for h in hotspots.bug_magnets: 

3465 lines.append( 

3466 f"| `{h.path}` | {h.bug_ratio * 100:.0f}% | " 

3467 f"{h.issue_types.get('BUG', 0)}/{h.issue_count} |" 

3468 ) 

3469 

3470 # Coupling Analysis 

3471 if analysis.coupling_analysis: 

3472 coupling = analysis.coupling_analysis 

3473 

3474 if coupling.pairs: 

3475 lines.append("") 

3476 lines.append("## Coupling Detection") 

3477 lines.append("") 

3478 lines.append("Files that frequently change together across issues:") 

3479 lines.append("") 

3480 lines.append("| File A | File B | Co-occurrences | Strength |") 

3481 lines.append("|--------|--------|----------------|----------|") 

3482 for p in coupling.pairs[:10]: 

3483 strength_badge = ( 

3484 "🔴" 

3485 if p.coupling_strength >= 0.7 

3486 else ("🟠" if p.coupling_strength >= 0.5 else "🟡") 

3487 ) 

3488 lines.append( 

3489 f"| `{p.file_a}` | `{p.file_b}` | {p.co_occurrence_count} | " 

3490 f"{p.coupling_strength:.2f} {strength_badge} |" 

3491 ) 

3492 

3493 if coupling.clusters: 

3494 lines.append("") 

3495 lines.append("### Coupling Clusters") 

3496 lines.append("") 

3497 lines.append("Groups of tightly coupled files (consider consolidating):") 

3498 lines.append("") 

3499 for i, cluster in enumerate(coupling.clusters[:5], 1): 

3500 files_str = ", ".join(f"`{f}`" for f in cluster[:5]) 

3501 if len(cluster) > 5: 

3502 files_str += f" (+{len(cluster) - 5} more)" 

3503 lines.append(f"{i}. {files_str}") 

3504 

3505 if coupling.hotspots: 

3506 lines.append("") 

3507 lines.append("### Coupling Hotspots") 

3508 lines.append("") 

3509 lines.append("Files coupled with 3+ other files (potential abstraction candidates):") 

3510 lines.append("") 

3511 for f in coupling.hotspots[:5]: 

3512 lines.append(f"- `{f}`") 

3513 

3514 # Regression Clustering Analysis 

3515 if analysis.regression_analysis: 

3516 regression = analysis.regression_analysis 

3517 

3518 if regression.clusters: 

3519 lines.append("") 

3520 lines.append("## Regression Clustering") 

3521 lines.append("") 

3522 lines.append( 

3523 f"**Total regression chains detected**: {regression.total_regression_chains}" 

3524 ) 

3525 lines.append("") 

3526 lines.append("Files where fixes frequently lead to new bugs:") 

3527 lines.append("") 

3528 lines.append("| File | Regressions | Pattern | Severity |") 

3529 lines.append("|------|-------------|---------|----------|") 

3530 for c in regression.clusters: 

3531 severity_badge = ( 

3532 "🔴" if c.severity == "critical" else ("🟠" if c.severity == "high" else "🟡") 

3533 ) 

3534 lines.append( 

3535 f"| `{c.primary_file}` | {c.regression_count} | " 

3536 f"{c.time_pattern} | {severity_badge} |" 

3537 ) 

3538 

3539 if regression.most_fragile_files: 

3540 lines.append("") 

3541 lines.append("### Most Fragile Files") 

3542 lines.append("") 

3543 lines.append("Files requiring architectural attention:") 

3544 lines.append("") 

3545 for f in regression.most_fragile_files: 

3546 lines.append(f"- `{f}`") 

3547 

3548 # Test Gap Analysis 

3549 if analysis.test_gap_analysis: 

3550 tga = analysis.test_gap_analysis 

3551 

3552 if tga.gaps: 

3553 lines.append("") 

3554 lines.append("## Test Gap Correlation") 

3555 lines.append("") 

3556 lines.append("Correlating bug occurrences with test coverage gaps:") 

3557 lines.append("") 

3558 lines.append("| Metric | Value |") 

3559 lines.append("|--------|-------|") 

3560 lines.append(f"| Files with tests | avg {tga.files_with_tests_avg_bugs:.1f} bugs |") 

3561 lines.append( 

3562 f"| Files without tests | avg {tga.files_without_tests_avg_bugs:.1f} bugs |" 

3563 ) 

3564 lines.append("") 

3565 

3566 # Critical gaps table 

3567 critical_gaps = [g for g in tga.gaps if g.priority in ("critical", "high")] 

3568 if critical_gaps: 

3569 lines.append("### Critical Test Gaps") 

3570 lines.append("") 

3571 lines.append("Files with high bug counts but missing tests:") 

3572 lines.append("") 

3573 lines.append("| File | Bugs | Priority | Test Status | Action |") 

3574 lines.append("|------|------|----------|-------------|--------|") 

3575 for g in critical_gaps[:10]: 

3576 priority_badge = "🔴" if g.priority == "critical" else "🟠" 

3577 test_status = f"`{g.test_file_path}`" if g.has_test_file else "NONE" 

3578 action = "Review coverage" if g.has_test_file else "Create test file" 

3579 lines.append( 

3580 f"| `{g.source_file}` | {g.bug_count} | {priority_badge} | " 

3581 f"{test_status} | {action} |" 

3582 ) 

3583 

3584 if tga.priority_test_targets: 

3585 lines.append("") 

3586 lines.append("### Priority Test Targets") 

3587 lines.append("") 

3588 lines.append("Files recommended for new test creation (ordered by bug count):") 

3589 lines.append("") 

3590 for target in tga.priority_test_targets[:10]: 

3591 lines.append(f"- `{target}`") 

3592 

3593 # Rejection Analysis 

3594 if analysis.rejection_analysis: 

3595 rej = analysis.rejection_analysis 

3596 overall = rej.overall 

3597 

3598 if overall.total_closed > 0: 

3599 lines.append("") 

3600 lines.append("## Rejection Analysis") 

3601 lines.append("") 

3602 lines.append( 

3603 f"**Overall rejection rate**: {overall.rejection_rate * 100:.1f}% " 

3604 f"({overall.rejected_count}/{overall.total_closed})" 

3605 ) 

3606 lines.append( 

3607 f"**Invalid rate**: {overall.invalid_rate * 100:.1f}% " 

3608 f"({overall.invalid_count}/{overall.total_closed})" 

3609 ) 

3610 lines.append("") 

3611 

3612 # By type table 

3613 if rej.by_type: 

3614 lines.append("### By Issue Type") 

3615 lines.append("") 

3616 lines.append("| Type | Rejected | Invalid | Total | Rate |") 

3617 lines.append("|------|----------|---------|-------|------|") 

3618 for issue_type in sorted(rej.by_type.keys()): 

3619 m = rej.by_type[issue_type] 

3620 rate = (m.rejection_rate + m.invalid_rate) * 100 

3621 lines.append( 

3622 f"| {issue_type} | {m.rejected_count} | {m.invalid_count} | " 

3623 f"{m.total_closed} | {rate:.1f}% |" 

3624 ) 

3625 lines.append("") 

3626 

3627 # Trend 

3628 if rej.by_month and len(rej.by_month) >= 2: 

3629 lines.append("### Trend") 

3630 lines.append("") 

3631 sorted_months = sorted(rej.by_month.keys())[-6:] 

3632 trend_parts = [] 

3633 for month in sorted_months: 

3634 m = rej.by_month[month] 

3635 rate = (m.rejection_rate + m.invalid_rate) * 100 

3636 trend_parts.append(f"{month}: {rate:.0f}%") 

3637 lines.append(" → ".join(trend_parts)) 

3638 lines.append(f"*Trend: {rej.trend}*") 

3639 lines.append("") 

3640 

3641 # Common reasons 

3642 if rej.common_reasons: 

3643 lines.append("### Common Rejection Reasons") 

3644 lines.append("") 

3645 for reason, count in rej.common_reasons[:5]: 

3646 lines.append(f'- "{reason}" ({count})') 

3647 

3648 # Manual Pattern Analysis 

3649 if analysis.manual_pattern_analysis: 

3650 mpa = analysis.manual_pattern_analysis 

3651 

3652 if mpa.patterns: 

3653 lines.append("") 

3654 lines.append("## Manual Pattern Analysis") 

3655 lines.append("") 

3656 lines.append( 

3657 f"**Total manual interventions detected**: {mpa.total_manual_interventions}" 

3658 ) 

3659 lines.append( 

3660 f"**Potentially automatable**: {mpa.automatable_percentage:.0f}% " 

3661 f"({mpa.automatable_count}/{mpa.total_manual_interventions})" 

3662 ) 

3663 lines.append("") 

3664 lines.append("### Recurring Patterns") 

3665 lines.append("") 

3666 lines.append("| Pattern | Occurrences | Affected Issues | Suggestion | Complexity |") 

3667 lines.append("|---------|-------------|-----------------|------------|------------|") 

3668 

3669 for pattern in mpa.patterns[:10]: 

3670 issues_str = ", ".join(pattern.affected_issues[:3]) 

3671 if len(pattern.affected_issues) > 3: 

3672 issues_str += "..." 

3673 lines.append( 

3674 f"| {pattern.pattern_description} | {pattern.occurrence_count} | " 

3675 f"{issues_str} | {pattern.suggested_automation} | " 

3676 f"{pattern.automation_complexity} |" 

3677 ) 

3678 

3679 if mpa.automation_suggestions: 

3680 lines.append("") 

3681 lines.append("### Automation Suggestions") 

3682 lines.append("") 

3683 lines.append("Based on detected patterns, consider implementing:") 

3684 lines.append("") 

3685 for suggestion in mpa.automation_suggestions[:5]: 

3686 lines.append(f"- {suggestion}") 

3687 

3688 # Configuration Gaps Analysis 

3689 if analysis.config_gaps_analysis: 

3690 cga = analysis.config_gaps_analysis 

3691 

3692 lines.append("") 

3693 lines.append("## Configuration Gaps Analysis") 

3694 lines.append("") 

3695 lines.append(f"**Coverage score**: {cga.coverage_score * 100:.0f}%") 

3696 lines.append("") 

3697 lines.append("### Current Configuration") 

3698 lines.append("") 

3699 lines.append(f"- **Hooks**: {', '.join(cga.current_hooks) or 'none'}") 

3700 lines.append(f"- **Skills**: {len(cga.current_skills)}") 

3701 lines.append(f"- **Agents**: {len(cga.current_agents)}") 

3702 

3703 if cga.gaps: 

3704 lines.append("") 

3705 lines.append("### Identified Gaps") 

3706 lines.append("") 

3707 lines.append("| Priority | Type | Description | Evidence |") 

3708 lines.append("|----------|------|-------------|----------|") 

3709 

3710 for gap in cga.gaps[:10]: 

3711 issues_str = ", ".join(gap.evidence[:3]) 

3712 if len(gap.evidence) > 3: 

3713 issues_str += "..." 

3714 lines.append( 

3715 f"| {gap.priority} | {gap.gap_type} | {gap.description} | {issues_str} |" 

3716 ) 

3717 

3718 lines.append("") 

3719 lines.append("### Suggested Configurations") 

3720 lines.append("") 

3721 for i, gap in enumerate(cga.gaps[:5], 1): 

3722 if gap.suggested_config: 

3723 lines.append(f"**{i}. {gap.description}**") 

3724 lines.append("") 

3725 lines.append("```json") 

3726 lines.append(gap.suggested_config) 

3727 lines.append("```") 

3728 lines.append("") 

3729 

3730 # Agent Effectiveness Analysis 

3731 if analysis.agent_effectiveness_analysis: 

3732 aea = analysis.agent_effectiveness_analysis 

3733 

3734 if aea.outcomes: 

3735 lines.append("") 

3736 lines.append("## Agent Effectiveness Analysis") 

3737 lines.append("") 

3738 lines.append("| Agent | Type | Success Rate | Completed | Rejected | Failed |") 

3739 lines.append("|-------|------|--------------|-----------|----------|--------|") 

3740 

3741 for outcome in sorted(aea.outcomes, key=lambda o: (o.agent_name, o.issue_type)): 

3742 rate_pct = outcome.success_rate * 100 

3743 flag = " ⚠️" if outcome.total_count >= 5 and rate_pct < 50 else "" 

3744 lines.append( 

3745 f"| {outcome.agent_name} | {outcome.issue_type} | " 

3746 f"{rate_pct:.1f}%{flag} | {outcome.success_count} | " 

3747 f"{outcome.rejection_count} | {outcome.failure_count} |" 

3748 ) 

3749 

3750 # Recommendations 

3751 if aea.best_agent_by_type or aea.problematic_combinations: 

3752 lines.append("") 

3753 lines.append("### Recommendations") 

3754 lines.append("") 

3755 for issue_type, best_agent in sorted(aea.best_agent_by_type.items()): 

3756 lines.append(f"- **{issue_type}**: Best handled by `{best_agent}`") 

3757 for agent, issue_type, reason in aea.problematic_combinations[:3]: 

3758 lines.append(f"- **{agent}** underperforms for {issue_type} ({reason})") 

3759 

3760 # Technical Debt 

3761 if analysis.debt_metrics: 

3762 lines.append("") 

3763 lines.append("## Technical Debt Health") 

3764 lines.append("") 

3765 debt = analysis.debt_metrics 

3766 lines.append("| Metric | Value | Assessment |") 

3767 lines.append("|--------|-------|------------|") 

3768 

3769 backlog_status = ( 

3770 "✓ Low" 

3771 if debt.backlog_size < 20 

3772 else ("⚠️ High" if debt.backlog_size > 50 else "Moderate") 

3773 ) 

3774 lines.append(f"| Backlog Size | {debt.backlog_size} | {backlog_status} |") 

3775 

3776 growth_status = ( 

3777 "✓ Shrinking" 

3778 if debt.backlog_growth_rate < 0 

3779 else ("⚠️ Growing" if debt.backlog_growth_rate > 2 else "Stable") 

3780 ) 

3781 lines.append(f"| Growth Rate | {debt.backlog_growth_rate:+.1f}/week | {growth_status} |") 

3782 

3783 hp_status = "✓ Good" if debt.high_priority_open < 3 else "⚠️ Attention needed" 

3784 lines.append(f"| High Priority Open | {debt.high_priority_open} | {hp_status} |") 

3785 

3786 aging_status = ( 

3787 "✓ Healthy" 

3788 if debt.aging_30_plus < 5 

3789 else ("⚠️ Review needed" if debt.aging_30_plus > 10 else "Moderate") 

3790 ) 

3791 lines.append(f"| Aging >30 days | {debt.aging_30_plus} | {aging_status} |") 

3792 

3793 # Comparison 

3794 if analysis.comparison_period and analysis.current_period and analysis.previous_period: 

3795 lines.append("") 

3796 lines.append(f"## Comparative Analysis (Last {analysis.comparison_period})") 

3797 lines.append("") 

3798 curr = analysis.current_period 

3799 prev = analysis.previous_period 

3800 

3801 lines.append("| Metric | Previous | Current | Change |") 

3802 lines.append("|--------|----------|---------|--------|") 

3803 

3804 if prev.total_completed > 0: 

3805 change = (curr.total_completed - prev.total_completed) / prev.total_completed * 100 

3806 change_str = f"{change:+.0f}%" 

3807 else: 

3808 change_str = "N/A" 

3809 lines.append( 

3810 f"| Completed | {prev.total_completed} | {curr.total_completed} | {change_str} |" 

3811 ) 

3812 

3813 prev_bugs = prev.type_counts.get("BUG", 0) 

3814 curr_bugs = curr.type_counts.get("BUG", 0) 

3815 if prev_bugs > 0: 

3816 bug_change = (curr_bugs - prev_bugs) / prev_bugs * 100 

3817 bug_change_str = f"{bug_change:+.0f}%" 

3818 if bug_change < 0: 

3819 bug_change_str += " ✓" 

3820 else: 

3821 bug_change_str = "N/A" 

3822 lines.append(f"| Bugs Fixed | {prev_bugs} | {curr_bugs} | {bug_change_str} |") 

3823 

3824 return "\n".join(lines)