Coverage for little_loops / parallel / merge_coordinator.py: 10%
451 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""Merge coordinator for sequential integration of parallel worker changes.
3Handles merging completed worker branches back to main with conflict detection
4and automatic retry capability.
5"""
7from __future__ import annotations
9import shutil
10import subprocess
11import threading
12import time
13from pathlib import Path
14from queue import Empty, Queue
15from typing import TYPE_CHECKING
17from little_loops.parallel.git_lock import GitLock
18from little_loops.parallel.types import (
19 MergeRequest,
20 MergeStatus,
21 ParallelConfig,
22 WorkerResult,
23)
25if TYPE_CHECKING:
26 from little_loops.logger import Logger
29class MergeCoordinator:
30 """Sequential merge queue with conflict handling.
32 Processes merge requests one at a time to avoid conflicts. Supports
33 automatic rebase and retry on merge failures. Handles uncommitted local
34 changes by stashing them before merge operations.
36 Example:
37 >>> coordinator = MergeCoordinator(config, logger, repo_path)
38 >>> coordinator.start()
39 >>> coordinator.queue_merge(worker_result)
40 >>> # ... later ...
41 >>> coordinator.shutdown()
42 """
44 def __init__(
45 self,
46 config: ParallelConfig,
47 logger: Logger,
48 repo_path: Path | None = None,
49 git_lock: GitLock | None = None,
50 ) -> None:
51 """Initialize the merge coordinator.
53 Args:
54 config: Parallel processing configuration
55 logger: Logger for merge output
56 repo_path: Path to the git repository (default: current directory)
57 git_lock: Shared lock for git operations (created if not provided)
58 """
59 self.config = config
60 self.logger = logger
61 self.repo_path = repo_path or Path.cwd()
62 self._git_lock = git_lock or GitLock(logger)
63 self._queue: Queue[MergeRequest] = Queue()
64 self._thread: threading.Thread | None = None
65 self._shutdown_event = threading.Event()
66 self._merged: list[str] = []
67 self._failed: dict[str, str] = {}
68 self._lock = threading.Lock()
69 self._stash_active = False # Track if we have an active stash
70 self._consecutive_failures = 0 # Circuit breaker counter
71 self._paused = False # Set when circuit breaker trips
72 self._assume_unchanged_active = False # Track if state file is marked assume-unchanged
73 self._stash_pop_failures: dict[str, str] = {} # issue_id -> failure message
74 self._current_issue_id: str | None = (
75 None # Track current issue for stash failure attribution
76 )
77 self._problematic_commits: set[str] = (
78 set()
79 ) # Track commits causing repeated rebase conflicts
81 def start(self) -> None:
82 """Start the merge coordinator background thread."""
83 if self._thread is not None and self._thread.is_alive():
84 return
86 self._shutdown_event.clear()
87 self._thread = threading.Thread(
88 target=self._merge_loop,
89 name="merge-coordinator",
90 daemon=True,
91 )
92 self._thread.start()
93 self.logger.info("Merge coordinator started")
95 def shutdown(self, wait: bool = True, timeout: float = 30.0) -> None:
96 """Shutdown the merge coordinator.
98 Args:
99 wait: Whether to wait for pending merges to complete
100 timeout: Maximum time to wait for shutdown
101 """
102 if self._thread is None:
103 return
105 self._shutdown_event.set()
107 if wait and self._thread.is_alive():
108 self._thread.join(timeout=timeout)
110 self._thread = None
111 self.logger.info("Merge coordinator stopped")
113 def queue_merge(self, worker_result: WorkerResult) -> None:
114 """Queue a worker result for merging.
116 Args:
117 worker_result: Result from a completed worker
118 """
119 request = MergeRequest(worker_result=worker_result)
120 self._queue.put(request)
121 self.logger.info(
122 f"Queued merge for {worker_result.issue_id} (branch: {worker_result.branch_name})"
123 )
125 def _stash_local_changes(self) -> bool:
126 """Stash any uncommitted tracked changes in the main repo.
128 Only stashes tracked file modifications. Untracked files are not stashed
129 because git stash pathspec exclusions don't work reliably with -u flag.
130 Untracked file conflicts during merge are handled by _handle_untracked_conflict.
132 The following are explicitly excluded from stashing:
133 1. State file - managed by orchestrator and continuously updated
134 2. Lifecycle file moves - issue files being moved to completed/ directory
135 3. Files in completed directory - lifecycle-managed files
136 4. Claude Code context state file - managed by Claude Code externally
138 These exclusions prevent stash pop conflicts after merge, since the merge
139 may change HEAD and create conflicts with stashed rename operations.
141 Returns:
142 True if changes were stashed, False if working tree was clean
143 """
144 state_file_path = Path(self.config.state_file)
145 state_file_str = str(state_file_path)
146 state_file_name = state_file_path.name
148 # Check if there are any tracked changes to stash.
149 # We only look at tracked files (exclude untracked with grep -v '??')
150 # since we can only reliably stash tracked changes.
151 status_result = self._git_lock.run(
152 ["status", "--porcelain"],
153 cwd=self.repo_path,
154 timeout=30,
155 )
157 # Filter to only tracked changes (lines not starting with ??)
158 # and exclude orchestrator-managed files to prevent stash pop conflicts
159 tracked_changes = []
160 files_to_stash = []
161 # Note: Don't use .strip() on the full output - it removes leading spaces
162 # from the first line which are significant in git status porcelain format
163 for line in status_result.stdout.splitlines():
164 if not line or line.startswith("??"):
165 continue
166 # Skip lifecycle file moves (issue files moved to completed/)
167 # These are managed by orchestrator and cause stash pop conflicts
168 if self._is_lifecycle_file_move(line):
169 self.logger.debug(f"Skipping lifecycle file move from stash: {line}")
170 continue
171 # Extract file path from porcelain format (XY filename or XY -> filename for renames)
172 # Format: XY filename or XY old -> new (XY is exactly 2 chars + 1 space)
173 file_path = line[3:].split(" -> ")[-1].strip()
174 if file_path == state_file_str or file_path.endswith(state_file_name):
175 continue # Skip state file - orchestrator manages it independently
176 # Skip files in completed/deferred directory - these are lifecycle-managed
177 # Handle both .issues/completed/ (with dot) and issues/completed/ (without dot)
178 if (
179 ".issues/completed/" in file_path
180 or file_path.startswith(".issues/completed/")
181 or "issues/completed/" in file_path
182 or file_path.startswith("issues/completed/")
183 or ".issues/deferred/" in file_path
184 or file_path.startswith(".issues/deferred/")
185 or "issues/deferred/" in file_path
186 or file_path.startswith("issues/deferred/")
187 ):
188 self.logger.debug(
189 f"Skipping completed/deferred directory file from stash: {file_path}"
190 )
191 continue
192 # Skip Claude Code context state file - managed externally
193 if file_path.endswith("ll-context-state.json"):
194 self.logger.debug(f"Skipping Claude context state file from stash: {file_path}")
195 continue
196 tracked_changes.append(line)
197 files_to_stash.append(file_path)
199 if not files_to_stash:
200 return False # No tracked changes to stash (excluding state file)
202 # Log files to be stashed for debugging
203 self.logger.debug(f"Tracked files to stash: {tracked_changes[:10]}")
205 # Stash only specific files, explicitly excluding the state file.
206 # Using explicit file list avoids race conditions where the orchestrator
207 # modifies the state file between a checkout and stash-all operation.
208 # Note: gitignored files are never stashed anyway.
209 stash_result = self._git_lock.run(
210 [
211 "stash",
212 "push",
213 "-m",
214 "ll-parallel: auto-stash before merge",
215 "--",
216 *files_to_stash,
217 ],
218 cwd=self.repo_path,
219 timeout=30,
220 )
222 if stash_result.returncode == 0:
223 self._stash_active = True
224 self.logger.info("Stashed local changes before merge")
225 return True
227 self.logger.error(f"Failed to stash local changes: {stash_result.stderr}")
228 return False
230 def _pop_stash(self) -> bool:
231 """Restore stashed changes if any were stashed.
233 Important: This method preserves the merge even if stash pop fails.
234 We never reset --hard HEAD here because that would undo a successful merge.
236 Returns:
237 True if stash was successfully popped or no stash was active,
238 False if pop failed (stash is left for manual recovery).
239 """
240 if not self._stash_active:
241 return True
243 pop_result = self._git_lock.run(
244 ["stash", "pop"],
245 cwd=self.repo_path,
246 timeout=30,
247 )
249 if pop_result.returncode != 0:
250 self.logger.warning(f"Failed to pop stash: {pop_result.stderr.strip()}")
252 # Check if it's a conflict issue - in that case, stash pop may have
253 # partially applied. We need to clean up the index but preserve the merge.
254 status_result = self._git_lock.run(
255 ["status", "--porcelain"],
256 cwd=self.repo_path,
257 timeout=30,
258 )
260 # Check for unmerged entries from the stash pop attempt
261 unmerged_prefixes = ("UU", "AA", "DD", "AU", "UA", "DU", "UD")
262 has_unmerged = any(
263 line[:2] in unmerged_prefixes
264 for line in status_result.stdout.splitlines()
265 if len(line) >= 2
266 )
268 if has_unmerged:
269 # Clean up the conflicted stash pop without affecting the merge
270 # Use checkout to restore conflicted files to their post-merge state
271 self._git_lock.run(
272 ["checkout", "--theirs", "."],
273 cwd=self.repo_path,
274 timeout=30,
275 )
276 self._git_lock.run(
277 ["reset", "HEAD"],
278 cwd=self.repo_path,
279 timeout=30,
280 )
281 self.logger.info("Cleaned up conflicted stash pop, merge preserved")
283 # Leave the stash intact for manual recovery
284 self._stash_active = False
286 # Record this failure for reporting in final summary
287 if self._current_issue_id:
288 with self._lock:
289 self._stash_pop_failures[self._current_issue_id] = (
290 "Local changes could not be restored after merge. "
291 "Run 'git stash list' and 'git stash pop' to recover manually."
292 )
294 self.logger.warning(
295 "Stash could not be restored - your changes are saved in 'git stash list'. "
296 "Run 'git stash show' to view and 'git stash pop' to retry manually."
297 )
298 return False
300 self._stash_active = False
301 self.logger.info("Restored stashed local changes")
302 return True
304 def _mark_state_file_assume_unchanged(self) -> bool:
305 """Mark the state file as assume-unchanged to prevent git from seeing modifications.
307 This allows git pull --rebase to proceed even when the state file is modified,
308 since the orchestrator continuously updates it during processing.
310 Returns:
311 True if successfully marked, False otherwise
312 """
313 state_file = str(self.config.state_file)
315 # Check if file exists and is tracked
316 ls_files = self._git_lock.run(
317 ["ls-files", state_file],
318 cwd=self.repo_path,
319 timeout=10,
320 )
322 if not ls_files.stdout.strip():
323 # File not tracked, nothing to do
324 return True
326 result = self._git_lock.run(
327 ["update-index", "--assume-unchanged", state_file],
328 cwd=self.repo_path,
329 timeout=10,
330 )
332 if result.returncode == 0:
333 self._assume_unchanged_active = True
334 self.logger.debug(f"Marked {state_file} as assume-unchanged")
335 return True
337 self.logger.warning(f"Failed to mark state file assume-unchanged: {result.stderr}")
338 return False
340 def _restore_state_file_tracking(self) -> bool:
341 """Restore normal tracking for the state file.
343 Returns:
344 True if successfully restored, False otherwise
345 """
346 if not self._assume_unchanged_active:
347 return True
349 state_file = str(self.config.state_file)
351 result = self._git_lock.run(
352 ["update-index", "--no-assume-unchanged", state_file],
353 cwd=self.repo_path,
354 timeout=10,
355 )
357 self._assume_unchanged_active = False
359 if result.returncode != 0:
360 self.logger.warning(f"Failed to restore state file tracking: {result.stderr}")
361 return False
363 self.logger.debug(f"Restored tracking for {state_file}")
364 return True
366 def _is_lifecycle_file_move(self, porcelain_line: str) -> bool:
367 """Check if a porcelain status line represents a lifecycle file move.
369 Lifecycle file moves are issue files being moved to completed/ directory.
370 These are managed by the orchestrator and should not be stashed, as they
371 will conflict with the merge when popping.
373 Args:
374 porcelain_line: A line from `git status --porcelain` output
376 Returns:
377 True if this is a lifecycle file move that should be excluded from stash
378 """
379 # Rename entries have format: R old_path -> new_path
380 if not porcelain_line.startswith("R"):
381 return False
383 # Check if it's a move to completed/ directory
384 if " -> " not in porcelain_line:
385 return False
387 # Extract destination path (after " -> ")
388 parts = porcelain_line[3:].split(" -> ")
389 if len(parts) != 2:
390 return False
392 dest_path = parts[1].strip()
394 # Check if destination is in completed or deferred directory (with or without dot prefix)
395 return (
396 ".issues/completed/" in dest_path
397 or dest_path.startswith(".issues/completed/")
398 or "issues/completed/" in dest_path
399 or dest_path.startswith("issues/completed/")
400 or ".issues/deferred/" in dest_path
401 or dest_path.startswith(".issues/deferred/")
402 or "issues/deferred/" in dest_path
403 or dest_path.startswith("issues/deferred/")
404 )
406 def _commit_pending_lifecycle_moves(self) -> bool:
407 """Commit any uncommitted lifecycle file moves.
409 Lifecycle file moves (issue files moved to completed/) are excluded from
410 stashing to prevent stash pop conflicts. However, if they remain uncommitted
411 when a merge starts, they will block the merge. This method commits any such
412 pending moves before the merge proceeds.
414 Returns:
415 True if any lifecycle moves were committed or none existed,
416 False if commit failed
417 """
418 # Check for lifecycle file moves in git status
419 status_result = self._git_lock.run(
420 ["status", "--porcelain"],
421 cwd=self.repo_path,
422 timeout=30,
423 )
425 lifecycle_moves = []
426 for line in status_result.stdout.splitlines():
427 if self._is_lifecycle_file_move(line):
428 lifecycle_moves.append(line)
430 if not lifecycle_moves:
431 return True
433 self.logger.info(
434 f"Found {len(lifecycle_moves)} uncommitted lifecycle file move(s), committing..."
435 )
437 # Stage all changes (lifecycle moves should already be staged from git mv,
438 # but add -A ensures any associated content changes are included)
439 self._git_lock.run(
440 ["add", "-A"],
441 cwd=self.repo_path,
442 timeout=30,
443 )
445 # Commit the lifecycle moves
446 commit_result = self._git_lock.run(
447 [
448 "commit",
449 "-m",
450 "chore(issues): commit pending lifecycle file moves\n\n"
451 "Auto-committed before merge to prevent conflicts.",
452 ],
453 cwd=self.repo_path,
454 timeout=30,
455 )
457 if commit_result.returncode != 0:
458 if "nothing to commit" in commit_result.stdout.lower():
459 # This shouldn't happen since we detected moves, but handle gracefully
460 self.logger.debug("No changes to commit despite detecting lifecycle moves")
461 return True
462 self.logger.error(f"Failed to commit lifecycle moves: {commit_result.stderr}")
463 return False
465 self.logger.info("Committed pending lifecycle file moves")
466 return True
468 def _is_local_changes_error(self, error_output: str) -> bool:
469 """Check if the error is due to uncommitted local changes.
471 Args:
472 error_output: The stderr/stdout from the failed git command
474 Returns:
475 True if the error indicates local changes would be overwritten
476 """
477 indicators = [
478 "Your local changes to the following files would be overwritten",
479 "Please commit your changes or stash them before you merge",
480 "error: cannot pull with rebase: You have unstaged changes",
481 ]
482 return any(indicator in error_output for indicator in indicators)
484 def _is_untracked_files_error(self, error_output: str) -> bool:
485 """Check if the error is due to untracked files blocking merge.
487 Args:
488 error_output: The stderr/stdout from the failed git command
490 Returns:
491 True if the error indicates untracked files would be overwritten
492 """
493 indicators = [
494 "untracked working tree files would be overwritten by merge",
495 "Please move or remove them before you merge",
496 ]
497 return any(indicator in error_output for indicator in indicators)
499 def _is_index_error(self, error_output: str) -> bool:
500 """Check if the error is due to a corrupted git index.
502 Args:
503 error_output: The stderr/stdout from the failed git command
505 Returns:
506 True if the error indicates index problems
507 """
508 indicators = [
509 "you need to resolve your current index first",
510 "fatal: cannot do a partial commit during a merge",
511 "error: you have not concluded your merge",
512 ]
513 return any(indicator in error_output for indicator in indicators)
515 def _is_rebase_in_progress(self) -> bool:
516 """Check if a rebase is currently in progress.
518 Returns:
519 True if rebase is in progress (rebase-merge or rebase-apply exists)
520 """
521 rebase_merge = self.repo_path / ".git" / "rebase-merge"
522 rebase_apply = self.repo_path / ".git" / "rebase-apply"
523 return rebase_merge.exists() or rebase_apply.exists()
525 def _abort_rebase_if_in_progress(self) -> bool:
526 """Abort any in-progress rebase operation.
528 Returns:
529 True if rebase was aborted or none was in progress,
530 False if abort failed
531 """
532 if not self._is_rebase_in_progress():
533 return True
535 self.logger.warning("Detected rebase in progress, aborting...")
536 abort_result = self._git_lock.run(
537 ["rebase", "--abort"],
538 cwd=self.repo_path,
539 timeout=30,
540 )
542 if abort_result.returncode != 0:
543 self.logger.error(f"Failed to abort rebase: {abort_result.stderr}")
544 # Force hard reset as last resort
545 return self._attempt_hard_reset()
547 self.logger.info("Aborted incomplete rebase from pull")
548 return True
550 def _is_unmerged_files_error(self, error_output: str) -> bool:
551 """Check if the error is due to pre-existing unmerged files.
553 Args:
554 error_output: The stderr/stdout from the failed git command
556 Returns:
557 True if the error indicates unmerged files blocking the operation
558 """
559 indicators = [
560 "you have unmerged files",
561 "Merging is not possible because you have unmerged files",
562 "fix conflicts and then commit the result",
563 ]
564 return any(indicator in error_output for indicator in indicators)
566 def _detect_conflict_commit(self, error_output: str) -> str | None:
567 """Extract commit hash from rebase conflict output.
569 Looks for patterns like:
570 - "dropping ae3b85ec1cac501058f6e5da362be37be1c99801 feat(ai): add stall detectio"
572 Args:
573 error_output: The stderr/stdout from the failed git pull --rebase
575 Returns:
576 The 40-character commit hash if found, None otherwise
577 """
578 import re
580 # Pattern: "dropping <40-char-hash>" followed by space and message
581 # Match only full 40-char hashes to avoid false positives
582 match = re.search(r"dropping\s+([a-f0-9]{40})\s+", error_output, re.IGNORECASE)
583 return match.group(1) if match else None
585 def _check_and_recover_index(self) -> bool:
586 """Check git index health and attempt recovery if needed.
588 Returns:
589 True if index is healthy or was recovered, False if unrecoverable
590 """
591 # Check if we're in the middle of a merge
592 merge_head = self.repo_path / ".git" / "MERGE_HEAD"
593 if merge_head.exists():
594 self.logger.warning("Detected incomplete merge, aborting...")
595 abort_result = self._git_lock.run(
596 ["merge", "--abort"],
597 cwd=self.repo_path,
598 timeout=30,
599 )
600 if abort_result.returncode != 0:
601 self.logger.error(f"Failed to abort merge: {abort_result.stderr}")
602 return False
603 self.logger.info("Aborted incomplete merge")
605 # Check if we're in the middle of a rebase
606 rebase_dir = self.repo_path / ".git" / "rebase-merge"
607 rebase_apply = self.repo_path / ".git" / "rebase-apply"
608 if rebase_dir.exists() or rebase_apply.exists():
609 self.logger.warning("Detected incomplete rebase, aborting...")
610 abort_result = self._git_lock.run(
611 ["rebase", "--abort"],
612 cwd=self.repo_path,
613 timeout=30,
614 )
615 if abort_result.returncode != 0:
616 self.logger.error(f"Failed to abort rebase: {abort_result.stderr}")
617 return False
618 self.logger.info("Aborted incomplete rebase")
619 # Force reset after rebase abort - the abort can leave index in dirty state
620 # This is defensive since unmerged file detection below may not trigger
621 if not self._attempt_hard_reset():
622 return False
624 # Check for unmerged files in the index (UU, AA, DD, AU, UA, DU, UD prefixes)
625 # These can persist even after merge --abort in some edge cases
626 status_result = self._git_lock.run(
627 ["status", "--porcelain"],
628 cwd=self.repo_path,
629 timeout=30,
630 )
632 if status_result.returncode != 0:
633 self.logger.error(f"git status failed: {status_result.stderr}")
634 return self._attempt_hard_reset()
636 # Debug logging to diagnose unmerged detection issues
637 if status_result.stdout.strip():
638 self.logger.debug(f"Git status output: {status_result.stdout[:500]}")
640 # Check for unmerged entries (first two chars indicate index/worktree status)
641 # Unmerged states: UU (both modified), AA (both added), DD (both deleted),
642 # AU/UA (added by us/them), DU/UD (deleted by us/them)
643 unmerged_prefixes = ("UU", "AA", "DD", "AU", "UA", "DU", "UD")
644 has_unmerged = any(
645 line[:2] in unmerged_prefixes
646 for line in status_result.stdout.splitlines()
647 if len(line) >= 2
648 )
650 if has_unmerged:
651 self.logger.warning("Detected unmerged files in index, resetting...")
652 if not self._attempt_hard_reset():
653 return False
654 self.logger.info("Cleared unmerged files from index")
656 # Final safety check - if MERGE_HEAD still exists, force reset
657 # This catches edge cases where abort succeeded but state is still dirty
658 if merge_head.exists():
659 self.logger.warning("MERGE_HEAD persists after recovery attempts, forcing reset")
660 if not self._attempt_hard_reset():
661 return False
663 return True
665 def _attempt_hard_reset(self) -> bool:
666 """Attempt a hard reset to recover from index issues.
668 Returns:
669 True if reset succeeded, False otherwise
670 """
671 self.logger.warning("Attempting hard reset to recover...")
672 reset_result = self._git_lock.run(
673 ["reset", "--hard", "HEAD"],
674 cwd=self.repo_path,
675 timeout=30,
676 )
677 if reset_result.returncode != 0:
678 self.logger.error("Hard reset failed - manual intervention required")
679 return False
680 self.logger.info("Hard reset successful")
681 return True
683 def _merge_loop(self) -> None:
684 """Background thread loop for processing merge requests."""
685 while not self._shutdown_event.is_set():
686 try:
687 # Wait for next merge request with timeout
688 try:
689 request = self._queue.get(timeout=1.0)
690 except Empty:
691 continue
693 # Process the merge
694 self._process_merge(request)
696 except Exception as e:
697 self.logger.error(f"Merge loop error: {e}")
698 time.sleep(1.0)
700 def _process_merge(self, request: MergeRequest) -> None:
701 """Process a single merge request.
703 Stashes any uncommitted local changes before attempting the merge,
704 and restores them afterward (regardless of success/failure).
706 Args:
707 request: Merge request to process
708 """
709 result = request.worker_result
710 with self._lock:
711 self._current_issue_id = result.issue_id
712 self.logger.info(f"Processing merge for {result.issue_id}")
713 had_local_changes = False
715 try:
716 # Circuit breaker check
717 if self._paused:
718 self.logger.warning(
719 f"Merge coordinator paused due to repeated failures. "
720 f"Skipping {result.issue_id}. Manual intervention required."
721 )
722 self._handle_failure(request, "Merge coordinator paused - circuit breaker tripped")
723 return
725 request.status = MergeStatus.IN_PROGRESS
727 # Health check: ensure git index is clean before proceeding
728 if not self._check_and_recover_index():
729 self._consecutive_failures += 1
730 if self._consecutive_failures >= 3:
731 self._paused = True
732 self.logger.error(
733 f"Circuit breaker tripped after {self._consecutive_failures} consecutive failures. "
734 "Merge coordinator paused. Manual recovery required."
735 )
736 self._handle_failure(request, "Git index recovery failed")
737 return
739 # Mark state file as assume-unchanged to prevent pull --rebase conflicts
740 # The orchestrator continuously updates the state file during processing
741 self._mark_state_file_assume_unchanged()
743 # Commit any uncommitted lifecycle file moves before stash/merge
744 # These are excluded from stash to prevent pop conflicts, so they must
745 # be committed to avoid blocking the merge (BUG-018 fix)
746 if not self._commit_pending_lifecycle_moves():
747 self._handle_failure(request, "Failed to commit pending lifecycle moves")
748 return
750 # Stash any local changes before merge operations
751 had_local_changes = self._stash_local_changes()
753 # Ensure we're on base branch in the main repo
754 base = self.config.base_branch
755 checkout_result = self._git_lock.run(
756 ["checkout", base],
757 cwd=self.repo_path,
758 timeout=30,
759 )
761 if checkout_result.returncode != 0:
762 error_output = checkout_result.stderr + checkout_result.stdout
763 if self._is_local_changes_error(error_output):
764 # This shouldn't happen since we stashed, but handle it anyway
765 self.logger.warning(
766 "Checkout failed due to local changes despite stash attempt"
767 )
768 if self._is_index_error(error_output):
769 # Try recovery
770 if self._check_and_recover_index():
771 # Retry checkout
772 checkout_result = self._git_lock.run(
773 ["checkout", base],
774 cwd=self.repo_path,
775 timeout=30,
776 )
777 if checkout_result.returncode == 0:
778 self.logger.info("Recovered from index error, checkout succeeded")
779 else:
780 raise RuntimeError(
781 f"Failed to checkout {base} after recovery: "
782 f"{checkout_result.stderr}"
783 )
784 else:
785 raise RuntimeError(f"Failed to checkout {base}: {checkout_result.stderr}")
786 else:
787 raise RuntimeError(f"Failed to checkout {base}: {checkout_result.stderr}")
789 # Track if merge strategy was used during pull (for conflict handling)
790 used_merge_strategy = False
792 # Pull latest changes
793 pull_result = self._git_lock.run(
794 ["pull", "--rebase", "origin", base],
795 cwd=self.repo_path,
796 timeout=60,
797 )
799 # Handle pull failures
800 if pull_result.returncode != 0:
801 error_output = pull_result.stderr + pull_result.stdout
803 # Check if rebase conflicted - must abort before continuing
804 if self._is_rebase_in_progress():
805 conflict_commit = self._detect_conflict_commit(error_output)
807 if conflict_commit and conflict_commit in self._problematic_commits:
808 # Known problematic commit - use merge strategy instead
809 self.logger.info(
810 f"Repeated rebase conflict with {conflict_commit[:8]}, "
811 f"using merge strategy (git pull --no-rebase)"
812 )
813 if not self._abort_rebase_if_in_progress():
814 raise RuntimeError("Failed to recover from rebase conflict during pull")
816 # Attempt merge strategy pull
817 merge_pull_result = self._git_lock.run(
818 ["pull", "--no-rebase", "origin", base],
819 cwd=self.repo_path,
820 timeout=60,
821 )
823 if merge_pull_result.returncode != 0:
824 self.logger.warning(
825 f"Merge strategy pull also failed: {merge_pull_result.stderr[:200]}"
826 )
827 # Continue anyway - merge may still work or fail appropriately
828 else:
829 self.logger.info(
830 f"Merge strategy pull succeeded for {conflict_commit[:8]}"
831 )
832 used_merge_strategy = True
834 else:
835 # First time seeing this conflict or couldn't extract commit
836 if conflict_commit:
837 self._problematic_commits.add(conflict_commit)
838 self.logger.warning(
839 f"New rebase conflict with {conflict_commit[:8]}, "
840 f"tracking for future merges (will use merge strategy on repeat)"
841 )
842 else:
843 self.logger.warning(
844 "Rebase conflict detected (could not extract commit hash), "
845 "tracking for future merges"
846 )
848 self.logger.warning(
849 f"Pull --rebase failed with conflicts: {error_output[:200]}"
850 )
852 if not self._abort_rebase_if_in_progress():
853 raise RuntimeError("Failed to recover from rebase conflict during pull")
854 # After aborting rebase, we're back to pre-pull state
855 # Continue without the pull - merge may still work or conflict
856 self.logger.info("Continuing without pull after rebase abort")
858 elif self._is_local_changes_error(error_output):
859 self.logger.warning(
860 f"Pull failed due to local changes, attempting re-stash: {error_output[:200]}"
861 )
862 # Re-stash any local changes that appeared during pull
863 if self._stash_local_changes():
864 self.logger.info("Re-stashed local changes after pull conflict")
865 had_local_changes = True
866 # For other pull failures, continue - merge will handle or fail
868 # Safety check: ensure no unmerged files before merge attempt
869 # This catches edge cases where previous operations left dirty state
870 if not self._check_and_recover_index():
871 raise RuntimeError(
872 "Git index has unresolved conflicts before merge - recovery failed"
873 )
875 # Attempt merge with no-ff
876 merge_result = self._git_lock.run(
877 [
878 "merge",
879 result.branch_name,
880 "--no-ff",
881 "-m",
882 f"feat: parallel merge {result.issue_id}\n\n"
883 f"Automated merge from parallel issue processing.",
884 ],
885 cwd=self.repo_path,
886 timeout=60,
887 )
889 if merge_result.returncode != 0:
890 error_output = merge_result.stderr + merge_result.stdout
892 # Check for local changes error (shouldn't happen after stash)
893 if self._is_local_changes_error(error_output):
894 self.logger.warning(
895 f"Merge blocked by local changes despite stash: {error_output[:200]}"
896 )
897 raise RuntimeError(f"Merge failed due to local changes: {error_output[:200]}")
899 # Check for untracked files blocking merge
900 if self._is_untracked_files_error(error_output):
901 self._handle_untracked_conflict(request, error_output)
902 return
904 # Check for merge conflict (including unmerged files from current merge)
905 # Unmerged files at this point are genuine conflicts from the current merge
906 # attempt, not leftover state from previous operations (those are cleaned up
907 # by _check_and_recover_index() at the start of _process_merge)
908 if self._is_unmerged_files_error(error_output) or "CONFLICT" in error_output:
909 self._handle_conflict(request, used_merge_strategy)
910 return
911 else:
912 raise RuntimeError(f"Merge failed: {merge_result.stderr}")
914 # Merge successful
915 self._finalize_merge(request)
917 except Exception as e:
918 self._handle_failure(request, str(e))
920 finally:
921 # Always restore stashed changes
922 if had_local_changes:
923 self._pop_stash()
924 # Always restore state file tracking
925 self._restore_state_file_tracking()
926 # Clear current issue tracking
927 with self._lock:
928 self._current_issue_id = None
930 def _handle_conflict(self, request: MergeRequest, used_merge_strategy: bool = False) -> None:
931 """Handle a merge conflict with retry logic.
933 Args:
934 request: The merge request that conflicted
935 used_merge_strategy: If True, merge strategy was used during pull and
936 rebase retry should be skipped (rebase would fail on same conflicts)
937 """
938 result = request.worker_result
939 request.retry_count += 1
941 # Abort the failed merge
942 self._git_lock.run(
943 ["merge", "--abort"],
944 cwd=self.repo_path,
945 timeout=10,
946 )
948 # Skip rebase retry if merge strategy was used during pull (BUG-079)
949 # Rebase would fail on the same commits that caused the initial conflict
950 if used_merge_strategy:
951 self.logger.warning(
952 f"Merge conflict for {result.issue_id}, "
953 f"skipping rebase retry (merge strategy was used during pull)"
954 )
955 self._handle_failure(
956 request,
957 "Merge conflict - rebase not attempted (would fail on same conflicts "
958 "that required merge strategy)",
959 )
960 return
962 if request.retry_count <= self.config.max_merge_retries:
963 # Attempt rebase in the worktree
964 self.logger.warning(
965 f"Merge conflict for {result.issue_id}, "
966 f"attempting rebase (retry {request.retry_count}/{self.config.max_merge_retries})"
967 )
969 request.status = MergeStatus.RETRYING
971 # Check for and stash any unstaged changes in the worktree before rebase
972 worktree_status = subprocess.run(
973 ["git", "status", "--porcelain"],
974 cwd=result.worktree_path,
975 capture_output=True,
976 text=True,
977 timeout=30,
978 )
979 worktree_has_changes = bool(worktree_status.stdout.strip())
981 if worktree_has_changes:
982 self.logger.debug(
983 f"Stashing worktree changes before rebase: {worktree_status.stdout[:200]}"
984 )
985 stash_result = subprocess.run(
986 ["git", "stash", "push", "-m", "ll-parallel: auto-stash before rebase"],
987 cwd=result.worktree_path,
988 capture_output=True,
989 text=True,
990 timeout=30,
991 )
992 if stash_result.returncode != 0:
993 self.logger.warning(f"Failed to stash worktree changes: {stash_result.stderr}")
995 # Fetch latest base branch before rebase (BUG-180)
996 # Use origin/base if fetch succeeds, fall back to base if no remote
997 base = self.config.base_branch
998 fetch_result = subprocess.run(
999 ["git", "fetch", "origin", base],
1000 cwd=result.worktree_path,
1001 capture_output=True,
1002 text=True,
1003 timeout=60,
1004 )
1005 rebase_target = f"origin/{base}" if fetch_result.returncode == 0 else base
1007 # Rebase the branch onto latest base branch (BUG-180)
1008 rebase_result = subprocess.run(
1009 ["git", "rebase", rebase_target],
1010 cwd=result.worktree_path,
1011 capture_output=True,
1012 text=True,
1013 timeout=120,
1014 )
1016 if rebase_result.returncode == 0:
1017 # Rebase succeeded, restore stash if we made one, then retry merge
1018 if worktree_has_changes:
1019 pop_result = subprocess.run(
1020 ["git", "stash", "pop"],
1021 cwd=result.worktree_path,
1022 capture_output=True,
1023 text=True,
1024 timeout=30,
1025 )
1026 if pop_result.returncode != 0:
1027 self.logger.error(
1028 f"Stash pop failed for {result.issue_id} after rebase: "
1029 f"{pop_result.stderr.strip()}"
1030 )
1031 self._handle_failure(request, "Stash pop conflict after rebase")
1032 return
1033 self._queue.put(request)
1034 else:
1035 # Rebase also failed - abort and restore stash
1036 subprocess.run(
1037 ["git", "rebase", "--abort"],
1038 cwd=result.worktree_path,
1039 capture_output=True,
1040 timeout=10,
1041 )
1042 if worktree_has_changes:
1043 subprocess.run(
1044 ["git", "stash", "pop"],
1045 cwd=result.worktree_path,
1046 capture_output=True,
1047 timeout=30,
1048 )
1049 self._handle_failure(
1050 request,
1051 f"Rebase failed after merge conflict: {rebase_result.stderr}",
1052 )
1053 else:
1054 self._handle_failure(
1055 request,
1056 f"Merge conflict after {request.retry_count} retries",
1057 )
1059 def _handle_untracked_conflict(self, request: MergeRequest, error_output: str) -> None:
1060 """Handle untracked files that would be overwritten by merge.
1062 Backs up conflicting untracked files and retries the merge.
1064 Args:
1065 request: The merge request that failed
1066 error_output: Git error message containing file list
1067 """
1068 result = request.worker_result
1069 request.retry_count += 1
1071 if request.retry_count > self.config.max_merge_retries:
1072 self._handle_failure(
1073 request,
1074 f"Untracked file conflict after {request.retry_count} retries",
1075 )
1076 return
1078 # Parse conflicting files from error message
1079 # Format: "error: The following untracked working tree files would be overwritten..."
1080 # followed by file paths, then "Please move or remove them..."
1081 conflicting_files = []
1082 in_file_list = False
1083 for line in error_output.splitlines():
1084 line = line.strip()
1085 if "untracked working tree files would be overwritten" in line:
1086 in_file_list = True
1087 continue
1088 if "Please move or remove them" in line:
1089 in_file_list = False
1090 continue
1091 if in_file_list and line and not line.startswith("error:"):
1092 conflicting_files.append(line)
1094 if not conflicting_files:
1095 self._handle_failure(
1096 request,
1097 f"Could not parse conflicting files from: {error_output[:200]}",
1098 )
1099 return
1101 # Create backup directory
1102 backup_dir = self.repo_path / ".ll-backup" / result.issue_id
1103 backup_dir.mkdir(parents=True, exist_ok=True)
1105 # Move conflicting files to backup
1106 moved_files = []
1107 for file_path in conflicting_files:
1108 src = self.repo_path / file_path
1109 if src.exists():
1110 dst = backup_dir / file_path
1111 dst.parent.mkdir(parents=True, exist_ok=True)
1112 shutil.move(str(src), str(dst))
1113 moved_files.append(file_path)
1115 if moved_files:
1116 self.logger.info(
1117 f"Backed up {len(moved_files)} conflicting untracked file(s) to {backup_dir}"
1118 )
1120 # Retry the merge
1121 self.logger.warning(
1122 f"Untracked files conflict for {result.issue_id}, "
1123 f"retrying after backup (attempt {request.retry_count}/{self.config.max_merge_retries})"
1124 )
1125 request.status = MergeStatus.RETRYING
1126 self._queue.put(request)
1128 def _finalize_merge(self, request: MergeRequest) -> None:
1129 """Finalize a successful merge.
1131 Args:
1132 request: The completed merge request
1133 """
1134 result = request.worker_result
1135 request.status = MergeStatus.SUCCESS
1137 # Reset circuit breaker on success
1138 self._consecutive_failures = 0
1140 with self._lock:
1141 self._merged.append(result.issue_id)
1143 # Cleanup worktree and branch
1144 self._cleanup_worktree(result.worktree_path, result.branch_name)
1146 self.logger.success(f"Merged {result.issue_id} successfully")
1148 def _handle_failure(self, request: MergeRequest, error: str) -> None:
1149 """Handle a merge failure.
1151 Args:
1152 request: The failed merge request
1153 error: Error message describing the failure
1154 """
1155 result = request.worker_result
1156 request.status = MergeStatus.FAILED
1157 request.error = error
1159 with self._lock:
1160 self._failed[result.issue_id] = error
1162 self.logger.error(f"Merge failed for {result.issue_id}: {error}")
1164 def _cleanup_worktree(self, worktree_path: Path, branch_name: str) -> None:
1165 """Clean up a merged worktree and its branch.
1167 Args:
1168 worktree_path: Path to the worktree
1169 branch_name: Name of the branch to delete
1170 """
1171 if not worktree_path.exists():
1172 return
1174 # Remove worktree
1175 self._git_lock.run(
1176 ["worktree", "remove", "--force", str(worktree_path)],
1177 cwd=self.repo_path,
1178 timeout=30,
1179 )
1181 # Force delete directory if still exists
1182 if worktree_path.exists():
1183 shutil.rmtree(worktree_path, ignore_errors=True)
1185 # Delete the branch
1186 if branch_name.startswith("parallel/"):
1187 self._git_lock.run(
1188 ["branch", "-D", branch_name],
1189 cwd=self.repo_path,
1190 timeout=10,
1191 )
1193 @property
1194 def merged_ids(self) -> list[str]:
1195 """List of successfully merged issue IDs."""
1196 with self._lock:
1197 return list(self._merged)
1199 @property
1200 def failed_merges(self) -> dict[str, str]:
1201 """Mapping of failed issue IDs to error messages."""
1202 with self._lock:
1203 return dict(self._failed)
1205 @property
1206 def pending_count(self) -> int:
1207 """Number of pending merge requests."""
1208 return self._queue.qsize()
1210 @property
1211 def stash_pop_failures(self) -> dict[str, str]:
1212 """Mapping of issue IDs to stash pop failure messages.
1214 These represent issues where the merge succeeded but the user's
1215 local changes could not be automatically restored and need manual
1216 recovery via 'git stash pop'.
1217 """
1218 with self._lock:
1219 return dict(self._stash_pop_failures)
1221 def wait_for_completion(self, timeout: float | None = None) -> bool:
1222 """Wait for all pending merges to complete.
1224 Waits until both:
1225 1. The merge queue is empty (no pending requests)
1226 2. No merge is actively being processed (_current_issue_id is None)
1228 Args:
1229 timeout: Maximum time to wait (None = forever)
1231 Returns:
1232 True if all merges completed, False if timeout
1233 """
1234 start_time = time.time()
1235 while True:
1236 with self._lock:
1237 active = self._current_issue_id
1238 if self._queue.empty() and not active:
1239 return True
1240 if timeout and (time.time() - start_time) > timeout:
1241 return False
1242 time.sleep(0.5)