Coverage for little_loops / issue_history / summary.py: 0%
116 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""Issue history summary and period metrics analysis."""
3from __future__ import annotations
5import statistics
6from collections import defaultdict
7from datetime import date, timedelta
8from pathlib import Path
9from typing import Literal
11from little_loops.issue_history._utils import get_issue_content
12from little_loops.issue_history.models import (
13 CompletedIssue,
14 HistorySummary,
15 PeriodMetrics,
16 SubsystemHealth,
17)
18from little_loops.issue_history.parsing import _extract_subsystem
21def calculate_summary(issues: list[CompletedIssue]) -> HistorySummary:
22 """Calculate summary statistics from issues.
24 Args:
25 issues: List of CompletedIssue objects
27 Returns:
28 HistorySummary with calculated statistics
29 """
30 type_counts: dict[str, int] = {}
31 priority_counts: dict[str, int] = {}
32 discovery_counts: dict[str, int] = {}
33 dates: list[date] = []
35 for issue in issues:
36 # Count by type
37 type_counts[issue.issue_type] = type_counts.get(issue.issue_type, 0) + 1
39 # Count by priority
40 priority_counts[issue.priority] = priority_counts.get(issue.priority, 0) + 1
42 # Count by discovery source
43 source = issue.discovered_by or "unknown"
44 discovery_counts[source] = discovery_counts.get(source, 0) + 1
46 # Collect dates
47 if issue.completed_date:
48 dates.append(issue.completed_date)
50 # Sort counts for consistent output
51 type_counts = dict(sorted(type_counts.items()))
52 priority_counts = dict(sorted(priority_counts.items()))
53 discovery_counts = dict(sorted(discovery_counts.items(), key=lambda x: (-x[1], x[0])))
55 return HistorySummary(
56 total_count=len(issues),
57 type_counts=type_counts,
58 priority_counts=priority_counts,
59 discovery_counts=discovery_counts,
60 earliest_date=min(dates) if dates else None,
61 latest_date=max(dates) if dates else None,
62 )
65def _calculate_period_label(start: date, period_type: str) -> str:
66 """Generate human-readable period label.
68 Args:
69 start: Period start date
70 period_type: "weekly", "monthly", "quarterly"
72 Returns:
73 Label like "Q1 2025", "Jan 2025", "Week 3 2025"
74 """
75 if period_type == "quarterly":
76 quarter = (start.month - 1) // 3 + 1
77 return f"Q{quarter} {start.year}"
78 elif period_type == "monthly":
79 return start.strftime("%b %Y")
80 else: # weekly
81 week_num = start.isocalendar()[1]
82 return f"Week {week_num} {start.year}"
85def _group_by_period(
86 issues: list[CompletedIssue],
87 period_type: Literal["weekly", "monthly", "quarterly"] = "monthly",
88) -> list[PeriodMetrics]:
89 """Group issues by time period and calculate metrics.
91 Args:
92 issues: List of completed issues with dates
93 period_type: Grouping period
95 Returns:
96 List of PeriodMetrics sorted by date ascending
97 """
98 # Filter issues with dates
99 dated_issues = [i for i in issues if i.completed_date]
100 if not dated_issues:
101 return []
103 # Sort by date
104 dated_issues.sort(key=lambda i: i.completed_date) # type: ignore
106 # Determine period boundaries
107 periods: dict[str, list[CompletedIssue]] = defaultdict(list)
109 for issue in dated_issues:
110 completed = issue.completed_date
111 assert completed is not None
113 if period_type == "quarterly":
114 quarter = (completed.month - 1) // 3
115 period_start = date(completed.year, quarter * 3 + 1, 1)
116 elif period_type == "monthly":
117 period_start = date(completed.year, completed.month, 1)
118 else: # weekly
119 # Start of week (Monday)
120 period_start = completed - timedelta(days=completed.weekday())
122 key = period_start.isoformat()
123 periods[key].append(issue)
125 # Calculate metrics for each period
126 result: list[PeriodMetrics] = []
127 for period_key in sorted(periods.keys()):
128 period_issues = periods[period_key]
129 period_start = date.fromisoformat(period_key)
131 # Calculate period end
132 if period_type == "quarterly":
133 month = period_start.month + 3
134 year = period_start.year
135 if month > 12:
136 month = 1
137 year += 1
138 period_end = date(year, month, 1) - timedelta(days=1)
139 elif period_type == "monthly":
140 month = period_start.month + 1
141 year = period_start.year
142 if month > 12:
143 month = 1
144 year += 1
145 period_end = date(year, month, 1) - timedelta(days=1)
146 else: # weekly
147 period_end = period_start + timedelta(days=6)
149 # Count types and priorities
150 type_counts: dict[str, int] = {}
151 priority_counts: dict[str, int] = {}
153 for issue in period_issues:
154 type_counts[issue.issue_type] = type_counts.get(issue.issue_type, 0) + 1
155 priority_counts[issue.priority] = priority_counts.get(issue.priority, 0) + 1
157 result.append(
158 PeriodMetrics(
159 period_start=period_start,
160 period_end=period_end,
161 period_label=_calculate_period_label(period_start, period_type),
162 total_completed=len(period_issues),
163 type_counts=dict(sorted(type_counts.items())),
164 priority_counts=dict(sorted(priority_counts.items())),
165 )
166 )
168 return result
171def _calculate_trend(values: list[float]) -> str:
172 """Determine trend from a series of values.
174 Args:
175 values: Time-ordered series of values
177 Returns:
178 "increasing", "decreasing", or "stable"
179 """
180 if len(values) < 3:
181 return "stable"
183 n = len(values)
184 slope = statistics.linear_regression(range(n), values).slope
186 # Normalize slope by average value
187 avg = sum(values) / n
188 if avg == 0:
189 avg = 1
190 normalized_slope = slope / avg
192 if normalized_slope > 0.05:
193 return "increasing"
194 elif normalized_slope < -0.05:
195 return "decreasing"
196 return "stable"
199def _analyze_subsystems(
200 issues: list[CompletedIssue],
201 recent_days: int = 30,
202 contents: dict[Path, str] | None = None,
203) -> list[SubsystemHealth]:
204 """Analyze health by subsystem/directory.
206 Args:
207 issues: List of completed issues
208 recent_days: Days to consider "recent"
209 contents: Pre-loaded issue file contents (path -> content)
211 Returns:
212 List of SubsystemHealth sorted by total issues descending
213 """
214 subsystems: dict[str, SubsystemHealth] = {}
215 cutoff = date.today() - timedelta(days=recent_days)
217 for issue in issues:
218 content = get_issue_content(issue, contents)
219 if content is None:
220 continue
222 subsystem = _extract_subsystem(content)
223 if not subsystem:
224 continue
226 if subsystem not in subsystems:
227 subsystems[subsystem] = SubsystemHealth(subsystem=subsystem)
229 health = subsystems[subsystem]
230 health.total_issues += 1
231 health.issue_ids.append(issue.issue_id)
233 if issue.completed_date and issue.completed_date >= cutoff:
234 health.recent_issues += 1
236 # Calculate trends based on recent vs historical ratio
237 for health in subsystems.values():
238 if health.total_issues >= 5:
239 recent_ratio = health.recent_issues / health.total_issues
240 if recent_ratio > 0.5:
241 health.trend = "degrading"
242 elif recent_ratio < 0.2:
243 health.trend = "improving"
245 # Sort by total issues descending
246 result = sorted(subsystems.values(), key=lambda s: -s.total_issues)
247 return result[:10] # Top 10