Coverage for little_loops / sprint.py: 40%

127 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""Sprint and sequence management for issue execution.""" 

2 

3import logging 

4from dataclasses import dataclass, field 

5from datetime import UTC, datetime 

6from pathlib import Path 

7from typing import TYPE_CHECKING, Any 

8 

9import yaml 

10 

11logger = logging.getLogger(__name__) 

12 

13if TYPE_CHECKING: 

14 from little_loops.config import BRConfig 

15 from little_loops.issue_parser import IssueInfo 

16 

17 

18@dataclass 

19class SprintOptions: 

20 """Execution options for sprint runs. 

21 

22 Attributes: 

23 max_iterations: Maximum Claude iterations per issue 

24 timeout: Per-issue timeout in seconds 

25 max_workers: Worker count for parallel execution within waves 

26 """ 

27 

28 max_iterations: int = 100 

29 timeout: int = 3600 

30 max_workers: int = 2 

31 

32 def to_dict(self) -> dict: 

33 """Convert to dictionary for YAML serialization.""" 

34 return { 

35 "max_iterations": self.max_iterations, 

36 "timeout": self.timeout, 

37 "max_workers": self.max_workers, 

38 } 

39 

40 @classmethod 

41 def from_dict(cls, data: dict | None) -> "SprintOptions": 

42 """Create from dictionary (YAML deserialization). 

43 

44 Args: 

45 data: Dictionary from YAML file or None for defaults 

46 

47 Returns: 

48 SprintOptions instance 

49 """ 

50 if data is None: 

51 return cls() 

52 return cls( 

53 max_iterations=data.get("max_iterations", 100), 

54 timeout=data.get("timeout", 3600), 

55 max_workers=data.get("max_workers", 2), 

56 ) 

57 

58 

59@dataclass 

60class SprintState: 

61 """Persistent state for sprint execution. 

62 

63 Enables resume capability after interruption by tracking: 

64 - Sprint name being executed 

65 - Current wave number 

66 - Completed issues 

67 - Failed issues with reasons 

68 - Timing information 

69 

70 Attributes: 

71 sprint_name: Name of the sprint being executed 

72 current_wave: Wave number currently being processed (1-indexed) 

73 completed_issues: List of completed issue IDs 

74 failed_issues: Mapping of issue ID to failure reason 

75 timing: Per-issue timing breakdown 

76 started_at: ISO 8601 timestamp when sprint started 

77 last_checkpoint: ISO 8601 timestamp of last state save 

78 """ 

79 

80 sprint_name: str = "" 

81 current_wave: int = 0 

82 completed_issues: list[str] = field(default_factory=list) 

83 failed_issues: dict[str, str] = field(default_factory=dict) 

84 skipped_blocked_issues: dict[str, str] = field(default_factory=dict) 

85 timing: dict[str, dict[str, float]] = field(default_factory=dict) 

86 started_at: str = "" 

87 last_checkpoint: str = "" 

88 

89 def to_dict(self) -> dict[str, Any]: 

90 """Convert state to dictionary for JSON serialization.""" 

91 return { 

92 "sprint_name": self.sprint_name, 

93 "current_wave": self.current_wave, 

94 "completed_issues": self.completed_issues, 

95 "failed_issues": self.failed_issues, 

96 "skipped_blocked_issues": self.skipped_blocked_issues, 

97 "timing": self.timing, 

98 "started_at": self.started_at, 

99 "last_checkpoint": self.last_checkpoint, 

100 } 

101 

102 @classmethod 

103 def from_dict(cls, data: dict[str, Any]) -> "SprintState": 

104 """Create state from dictionary (JSON deserialization).""" 

105 return cls( 

106 sprint_name=data.get("sprint_name", ""), 

107 current_wave=data.get("current_wave", 0), 

108 completed_issues=data.get("completed_issues", []), 

109 failed_issues=data.get("failed_issues", {}), 

110 skipped_blocked_issues=data.get("skipped_blocked_issues", {}), 

111 timing=data.get("timing", {}), 

112 started_at=data.get("started_at", ""), 

113 last_checkpoint=data.get("last_checkpoint", ""), 

114 ) 

115 

116 

117@dataclass 

118class Sprint: 

119 """A sprint is a named group of issues to execute together. 

120 

121 Sprints allow planning work in batches and executing them as a unit. 

122 Execution is always dependency-aware with parallel waves. 

123 

124 Attributes: 

125 name: Sprint identifier (used as filename) 

126 description: Human-readable purpose 

127 issues: List of issue IDs (e.g., BUG-001, FEAT-010) 

128 created: ISO 8601 timestamp of creation 

129 options: Execution options (timeout, max_workers, etc.) 

130 """ 

131 

132 name: str 

133 description: str 

134 issues: list[str] 

135 created: str 

136 options: SprintOptions | None = None 

137 

138 def to_dict(self) -> dict[str, str | list[str] | dict]: 

139 """Convert to dictionary for YAML serialization. 

140 

141 Returns: 

142 Dictionary representation suitable for yaml.dump() 

143 """ 

144 data: dict[str, str | list[str] | dict] = { 

145 "name": self.name, 

146 "description": self.description, 

147 "created": self.created, 

148 "issues": self.issues, 

149 } 

150 if self.options: 

151 data["options"] = self.options.to_dict() 

152 return data 

153 

154 @classmethod 

155 def from_dict(cls, data: dict) -> "Sprint": 

156 """Create from dictionary (YAML deserialization). 

157 

158 Args: 

159 data: Dictionary from YAML file 

160 

161 Returns: 

162 Sprint instance 

163 """ 

164 return cls( 

165 name=data["name"], 

166 description=data.get("description", ""), 

167 issues=data.get("issues", []), 

168 created=data.get("created", datetime.now(UTC).isoformat()), 

169 options=SprintOptions.from_dict(data.get("options")), 

170 ) 

171 

172 def save(self, sprints_dir: Path) -> Path: 

173 """Save sprint to YAML file. 

174 

175 Args: 

176 sprints_dir: Directory containing sprint definitions 

177 

178 Returns: 

179 Path to saved file 

180 """ 

181 sprints_dir.mkdir(parents=True, exist_ok=True) 

182 sprint_path = sprints_dir / f"{self.name}.yaml" 

183 with open(sprint_path, "w") as f: 

184 yaml.dump(self.to_dict(), f, default_flow_style=False, sort_keys=False) 

185 return sprint_path 

186 

187 @classmethod 

188 def load(cls, sprints_dir: Path, name: str) -> "Sprint | None": 

189 """Load sprint from YAML file. 

190 

191 Args: 

192 sprints_dir: Directory containing sprint definitions 

193 name: Sprint name (without .yaml extension) 

194 

195 Returns: 

196 Sprint instance or None if not found 

197 """ 

198 sprint_path = sprints_dir / f"{name}.yaml" 

199 if not sprint_path.exists(): 

200 return None 

201 with open(sprint_path) as f: 

202 data = yaml.safe_load(f) 

203 return cls.from_dict(data) 

204 

205 

206class SprintManager: 

207 """Manager for sprint CRUD operations. 

208 

209 Provides methods to create, load, list, and delete sprint definitions. 

210 Also validates that issue IDs exist before executing sprints. 

211 """ 

212 

213 def __init__(self, sprints_dir: Path | None = None, config: "BRConfig | None" = None) -> None: 

214 """Initialize SprintManager. 

215 

216 Args: 

217 sprints_dir: Directory for sprint definitions (overrides config) 

218 config: Project configuration for settings and issue validation 

219 """ 

220 self.config = config 

221 # Derive sprints_dir: explicit arg > config > default 

222 if sprints_dir is not None: 

223 self.sprints_dir = sprints_dir 

224 elif config is not None: 

225 self.sprints_dir = Path(config.sprints.sprints_dir) 

226 else: 

227 self.sprints_dir = Path(".sprints") 

228 self.sprints_dir.mkdir(parents=True, exist_ok=True) 

229 

230 def get_default_options(self) -> SprintOptions: 

231 """Get default SprintOptions from config or hardcoded defaults. 

232 

233 Returns: 

234 SprintOptions with values from config if available, else defaults 

235 """ 

236 if self.config is not None: 

237 return SprintOptions( 

238 timeout=self.config.sprints.default_timeout, 

239 max_workers=self.config.sprints.default_max_workers, 

240 ) 

241 return SprintOptions() 

242 

243 def create( 

244 self, 

245 name: str, 

246 issues: list[str], 

247 description: str = "", 

248 options: SprintOptions | None = None, 

249 ) -> Sprint: 

250 """Create a new sprint. 

251 

252 Args: 

253 name: Sprint identifier 

254 issues: List of issue IDs 

255 description: Human-readable description 

256 options: Optional execution options 

257 

258 Returns: 

259 Created Sprint instance 

260 """ 

261 sprint = Sprint( 

262 name=name, 

263 description=description, 

264 issues=[i.strip().upper() for i in issues], 

265 created=datetime.now(UTC).isoformat(), 

266 options=options, 

267 ) 

268 sprint.save(self.sprints_dir) 

269 return sprint 

270 

271 def load(self, name: str) -> Sprint | None: 

272 """Load a sprint by name. 

273 

274 Args: 

275 name: Sprint name 

276 

277 Returns: 

278 Sprint instance or None if not found 

279 """ 

280 return Sprint.load(self.sprints_dir, name) 

281 

282 def list_all(self) -> list[Sprint]: 

283 """List all sprints. 

284 

285 Returns: 

286 List of Sprint instances, sorted by name 

287 """ 

288 sprints = [] 

289 for path in sorted(self.sprints_dir.glob("*.yaml")): 

290 sprint = Sprint.load(self.sprints_dir, path.stem) 

291 if sprint: 

292 sprints.append(sprint) 

293 return sprints 

294 

295 def delete(self, name: str) -> bool: 

296 """Delete a sprint. 

297 

298 Args: 

299 name: Sprint name 

300 

301 Returns: 

302 True if deleted, False if not found 

303 """ 

304 sprint_path = self.sprints_dir / f"{name}.yaml" 

305 if not sprint_path.exists(): 

306 return False 

307 sprint_path.unlink() 

308 return True 

309 

310 def _find_issue_path(self, issue_id: str) -> Path | None: 

311 """Find the filesystem path for an issue ID. 

312 

313 Searches all configured issue categories for a file matching the issue ID. 

314 

315 Args: 

316 issue_id: Issue ID to locate (e.g. "BUG-001") 

317 

318 Returns: 

319 Path to the issue file, or None if not found 

320 """ 

321 if not self.config: 

322 return None 

323 for category in self.config.issue_categories: 

324 issue_dir = self.config.get_issue_dir(category) 

325 for path in issue_dir.glob(f"*-{issue_id}-*.md"): 

326 return path 

327 return None 

328 

329 def validate_issues(self, issues: list[str]) -> dict[str, Path]: 

330 """Validate that issue IDs exist. 

331 

332 Args: 

333 issues: List of issue IDs to validate 

334 

335 Returns: 

336 Dictionary mapping valid issue IDs to their file paths 

337 """ 

338 if not self.config: 

339 # No config provided, skip validation 

340 return {} 

341 

342 valid = {} 

343 for issue_id in issues: 

344 path = self._find_issue_path(issue_id) 

345 if path is not None: 

346 valid[issue_id] = path 

347 return valid 

348 

349 def load_issue_infos(self, issues: list[str]) -> list["IssueInfo"]: 

350 """Load IssueInfo objects for the given issue IDs. 

351 

352 Args: 

353 issues: List of issue IDs to load 

354 

355 Returns: 

356 List of IssueInfo objects (only for issues that exist) 

357 """ 

358 from little_loops.issue_parser import IssueParser 

359 

360 if not self.config: 

361 return [] 

362 

363 parser = IssueParser(self.config) 

364 result: list[IssueInfo] = [] 

365 for issue_id in issues: 

366 path = self._find_issue_path(issue_id) 

367 if path is not None: 

368 try: 

369 info = parser.parse_file(path) 

370 result.append(info) 

371 except Exception as e: 

372 logger.warning("Failed to parse issue file %s: %s", path, e) 

373 return result