Coverage for little_loops / parallel / orchestrator.py: 8%

568 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""Main orchestrator for parallel issue processing. 

2 

3Coordinates the priority queue, worker pool, and merge coordinator to process 

4multiple issues concurrently. 

5""" 

6 

7from __future__ import annotations 

8 

9import json 

10import os 

11import re 

12import shutil 

13import signal 

14import threading 

15import time 

16from datetime import datetime 

17from pathlib import Path 

18from typing import TYPE_CHECKING, Any 

19 

20from little_loops.issue_parser import IssueInfo 

21from little_loops.logger import Logger, format_duration 

22from little_loops.parallel.git_lock import GitLock 

23from little_loops.parallel.merge_coordinator import MergeCoordinator 

24from little_loops.parallel.overlap_detector import OverlapDetector 

25from little_loops.parallel.priority_queue import IssuePriorityQueue 

26from little_loops.parallel.types import ( 

27 OrchestratorState, 

28 ParallelConfig, 

29 PendingWorktreeInfo, 

30 WorkerResult, 

31) 

32from little_loops.parallel.worker_pool import WorkerPool 

33 

34if TYPE_CHECKING: 

35 from little_loops.config import BRConfig 

36 

37 

38class ParallelOrchestrator: 

39 """Main controller for parallel issue processing. 

40 

41 Coordinates: 

42 - Issue scanning and prioritization 

43 - Worker dispatch (P0 sequential, P1-P5 parallel) 

44 - Merge coordination 

45 - State persistence for resume capability 

46 - Graceful shutdown on signals 

47 

48 Example: 

49 >>> from little_loops.config import BRConfig 

50 >>> from little_loops.parallel import ParallelConfig, ParallelOrchestrator 

51 >>> br_config = BRConfig(Path.cwd()) 

52 >>> parallel_config = ParallelConfig(max_workers=2) 

53 >>> orchestrator = ParallelOrchestrator(parallel_config, br_config) 

54 >>> exit_code = orchestrator.run() 

55 """ 

56 

57 def __init__( 

58 self, 

59 parallel_config: ParallelConfig, 

60 br_config: BRConfig, 

61 repo_path: Path | None = None, 

62 verbose: bool = True, 

63 wave_label: str | None = None, 

64 ) -> None: 

65 """Initialize the orchestrator. 

66 

67 Args: 

68 parallel_config: Parallel processing configuration 

69 br_config: Project configuration 

70 repo_path: Path to the git repository (default: current directory) 

71 verbose: Whether to output progress messages 

72 wave_label: Optional label for wave-based execution (e.g., "Wave 1") 

73 """ 

74 self.parallel_config = parallel_config 

75 self.br_config = br_config 

76 self.repo_path = repo_path or Path.cwd() 

77 self.logger = Logger(verbose=verbose) 

78 self.wave_label = wave_label 

79 self._execution_duration: float = 0.0 

80 

81 # Create shared git lock for serializing main repo operations 

82 # This prevents index.lock race conditions between workers and merge coordinator 

83 self._git_lock = GitLock(self.logger) 

84 

85 # Initialize components with shared git lock 

86 self.queue = IssuePriorityQueue() 

87 self.worker_pool = WorkerPool( 

88 parallel_config, br_config, self.logger, self.repo_path, self._git_lock 

89 ) 

90 self.merge_coordinator = MergeCoordinator( 

91 parallel_config, self.logger, self.repo_path, self._git_lock 

92 ) 

93 

94 # State management 

95 self.state = OrchestratorState() 

96 self._state_lock = threading.Lock() 

97 self._shutdown_requested = False 

98 self._original_sigint: Any = None 

99 self._original_sigterm: Any = None 

100 

101 # Track issue info for lifecycle completion after merge 

102 self._issue_info_by_id: dict[str, IssueInfo] = {} 

103 # Track interrupted issues separately from failures (ENH-036) 

104 self._interrupted_issues: list[str] = [] 

105 

106 # Overlap detection (ENH-143) 

107 self.overlap_detector: OverlapDetector | None = ( 

108 OverlapDetector(config=br_config.dependency_mapping) 

109 if parallel_config.overlap_detection 

110 else None 

111 ) 

112 # Track deferred issues for re-check after active issues complete 

113 self._deferred_issues: list[IssueInfo] = [] 

114 # Track last status report time for progress visibility (ENH-262) 

115 self._last_status_time: float = 0.0 

116 self._last_status_line: str = "" 

117 

118 @property 

119 def execution_duration(self) -> float: 

120 """Return the total execution duration in seconds.""" 

121 return self._execution_duration 

122 

123 def run(self) -> int: 

124 """Run the parallel issue processor. 

125 

126 Returns: 

127 Exit code (0 = success, 1 = failure) 

128 """ 

129 try: 

130 self._setup_signal_handlers() 

131 self._ensure_gitignore_entries() 

132 

133 # Check for pending work from previous runs (unless clean start) 

134 if not self.parallel_config.clean_start: 

135 pending_worktrees = self._check_pending_worktrees() 

136 

137 # Handle pending worktrees based on flags 

138 pending_with_work = [p for p in pending_worktrees if p.has_pending_work] 

139 if pending_with_work: 

140 if self.parallel_config.merge_pending: 

141 self._merge_pending_worktrees(pending_worktrees) 

142 elif not self.parallel_config.ignore_pending: 

143 # Default behavior: just report (cleanup happens below) 

144 self.logger.info( 

145 "Continuing with cleanup (use --merge-pending to merge)..." 

146 ) 

147 

148 self._cleanup_orphaned_worktrees() 

149 self._load_state() 

150 

151 if self.parallel_config.dry_run: 

152 return self._dry_run() 

153 

154 return self._execute() 

155 

156 except KeyboardInterrupt: 

157 self.logger.warning("Interrupted by user") 

158 return 1 

159 except Exception as e: 

160 self.logger.error(f"Fatal error: {e}") 

161 return 1 

162 finally: 

163 self._cleanup() 

164 self._restore_signal_handlers() 

165 

166 def _setup_signal_handlers(self) -> None: 

167 """Setup signal handlers for graceful shutdown.""" 

168 self._original_sigint = signal.signal(signal.SIGINT, self._signal_handler) 

169 self._original_sigterm = signal.signal(signal.SIGTERM, self._signal_handler) 

170 

171 def _restore_signal_handlers(self) -> None: 

172 """Restore original signal handlers.""" 

173 if self._original_sigint is not None: 

174 signal.signal(signal.SIGINT, self._original_sigint) 

175 if self._original_sigterm is not None: 

176 signal.signal(signal.SIGTERM, self._original_sigterm) 

177 

178 def _ensure_gitignore_entries(self) -> None: 

179 """Ensure .gitignore has entries for parallel processing artifacts. 

180 

181 Adds entries for: 

182 - .parallel-manage-state.json (state file) 

183 - .worktrees/ (git worktree directory) 

184 

185 This prevents these files from being tracked by git, which would cause 

186 conflicts during merge operations (state file is continuously updated). 

187 """ 

188 gitignore_path = self.repo_path / ".gitignore" 

189 required_entries = [ 

190 ".parallel-manage-state.json", 

191 ".worktrees/", 

192 ] 

193 

194 existing_content = "" 

195 if gitignore_path.exists(): 

196 existing_content = gitignore_path.read_text() 

197 

198 # Check which entries are missing 

199 missing_entries = [] 

200 for entry in required_entries: 

201 # Check for exact match or pattern that would cover it 

202 if entry not in existing_content: 

203 missing_entries.append(entry) 

204 

205 if not missing_entries: 

206 return 

207 

208 # Append missing entries 

209 addition = "\n# ll-parallel artifacts\n" 

210 for entry in missing_entries: 

211 addition += f"{entry}\n" 

212 

213 # Ensure file ends with newline before adding 

214 if existing_content and not existing_content.endswith("\n"): 

215 addition = "\n" + addition 

216 

217 gitignore_path.write_text(existing_content + addition) 

218 self.logger.info(f"Added {len(missing_entries)} entries to .gitignore") 

219 

220 def _cleanup_orphaned_worktrees(self) -> None: 

221 """Clean up worktrees from previous interrupted runs. 

222 

223 Scans the worktree base directory and removes any worktrees that are 

224 not from the current session. This handles cases where a previous run 

225 was interrupted (Ctrl+C) and worktrees were not cleaned up. 

226 """ 

227 worktree_base = self.repo_path / self.parallel_config.worktree_base 

228 if not worktree_base.exists(): 

229 return 

230 

231 # Get list of worktree directories, skipping those owned by live processes (BUG-579) 

232 orphaned = [] 

233 for item in worktree_base.iterdir(): 

234 if item.is_dir() and item.name.startswith("worker-"): 

235 # Check for a .ll-session-<pid> marker left by an active orchestrator 

236 owned_by_live = False 

237 for marker in item.glob(".ll-session-*"): 

238 try: 

239 pid = int(marker.name.split("-")[-1]) 

240 os.kill(pid, 0) # Signal 0: check if process exists 

241 owned_by_live = True 

242 break 

243 except (ProcessLookupError, ValueError): 

244 pass 

245 except PermissionError: 

246 owned_by_live = True # Process exists, no permission to signal 

247 break 

248 if owned_by_live: 

249 self.logger.info(f"Skipping {item.name}: owned by running process") 

250 continue 

251 orphaned.append(item) 

252 

253 if not orphaned: 

254 return 

255 

256 self.logger.info(f"Cleaning up {len(orphaned)} orphaned worktree(s) from previous run") 

257 

258 for worktree_path in orphaned: 

259 try: 

260 # Try git worktree remove first 

261 self._git_lock.run( 

262 ["worktree", "remove", "--force", str(worktree_path)], 

263 cwd=self.repo_path, 

264 timeout=30, 

265 ) 

266 

267 # If git worktree remove failed, force delete the directory 

268 if worktree_path.exists(): 

269 shutil.rmtree(worktree_path, ignore_errors=True) 

270 

271 # Try to delete the associated branch 

272 # Branch name format: parallel/<issue-id>-<timestamp> 

273 branch_name = worktree_path.name.replace("worker-", "parallel/") 

274 self._git_lock.run( 

275 ["branch", "-D", branch_name], 

276 cwd=self.repo_path, 

277 timeout=10, 

278 ) 

279 except Exception as e: 

280 self.logger.warning(f"Failed to clean up {worktree_path.name}: {e}") 

281 

282 # Also prune git worktree references 

283 self._git_lock.run( 

284 ["worktree", "prune"], 

285 cwd=self.repo_path, 

286 timeout=30, 

287 ) 

288 

289 def _inspect_worktree(self, worktree_path: Path) -> PendingWorktreeInfo | None: 

290 """Inspect a worktree to determine its status. 

291 

292 Args: 

293 worktree_path: Path to the worktree directory 

294 

295 Returns: 

296 PendingWorktreeInfo if inspection succeeded, None if failed 

297 """ 

298 try: 

299 # Extract branch name from worktree path 

300 # worker-bug-045-20260117-143022 -> parallel/bug-045-20260117-143022 

301 branch_name = worktree_path.name.replace("worker-", "parallel/") 

302 

303 # Extract issue ID (e.g., bug-045 -> BUG-045) 

304 # Pattern: worker-<issue-id>-<timestamp> 

305 match = re.match(r"worker-([a-z]+-\d+)-\d{8}-\d{6}", worktree_path.name) 

306 issue_id = match.group(1).upper() if match else worktree_path.name 

307 

308 # Check commits ahead of main 

309 result = self._git_lock.run( 

310 ["rev-list", "--count", f"{self.parallel_config.base_branch}..{branch_name}"], 

311 cwd=self.repo_path, 

312 timeout=10, 

313 ) 

314 commits_ahead = int(result.stdout.strip()) if result.returncode == 0 else 0 

315 

316 # Check for uncommitted changes in worktree 

317 result = self._git_lock.run( 

318 ["status", "--porcelain"], 

319 cwd=worktree_path, 

320 timeout=10, 

321 ) 

322 changed_files = [] 

323 has_uncommitted = False 

324 if result.returncode == 0 and result.stdout.strip(): 

325 has_uncommitted = True 

326 changed_files = [line[3:] for line in result.stdout.strip().split("\n") if line] 

327 

328 return PendingWorktreeInfo( 

329 worktree_path=worktree_path, 

330 branch_name=branch_name, 

331 issue_id=issue_id, 

332 commits_ahead=commits_ahead, 

333 has_uncommitted_changes=has_uncommitted, 

334 changed_files=changed_files, 

335 ) 

336 except Exception as e: 

337 self.logger.warning(f"Failed to inspect worktree {worktree_path.name}: {e}") 

338 return None 

339 

340 def _check_pending_worktrees(self) -> list[PendingWorktreeInfo]: 

341 """Check for pending worktrees from previous runs and report status. 

342 

343 Returns: 

344 List of pending worktree information 

345 """ 

346 worktree_base = self.repo_path / self.parallel_config.worktree_base 

347 if not worktree_base.exists(): 

348 return [] 

349 

350 # Find all worker directories 

351 worktrees = [ 

352 item 

353 for item in worktree_base.iterdir() 

354 if item.is_dir() and item.name.startswith("worker-") 

355 ] 

356 

357 if not worktrees: 

358 return [] 

359 

360 self.logger.info("Checking for pending work from previous runs...") 

361 

362 # Inspect each worktree 

363 pending_info: list[PendingWorktreeInfo] = [] 

364 for worktree_path in worktrees: 

365 info = self._inspect_worktree(worktree_path) 

366 if info: 

367 pending_info.append(info) 

368 

369 # Report findings 

370 with_work = [p for p in pending_info if p.has_pending_work] 

371 if with_work: 

372 self.logger.warning(f"Found {len(with_work)} worktree(s) with pending work:") 

373 for info in with_work: 

374 status_parts = [] 

375 if info.commits_ahead > 0: 

376 status_parts.append(f"{info.commits_ahead} commit(s) ahead of main") 

377 if info.has_uncommitted_changes: 

378 status_parts.append(f"{len(info.changed_files)} uncommitted file(s)") 

379 status = ", ".join(status_parts) 

380 self.logger.warning(f" - {info.worktree_path.name}: {info.issue_id} ({status})") 

381 

382 self.logger.info("") 

383 self.logger.info("Options:") 

384 self.logger.info(" --merge-pending Attempt to merge pending work before continuing") 

385 self.logger.info(" --clean-start Remove all worktrees and start fresh") 

386 self.logger.info( 

387 " --ignore-pending Continue without action (worktrees will be cleaned up)" 

388 ) 

389 elif pending_info: 

390 self.logger.info(f"Found {len(pending_info)} orphaned worktree(s) with no pending work") 

391 

392 return pending_info 

393 

394 def _merge_pending_worktrees(self, pending: list[PendingWorktreeInfo]) -> None: 

395 """Attempt to merge pending worktrees from previous runs. 

396 

397 Args: 

398 pending: List of pending worktree information 

399 """ 

400 with_work = [p for p in pending if p.has_pending_work] 

401 if not with_work: 

402 return 

403 

404 self.logger.info(f"Attempting to merge {len(with_work)} pending worktree(s)...") 

405 

406 for info in with_work: 

407 try: 

408 # If there are uncommitted changes, commit them first 

409 if info.has_uncommitted_changes: 

410 self.logger.info(f" Committing uncommitted changes in {info.issue_id}...") 

411 self._git_lock.run( 

412 ["add", "-A"], 

413 cwd=info.worktree_path, 

414 timeout=30, 

415 ) 

416 self._git_lock.run( 

417 [ 

418 "commit", 

419 "-m", 

420 f"WIP: Auto-commit from interrupted session for {info.issue_id}", 

421 ], 

422 cwd=info.worktree_path, 

423 timeout=30, 

424 ) 

425 

426 # Attempt merge 

427 self.logger.info(f" Merging {info.issue_id} ({info.branch_name})...") 

428 result = self._git_lock.run( 

429 [ 

430 "merge", 

431 "--no-ff", 

432 info.branch_name, 

433 "-m", 

434 f"Merge pending work for {info.issue_id}", 

435 ], 

436 cwd=self.repo_path, 

437 timeout=60, 

438 ) 

439 

440 if result.returncode == 0: 

441 self.logger.success(f" Successfully merged {info.issue_id}") 

442 # Clean up the worktree after successful merge 

443 self._git_lock.run( 

444 ["worktree", "remove", "--force", str(info.worktree_path)], 

445 cwd=self.repo_path, 

446 timeout=30, 

447 ) 

448 self._git_lock.run( 

449 ["branch", "-D", info.branch_name], 

450 cwd=self.repo_path, 

451 timeout=10, 

452 ) 

453 else: 

454 self.logger.warning(f" Failed to merge {info.issue_id}: {result.stderr}") 

455 # Abort the merge if it failed 

456 self._git_lock.run( 

457 ["merge", "--abort"], 

458 cwd=self.repo_path, 

459 timeout=10, 

460 ) 

461 

462 except Exception as e: 

463 self.logger.warning(f" Error merging {info.issue_id}: {e}") 

464 

465 def _signal_handler(self, signum: int, frame: object) -> None: 

466 """Handle shutdown signals gracefully.""" 

467 self._shutdown_requested = True 

468 # Propagate to worker pool for interrupted worker detection (ENH-036) 

469 self.worker_pool.set_shutdown_requested(True) 

470 self.logger.warning(f"Received signal {signum}, shutting down gracefully...") 

471 

472 def _load_state(self) -> None: 

473 """Load state from file for resume capability.""" 

474 if self.parallel_config.clean_start: 

475 self.state.started_at = datetime.now().isoformat() 

476 return 

477 state_file = self.repo_path / self.parallel_config.state_file 

478 if not state_file.exists(): 

479 self.state.started_at = datetime.now().isoformat() 

480 return 

481 

482 try: 

483 data = json.loads(state_file.read_text()) 

484 self.state = OrchestratorState.from_dict(data) 

485 

486 # Restore queue state 

487 self.queue.load_completed(self.state.completed_issues) 

488 self.queue.load_failed(self.state.failed_issues.keys()) 

489 

490 self.logger.info( 

491 f"Resumed from previous state: " 

492 f"{len(self.state.completed_issues)} completed, " 

493 f"{len(self.state.failed_issues)} failed" 

494 ) 

495 except Exception as e: 

496 self.logger.warning(f"Could not load state: {e}") 

497 self.state.started_at = datetime.now().isoformat() 

498 

499 def _save_state(self) -> None: 

500 """Save current state to file.""" 

501 with self._state_lock: 

502 self.state.last_checkpoint = datetime.now().isoformat() 

503 self.state.completed_issues = self.queue.completed_ids 

504 self.state.failed_issues = dict.fromkeys(self.queue.failed_ids, "Failed") 

505 self.state.in_progress_issues = self.queue.in_progress_ids 

506 

507 state_file = self.repo_path / self.parallel_config.state_file 

508 state_file.write_text(json.dumps(self.state.to_dict(), indent=2)) 

509 

510 def _cleanup_state(self) -> None: 

511 """Remove state file on successful completion.""" 

512 state_file = self.repo_path / self.parallel_config.state_file 

513 if state_file.exists(): 

514 state_file.unlink() 

515 

516 def _dry_run(self) -> int: 

517 """Preview what would be processed without executing. 

518 

519 Returns: 

520 Exit code (always 0 for dry run) 

521 """ 

522 issues = self._scan_issues() 

523 

524 self.logger.info("=" * 60) 

525 self.logger.info("DRY RUN - No changes will be made") 

526 self.logger.info("=" * 60) 

527 self.logger.info("") 

528 

529 if not issues: 

530 self.logger.info("No issues found matching criteria") 

531 return 0 

532 

533 self.logger.info(f"Found {len(issues)} issues to process:") 

534 self.logger.info("") 

535 

536 # Group by priority 

537 by_priority: dict[str, list[IssueInfo]] = {} 

538 for issue in issues: 

539 by_priority.setdefault(issue.priority, []).append(issue) 

540 

541 for priority in IssuePriorityQueue.DEFAULT_PRIORITIES: 

542 if priority not in by_priority: 

543 continue 

544 

545 priority_issues = by_priority[priority] 

546 self.logger.info(f" {priority} ({len(priority_issues)} issues):") 

547 for issue in priority_issues: 

548 mode = ( 

549 "sequential" 

550 if priority == "P0" and self.parallel_config.p0_sequential 

551 else "parallel" 

552 ) 

553 self.logger.info(f" - {issue.issue_id}: {issue.title} [{mode}]") 

554 

555 self.logger.info("") 

556 self.logger.info("Configuration:") 

557 self.logger.info(f" Workers: {self.parallel_config.max_workers}") 

558 self.logger.info(f" P0 Sequential: {self.parallel_config.p0_sequential}") 

559 self.logger.info(f" Max Issues: {self.parallel_config.max_issues or 'unlimited'}") 

560 self.logger.info(f" Command Prefix: {self.parallel_config.command_prefix}") 

561 

562 return 0 

563 

564 def _maybe_report_status(self) -> None: 

565 """Report status if enough time has elapsed since last report. 

566 

567 Reports every 5 seconds during active processing for progress visibility (ENH-262). 

568 Suppresses duplicate lines when nothing has changed. 

569 """ 

570 now = time.time() 

571 # Report every 5 seconds 

572 if now - self._last_status_time < 5.0: 

573 return 

574 

575 self._last_status_time = now 

576 

577 # Build status line 

578 parts = [] 

579 

580 # Add wave label if present 

581 if self.wave_label: 

582 parts.append(f"{self.wave_label}") 

583 

584 # Get queue counts 

585 in_progress = len(self.queue.in_progress_ids) 

586 completed = self.queue.completed_count 

587 failed = self.queue.failed_count 

588 pending_merge = self.merge_coordinator.pending_count 

589 

590 parts.append(f"Active: {in_progress}") 

591 parts.append(f"Done: {completed}") 

592 if failed > 0: 

593 parts.append(f"Failed: {failed}") 

594 if pending_merge > 0: 

595 parts.append(f"Merging: {pending_merge}") 

596 

597 # Build status line 

598 status = " | ".join(parts) 

599 

600 # Get active worker stages 

601 active_stages = self.worker_pool.get_active_stages() 

602 

603 # Add worker details if any are active 

604 if active_stages: 

605 # Group by stage 

606 by_stage: dict[str, list[str]] = {} 

607 for issue_id, worker_stage in active_stages.items(): 

608 stage_name = worker_stage.value.title() 

609 by_stage.setdefault(stage_name, []).append(issue_id) 

610 

611 stage_parts = [] 

612 for stage_name in ["Validating", "Implementing", "Verifying", "Merging"]: 

613 if stage_name in by_stage: 

614 issue_ids = ", ".join(by_stage[stage_name]) 

615 stage_parts.append(f"{stage_name}: [{issue_ids}]") 

616 

617 if stage_parts: 

618 status += " | " + " | ".join(stage_parts) 

619 

620 # Skip if nothing changed since last report 

621 if status == self._last_status_line: 

622 return 

623 self._last_status_line = status 

624 

625 # Log with gray color to distinguish from normal logs 

626 if self.logger.use_color: 

627 color = self.logger.GRAY 

628 ts = self.logger._timestamp() 

629 print(f"{color}[{ts}]{self.logger.RESET} {status}") 

630 else: 

631 self.logger.info(status) 

632 

633 def _execute(self) -> int: 

634 """Execute parallel issue processing. 

635 

636 Returns: 

637 Exit code (0 = success, 1 = failure) 

638 """ 

639 start_time = time.time() 

640 

641 # Scan and queue issues 

642 issues = self._scan_issues() 

643 if not issues: 

644 self.logger.info("No issues to process") 

645 return 0 

646 

647 # Store issue info for lifecycle completion after merge 

648 for issue in issues: 

649 self._issue_info_by_id[issue.issue_id] = issue 

650 

651 added = self.queue.add_many(issues) 

652 self.logger.info(f"Queued {added} issues for processing") 

653 

654 # Start components 

655 self.worker_pool.start() 

656 self.merge_coordinator.start() 

657 

658 # Process issues 

659 issues_processed = 0 

660 max_issues = self.parallel_config.max_issues or float("inf") 

661 

662 while not self._shutdown_requested: 

663 # Check if done 

664 if self.queue.empty() and self.worker_pool.active_count == 0: 

665 # Wait for pending merges 

666 if self.merge_coordinator.pending_count == 0: 

667 break 

668 

669 # Check max issues limit 

670 if issues_processed >= max_issues: 

671 self.logger.info(f"Reached max issues limit ({max_issues})") 

672 break 

673 

674 # Get next issue if workers available 

675 if self.worker_pool.active_count < self.parallel_config.max_workers: 

676 queued = self.queue.get(block=False) 

677 if queued: 

678 issue = queued.issue_info 

679 

680 # P0 sequential processing 

681 if issue.priority == "P0" and self.parallel_config.p0_sequential: 

682 self._process_sequential(issue) 

683 else: 

684 self._process_parallel(issue) 

685 

686 issues_processed += 1 

687 

688 # Save state periodically 

689 self._save_state() 

690 

691 # Report status periodically for progress visibility (ENH-262) 

692 self._maybe_report_status() 

693 

694 # Small sleep to prevent busy loop 

695 time.sleep(0.1) 

696 

697 # Wait for completion 

698 self._wait_for_completion() 

699 

700 # Report results 

701 self._report_results(start_time) 

702 

703 # Cleanup state on success 

704 if not self._shutdown_requested and self.queue.failed_count == 0: 

705 self._cleanup_state() 

706 

707 return 0 if self.queue.failed_count == 0 else 1 

708 

709 def _scan_issues(self) -> list[IssueInfo]: 

710 """Scan for issues matching criteria. 

711 

712 Returns: 

713 List of issues sorted by priority 

714 """ 

715 # Combine skip_ids from state and config 

716 skip_ids = set(self.state.completed_issues) | set(self.state.failed_issues.keys()) 

717 if self.parallel_config.skip_ids: 

718 skip_ids |= self.parallel_config.skip_ids 

719 

720 issues = IssuePriorityQueue.scan_issues( 

721 self.br_config, 

722 priority_filter=list(self.parallel_config.priority_filter), 

723 skip_ids=skip_ids, 

724 only_ids=self.parallel_config.only_ids, 

725 type_prefixes=self.parallel_config.type_prefixes, 

726 ) 

727 

728 # Apply max issues limit 

729 if self.parallel_config.max_issues > 0: 

730 issues = issues[: self.parallel_config.max_issues] 

731 

732 return issues 

733 

734 def _process_sequential(self, issue: IssueInfo) -> None: 

735 """Process an issue sequentially (blocking). 

736 

737 Args: 

738 issue: Issue to process 

739 """ 

740 self.logger.info(f"Processing {issue.issue_id} sequentially (P0)") 

741 

742 # Wait for any parallel work to finish 

743 while self.worker_pool.active_count > 0: 

744 time.sleep(0.5) 

745 

746 # Process in main repo (no worktree isolation needed) 

747 # Note: No callback here - _merge_sequential handles the result explicitly 

748 # to avoid double-processing (callback would also queue merge/close) 

749 future = self.worker_pool.submit(issue) 

750 

751 # Wait for completion 

752 try: 

753 result = future.result(timeout=self.parallel_config.timeout_per_issue) 

754 if result.success: 

755 # Merge immediately for P0 

756 self._merge_sequential(result) 

757 except Exception as e: 

758 self.logger.error(f"Sequential processing failed: {e}") 

759 self.queue.mark_failed(issue.issue_id) 

760 

761 def _process_parallel(self, issue: IssueInfo) -> None: 

762 """Process an issue in parallel (non-blocking). 

763 

764 Args: 

765 issue: Issue to process 

766 """ 

767 # Check for overlaps if enabled (ENH-143) 

768 if self.overlap_detector: 

769 overlap = self.overlap_detector.check_overlap(issue) 

770 if overlap: 

771 if self.parallel_config.serialize_overlapping: 

772 self.logger.warning( 

773 f"Deferring {issue.issue_id} - overlaps with {overlap.overlapping_issues}" 

774 ) 

775 # Track for re-check when active issues complete 

776 self._deferred_issues.append(issue) 

777 return 

778 else: 

779 self.logger.warning( 

780 f"Warning: {issue.issue_id} may conflict with {overlap.overlapping_issues}" 

781 ) 

782 

783 # Register as active before dispatch 

784 self.overlap_detector.register_issue(issue) 

785 

786 self.logger.info(f"Dispatching {issue.issue_id} to worker pool") 

787 self.worker_pool.submit(issue, self._on_worker_complete) 

788 

789 def _on_worker_complete(self, result: WorkerResult) -> None: 

790 """Callback when a worker completes. 

791 

792 Args: 

793 result: Result from the worker 

794 """ 

795 # Unregister from overlap tracking (ENH-143) 

796 if self.overlap_detector: 

797 self.overlap_detector.unregister_issue(result.issue_id) 

798 # Re-queue deferred issues that were waiting on this one 

799 self._requeue_deferred_issues() 

800 

801 # Handle interrupted workers (not counted as failed) - ENH-036 

802 if result.interrupted: 

803 self.logger.info(f"{result.issue_id} was interrupted during shutdown (can retry)") 

804 self._interrupted_issues.append(result.issue_id) 

805 # Don't mark as failed - they can be retried on next run 

806 return 

807 

808 # Handle issue closure (no merge needed) 

809 if result.should_close: 

810 # Lazy import to avoid circular dependency 

811 from little_loops.issue_lifecycle import close_issue 

812 

813 self.logger.info(f"{result.issue_id} should be closed: {result.close_status}") 

814 info = self._issue_info_by_id.get(result.issue_id) 

815 if info: 

816 if close_issue( 

817 info, 

818 self.br_config, 

819 self.logger, 

820 result.close_reason, 

821 result.close_status, 

822 ): 

823 self.queue.mark_completed(result.issue_id) 

824 else: 

825 self.queue.mark_failed(result.issue_id) 

826 else: 

827 self.logger.warning(f"No issue info found for {result.issue_id}") 

828 self.queue.mark_failed(result.issue_id) 

829 elif result.success: 

830 self.logger.success( 

831 f"{result.issue_id} completed in {format_duration(result.duration)}" 

832 ) 

833 if result.was_corrected: 

834 self.logger.info(f"{result.issue_id} was auto-corrected during validation") 

835 # Log and store corrections for pattern analysis (ENH-010) 

836 for correction in result.corrections: 

837 self.logger.info(f" Correction: {correction}") 

838 if result.corrections: 

839 with self._state_lock: 

840 self.state.corrections[result.issue_id] = result.corrections 

841 self.merge_coordinator.queue_merge(result) 

842 # Wait for merge to complete before returning from callback. 

843 # This prevents dispatch of next worker while merge is in progress, 

844 # avoiding race conditions between worktree creation and merge ops. 

845 # (BUG-140: Race condition between worktree creation and merge) 

846 self.merge_coordinator.wait_for_completion(timeout=120) 

847 else: 

848 self.logger.error(f"{result.issue_id} failed: {result.error}") 

849 self.queue.mark_failed(result.issue_id) 

850 

851 # Update timing 

852 with self._state_lock: 

853 self.state.timing[result.issue_id] = { 

854 "total": result.duration, 

855 } 

856 

857 # Clean up stage tracking after callback completes (ENH-262) 

858 # Delay briefly so status reporter can show completion 

859 self.worker_pool.remove_worker_stage(result.issue_id) 

860 

861 def _requeue_deferred_issues(self) -> None: 

862 """Re-queue deferred issues that no longer have overlaps (ENH-143).""" 

863 if not self._deferred_issues: 

864 return 

865 

866 # Check each deferred issue for remaining overlaps 

867 still_deferred = [] 

868 for issue in self._deferred_issues: 

869 if self.overlap_detector: 

870 overlap = self.overlap_detector.check_overlap(issue) 

871 if overlap: 

872 # Still has overlaps, keep deferred 

873 still_deferred.append(issue) 

874 else: 

875 # No more overlaps, add back to queue 

876 self.logger.info(f"Re-queuing {issue.issue_id} - no longer overlapping") 

877 self.queue.add(issue) 

878 

879 self._deferred_issues = still_deferred 

880 

881 def _merge_sequential(self, result: WorkerResult) -> None: 

882 """Merge a sequential (P0) result immediately. 

883 

884 Args: 

885 result: Result to merge 

886 """ 

887 # Handle closure for sequential issues 

888 if result.should_close: 

889 # Lazy import to avoid circular dependency 

890 from little_loops.issue_lifecycle import close_issue 

891 

892 info = self._issue_info_by_id.get(result.issue_id) 

893 if info and close_issue( 

894 info, 

895 self.br_config, 

896 self.logger, 

897 result.close_reason, 

898 result.close_status, 

899 ): 

900 self.queue.mark_completed(result.issue_id) 

901 else: 

902 self.queue.mark_failed(result.issue_id) 

903 return 

904 

905 self.merge_coordinator.queue_merge(result) 

906 # Wait for this specific merge 

907 self.merge_coordinator.wait_for_completion(timeout=60) 

908 

909 if result.issue_id in self.merge_coordinator.merged_ids: 

910 self.queue.mark_completed(result.issue_id) 

911 self._complete_issue_lifecycle_if_needed(result.issue_id) 

912 else: 

913 self.queue.mark_failed(result.issue_id) 

914 

915 def _wait_for_completion(self) -> None: 

916 """Wait for all workers and merges to complete.""" 

917 self.logger.info("Waiting for workers to complete...") 

918 

919 # Calculate timeout 

920 if self.parallel_config.orchestrator_timeout > 0: 

921 timeout = self.parallel_config.orchestrator_timeout 

922 else: 

923 timeout = self.parallel_config.timeout_per_issue * self.parallel_config.max_workers 

924 

925 start = time.time() 

926 while self.worker_pool.active_count > 0: 

927 if time.time() - start > timeout: 

928 self.logger.warning(f"Timeout waiting for workers after {timeout}s") 

929 self.worker_pool.terminate_all_processes() 

930 break 

931 time.sleep(1.0) 

932 

933 # Wait for merges 

934 self.logger.info("Waiting for pending merges...") 

935 self.merge_coordinator.wait_for_completion(timeout=120) 

936 

937 # Update queue with merge results and complete lifecycle 

938 for issue_id in self.merge_coordinator.merged_ids: 

939 self.queue.mark_completed(issue_id) 

940 self._complete_issue_lifecycle_if_needed(issue_id) 

941 

942 for issue_id in self.merge_coordinator.failed_merges: 

943 self.queue.mark_failed(issue_id) 

944 

945 def _report_results(self, start_time: float) -> None: 

946 """Report processing results. 

947 

948 Args: 

949 start_time: When processing started 

950 """ 

951 total_time = time.time() - start_time 

952 self._execution_duration = total_time 

953 

954 self.logger.info("") 

955 self.logger.info("=" * 60) 

956 if self.wave_label: 

957 self.logger.info(f"{self.wave_label.upper()} PROCESSING COMPLETE") 

958 else: 

959 self.logger.info("PARALLEL ISSUE PROCESSING COMPLETE") 

960 self.logger.info("=" * 60) 

961 self.logger.info("") 

962 self.logger.timing(f"Total time: {format_duration(total_time)}") 

963 self.logger.info(f"Completed: {self.queue.completed_count}") 

964 self.logger.info(f"Failed: {self.queue.failed_count}") 

965 if self._interrupted_issues: 

966 self.logger.info(f"Interrupted: {len(self._interrupted_issues)}") 

967 

968 with self._state_lock: 

969 timing_snapshot = dict(self.state.timing) 

970 corrections_snapshot = dict(self.state.corrections) 

971 

972 if self.queue.completed_count > 0: 

973 total_issue_time = sum(t.get("total", 0) for t in timing_snapshot.values()) 

974 if total_issue_time > 0: 

975 speedup = total_issue_time / total_time 

976 self.logger.info(f"Estimated speedup: {speedup:.2f}x") 

977 

978 if self.queue.failed_ids: 

979 self.logger.info("") 

980 self.logger.warning("Failed issues:") 

981 for issue_id in self.queue.failed_ids: 

982 self.logger.warning(f" - {issue_id}") 

983 

984 # Report interrupted issues separately (ENH-036) 

985 if self._interrupted_issues: 

986 self.logger.info("") 

987 self.logger.info(f"Interrupted: {len(self._interrupted_issues)} (can retry)") 

988 for issue_id in self._interrupted_issues: 

989 self.logger.info(f" - {issue_id}") 

990 

991 # Report correction statistics for quality tracking (ENH-010) 

992 if corrections_snapshot: 

993 total_corrected = len(corrections_snapshot) 

994 total_issues = self.queue.completed_count + self.queue.failed_count 

995 correction_rate = (total_corrected / total_issues * 100) if total_issues > 0 else 0 

996 self.logger.info("") 

997 self.logger.info( 

998 f"Auto-corrections: {total_corrected}/{total_issues} ({correction_rate:.1f}%)" 

999 ) 

1000 

1001 # Group corrections by category (ENH-010 fourth fix) 

1002 from collections import Counter, defaultdict 

1003 

1004 all_corrections: list[str] = [] 

1005 by_category: dict[str, int] = defaultdict(int) 

1006 for corrections in corrections_snapshot.values(): 

1007 all_corrections.extend(corrections) 

1008 for correction in corrections: 

1009 # Extract category from [category] prefix if present 

1010 if correction.startswith("[") and "]" in correction: 

1011 category = correction[1 : correction.index("]")] 

1012 by_category[category] += 1 

1013 else: 

1014 by_category["uncategorized"] += 1 

1015 

1016 # Log corrections by type/category 

1017 if by_category: 

1018 self.logger.info("Corrections by type:") 

1019 for category, count in sorted(by_category.items(), key=lambda x: -x[1]): 

1020 self.logger.info(f" - {category}: {count}") 

1021 

1022 # Log most common individual corrections 

1023 if all_corrections: 

1024 common = Counter(all_corrections).most_common(3) 

1025 self.logger.info("Most common corrections:") 

1026 for correction, count in common: 

1027 # Truncate long correction descriptions 

1028 display = correction[:60] + "..." if len(correction) > 60 else correction 

1029 self.logger.info(f" - {display}: {count}") 

1030 

1031 # Report stash pop warnings (local changes need manual recovery) 

1032 stash_warnings = self.merge_coordinator.stash_pop_failures 

1033 if stash_warnings: 

1034 self.logger.info("") 

1035 self.logger.warning("Stash recovery warnings (local changes need manual restoration):") 

1036 for issue_id, message in stash_warnings.items(): 

1037 self.logger.warning(f" - {issue_id}: {message}") 

1038 self.logger.warning("") 

1039 self.logger.warning( 

1040 "To recover: Run 'git stash list' to find your changes, " 

1041 "then 'git stash pop' or 'git stash apply stash@{N}'" 

1042 ) 

1043 

1044 def _complete_issue_lifecycle_if_needed(self, issue_id: str) -> bool: 

1045 """Complete issue lifecycle if the issue file wasn't moved during merge. 

1046 

1047 Args: 

1048 issue_id: ID of the issue to complete 

1049 

1050 Returns: 

1051 True if lifecycle was completed (or already complete), False on error 

1052 """ 

1053 info = self._issue_info_by_id.get(issue_id) 

1054 if not info: 

1055 self.logger.warning(f"No issue info found for {issue_id}") 

1056 return False 

1057 

1058 original_path = info.path 

1059 completed_dir = self.br_config.get_completed_dir() 

1060 completed_path = completed_dir / original_path.name 

1061 

1062 # Check if already moved to completed 

1063 if completed_path.exists(): 

1064 return True 

1065 

1066 # Check if still in original location 

1067 if not original_path.exists(): 

1068 return True 

1069 

1070 # Issue file still in original location - complete lifecycle 

1071 self.logger.info(f"Completing lifecycle for {issue_id} (merged but file not moved)") 

1072 

1073 try: 

1074 completed_dir.mkdir(parents=True, exist_ok=True) 

1075 

1076 # Read original content 

1077 content = original_path.read_text() 

1078 

1079 # Add resolution section if not already present 

1080 if "## Resolution" not in content: 

1081 action = self.br_config.get_category_action(info.issue_type) 

1082 resolution = f""" 

1083 

1084--- 

1085 

1086## Resolution 

1087 

1088- **Action**: {action} 

1089- **Completed**: {datetime.now().strftime("%Y-%m-%d")} 

1090- **Status**: Completed (parallel merge fallback) 

1091- **Implementation**: Merged from parallel worker branch 

1092 

1093### Changes Made 

1094- See git history for implementation details 

1095 

1096### Verification Results 

1097- Work verification passed before merge 

1098 

1099### Commits 

1100- See `git log --oneline` for merge commit details 

1101""" 

1102 content += resolution 

1103 

1104 # Use git mv if possible (before writing content to avoid "destination exists") 

1105 result = self._git_lock.run( 

1106 ["mv", str(original_path), str(completed_path)], 

1107 cwd=self.repo_path, 

1108 ) 

1109 

1110 if result.returncode != 0: 

1111 # git mv failed (destination may exist or other error) 

1112 self.logger.warning(f"git mv failed for {issue_id}: {result.stderr}") 

1113 # Write content to destination (may overwrite existing) 

1114 completed_path.write_text(content) 

1115 # Remove source if it still exists 

1116 original_path.unlink(missing_ok=True) 

1117 else: 

1118 # git mv succeeded, write updated content 

1119 completed_path.write_text(content) 

1120 

1121 # Stage and commit 

1122 self._git_lock.run( 

1123 ["add", "-A"], 

1124 cwd=self.repo_path, 

1125 ) 

1126 

1127 action = self.br_config.get_category_action(info.issue_type) 

1128 commit_msg = f"""{action}({info.issue_type}): complete {issue_id} lifecycle 

1129 

1130Parallel merge fallback - issue file moved to completed. 

1131 

1132Issue: {issue_id} 

1133Type: {info.issue_type} 

1134Title: {info.title} 

1135""" 

1136 commit_result = self._git_lock.run( 

1137 ["commit", "-m", commit_msg], 

1138 cwd=self.repo_path, 

1139 ) 

1140 

1141 if commit_result.returncode != 0: 

1142 if "nothing to commit" not in commit_result.stdout.lower(): 

1143 self.logger.warning(f"git commit failed: {commit_result.stderr}") 

1144 else: 

1145 commit_hash_match = re.search(r"\[[\w-]+\s+([a-f0-9]+)\]", commit_result.stdout) 

1146 if commit_hash_match: 

1147 self.logger.success( 

1148 f"Completed lifecycle for {issue_id}: {commit_hash_match.group(1)}" 

1149 ) 

1150 else: 

1151 self.logger.success(f"Completed lifecycle for {issue_id}") 

1152 

1153 return True 

1154 

1155 except Exception as e: 

1156 self.logger.error(f"Failed to complete lifecycle for {issue_id}: {e}") 

1157 return False 

1158 

1159 def _cleanup(self) -> None: 

1160 """Clean up resources.""" 

1161 self.logger.info("Cleaning up...") 

1162 

1163 # Save final state 

1164 self._save_state() 

1165 

1166 # Shutdown components 

1167 self.worker_pool.shutdown(wait=True) 

1168 self.merge_coordinator.shutdown(wait=True, timeout=30) 

1169 

1170 # Clean up worktrees if not interrupted 

1171 if not self._shutdown_requested: 

1172 self.worker_pool.cleanup_all_worktrees()