Coverage for little_loops / cli / sprint / run.py: 7%

285 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""ll-sprint run subcommand with signal handling and state management.""" 

2 

3from __future__ import annotations 

4 

5import os 

6import signal 

7import subprocess 

8import sys 

9from pathlib import Path 

10from types import FrameType 

11from typing import TYPE_CHECKING 

12 

13from little_loops.cli.sprint._helpers import _build_issue_contents, _render_dependency_analysis 

14from little_loops.cli_args import parse_issue_ids, parse_issue_types 

15from little_loops.dependency_graph import DependencyGraph, refine_waves_for_contention 

16from little_loops.logger import Logger, format_duration 

17from little_loops.parallel.orchestrator import ParallelOrchestrator 

18from little_loops.sprint import SprintManager, SprintState 

19 

20if TYPE_CHECKING: 

21 import argparse 

22 

23 from little_loops.config import BRConfig 

24 

25# Module-level shutdown flag for ll-sprint signal handling (ENH-183) 

26_sprint_shutdown_requested: bool = False 

27 

28 

29def _sprint_signal_handler(signum: int, frame: FrameType | None) -> None: 

30 """Handle shutdown signals gracefully for ll-sprint. 

31 

32 First signal: Set shutdown flag for graceful exit after current wave. 

33 Second signal: Force immediate exit. 

34 """ 

35 global _sprint_shutdown_requested 

36 if _sprint_shutdown_requested: 

37 # Second signal - force exit 

38 print("\nForce shutdown requested", file=sys.stderr) 

39 sys.exit(1) 

40 _sprint_shutdown_requested = True 

41 print("\nShutdown requested, will exit after current wave...", file=sys.stderr) 

42 

43 

44def _get_sprint_state_file() -> Path: 

45 """Get path to sprint state file.""" 

46 return Path.cwd() / ".sprint-state.json" 

47 

48 

49def _load_sprint_state(logger: Logger) -> SprintState | None: 

50 """Load sprint state from file.""" 

51 import json 

52 

53 state_file = _get_sprint_state_file() 

54 if not state_file.exists(): 

55 return None 

56 try: 

57 data = json.loads(state_file.read_text()) 

58 state = SprintState.from_dict(data) 

59 logger.info(f"State loaded from {state_file}") 

60 return state 

61 except (json.JSONDecodeError, KeyError) as e: 

62 logger.warning(f"Failed to load state: {e}") 

63 return None 

64 

65 

66def _save_sprint_state(state: SprintState, logger: Logger) -> None: 

67 """Save sprint state to file.""" 

68 import json 

69 from datetime import datetime 

70 

71 state.last_checkpoint = datetime.now().isoformat() 

72 state_file = _get_sprint_state_file() 

73 state_file.write_text(json.dumps(state.to_dict(), indent=2)) 

74 logger.info(f"State saved to {state_file}") 

75 

76 

77def _cleanup_sprint_state(logger: Logger) -> None: 

78 """Remove sprint state file.""" 

79 state_file = _get_sprint_state_file() 

80 if state_file.exists(): 

81 state_file.unlink() 

82 logger.info("Sprint state file cleaned up") 

83 

84 

85def _cmd_sprint_run( 

86 args: argparse.Namespace, 

87 manager: SprintManager, 

88 config: BRConfig, 

89) -> int: 

90 """Execute a sprint with dependency-aware scheduling.""" 

91 from datetime import datetime 

92 

93 logger = Logger(verbose=not args.quiet) 

94 

95 # Setup signal handlers for graceful shutdown (ENH-183) 

96 global _sprint_shutdown_requested 

97 _sprint_shutdown_requested = False # Reset in case of multiple runs 

98 signal.signal(signal.SIGINT, _sprint_signal_handler) 

99 signal.signal(signal.SIGTERM, _sprint_signal_handler) 

100 

101 handoff_threshold = getattr(args, "handoff_threshold", None) 

102 if handoff_threshold is not None: 

103 os.environ["LL_HANDOFF_THRESHOLD"] = str(handoff_threshold) 

104 

105 sprint = manager.load(args.sprint) 

106 if not sprint: 

107 logger.error(f"Sprint not found: {args.sprint}") 

108 return 1 

109 

110 # Apply skip filter if provided 

111 issues_to_process = list(sprint.issues) 

112 skip_ids = parse_issue_ids(args.skip) 

113 if skip_ids: 

114 original_count = len(issues_to_process) 

115 issues_to_process = [i for i in issues_to_process if i not in skip_ids] 

116 skipped = original_count - len(issues_to_process) 

117 if skipped > 0: 

118 logger.info(f"Skipping {skipped} issue(s): {', '.join(sorted(skip_ids))}") 

119 

120 # Apply only filter if provided 

121 only_ids = parse_issue_ids(getattr(args, "only", None)) 

122 if only_ids: 

123 invalid_only = only_ids - set(sprint.issues) 

124 if invalid_only: 

125 logger.error( 

126 f"Issue(s) not found in sprint definition: {', '.join(sorted(invalid_only))}" 

127 ) 

128 return 1 

129 issues_to_process = [i for i in issues_to_process if i in only_ids] 

130 logger.info( 

131 f"Processing only {len(issues_to_process)} issue(s): {', '.join(sorted(only_ids))}" 

132 ) 

133 

134 # Apply type filter if provided 

135 type_prefixes = parse_issue_types(getattr(args, "type", None)) 

136 if type_prefixes: 

137 original_count = len(issues_to_process) 

138 issues_to_process = [i for i in issues_to_process if i.split("-", 1)[0] in type_prefixes] 

139 filtered = original_count - len(issues_to_process) 

140 if filtered > 0: 

141 logger.info(f"Filtered {filtered} issue(s) by type: {', '.join(sorted(type_prefixes))}") 

142 

143 # Pre-validate: skip issues already moved to completed/ (ENH-581) 

144 pre_completed_skipped: list[str] = [] 

145 if config is not None: 

146 completed_dir = config.get_completed_dir() 

147 if completed_dir.exists(): 

148 still_active: list[str] = [] 

149 for issue_id in issues_to_process: 

150 if list(completed_dir.glob(f"*-{issue_id}-*.md")): 

151 logger.info(f" {issue_id}: already in completed/, skipping") 

152 pre_completed_skipped.append(issue_id) 

153 else: 

154 still_active.append(issue_id) 

155 if pre_completed_skipped: 

156 logger.info( 

157 f"Pre-validation: {len(pre_completed_skipped)} issue(s) already completed, " 

158 f"{len(still_active)} active" 

159 ) 

160 issues_to_process = still_active 

161 

162 if pre_completed_skipped and not issues_to_process: 

163 logger.info("All sprint issues already completed - nothing to process") 

164 return 0 

165 

166 # Validate issues exist 

167 valid = manager.validate_issues(issues_to_process) 

168 invalid = set(issues_to_process) - set(valid.keys()) 

169 

170 if invalid: 

171 logger.error(f"Issue IDs not found: {', '.join(sorted(invalid))}") 

172 logger.info("Cannot execute sprint with missing issues") 

173 return 1 

174 

175 # Load full IssueInfo objects for dependency analysis 

176 issue_infos = manager.load_issue_infos(issues_to_process) 

177 if not issue_infos: 

178 logger.error("No issue files found") 

179 return 1 

180 

181 # Gather all issue IDs on disk to avoid false "nonexistent" warnings 

182 from little_loops.dependency_mapper import gather_all_issue_ids 

183 

184 issues_dir = config.project_root / config.issues.base_dir 

185 all_known_ids = gather_all_issue_ids(issues_dir, config=config) 

186 

187 # Dependency analysis (ENH-301) 

188 if not getattr(args, "skip_analysis", False): 

189 from little_loops.dependency_mapper import analyze_dependencies 

190 

191 issue_contents = _build_issue_contents(issue_infos) 

192 dep_config = config.dependency_mapping 

193 dep_report = analyze_dependencies( 

194 issue_infos, issue_contents, all_known_ids=all_known_ids, config=dep_config 

195 ) 

196 _render_dependency_analysis(dep_report, logger, config=dep_config) 

197 

198 # Build dependency graph 

199 dep_graph = DependencyGraph.from_issues(issue_infos, all_known_ids=all_known_ids) 

200 

201 # Detect cycles 

202 if dep_graph.has_cycles(): 

203 cycles = dep_graph.detect_cycles() 

204 for cycle in cycles: 

205 logger.error(f"Dependency cycle detected: {' -> '.join(cycle)}") 

206 return 1 

207 

208 # Get execution waves 

209 try: 

210 waves = dep_graph.get_execution_waves() 

211 except ValueError as e: 

212 logger.error(str(e)) 

213 return 1 

214 

215 # Refine waves for file overlap (ENH-306) 

216 waves, contention_notes = refine_waves_for_contention(waves, config=config.dependency_mapping) 

217 

218 # Display execution plan 

219 logger.info(f"Running sprint: {sprint.name}") 

220 logger.info("Dependency analysis:") 

221 for i, wave in enumerate(waves, 1): 

222 issue_ids = ", ".join(issue.issue_id for issue in wave) 

223 note = contention_notes[i - 1] if contention_notes else None 

224 if note: 

225 logger.info( 

226 f" Wave {i}: {issue_ids}" 

227 f" [sub-wave {note.sub_wave_index + 1}/{note.total_sub_waves}]" 

228 ) 

229 else: 

230 logger.info(f" Wave {i}: {issue_ids}") 

231 

232 if args.dry_run: 

233 logger.info("\nDry run mode - no changes will be made") 

234 return 0 

235 

236 # Initialize or load state 

237 state: SprintState 

238 start_wave = 1 

239 

240 if args.resume: 

241 loaded_state = _load_sprint_state(logger) 

242 if loaded_state and loaded_state.sprint_name == args.sprint: 

243 state = loaded_state 

244 # Find first incomplete wave by checking completed issues 

245 completed_set = set(state.completed_issues) 

246 for i, wave in enumerate(waves, 1): 

247 wave_issue_ids = {issue.issue_id for issue in wave} 

248 if not wave_issue_ids.issubset(completed_set): 

249 start_wave = i 

250 break 

251 else: 

252 # All waves completed 

253 logger.info("Sprint already completed - nothing to resume") 

254 _cleanup_sprint_state(logger) 

255 return 0 

256 logger.info(f"Resuming from wave {start_wave}/{len(waves)}") 

257 logger.info(f" Previously completed: {len(state.completed_issues)} issues") 

258 else: 

259 if loaded_state: 

260 logger.warning( 

261 f"State file is for sprint '{loaded_state.sprint_name}', " 

262 f"not '{args.sprint}' - starting fresh" 

263 ) 

264 else: 

265 logger.warning("No valid state found - starting fresh") 

266 state = SprintState( 

267 sprint_name=args.sprint, 

268 started_at=datetime.now().isoformat(), 

269 ) 

270 else: 

271 # Fresh start - delete any old state 

272 _cleanup_sprint_state(logger) 

273 state = SprintState( 

274 sprint_name=args.sprint, 

275 started_at=datetime.now().isoformat(), 

276 ) 

277 

278 # Track exit status for error handling (ENH-185) 

279 exit_code = 0 

280 

281 try: 

282 # Determine max workers 

283 max_workers = args.max_workers or (sprint.options.max_workers if sprint.options else 2) 

284 

285 # Execute wave by wave 

286 completed: set[str] = set(state.completed_issues) 

287 failed_waves = 0 

288 total_duration = 0.0 

289 total_waves = len(waves) 

290 

291 for wave_num, wave in enumerate(waves, 1): 

292 # Check for shutdown request (ENH-183) 

293 if _sprint_shutdown_requested: 

294 logger.warning("Shutdown requested - saving state and exiting") 

295 _save_sprint_state(state, logger) 

296 exit_code = 1 

297 return exit_code 

298 

299 # Skip already-completed waves when resuming 

300 if wave_num < start_wave: 

301 continue 

302 

303 wave_ids = [issue.issue_id for issue in wave] 

304 state.current_wave = wave_num 

305 logger.info(f"\nProcessing wave {wave_num}/{total_waves}: {', '.join(wave_ids)}") 

306 

307 wave_note = ( 

308 contention_notes[wave_num - 1] 

309 if contention_notes and wave_num - 1 < len(contention_notes) 

310 else None 

311 ) 

312 is_contention_subwave = wave_note is not None 

313 

314 if len(wave) == 1 or is_contention_subwave: 

315 # Single issue OR contention sub-wave — process in-place sequentially 

316 # (contention sub-waves are displayed as "serialized steps" so must run that way) 

317 from little_loops.issue_manager import process_issue_inplace 

318 

319 wave_failed = False 

320 for issue in wave: 

321 issue_result = process_issue_inplace( 

322 info=issue, 

323 config=config, 

324 logger=logger, 

325 dry_run=args.dry_run, 

326 ) 

327 total_duration += issue_result.duration 

328 if issue_result.success: 

329 completed.add(issue.issue_id) 

330 state.completed_issues.append(issue.issue_id) 

331 state.timing[issue.issue_id] = {"total": issue_result.duration} 

332 logger.success(f" {issue.issue_id}: completed") 

333 elif issue_result.was_blocked: 

334 completed.add(issue.issue_id) 

335 state.skipped_blocked_issues[issue.issue_id] = issue_result.failure_reason 

336 logger.warning(f" {issue.issue_id}: skipped (blocked by open dependency)") 

337 else: 

338 wave_failed = True 

339 completed.add(issue.issue_id) 

340 state.failed_issues[issue.issue_id] = "Issue processing failed" 

341 logger.warning(f" {issue.issue_id}: failed") 

342 if wave_failed: 

343 failed_waves += 1 

344 logger.warning(f"Wave {wave_num}/{total_waves} had failures") 

345 else: 

346 logger.success( 

347 f"Wave {wave_num}/{total_waves} completed: {', '.join(wave_ids)}" 

348 ) 

349 _save_sprint_state(state, logger) 

350 if wave_num < total_waves: 

351 logger.info(f"Continuing to wave {wave_num + 1}/{total_waves}...") 

352 # Check for shutdown before next wave (ENH-183) 

353 if _sprint_shutdown_requested: 

354 logger.warning("Shutdown requested - exiting after wave completion") 

355 exit_code = 1 

356 return exit_code 

357 else: 

358 # Multi-issue — use ParallelOrchestrator with worktrees 

359 only_ids = set(wave_ids) 

360 # Detect current branch for rebase/merge operations (BUG-439) 

361 _br = subprocess.run( 

362 ["git", "rev-parse", "--abbrev-ref", "HEAD"], 

363 capture_output=True, 

364 text=True, 

365 cwd=Path.cwd(), 

366 ) 

367 _base_branch = _br.stdout.strip() if _br.returncode == 0 else "main" 

368 # Runtime overlap detection disabled for sprints (ENH-512): 

369 # refine_waves_for_contention() already splits overlapping 

370 # issues into separate sub-waves before dispatch. 

371 parallel_config = config.create_parallel_config( 

372 max_workers=min(max_workers, len(wave)), 

373 only_ids=only_ids, 

374 dry_run=args.dry_run, 

375 overlap_detection=False, 

376 serialize_overlapping=True, 

377 base_branch=_base_branch, 

378 clean_start=True, # Sprint manages its own state; don't load stale orchestrator state 

379 ) 

380 

381 orchestrator = ParallelOrchestrator( 

382 parallel_config, config, Path.cwd(), wave_label=f"Wave {wave_num}/{total_waves}" 

383 ) 

384 result = orchestrator.run() 

385 total_duration += orchestrator.execution_duration 

386 

387 # Track completed/failed from this wave using per-issue results 

388 actually_completed = set(orchestrator.queue.completed_ids) 

389 actually_failed = set(orchestrator.queue.failed_ids) 

390 

391 for issue_id in wave_ids: 

392 if issue_id in actually_completed: 

393 completed.add(issue_id) 

394 state.completed_issues.append(issue_id) 

395 state.timing[issue_id] = { 

396 "total": orchestrator.execution_duration / len(wave) 

397 } 

398 elif issue_id in actually_failed: 

399 completed.add(issue_id) 

400 state.failed_issues[issue_id] = "Issue failed during wave execution" 

401 # else: issue was neither completed nor failed (interrupted/stranded) 

402 # — leave untracked so it can be retried on resume 

403 

404 # Sequential retry for failed issues (ENH-308) 

405 if actually_failed: 

406 logger.info(f"Retrying {len(actually_failed)} failed issue(s) sequentially...") 

407 from little_loops.issue_manager import process_issue_inplace 

408 

409 retried_ok = 0 

410 for issue in wave: 

411 if issue.issue_id not in actually_failed: 

412 continue 

413 logger.info(f" Retrying {issue.issue_id} in-place...") 

414 retry_result = process_issue_inplace( 

415 info=issue, 

416 config=config, 

417 logger=logger, 

418 dry_run=args.dry_run, 

419 ) 

420 total_duration += retry_result.duration 

421 if retry_result.success: 

422 retried_ok += 1 

423 state.failed_issues.pop(issue.issue_id, None) 

424 state.completed_issues.append(issue.issue_id) 

425 state.timing[issue.issue_id] = {"total": retry_result.duration} 

426 logger.success(f" Retry succeeded: {issue.issue_id}") 

427 elif retry_result.was_blocked: 

428 state.failed_issues.pop(issue.issue_id, None) 

429 state.skipped_blocked_issues[issue.issue_id] = ( 

430 retry_result.failure_reason 

431 ) 

432 logger.warning( 

433 f" Retry skipped: {issue.issue_id} (blocked by open dependency)" 

434 ) 

435 else: 

436 logger.warning(f" Retry failed: {issue.issue_id}") 

437 if retried_ok > 0: 

438 logger.info( 

439 f"Sequential retry recovered {retried_ok}/{len(actually_failed)} issue(s)" 

440 ) 

441 

442 # Check whether failures remain after retry (ENH-308) 

443 remaining_failures = {iid for iid in actually_failed if iid in state.failed_issues} 

444 if result == 0 or not remaining_failures: 

445 logger.success( 

446 f"Wave {wave_num}/{total_waves} completed: {', '.join(wave_ids)}" 

447 ) 

448 else: 

449 failed_waves += 1 

450 logger.warning(f"Wave {wave_num}/{total_waves} had failures") 

451 _save_sprint_state(state, logger) 

452 if wave_num < total_waves: 

453 logger.info(f"Continuing to wave {wave_num + 1}/{total_waves}...") 

454 # Check for shutdown before next wave (ENH-183) 

455 if _sprint_shutdown_requested: 

456 logger.warning("Shutdown requested - exiting after wave completion") 

457 exit_code = 1 

458 return exit_code 

459 

460 wave_word = "wave" if len(waves) == 1 else "waves" 

461 skip_msg = ( 

462 f", {len(pre_completed_skipped)} already completed (skipped)" 

463 if pre_completed_skipped 

464 else "" 

465 ) 

466 blocked_msg = ( 

467 f", {len(state.skipped_blocked_issues)} skipped (blocked)" 

468 if state.skipped_blocked_issues 

469 else "" 

470 ) 

471 logger.info( 

472 f"\nSprint completed: {len(completed)} issues processed " 

473 f"({len(waves)} {wave_word}){skip_msg}{blocked_msg}" 

474 ) 

475 logger.timing(f"Total execution time: {format_duration(total_duration)}") 

476 if failed_waves > 0: 

477 logger.warning(f"{failed_waves} wave(s) had failures") 

478 exit_code = 1 

479 else: 

480 # Clean up state on successful completion 

481 _cleanup_sprint_state(logger) 

482 

483 except KeyboardInterrupt: 

484 # Belt-and-suspenders with signal handler (ENH-185) 

485 logger.warning("Sprint interrupted by user (KeyboardInterrupt)") 

486 exit_code = 130 

487 

488 except Exception as e: 

489 # Catch unexpected exceptions (ENH-185) 

490 logger.error(f"Sprint failed unexpectedly: {e}") 

491 exit_code = 1 

492 

493 finally: 

494 # Guaranteed state save on any non-success exit (ENH-185) 

495 if exit_code != 0: 

496 _save_sprint_state(state, logger) 

497 logger.info("State saved before exit") 

498 

499 return exit_code