Coverage for little_loops / parallel / orchestrator.py: 8%
568 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""Main orchestrator for parallel issue processing.
3Coordinates the priority queue, worker pool, and merge coordinator to process
4multiple issues concurrently.
5"""
7from __future__ import annotations
9import json
10import os
11import re
12import shutil
13import signal
14import threading
15import time
16from datetime import datetime
17from pathlib import Path
18from typing import TYPE_CHECKING, Any
20from little_loops.issue_parser import IssueInfo
21from little_loops.logger import Logger, format_duration
22from little_loops.parallel.git_lock import GitLock
23from little_loops.parallel.merge_coordinator import MergeCoordinator
24from little_loops.parallel.overlap_detector import OverlapDetector
25from little_loops.parallel.priority_queue import IssuePriorityQueue
26from little_loops.parallel.types import (
27 OrchestratorState,
28 ParallelConfig,
29 PendingWorktreeInfo,
30 WorkerResult,
31)
32from little_loops.parallel.worker_pool import WorkerPool
34if TYPE_CHECKING:
35 from little_loops.config import BRConfig
38class ParallelOrchestrator:
39 """Main controller for parallel issue processing.
41 Coordinates:
42 - Issue scanning and prioritization
43 - Worker dispatch (P0 sequential, P1-P5 parallel)
44 - Merge coordination
45 - State persistence for resume capability
46 - Graceful shutdown on signals
48 Example:
49 >>> from little_loops.config import BRConfig
50 >>> from little_loops.parallel import ParallelConfig, ParallelOrchestrator
51 >>> br_config = BRConfig(Path.cwd())
52 >>> parallel_config = ParallelConfig(max_workers=2)
53 >>> orchestrator = ParallelOrchestrator(parallel_config, br_config)
54 >>> exit_code = orchestrator.run()
55 """
57 def __init__(
58 self,
59 parallel_config: ParallelConfig,
60 br_config: BRConfig,
61 repo_path: Path | None = None,
62 verbose: bool = True,
63 wave_label: str | None = None,
64 ) -> None:
65 """Initialize the orchestrator.
67 Args:
68 parallel_config: Parallel processing configuration
69 br_config: Project configuration
70 repo_path: Path to the git repository (default: current directory)
71 verbose: Whether to output progress messages
72 wave_label: Optional label for wave-based execution (e.g., "Wave 1")
73 """
74 self.parallel_config = parallel_config
75 self.br_config = br_config
76 self.repo_path = repo_path or Path.cwd()
77 self.logger = Logger(verbose=verbose)
78 self.wave_label = wave_label
79 self._execution_duration: float = 0.0
81 # Create shared git lock for serializing main repo operations
82 # This prevents index.lock race conditions between workers and merge coordinator
83 self._git_lock = GitLock(self.logger)
85 # Initialize components with shared git lock
86 self.queue = IssuePriorityQueue()
87 self.worker_pool = WorkerPool(
88 parallel_config, br_config, self.logger, self.repo_path, self._git_lock
89 )
90 self.merge_coordinator = MergeCoordinator(
91 parallel_config, self.logger, self.repo_path, self._git_lock
92 )
94 # State management
95 self.state = OrchestratorState()
96 self._state_lock = threading.Lock()
97 self._shutdown_requested = False
98 self._original_sigint: Any = None
99 self._original_sigterm: Any = None
101 # Track issue info for lifecycle completion after merge
102 self._issue_info_by_id: dict[str, IssueInfo] = {}
103 # Track interrupted issues separately from failures (ENH-036)
104 self._interrupted_issues: list[str] = []
106 # Overlap detection (ENH-143)
107 self.overlap_detector: OverlapDetector | None = (
108 OverlapDetector(config=br_config.dependency_mapping)
109 if parallel_config.overlap_detection
110 else None
111 )
112 # Track deferred issues for re-check after active issues complete
113 self._deferred_issues: list[IssueInfo] = []
114 # Track last status report time for progress visibility (ENH-262)
115 self._last_status_time: float = 0.0
116 self._last_status_line: str = ""
118 @property
119 def execution_duration(self) -> float:
120 """Return the total execution duration in seconds."""
121 return self._execution_duration
123 def run(self) -> int:
124 """Run the parallel issue processor.
126 Returns:
127 Exit code (0 = success, 1 = failure)
128 """
129 try:
130 self._setup_signal_handlers()
131 self._ensure_gitignore_entries()
133 # Check for pending work from previous runs (unless clean start)
134 if not self.parallel_config.clean_start:
135 pending_worktrees = self._check_pending_worktrees()
137 # Handle pending worktrees based on flags
138 pending_with_work = [p for p in pending_worktrees if p.has_pending_work]
139 if pending_with_work:
140 if self.parallel_config.merge_pending:
141 self._merge_pending_worktrees(pending_worktrees)
142 elif not self.parallel_config.ignore_pending:
143 # Default behavior: just report (cleanup happens below)
144 self.logger.info(
145 "Continuing with cleanup (use --merge-pending to merge)..."
146 )
148 self._cleanup_orphaned_worktrees()
149 self._load_state()
151 if self.parallel_config.dry_run:
152 return self._dry_run()
154 return self._execute()
156 except KeyboardInterrupt:
157 self.logger.warning("Interrupted by user")
158 return 1
159 except Exception as e:
160 self.logger.error(f"Fatal error: {e}")
161 return 1
162 finally:
163 self._cleanup()
164 self._restore_signal_handlers()
166 def _setup_signal_handlers(self) -> None:
167 """Setup signal handlers for graceful shutdown."""
168 self._original_sigint = signal.signal(signal.SIGINT, self._signal_handler)
169 self._original_sigterm = signal.signal(signal.SIGTERM, self._signal_handler)
171 def _restore_signal_handlers(self) -> None:
172 """Restore original signal handlers."""
173 if self._original_sigint is not None:
174 signal.signal(signal.SIGINT, self._original_sigint)
175 if self._original_sigterm is not None:
176 signal.signal(signal.SIGTERM, self._original_sigterm)
178 def _ensure_gitignore_entries(self) -> None:
179 """Ensure .gitignore has entries for parallel processing artifacts.
181 Adds entries for:
182 - .parallel-manage-state.json (state file)
183 - .worktrees/ (git worktree directory)
185 This prevents these files from being tracked by git, which would cause
186 conflicts during merge operations (state file is continuously updated).
187 """
188 gitignore_path = self.repo_path / ".gitignore"
189 required_entries = [
190 ".parallel-manage-state.json",
191 ".worktrees/",
192 ]
194 existing_content = ""
195 if gitignore_path.exists():
196 existing_content = gitignore_path.read_text()
198 # Check which entries are missing
199 missing_entries = []
200 for entry in required_entries:
201 # Check for exact match or pattern that would cover it
202 if entry not in existing_content:
203 missing_entries.append(entry)
205 if not missing_entries:
206 return
208 # Append missing entries
209 addition = "\n# ll-parallel artifacts\n"
210 for entry in missing_entries:
211 addition += f"{entry}\n"
213 # Ensure file ends with newline before adding
214 if existing_content and not existing_content.endswith("\n"):
215 addition = "\n" + addition
217 gitignore_path.write_text(existing_content + addition)
218 self.logger.info(f"Added {len(missing_entries)} entries to .gitignore")
220 def _cleanup_orphaned_worktrees(self) -> None:
221 """Clean up worktrees from previous interrupted runs.
223 Scans the worktree base directory and removes any worktrees that are
224 not from the current session. This handles cases where a previous run
225 was interrupted (Ctrl+C) and worktrees were not cleaned up.
226 """
227 worktree_base = self.repo_path / self.parallel_config.worktree_base
228 if not worktree_base.exists():
229 return
231 # Get list of worktree directories, skipping those owned by live processes (BUG-579)
232 orphaned = []
233 for item in worktree_base.iterdir():
234 if item.is_dir() and item.name.startswith("worker-"):
235 # Check for a .ll-session-<pid> marker left by an active orchestrator
236 owned_by_live = False
237 for marker in item.glob(".ll-session-*"):
238 try:
239 pid = int(marker.name.split("-")[-1])
240 os.kill(pid, 0) # Signal 0: check if process exists
241 owned_by_live = True
242 break
243 except (ProcessLookupError, ValueError):
244 pass
245 except PermissionError:
246 owned_by_live = True # Process exists, no permission to signal
247 break
248 if owned_by_live:
249 self.logger.info(f"Skipping {item.name}: owned by running process")
250 continue
251 orphaned.append(item)
253 if not orphaned:
254 return
256 self.logger.info(f"Cleaning up {len(orphaned)} orphaned worktree(s) from previous run")
258 for worktree_path in orphaned:
259 try:
260 # Try git worktree remove first
261 self._git_lock.run(
262 ["worktree", "remove", "--force", str(worktree_path)],
263 cwd=self.repo_path,
264 timeout=30,
265 )
267 # If git worktree remove failed, force delete the directory
268 if worktree_path.exists():
269 shutil.rmtree(worktree_path, ignore_errors=True)
271 # Try to delete the associated branch
272 # Branch name format: parallel/<issue-id>-<timestamp>
273 branch_name = worktree_path.name.replace("worker-", "parallel/")
274 self._git_lock.run(
275 ["branch", "-D", branch_name],
276 cwd=self.repo_path,
277 timeout=10,
278 )
279 except Exception as e:
280 self.logger.warning(f"Failed to clean up {worktree_path.name}: {e}")
282 # Also prune git worktree references
283 self._git_lock.run(
284 ["worktree", "prune"],
285 cwd=self.repo_path,
286 timeout=30,
287 )
289 def _inspect_worktree(self, worktree_path: Path) -> PendingWorktreeInfo | None:
290 """Inspect a worktree to determine its status.
292 Args:
293 worktree_path: Path to the worktree directory
295 Returns:
296 PendingWorktreeInfo if inspection succeeded, None if failed
297 """
298 try:
299 # Extract branch name from worktree path
300 # worker-bug-045-20260117-143022 -> parallel/bug-045-20260117-143022
301 branch_name = worktree_path.name.replace("worker-", "parallel/")
303 # Extract issue ID (e.g., bug-045 -> BUG-045)
304 # Pattern: worker-<issue-id>-<timestamp>
305 match = re.match(r"worker-([a-z]+-\d+)-\d{8}-\d{6}", worktree_path.name)
306 issue_id = match.group(1).upper() if match else worktree_path.name
308 # Check commits ahead of main
309 result = self._git_lock.run(
310 ["rev-list", "--count", f"{self.parallel_config.base_branch}..{branch_name}"],
311 cwd=self.repo_path,
312 timeout=10,
313 )
314 commits_ahead = int(result.stdout.strip()) if result.returncode == 0 else 0
316 # Check for uncommitted changes in worktree
317 result = self._git_lock.run(
318 ["status", "--porcelain"],
319 cwd=worktree_path,
320 timeout=10,
321 )
322 changed_files = []
323 has_uncommitted = False
324 if result.returncode == 0 and result.stdout.strip():
325 has_uncommitted = True
326 changed_files = [line[3:] for line in result.stdout.strip().split("\n") if line]
328 return PendingWorktreeInfo(
329 worktree_path=worktree_path,
330 branch_name=branch_name,
331 issue_id=issue_id,
332 commits_ahead=commits_ahead,
333 has_uncommitted_changes=has_uncommitted,
334 changed_files=changed_files,
335 )
336 except Exception as e:
337 self.logger.warning(f"Failed to inspect worktree {worktree_path.name}: {e}")
338 return None
340 def _check_pending_worktrees(self) -> list[PendingWorktreeInfo]:
341 """Check for pending worktrees from previous runs and report status.
343 Returns:
344 List of pending worktree information
345 """
346 worktree_base = self.repo_path / self.parallel_config.worktree_base
347 if not worktree_base.exists():
348 return []
350 # Find all worker directories
351 worktrees = [
352 item
353 for item in worktree_base.iterdir()
354 if item.is_dir() and item.name.startswith("worker-")
355 ]
357 if not worktrees:
358 return []
360 self.logger.info("Checking for pending work from previous runs...")
362 # Inspect each worktree
363 pending_info: list[PendingWorktreeInfo] = []
364 for worktree_path in worktrees:
365 info = self._inspect_worktree(worktree_path)
366 if info:
367 pending_info.append(info)
369 # Report findings
370 with_work = [p for p in pending_info if p.has_pending_work]
371 if with_work:
372 self.logger.warning(f"Found {len(with_work)} worktree(s) with pending work:")
373 for info in with_work:
374 status_parts = []
375 if info.commits_ahead > 0:
376 status_parts.append(f"{info.commits_ahead} commit(s) ahead of main")
377 if info.has_uncommitted_changes:
378 status_parts.append(f"{len(info.changed_files)} uncommitted file(s)")
379 status = ", ".join(status_parts)
380 self.logger.warning(f" - {info.worktree_path.name}: {info.issue_id} ({status})")
382 self.logger.info("")
383 self.logger.info("Options:")
384 self.logger.info(" --merge-pending Attempt to merge pending work before continuing")
385 self.logger.info(" --clean-start Remove all worktrees and start fresh")
386 self.logger.info(
387 " --ignore-pending Continue without action (worktrees will be cleaned up)"
388 )
389 elif pending_info:
390 self.logger.info(f"Found {len(pending_info)} orphaned worktree(s) with no pending work")
392 return pending_info
394 def _merge_pending_worktrees(self, pending: list[PendingWorktreeInfo]) -> None:
395 """Attempt to merge pending worktrees from previous runs.
397 Args:
398 pending: List of pending worktree information
399 """
400 with_work = [p for p in pending if p.has_pending_work]
401 if not with_work:
402 return
404 self.logger.info(f"Attempting to merge {len(with_work)} pending worktree(s)...")
406 for info in with_work:
407 try:
408 # If there are uncommitted changes, commit them first
409 if info.has_uncommitted_changes:
410 self.logger.info(f" Committing uncommitted changes in {info.issue_id}...")
411 self._git_lock.run(
412 ["add", "-A"],
413 cwd=info.worktree_path,
414 timeout=30,
415 )
416 self._git_lock.run(
417 [
418 "commit",
419 "-m",
420 f"WIP: Auto-commit from interrupted session for {info.issue_id}",
421 ],
422 cwd=info.worktree_path,
423 timeout=30,
424 )
426 # Attempt merge
427 self.logger.info(f" Merging {info.issue_id} ({info.branch_name})...")
428 result = self._git_lock.run(
429 [
430 "merge",
431 "--no-ff",
432 info.branch_name,
433 "-m",
434 f"Merge pending work for {info.issue_id}",
435 ],
436 cwd=self.repo_path,
437 timeout=60,
438 )
440 if result.returncode == 0:
441 self.logger.success(f" Successfully merged {info.issue_id}")
442 # Clean up the worktree after successful merge
443 self._git_lock.run(
444 ["worktree", "remove", "--force", str(info.worktree_path)],
445 cwd=self.repo_path,
446 timeout=30,
447 )
448 self._git_lock.run(
449 ["branch", "-D", info.branch_name],
450 cwd=self.repo_path,
451 timeout=10,
452 )
453 else:
454 self.logger.warning(f" Failed to merge {info.issue_id}: {result.stderr}")
455 # Abort the merge if it failed
456 self._git_lock.run(
457 ["merge", "--abort"],
458 cwd=self.repo_path,
459 timeout=10,
460 )
462 except Exception as e:
463 self.logger.warning(f" Error merging {info.issue_id}: {e}")
465 def _signal_handler(self, signum: int, frame: object) -> None:
466 """Handle shutdown signals gracefully."""
467 self._shutdown_requested = True
468 # Propagate to worker pool for interrupted worker detection (ENH-036)
469 self.worker_pool.set_shutdown_requested(True)
470 self.logger.warning(f"Received signal {signum}, shutting down gracefully...")
472 def _load_state(self) -> None:
473 """Load state from file for resume capability."""
474 if self.parallel_config.clean_start:
475 self.state.started_at = datetime.now().isoformat()
476 return
477 state_file = self.repo_path / self.parallel_config.state_file
478 if not state_file.exists():
479 self.state.started_at = datetime.now().isoformat()
480 return
482 try:
483 data = json.loads(state_file.read_text())
484 self.state = OrchestratorState.from_dict(data)
486 # Restore queue state
487 self.queue.load_completed(self.state.completed_issues)
488 self.queue.load_failed(self.state.failed_issues.keys())
490 self.logger.info(
491 f"Resumed from previous state: "
492 f"{len(self.state.completed_issues)} completed, "
493 f"{len(self.state.failed_issues)} failed"
494 )
495 except Exception as e:
496 self.logger.warning(f"Could not load state: {e}")
497 self.state.started_at = datetime.now().isoformat()
499 def _save_state(self) -> None:
500 """Save current state to file."""
501 with self._state_lock:
502 self.state.last_checkpoint = datetime.now().isoformat()
503 self.state.completed_issues = self.queue.completed_ids
504 self.state.failed_issues = dict.fromkeys(self.queue.failed_ids, "Failed")
505 self.state.in_progress_issues = self.queue.in_progress_ids
507 state_file = self.repo_path / self.parallel_config.state_file
508 state_file.write_text(json.dumps(self.state.to_dict(), indent=2))
510 def _cleanup_state(self) -> None:
511 """Remove state file on successful completion."""
512 state_file = self.repo_path / self.parallel_config.state_file
513 if state_file.exists():
514 state_file.unlink()
516 def _dry_run(self) -> int:
517 """Preview what would be processed without executing.
519 Returns:
520 Exit code (always 0 for dry run)
521 """
522 issues = self._scan_issues()
524 self.logger.info("=" * 60)
525 self.logger.info("DRY RUN - No changes will be made")
526 self.logger.info("=" * 60)
527 self.logger.info("")
529 if not issues:
530 self.logger.info("No issues found matching criteria")
531 return 0
533 self.logger.info(f"Found {len(issues)} issues to process:")
534 self.logger.info("")
536 # Group by priority
537 by_priority: dict[str, list[IssueInfo]] = {}
538 for issue in issues:
539 by_priority.setdefault(issue.priority, []).append(issue)
541 for priority in IssuePriorityQueue.DEFAULT_PRIORITIES:
542 if priority not in by_priority:
543 continue
545 priority_issues = by_priority[priority]
546 self.logger.info(f" {priority} ({len(priority_issues)} issues):")
547 for issue in priority_issues:
548 mode = (
549 "sequential"
550 if priority == "P0" and self.parallel_config.p0_sequential
551 else "parallel"
552 )
553 self.logger.info(f" - {issue.issue_id}: {issue.title} [{mode}]")
555 self.logger.info("")
556 self.logger.info("Configuration:")
557 self.logger.info(f" Workers: {self.parallel_config.max_workers}")
558 self.logger.info(f" P0 Sequential: {self.parallel_config.p0_sequential}")
559 self.logger.info(f" Max Issues: {self.parallel_config.max_issues or 'unlimited'}")
560 self.logger.info(f" Command Prefix: {self.parallel_config.command_prefix}")
562 return 0
564 def _maybe_report_status(self) -> None:
565 """Report status if enough time has elapsed since last report.
567 Reports every 5 seconds during active processing for progress visibility (ENH-262).
568 Suppresses duplicate lines when nothing has changed.
569 """
570 now = time.time()
571 # Report every 5 seconds
572 if now - self._last_status_time < 5.0:
573 return
575 self._last_status_time = now
577 # Build status line
578 parts = []
580 # Add wave label if present
581 if self.wave_label:
582 parts.append(f"{self.wave_label}")
584 # Get queue counts
585 in_progress = len(self.queue.in_progress_ids)
586 completed = self.queue.completed_count
587 failed = self.queue.failed_count
588 pending_merge = self.merge_coordinator.pending_count
590 parts.append(f"Active: {in_progress}")
591 parts.append(f"Done: {completed}")
592 if failed > 0:
593 parts.append(f"Failed: {failed}")
594 if pending_merge > 0:
595 parts.append(f"Merging: {pending_merge}")
597 # Build status line
598 status = " | ".join(parts)
600 # Get active worker stages
601 active_stages = self.worker_pool.get_active_stages()
603 # Add worker details if any are active
604 if active_stages:
605 # Group by stage
606 by_stage: dict[str, list[str]] = {}
607 for issue_id, worker_stage in active_stages.items():
608 stage_name = worker_stage.value.title()
609 by_stage.setdefault(stage_name, []).append(issue_id)
611 stage_parts = []
612 for stage_name in ["Validating", "Implementing", "Verifying", "Merging"]:
613 if stage_name in by_stage:
614 issue_ids = ", ".join(by_stage[stage_name])
615 stage_parts.append(f"{stage_name}: [{issue_ids}]")
617 if stage_parts:
618 status += " | " + " | ".join(stage_parts)
620 # Skip if nothing changed since last report
621 if status == self._last_status_line:
622 return
623 self._last_status_line = status
625 # Log with gray color to distinguish from normal logs
626 if self.logger.use_color:
627 color = self.logger.GRAY
628 ts = self.logger._timestamp()
629 print(f"{color}[{ts}]{self.logger.RESET} {status}")
630 else:
631 self.logger.info(status)
633 def _execute(self) -> int:
634 """Execute parallel issue processing.
636 Returns:
637 Exit code (0 = success, 1 = failure)
638 """
639 start_time = time.time()
641 # Scan and queue issues
642 issues = self._scan_issues()
643 if not issues:
644 self.logger.info("No issues to process")
645 return 0
647 # Store issue info for lifecycle completion after merge
648 for issue in issues:
649 self._issue_info_by_id[issue.issue_id] = issue
651 added = self.queue.add_many(issues)
652 self.logger.info(f"Queued {added} issues for processing")
654 # Start components
655 self.worker_pool.start()
656 self.merge_coordinator.start()
658 # Process issues
659 issues_processed = 0
660 max_issues = self.parallel_config.max_issues or float("inf")
662 while not self._shutdown_requested:
663 # Check if done
664 if self.queue.empty() and self.worker_pool.active_count == 0:
665 # Wait for pending merges
666 if self.merge_coordinator.pending_count == 0:
667 break
669 # Check max issues limit
670 if issues_processed >= max_issues:
671 self.logger.info(f"Reached max issues limit ({max_issues})")
672 break
674 # Get next issue if workers available
675 if self.worker_pool.active_count < self.parallel_config.max_workers:
676 queued = self.queue.get(block=False)
677 if queued:
678 issue = queued.issue_info
680 # P0 sequential processing
681 if issue.priority == "P0" and self.parallel_config.p0_sequential:
682 self._process_sequential(issue)
683 else:
684 self._process_parallel(issue)
686 issues_processed += 1
688 # Save state periodically
689 self._save_state()
691 # Report status periodically for progress visibility (ENH-262)
692 self._maybe_report_status()
694 # Small sleep to prevent busy loop
695 time.sleep(0.1)
697 # Wait for completion
698 self._wait_for_completion()
700 # Report results
701 self._report_results(start_time)
703 # Cleanup state on success
704 if not self._shutdown_requested and self.queue.failed_count == 0:
705 self._cleanup_state()
707 return 0 if self.queue.failed_count == 0 else 1
709 def _scan_issues(self) -> list[IssueInfo]:
710 """Scan for issues matching criteria.
712 Returns:
713 List of issues sorted by priority
714 """
715 # Combine skip_ids from state and config
716 skip_ids = set(self.state.completed_issues) | set(self.state.failed_issues.keys())
717 if self.parallel_config.skip_ids:
718 skip_ids |= self.parallel_config.skip_ids
720 issues = IssuePriorityQueue.scan_issues(
721 self.br_config,
722 priority_filter=list(self.parallel_config.priority_filter),
723 skip_ids=skip_ids,
724 only_ids=self.parallel_config.only_ids,
725 type_prefixes=self.parallel_config.type_prefixes,
726 )
728 # Apply max issues limit
729 if self.parallel_config.max_issues > 0:
730 issues = issues[: self.parallel_config.max_issues]
732 return issues
734 def _process_sequential(self, issue: IssueInfo) -> None:
735 """Process an issue sequentially (blocking).
737 Args:
738 issue: Issue to process
739 """
740 self.logger.info(f"Processing {issue.issue_id} sequentially (P0)")
742 # Wait for any parallel work to finish
743 while self.worker_pool.active_count > 0:
744 time.sleep(0.5)
746 # Process in main repo (no worktree isolation needed)
747 # Note: No callback here - _merge_sequential handles the result explicitly
748 # to avoid double-processing (callback would also queue merge/close)
749 future = self.worker_pool.submit(issue)
751 # Wait for completion
752 try:
753 result = future.result(timeout=self.parallel_config.timeout_per_issue)
754 if result.success:
755 # Merge immediately for P0
756 self._merge_sequential(result)
757 except Exception as e:
758 self.logger.error(f"Sequential processing failed: {e}")
759 self.queue.mark_failed(issue.issue_id)
761 def _process_parallel(self, issue: IssueInfo) -> None:
762 """Process an issue in parallel (non-blocking).
764 Args:
765 issue: Issue to process
766 """
767 # Check for overlaps if enabled (ENH-143)
768 if self.overlap_detector:
769 overlap = self.overlap_detector.check_overlap(issue)
770 if overlap:
771 if self.parallel_config.serialize_overlapping:
772 self.logger.warning(
773 f"Deferring {issue.issue_id} - overlaps with {overlap.overlapping_issues}"
774 )
775 # Track for re-check when active issues complete
776 self._deferred_issues.append(issue)
777 return
778 else:
779 self.logger.warning(
780 f"Warning: {issue.issue_id} may conflict with {overlap.overlapping_issues}"
781 )
783 # Register as active before dispatch
784 self.overlap_detector.register_issue(issue)
786 self.logger.info(f"Dispatching {issue.issue_id} to worker pool")
787 self.worker_pool.submit(issue, self._on_worker_complete)
789 def _on_worker_complete(self, result: WorkerResult) -> None:
790 """Callback when a worker completes.
792 Args:
793 result: Result from the worker
794 """
795 # Unregister from overlap tracking (ENH-143)
796 if self.overlap_detector:
797 self.overlap_detector.unregister_issue(result.issue_id)
798 # Re-queue deferred issues that were waiting on this one
799 self._requeue_deferred_issues()
801 # Handle interrupted workers (not counted as failed) - ENH-036
802 if result.interrupted:
803 self.logger.info(f"{result.issue_id} was interrupted during shutdown (can retry)")
804 self._interrupted_issues.append(result.issue_id)
805 # Don't mark as failed - they can be retried on next run
806 return
808 # Handle issue closure (no merge needed)
809 if result.should_close:
810 # Lazy import to avoid circular dependency
811 from little_loops.issue_lifecycle import close_issue
813 self.logger.info(f"{result.issue_id} should be closed: {result.close_status}")
814 info = self._issue_info_by_id.get(result.issue_id)
815 if info:
816 if close_issue(
817 info,
818 self.br_config,
819 self.logger,
820 result.close_reason,
821 result.close_status,
822 ):
823 self.queue.mark_completed(result.issue_id)
824 else:
825 self.queue.mark_failed(result.issue_id)
826 else:
827 self.logger.warning(f"No issue info found for {result.issue_id}")
828 self.queue.mark_failed(result.issue_id)
829 elif result.success:
830 self.logger.success(
831 f"{result.issue_id} completed in {format_duration(result.duration)}"
832 )
833 if result.was_corrected:
834 self.logger.info(f"{result.issue_id} was auto-corrected during validation")
835 # Log and store corrections for pattern analysis (ENH-010)
836 for correction in result.corrections:
837 self.logger.info(f" Correction: {correction}")
838 if result.corrections:
839 with self._state_lock:
840 self.state.corrections[result.issue_id] = result.corrections
841 self.merge_coordinator.queue_merge(result)
842 # Wait for merge to complete before returning from callback.
843 # This prevents dispatch of next worker while merge is in progress,
844 # avoiding race conditions between worktree creation and merge ops.
845 # (BUG-140: Race condition between worktree creation and merge)
846 self.merge_coordinator.wait_for_completion(timeout=120)
847 else:
848 self.logger.error(f"{result.issue_id} failed: {result.error}")
849 self.queue.mark_failed(result.issue_id)
851 # Update timing
852 with self._state_lock:
853 self.state.timing[result.issue_id] = {
854 "total": result.duration,
855 }
857 # Clean up stage tracking after callback completes (ENH-262)
858 # Delay briefly so status reporter can show completion
859 self.worker_pool.remove_worker_stage(result.issue_id)
861 def _requeue_deferred_issues(self) -> None:
862 """Re-queue deferred issues that no longer have overlaps (ENH-143)."""
863 if not self._deferred_issues:
864 return
866 # Check each deferred issue for remaining overlaps
867 still_deferred = []
868 for issue in self._deferred_issues:
869 if self.overlap_detector:
870 overlap = self.overlap_detector.check_overlap(issue)
871 if overlap:
872 # Still has overlaps, keep deferred
873 still_deferred.append(issue)
874 else:
875 # No more overlaps, add back to queue
876 self.logger.info(f"Re-queuing {issue.issue_id} - no longer overlapping")
877 self.queue.add(issue)
879 self._deferred_issues = still_deferred
881 def _merge_sequential(self, result: WorkerResult) -> None:
882 """Merge a sequential (P0) result immediately.
884 Args:
885 result: Result to merge
886 """
887 # Handle closure for sequential issues
888 if result.should_close:
889 # Lazy import to avoid circular dependency
890 from little_loops.issue_lifecycle import close_issue
892 info = self._issue_info_by_id.get(result.issue_id)
893 if info and close_issue(
894 info,
895 self.br_config,
896 self.logger,
897 result.close_reason,
898 result.close_status,
899 ):
900 self.queue.mark_completed(result.issue_id)
901 else:
902 self.queue.mark_failed(result.issue_id)
903 return
905 self.merge_coordinator.queue_merge(result)
906 # Wait for this specific merge
907 self.merge_coordinator.wait_for_completion(timeout=60)
909 if result.issue_id in self.merge_coordinator.merged_ids:
910 self.queue.mark_completed(result.issue_id)
911 self._complete_issue_lifecycle_if_needed(result.issue_id)
912 else:
913 self.queue.mark_failed(result.issue_id)
915 def _wait_for_completion(self) -> None:
916 """Wait for all workers and merges to complete."""
917 self.logger.info("Waiting for workers to complete...")
919 # Calculate timeout
920 if self.parallel_config.orchestrator_timeout > 0:
921 timeout = self.parallel_config.orchestrator_timeout
922 else:
923 timeout = self.parallel_config.timeout_per_issue * self.parallel_config.max_workers
925 start = time.time()
926 while self.worker_pool.active_count > 0:
927 if time.time() - start > timeout:
928 self.logger.warning(f"Timeout waiting for workers after {timeout}s")
929 self.worker_pool.terminate_all_processes()
930 break
931 time.sleep(1.0)
933 # Wait for merges
934 self.logger.info("Waiting for pending merges...")
935 self.merge_coordinator.wait_for_completion(timeout=120)
937 # Update queue with merge results and complete lifecycle
938 for issue_id in self.merge_coordinator.merged_ids:
939 self.queue.mark_completed(issue_id)
940 self._complete_issue_lifecycle_if_needed(issue_id)
942 for issue_id in self.merge_coordinator.failed_merges:
943 self.queue.mark_failed(issue_id)
945 def _report_results(self, start_time: float) -> None:
946 """Report processing results.
948 Args:
949 start_time: When processing started
950 """
951 total_time = time.time() - start_time
952 self._execution_duration = total_time
954 self.logger.info("")
955 self.logger.info("=" * 60)
956 if self.wave_label:
957 self.logger.info(f"{self.wave_label.upper()} PROCESSING COMPLETE")
958 else:
959 self.logger.info("PARALLEL ISSUE PROCESSING COMPLETE")
960 self.logger.info("=" * 60)
961 self.logger.info("")
962 self.logger.timing(f"Total time: {format_duration(total_time)}")
963 self.logger.info(f"Completed: {self.queue.completed_count}")
964 self.logger.info(f"Failed: {self.queue.failed_count}")
965 if self._interrupted_issues:
966 self.logger.info(f"Interrupted: {len(self._interrupted_issues)}")
968 with self._state_lock:
969 timing_snapshot = dict(self.state.timing)
970 corrections_snapshot = dict(self.state.corrections)
972 if self.queue.completed_count > 0:
973 total_issue_time = sum(t.get("total", 0) for t in timing_snapshot.values())
974 if total_issue_time > 0:
975 speedup = total_issue_time / total_time
976 self.logger.info(f"Estimated speedup: {speedup:.2f}x")
978 if self.queue.failed_ids:
979 self.logger.info("")
980 self.logger.warning("Failed issues:")
981 for issue_id in self.queue.failed_ids:
982 self.logger.warning(f" - {issue_id}")
984 # Report interrupted issues separately (ENH-036)
985 if self._interrupted_issues:
986 self.logger.info("")
987 self.logger.info(f"Interrupted: {len(self._interrupted_issues)} (can retry)")
988 for issue_id in self._interrupted_issues:
989 self.logger.info(f" - {issue_id}")
991 # Report correction statistics for quality tracking (ENH-010)
992 if corrections_snapshot:
993 total_corrected = len(corrections_snapshot)
994 total_issues = self.queue.completed_count + self.queue.failed_count
995 correction_rate = (total_corrected / total_issues * 100) if total_issues > 0 else 0
996 self.logger.info("")
997 self.logger.info(
998 f"Auto-corrections: {total_corrected}/{total_issues} ({correction_rate:.1f}%)"
999 )
1001 # Group corrections by category (ENH-010 fourth fix)
1002 from collections import Counter, defaultdict
1004 all_corrections: list[str] = []
1005 by_category: dict[str, int] = defaultdict(int)
1006 for corrections in corrections_snapshot.values():
1007 all_corrections.extend(corrections)
1008 for correction in corrections:
1009 # Extract category from [category] prefix if present
1010 if correction.startswith("[") and "]" in correction:
1011 category = correction[1 : correction.index("]")]
1012 by_category[category] += 1
1013 else:
1014 by_category["uncategorized"] += 1
1016 # Log corrections by type/category
1017 if by_category:
1018 self.logger.info("Corrections by type:")
1019 for category, count in sorted(by_category.items(), key=lambda x: -x[1]):
1020 self.logger.info(f" - {category}: {count}")
1022 # Log most common individual corrections
1023 if all_corrections:
1024 common = Counter(all_corrections).most_common(3)
1025 self.logger.info("Most common corrections:")
1026 for correction, count in common:
1027 # Truncate long correction descriptions
1028 display = correction[:60] + "..." if len(correction) > 60 else correction
1029 self.logger.info(f" - {display}: {count}")
1031 # Report stash pop warnings (local changes need manual recovery)
1032 stash_warnings = self.merge_coordinator.stash_pop_failures
1033 if stash_warnings:
1034 self.logger.info("")
1035 self.logger.warning("Stash recovery warnings (local changes need manual restoration):")
1036 for issue_id, message in stash_warnings.items():
1037 self.logger.warning(f" - {issue_id}: {message}")
1038 self.logger.warning("")
1039 self.logger.warning(
1040 "To recover: Run 'git stash list' to find your changes, "
1041 "then 'git stash pop' or 'git stash apply stash@{N}'"
1042 )
1044 def _complete_issue_lifecycle_if_needed(self, issue_id: str) -> bool:
1045 """Complete issue lifecycle if the issue file wasn't moved during merge.
1047 Args:
1048 issue_id: ID of the issue to complete
1050 Returns:
1051 True if lifecycle was completed (or already complete), False on error
1052 """
1053 info = self._issue_info_by_id.get(issue_id)
1054 if not info:
1055 self.logger.warning(f"No issue info found for {issue_id}")
1056 return False
1058 original_path = info.path
1059 completed_dir = self.br_config.get_completed_dir()
1060 completed_path = completed_dir / original_path.name
1062 # Check if already moved to completed
1063 if completed_path.exists():
1064 return True
1066 # Check if still in original location
1067 if not original_path.exists():
1068 return True
1070 # Issue file still in original location - complete lifecycle
1071 self.logger.info(f"Completing lifecycle for {issue_id} (merged but file not moved)")
1073 try:
1074 completed_dir.mkdir(parents=True, exist_ok=True)
1076 # Read original content
1077 content = original_path.read_text()
1079 # Add resolution section if not already present
1080 if "## Resolution" not in content:
1081 action = self.br_config.get_category_action(info.issue_type)
1082 resolution = f"""
1084---
1086## Resolution
1088- **Action**: {action}
1089- **Completed**: {datetime.now().strftime("%Y-%m-%d")}
1090- **Status**: Completed (parallel merge fallback)
1091- **Implementation**: Merged from parallel worker branch
1093### Changes Made
1094- See git history for implementation details
1096### Verification Results
1097- Work verification passed before merge
1099### Commits
1100- See `git log --oneline` for merge commit details
1101"""
1102 content += resolution
1104 # Use git mv if possible (before writing content to avoid "destination exists")
1105 result = self._git_lock.run(
1106 ["mv", str(original_path), str(completed_path)],
1107 cwd=self.repo_path,
1108 )
1110 if result.returncode != 0:
1111 # git mv failed (destination may exist or other error)
1112 self.logger.warning(f"git mv failed for {issue_id}: {result.stderr}")
1113 # Write content to destination (may overwrite existing)
1114 completed_path.write_text(content)
1115 # Remove source if it still exists
1116 original_path.unlink(missing_ok=True)
1117 else:
1118 # git mv succeeded, write updated content
1119 completed_path.write_text(content)
1121 # Stage and commit
1122 self._git_lock.run(
1123 ["add", "-A"],
1124 cwd=self.repo_path,
1125 )
1127 action = self.br_config.get_category_action(info.issue_type)
1128 commit_msg = f"""{action}({info.issue_type}): complete {issue_id} lifecycle
1130Parallel merge fallback - issue file moved to completed.
1132Issue: {issue_id}
1133Type: {info.issue_type}
1134Title: {info.title}
1135"""
1136 commit_result = self._git_lock.run(
1137 ["commit", "-m", commit_msg],
1138 cwd=self.repo_path,
1139 )
1141 if commit_result.returncode != 0:
1142 if "nothing to commit" not in commit_result.stdout.lower():
1143 self.logger.warning(f"git commit failed: {commit_result.stderr}")
1144 else:
1145 commit_hash_match = re.search(r"\[[\w-]+\s+([a-f0-9]+)\]", commit_result.stdout)
1146 if commit_hash_match:
1147 self.logger.success(
1148 f"Completed lifecycle for {issue_id}: {commit_hash_match.group(1)}"
1149 )
1150 else:
1151 self.logger.success(f"Completed lifecycle for {issue_id}")
1153 return True
1155 except Exception as e:
1156 self.logger.error(f"Failed to complete lifecycle for {issue_id}: {e}")
1157 return False
1159 def _cleanup(self) -> None:
1160 """Clean up resources."""
1161 self.logger.info("Cleaning up...")
1163 # Save final state
1164 self._save_state()
1166 # Shutdown components
1167 self.worker_pool.shutdown(wait=True)
1168 self.merge_coordinator.shutdown(wait=True, timeout=30)
1170 # Clean up worktrees if not interrupted
1171 if not self._shutdown_requested:
1172 self.worker_pool.cleanup_all_worktrees()