Coverage for little_loops / cli / sprint / run.py: 7%
285 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""ll-sprint run subcommand with signal handling and state management."""
3from __future__ import annotations
5import os
6import signal
7import subprocess
8import sys
9from pathlib import Path
10from types import FrameType
11from typing import TYPE_CHECKING
13from little_loops.cli.sprint._helpers import _build_issue_contents, _render_dependency_analysis
14from little_loops.cli_args import parse_issue_ids, parse_issue_types
15from little_loops.dependency_graph import DependencyGraph, refine_waves_for_contention
16from little_loops.logger import Logger, format_duration
17from little_loops.parallel.orchestrator import ParallelOrchestrator
18from little_loops.sprint import SprintManager, SprintState
20if TYPE_CHECKING:
21 import argparse
23 from little_loops.config import BRConfig
25# Module-level shutdown flag for ll-sprint signal handling (ENH-183)
26_sprint_shutdown_requested: bool = False
29def _sprint_signal_handler(signum: int, frame: FrameType | None) -> None:
30 """Handle shutdown signals gracefully for ll-sprint.
32 First signal: Set shutdown flag for graceful exit after current wave.
33 Second signal: Force immediate exit.
34 """
35 global _sprint_shutdown_requested
36 if _sprint_shutdown_requested:
37 # Second signal - force exit
38 print("\nForce shutdown requested", file=sys.stderr)
39 sys.exit(1)
40 _sprint_shutdown_requested = True
41 print("\nShutdown requested, will exit after current wave...", file=sys.stderr)
44def _get_sprint_state_file() -> Path:
45 """Get path to sprint state file."""
46 return Path.cwd() / ".sprint-state.json"
49def _load_sprint_state(logger: Logger) -> SprintState | None:
50 """Load sprint state from file."""
51 import json
53 state_file = _get_sprint_state_file()
54 if not state_file.exists():
55 return None
56 try:
57 data = json.loads(state_file.read_text())
58 state = SprintState.from_dict(data)
59 logger.info(f"State loaded from {state_file}")
60 return state
61 except (json.JSONDecodeError, KeyError) as e:
62 logger.warning(f"Failed to load state: {e}")
63 return None
66def _save_sprint_state(state: SprintState, logger: Logger) -> None:
67 """Save sprint state to file."""
68 import json
69 from datetime import datetime
71 state.last_checkpoint = datetime.now().isoformat()
72 state_file = _get_sprint_state_file()
73 state_file.write_text(json.dumps(state.to_dict(), indent=2))
74 logger.info(f"State saved to {state_file}")
77def _cleanup_sprint_state(logger: Logger) -> None:
78 """Remove sprint state file."""
79 state_file = _get_sprint_state_file()
80 if state_file.exists():
81 state_file.unlink()
82 logger.info("Sprint state file cleaned up")
85def _cmd_sprint_run(
86 args: argparse.Namespace,
87 manager: SprintManager,
88 config: BRConfig,
89) -> int:
90 """Execute a sprint with dependency-aware scheduling."""
91 from datetime import datetime
93 logger = Logger(verbose=not args.quiet)
95 # Setup signal handlers for graceful shutdown (ENH-183)
96 global _sprint_shutdown_requested
97 _sprint_shutdown_requested = False # Reset in case of multiple runs
98 signal.signal(signal.SIGINT, _sprint_signal_handler)
99 signal.signal(signal.SIGTERM, _sprint_signal_handler)
101 handoff_threshold = getattr(args, "handoff_threshold", None)
102 if handoff_threshold is not None:
103 os.environ["LL_HANDOFF_THRESHOLD"] = str(handoff_threshold)
105 sprint = manager.load(args.sprint)
106 if not sprint:
107 logger.error(f"Sprint not found: {args.sprint}")
108 return 1
110 # Apply skip filter if provided
111 issues_to_process = list(sprint.issues)
112 skip_ids = parse_issue_ids(args.skip)
113 if skip_ids:
114 original_count = len(issues_to_process)
115 issues_to_process = [i for i in issues_to_process if i not in skip_ids]
116 skipped = original_count - len(issues_to_process)
117 if skipped > 0:
118 logger.info(f"Skipping {skipped} issue(s): {', '.join(sorted(skip_ids))}")
120 # Apply only filter if provided
121 only_ids = parse_issue_ids(getattr(args, "only", None))
122 if only_ids:
123 invalid_only = only_ids - set(sprint.issues)
124 if invalid_only:
125 logger.error(
126 f"Issue(s) not found in sprint definition: {', '.join(sorted(invalid_only))}"
127 )
128 return 1
129 issues_to_process = [i for i in issues_to_process if i in only_ids]
130 logger.info(
131 f"Processing only {len(issues_to_process)} issue(s): {', '.join(sorted(only_ids))}"
132 )
134 # Apply type filter if provided
135 type_prefixes = parse_issue_types(getattr(args, "type", None))
136 if type_prefixes:
137 original_count = len(issues_to_process)
138 issues_to_process = [i for i in issues_to_process if i.split("-", 1)[0] in type_prefixes]
139 filtered = original_count - len(issues_to_process)
140 if filtered > 0:
141 logger.info(f"Filtered {filtered} issue(s) by type: {', '.join(sorted(type_prefixes))}")
143 # Pre-validate: skip issues already moved to completed/ (ENH-581)
144 pre_completed_skipped: list[str] = []
145 if config is not None:
146 completed_dir = config.get_completed_dir()
147 if completed_dir.exists():
148 still_active: list[str] = []
149 for issue_id in issues_to_process:
150 if list(completed_dir.glob(f"*-{issue_id}-*.md")):
151 logger.info(f" {issue_id}: already in completed/, skipping")
152 pre_completed_skipped.append(issue_id)
153 else:
154 still_active.append(issue_id)
155 if pre_completed_skipped:
156 logger.info(
157 f"Pre-validation: {len(pre_completed_skipped)} issue(s) already completed, "
158 f"{len(still_active)} active"
159 )
160 issues_to_process = still_active
162 if pre_completed_skipped and not issues_to_process:
163 logger.info("All sprint issues already completed - nothing to process")
164 return 0
166 # Validate issues exist
167 valid = manager.validate_issues(issues_to_process)
168 invalid = set(issues_to_process) - set(valid.keys())
170 if invalid:
171 logger.error(f"Issue IDs not found: {', '.join(sorted(invalid))}")
172 logger.info("Cannot execute sprint with missing issues")
173 return 1
175 # Load full IssueInfo objects for dependency analysis
176 issue_infos = manager.load_issue_infos(issues_to_process)
177 if not issue_infos:
178 logger.error("No issue files found")
179 return 1
181 # Gather all issue IDs on disk to avoid false "nonexistent" warnings
182 from little_loops.dependency_mapper import gather_all_issue_ids
184 issues_dir = config.project_root / config.issues.base_dir
185 all_known_ids = gather_all_issue_ids(issues_dir, config=config)
187 # Dependency analysis (ENH-301)
188 if not getattr(args, "skip_analysis", False):
189 from little_loops.dependency_mapper import analyze_dependencies
191 issue_contents = _build_issue_contents(issue_infos)
192 dep_config = config.dependency_mapping
193 dep_report = analyze_dependencies(
194 issue_infos, issue_contents, all_known_ids=all_known_ids, config=dep_config
195 )
196 _render_dependency_analysis(dep_report, logger, config=dep_config)
198 # Build dependency graph
199 dep_graph = DependencyGraph.from_issues(issue_infos, all_known_ids=all_known_ids)
201 # Detect cycles
202 if dep_graph.has_cycles():
203 cycles = dep_graph.detect_cycles()
204 for cycle in cycles:
205 logger.error(f"Dependency cycle detected: {' -> '.join(cycle)}")
206 return 1
208 # Get execution waves
209 try:
210 waves = dep_graph.get_execution_waves()
211 except ValueError as e:
212 logger.error(str(e))
213 return 1
215 # Refine waves for file overlap (ENH-306)
216 waves, contention_notes = refine_waves_for_contention(waves, config=config.dependency_mapping)
218 # Display execution plan
219 logger.info(f"Running sprint: {sprint.name}")
220 logger.info("Dependency analysis:")
221 for i, wave in enumerate(waves, 1):
222 issue_ids = ", ".join(issue.issue_id for issue in wave)
223 note = contention_notes[i - 1] if contention_notes else None
224 if note:
225 logger.info(
226 f" Wave {i}: {issue_ids}"
227 f" [sub-wave {note.sub_wave_index + 1}/{note.total_sub_waves}]"
228 )
229 else:
230 logger.info(f" Wave {i}: {issue_ids}")
232 if args.dry_run:
233 logger.info("\nDry run mode - no changes will be made")
234 return 0
236 # Initialize or load state
237 state: SprintState
238 start_wave = 1
240 if args.resume:
241 loaded_state = _load_sprint_state(logger)
242 if loaded_state and loaded_state.sprint_name == args.sprint:
243 state = loaded_state
244 # Find first incomplete wave by checking completed issues
245 completed_set = set(state.completed_issues)
246 for i, wave in enumerate(waves, 1):
247 wave_issue_ids = {issue.issue_id for issue in wave}
248 if not wave_issue_ids.issubset(completed_set):
249 start_wave = i
250 break
251 else:
252 # All waves completed
253 logger.info("Sprint already completed - nothing to resume")
254 _cleanup_sprint_state(logger)
255 return 0
256 logger.info(f"Resuming from wave {start_wave}/{len(waves)}")
257 logger.info(f" Previously completed: {len(state.completed_issues)} issues")
258 else:
259 if loaded_state:
260 logger.warning(
261 f"State file is for sprint '{loaded_state.sprint_name}', "
262 f"not '{args.sprint}' - starting fresh"
263 )
264 else:
265 logger.warning("No valid state found - starting fresh")
266 state = SprintState(
267 sprint_name=args.sprint,
268 started_at=datetime.now().isoformat(),
269 )
270 else:
271 # Fresh start - delete any old state
272 _cleanup_sprint_state(logger)
273 state = SprintState(
274 sprint_name=args.sprint,
275 started_at=datetime.now().isoformat(),
276 )
278 # Track exit status for error handling (ENH-185)
279 exit_code = 0
281 try:
282 # Determine max workers
283 max_workers = args.max_workers or (sprint.options.max_workers if sprint.options else 2)
285 # Execute wave by wave
286 completed: set[str] = set(state.completed_issues)
287 failed_waves = 0
288 total_duration = 0.0
289 total_waves = len(waves)
291 for wave_num, wave in enumerate(waves, 1):
292 # Check for shutdown request (ENH-183)
293 if _sprint_shutdown_requested:
294 logger.warning("Shutdown requested - saving state and exiting")
295 _save_sprint_state(state, logger)
296 exit_code = 1
297 return exit_code
299 # Skip already-completed waves when resuming
300 if wave_num < start_wave:
301 continue
303 wave_ids = [issue.issue_id for issue in wave]
304 state.current_wave = wave_num
305 logger.info(f"\nProcessing wave {wave_num}/{total_waves}: {', '.join(wave_ids)}")
307 wave_note = (
308 contention_notes[wave_num - 1]
309 if contention_notes and wave_num - 1 < len(contention_notes)
310 else None
311 )
312 is_contention_subwave = wave_note is not None
314 if len(wave) == 1 or is_contention_subwave:
315 # Single issue OR contention sub-wave — process in-place sequentially
316 # (contention sub-waves are displayed as "serialized steps" so must run that way)
317 from little_loops.issue_manager import process_issue_inplace
319 wave_failed = False
320 for issue in wave:
321 issue_result = process_issue_inplace(
322 info=issue,
323 config=config,
324 logger=logger,
325 dry_run=args.dry_run,
326 )
327 total_duration += issue_result.duration
328 if issue_result.success:
329 completed.add(issue.issue_id)
330 state.completed_issues.append(issue.issue_id)
331 state.timing[issue.issue_id] = {"total": issue_result.duration}
332 logger.success(f" {issue.issue_id}: completed")
333 elif issue_result.was_blocked:
334 completed.add(issue.issue_id)
335 state.skipped_blocked_issues[issue.issue_id] = issue_result.failure_reason
336 logger.warning(f" {issue.issue_id}: skipped (blocked by open dependency)")
337 else:
338 wave_failed = True
339 completed.add(issue.issue_id)
340 state.failed_issues[issue.issue_id] = "Issue processing failed"
341 logger.warning(f" {issue.issue_id}: failed")
342 if wave_failed:
343 failed_waves += 1
344 logger.warning(f"Wave {wave_num}/{total_waves} had failures")
345 else:
346 logger.success(
347 f"Wave {wave_num}/{total_waves} completed: {', '.join(wave_ids)}"
348 )
349 _save_sprint_state(state, logger)
350 if wave_num < total_waves:
351 logger.info(f"Continuing to wave {wave_num + 1}/{total_waves}...")
352 # Check for shutdown before next wave (ENH-183)
353 if _sprint_shutdown_requested:
354 logger.warning("Shutdown requested - exiting after wave completion")
355 exit_code = 1
356 return exit_code
357 else:
358 # Multi-issue — use ParallelOrchestrator with worktrees
359 only_ids = set(wave_ids)
360 # Detect current branch for rebase/merge operations (BUG-439)
361 _br = subprocess.run(
362 ["git", "rev-parse", "--abbrev-ref", "HEAD"],
363 capture_output=True,
364 text=True,
365 cwd=Path.cwd(),
366 )
367 _base_branch = _br.stdout.strip() if _br.returncode == 0 else "main"
368 # Runtime overlap detection disabled for sprints (ENH-512):
369 # refine_waves_for_contention() already splits overlapping
370 # issues into separate sub-waves before dispatch.
371 parallel_config = config.create_parallel_config(
372 max_workers=min(max_workers, len(wave)),
373 only_ids=only_ids,
374 dry_run=args.dry_run,
375 overlap_detection=False,
376 serialize_overlapping=True,
377 base_branch=_base_branch,
378 clean_start=True, # Sprint manages its own state; don't load stale orchestrator state
379 )
381 orchestrator = ParallelOrchestrator(
382 parallel_config, config, Path.cwd(), wave_label=f"Wave {wave_num}/{total_waves}"
383 )
384 result = orchestrator.run()
385 total_duration += orchestrator.execution_duration
387 # Track completed/failed from this wave using per-issue results
388 actually_completed = set(orchestrator.queue.completed_ids)
389 actually_failed = set(orchestrator.queue.failed_ids)
391 for issue_id in wave_ids:
392 if issue_id in actually_completed:
393 completed.add(issue_id)
394 state.completed_issues.append(issue_id)
395 state.timing[issue_id] = {
396 "total": orchestrator.execution_duration / len(wave)
397 }
398 elif issue_id in actually_failed:
399 completed.add(issue_id)
400 state.failed_issues[issue_id] = "Issue failed during wave execution"
401 # else: issue was neither completed nor failed (interrupted/stranded)
402 # — leave untracked so it can be retried on resume
404 # Sequential retry for failed issues (ENH-308)
405 if actually_failed:
406 logger.info(f"Retrying {len(actually_failed)} failed issue(s) sequentially...")
407 from little_loops.issue_manager import process_issue_inplace
409 retried_ok = 0
410 for issue in wave:
411 if issue.issue_id not in actually_failed:
412 continue
413 logger.info(f" Retrying {issue.issue_id} in-place...")
414 retry_result = process_issue_inplace(
415 info=issue,
416 config=config,
417 logger=logger,
418 dry_run=args.dry_run,
419 )
420 total_duration += retry_result.duration
421 if retry_result.success:
422 retried_ok += 1
423 state.failed_issues.pop(issue.issue_id, None)
424 state.completed_issues.append(issue.issue_id)
425 state.timing[issue.issue_id] = {"total": retry_result.duration}
426 logger.success(f" Retry succeeded: {issue.issue_id}")
427 elif retry_result.was_blocked:
428 state.failed_issues.pop(issue.issue_id, None)
429 state.skipped_blocked_issues[issue.issue_id] = (
430 retry_result.failure_reason
431 )
432 logger.warning(
433 f" Retry skipped: {issue.issue_id} (blocked by open dependency)"
434 )
435 else:
436 logger.warning(f" Retry failed: {issue.issue_id}")
437 if retried_ok > 0:
438 logger.info(
439 f"Sequential retry recovered {retried_ok}/{len(actually_failed)} issue(s)"
440 )
442 # Check whether failures remain after retry (ENH-308)
443 remaining_failures = {iid for iid in actually_failed if iid in state.failed_issues}
444 if result == 0 or not remaining_failures:
445 logger.success(
446 f"Wave {wave_num}/{total_waves} completed: {', '.join(wave_ids)}"
447 )
448 else:
449 failed_waves += 1
450 logger.warning(f"Wave {wave_num}/{total_waves} had failures")
451 _save_sprint_state(state, logger)
452 if wave_num < total_waves:
453 logger.info(f"Continuing to wave {wave_num + 1}/{total_waves}...")
454 # Check for shutdown before next wave (ENH-183)
455 if _sprint_shutdown_requested:
456 logger.warning("Shutdown requested - exiting after wave completion")
457 exit_code = 1
458 return exit_code
460 wave_word = "wave" if len(waves) == 1 else "waves"
461 skip_msg = (
462 f", {len(pre_completed_skipped)} already completed (skipped)"
463 if pre_completed_skipped
464 else ""
465 )
466 blocked_msg = (
467 f", {len(state.skipped_blocked_issues)} skipped (blocked)"
468 if state.skipped_blocked_issues
469 else ""
470 )
471 logger.info(
472 f"\nSprint completed: {len(completed)} issues processed "
473 f"({len(waves)} {wave_word}){skip_msg}{blocked_msg}"
474 )
475 logger.timing(f"Total execution time: {format_duration(total_duration)}")
476 if failed_waves > 0:
477 logger.warning(f"{failed_waves} wave(s) had failures")
478 exit_code = 1
479 else:
480 # Clean up state on successful completion
481 _cleanup_sprint_state(logger)
483 except KeyboardInterrupt:
484 # Belt-and-suspenders with signal handler (ENH-185)
485 logger.warning("Sprint interrupted by user (KeyboardInterrupt)")
486 exit_code = 130
488 except Exception as e:
489 # Catch unexpected exceptions (ENH-185)
490 logger.error(f"Sprint failed unexpectedly: {e}")
491 exit_code = 1
493 finally:
494 # Guaranteed state save on any non-success exit (ENH-185)
495 if exit_code != 0:
496 _save_sprint_state(state, logger)
497 logger.info("State saved before exit")
499 return exit_code