Coverage for little_loops / issue_history / summary.py: 0%

116 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""Issue history summary and period metrics analysis.""" 

2 

3from __future__ import annotations 

4 

5import statistics 

6from collections import defaultdict 

7from datetime import date, timedelta 

8from pathlib import Path 

9from typing import Literal 

10 

11from little_loops.issue_history._utils import get_issue_content 

12from little_loops.issue_history.models import ( 

13 CompletedIssue, 

14 HistorySummary, 

15 PeriodMetrics, 

16 SubsystemHealth, 

17) 

18from little_loops.issue_history.parsing import _extract_subsystem 

19 

20 

21def calculate_summary(issues: list[CompletedIssue]) -> HistorySummary: 

22 """Calculate summary statistics from issues. 

23 

24 Args: 

25 issues: List of CompletedIssue objects 

26 

27 Returns: 

28 HistorySummary with calculated statistics 

29 """ 

30 type_counts: dict[str, int] = {} 

31 priority_counts: dict[str, int] = {} 

32 discovery_counts: dict[str, int] = {} 

33 dates: list[date] = [] 

34 

35 for issue in issues: 

36 # Count by type 

37 type_counts[issue.issue_type] = type_counts.get(issue.issue_type, 0) + 1 

38 

39 # Count by priority 

40 priority_counts[issue.priority] = priority_counts.get(issue.priority, 0) + 1 

41 

42 # Count by discovery source 

43 source = issue.discovered_by or "unknown" 

44 discovery_counts[source] = discovery_counts.get(source, 0) + 1 

45 

46 # Collect dates 

47 if issue.completed_date: 

48 dates.append(issue.completed_date) 

49 

50 # Sort counts for consistent output 

51 type_counts = dict(sorted(type_counts.items())) 

52 priority_counts = dict(sorted(priority_counts.items())) 

53 discovery_counts = dict(sorted(discovery_counts.items(), key=lambda x: (-x[1], x[0]))) 

54 

55 return HistorySummary( 

56 total_count=len(issues), 

57 type_counts=type_counts, 

58 priority_counts=priority_counts, 

59 discovery_counts=discovery_counts, 

60 earliest_date=min(dates) if dates else None, 

61 latest_date=max(dates) if dates else None, 

62 ) 

63 

64 

65def _calculate_period_label(start: date, period_type: str) -> str: 

66 """Generate human-readable period label. 

67 

68 Args: 

69 start: Period start date 

70 period_type: "weekly", "monthly", "quarterly" 

71 

72 Returns: 

73 Label like "Q1 2025", "Jan 2025", "Week 3 2025" 

74 """ 

75 if period_type == "quarterly": 

76 quarter = (start.month - 1) // 3 + 1 

77 return f"Q{quarter} {start.year}" 

78 elif period_type == "monthly": 

79 return start.strftime("%b %Y") 

80 else: # weekly 

81 week_num = start.isocalendar()[1] 

82 return f"Week {week_num} {start.year}" 

83 

84 

85def _group_by_period( 

86 issues: list[CompletedIssue], 

87 period_type: Literal["weekly", "monthly", "quarterly"] = "monthly", 

88) -> list[PeriodMetrics]: 

89 """Group issues by time period and calculate metrics. 

90 

91 Args: 

92 issues: List of completed issues with dates 

93 period_type: Grouping period 

94 

95 Returns: 

96 List of PeriodMetrics sorted by date ascending 

97 """ 

98 # Filter issues with dates 

99 dated_issues = [i for i in issues if i.completed_date] 

100 if not dated_issues: 

101 return [] 

102 

103 # Sort by date 

104 dated_issues.sort(key=lambda i: i.completed_date) # type: ignore 

105 

106 # Determine period boundaries 

107 periods: dict[str, list[CompletedIssue]] = defaultdict(list) 

108 

109 for issue in dated_issues: 

110 completed = issue.completed_date 

111 assert completed is not None 

112 

113 if period_type == "quarterly": 

114 quarter = (completed.month - 1) // 3 

115 period_start = date(completed.year, quarter * 3 + 1, 1) 

116 elif period_type == "monthly": 

117 period_start = date(completed.year, completed.month, 1) 

118 else: # weekly 

119 # Start of week (Monday) 

120 period_start = completed - timedelta(days=completed.weekday()) 

121 

122 key = period_start.isoformat() 

123 periods[key].append(issue) 

124 

125 # Calculate metrics for each period 

126 result: list[PeriodMetrics] = [] 

127 for period_key in sorted(periods.keys()): 

128 period_issues = periods[period_key] 

129 period_start = date.fromisoformat(period_key) 

130 

131 # Calculate period end 

132 if period_type == "quarterly": 

133 month = period_start.month + 3 

134 year = period_start.year 

135 if month > 12: 

136 month = 1 

137 year += 1 

138 period_end = date(year, month, 1) - timedelta(days=1) 

139 elif period_type == "monthly": 

140 month = period_start.month + 1 

141 year = period_start.year 

142 if month > 12: 

143 month = 1 

144 year += 1 

145 period_end = date(year, month, 1) - timedelta(days=1) 

146 else: # weekly 

147 period_end = period_start + timedelta(days=6) 

148 

149 # Count types and priorities 

150 type_counts: dict[str, int] = {} 

151 priority_counts: dict[str, int] = {} 

152 

153 for issue in period_issues: 

154 type_counts[issue.issue_type] = type_counts.get(issue.issue_type, 0) + 1 

155 priority_counts[issue.priority] = priority_counts.get(issue.priority, 0) + 1 

156 

157 result.append( 

158 PeriodMetrics( 

159 period_start=period_start, 

160 period_end=period_end, 

161 period_label=_calculate_period_label(period_start, period_type), 

162 total_completed=len(period_issues), 

163 type_counts=dict(sorted(type_counts.items())), 

164 priority_counts=dict(sorted(priority_counts.items())), 

165 ) 

166 ) 

167 

168 return result 

169 

170 

171def _calculate_trend(values: list[float]) -> str: 

172 """Determine trend from a series of values. 

173 

174 Args: 

175 values: Time-ordered series of values 

176 

177 Returns: 

178 "increasing", "decreasing", or "stable" 

179 """ 

180 if len(values) < 3: 

181 return "stable" 

182 

183 n = len(values) 

184 slope = statistics.linear_regression(range(n), values).slope 

185 

186 # Normalize slope by average value 

187 avg = sum(values) / n 

188 if avg == 0: 

189 avg = 1 

190 normalized_slope = slope / avg 

191 

192 if normalized_slope > 0.05: 

193 return "increasing" 

194 elif normalized_slope < -0.05: 

195 return "decreasing" 

196 return "stable" 

197 

198 

199def _analyze_subsystems( 

200 issues: list[CompletedIssue], 

201 recent_days: int = 30, 

202 contents: dict[Path, str] | None = None, 

203) -> list[SubsystemHealth]: 

204 """Analyze health by subsystem/directory. 

205 

206 Args: 

207 issues: List of completed issues 

208 recent_days: Days to consider "recent" 

209 contents: Pre-loaded issue file contents (path -> content) 

210 

211 Returns: 

212 List of SubsystemHealth sorted by total issues descending 

213 """ 

214 subsystems: dict[str, SubsystemHealth] = {} 

215 cutoff = date.today() - timedelta(days=recent_days) 

216 

217 for issue in issues: 

218 content = get_issue_content(issue, contents) 

219 if content is None: 

220 continue 

221 

222 subsystem = _extract_subsystem(content) 

223 if not subsystem: 

224 continue 

225 

226 if subsystem not in subsystems: 

227 subsystems[subsystem] = SubsystemHealth(subsystem=subsystem) 

228 

229 health = subsystems[subsystem] 

230 health.total_issues += 1 

231 health.issue_ids.append(issue.issue_id) 

232 

233 if issue.completed_date and issue.completed_date >= cutoff: 

234 health.recent_issues += 1 

235 

236 # Calculate trends based on recent vs historical ratio 

237 for health in subsystems.values(): 

238 if health.total_issues >= 5: 

239 recent_ratio = health.recent_issues / health.total_issues 

240 if recent_ratio > 0.5: 

241 health.trend = "degrading" 

242 elif recent_ratio < 0.2: 

243 health.trend = "improving" 

244 

245 # Sort by total issues descending 

246 result = sorted(subsystems.values(), key=lambda s: -s.total_issues) 

247 return result[:10] # Top 10