Coverage for little_loops / parallel / merge_coordinator.py: 10%

451 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""Merge coordinator for sequential integration of parallel worker changes. 

2 

3Handles merging completed worker branches back to main with conflict detection 

4and automatic retry capability. 

5""" 

6 

7from __future__ import annotations 

8 

9import shutil 

10import subprocess 

11import threading 

12import time 

13from pathlib import Path 

14from queue import Empty, Queue 

15from typing import TYPE_CHECKING 

16 

17from little_loops.parallel.git_lock import GitLock 

18from little_loops.parallel.types import ( 

19 MergeRequest, 

20 MergeStatus, 

21 ParallelConfig, 

22 WorkerResult, 

23) 

24 

25if TYPE_CHECKING: 

26 from little_loops.logger import Logger 

27 

28 

29class MergeCoordinator: 

30 """Sequential merge queue with conflict handling. 

31 

32 Processes merge requests one at a time to avoid conflicts. Supports 

33 automatic rebase and retry on merge failures. Handles uncommitted local 

34 changes by stashing them before merge operations. 

35 

36 Example: 

37 >>> coordinator = MergeCoordinator(config, logger, repo_path) 

38 >>> coordinator.start() 

39 >>> coordinator.queue_merge(worker_result) 

40 >>> # ... later ... 

41 >>> coordinator.shutdown() 

42 """ 

43 

44 def __init__( 

45 self, 

46 config: ParallelConfig, 

47 logger: Logger, 

48 repo_path: Path | None = None, 

49 git_lock: GitLock | None = None, 

50 ) -> None: 

51 """Initialize the merge coordinator. 

52 

53 Args: 

54 config: Parallel processing configuration 

55 logger: Logger for merge output 

56 repo_path: Path to the git repository (default: current directory) 

57 git_lock: Shared lock for git operations (created if not provided) 

58 """ 

59 self.config = config 

60 self.logger = logger 

61 self.repo_path = repo_path or Path.cwd() 

62 self._git_lock = git_lock or GitLock(logger) 

63 self._queue: Queue[MergeRequest] = Queue() 

64 self._thread: threading.Thread | None = None 

65 self._shutdown_event = threading.Event() 

66 self._merged: list[str] = [] 

67 self._failed: dict[str, str] = {} 

68 self._lock = threading.Lock() 

69 self._stash_active = False # Track if we have an active stash 

70 self._consecutive_failures = 0 # Circuit breaker counter 

71 self._paused = False # Set when circuit breaker trips 

72 self._assume_unchanged_active = False # Track if state file is marked assume-unchanged 

73 self._stash_pop_failures: dict[str, str] = {} # issue_id -> failure message 

74 self._current_issue_id: str | None = ( 

75 None # Track current issue for stash failure attribution 

76 ) 

77 self._problematic_commits: set[str] = ( 

78 set() 

79 ) # Track commits causing repeated rebase conflicts 

80 

81 def start(self) -> None: 

82 """Start the merge coordinator background thread.""" 

83 if self._thread is not None and self._thread.is_alive(): 

84 return 

85 

86 self._shutdown_event.clear() 

87 self._thread = threading.Thread( 

88 target=self._merge_loop, 

89 name="merge-coordinator", 

90 daemon=True, 

91 ) 

92 self._thread.start() 

93 self.logger.info("Merge coordinator started") 

94 

95 def shutdown(self, wait: bool = True, timeout: float = 30.0) -> None: 

96 """Shutdown the merge coordinator. 

97 

98 Args: 

99 wait: Whether to wait for pending merges to complete 

100 timeout: Maximum time to wait for shutdown 

101 """ 

102 if self._thread is None: 

103 return 

104 

105 self._shutdown_event.set() 

106 

107 if wait and self._thread.is_alive(): 

108 self._thread.join(timeout=timeout) 

109 

110 self._thread = None 

111 self.logger.info("Merge coordinator stopped") 

112 

113 def queue_merge(self, worker_result: WorkerResult) -> None: 

114 """Queue a worker result for merging. 

115 

116 Args: 

117 worker_result: Result from a completed worker 

118 """ 

119 request = MergeRequest(worker_result=worker_result) 

120 self._queue.put(request) 

121 self.logger.info( 

122 f"Queued merge for {worker_result.issue_id} (branch: {worker_result.branch_name})" 

123 ) 

124 

125 def _stash_local_changes(self) -> bool: 

126 """Stash any uncommitted tracked changes in the main repo. 

127 

128 Only stashes tracked file modifications. Untracked files are not stashed 

129 because git stash pathspec exclusions don't work reliably with -u flag. 

130 Untracked file conflicts during merge are handled by _handle_untracked_conflict. 

131 

132 The following are explicitly excluded from stashing: 

133 1. State file - managed by orchestrator and continuously updated 

134 2. Lifecycle file moves - issue files being moved to completed/ directory 

135 3. Files in completed directory - lifecycle-managed files 

136 4. Claude Code context state file - managed by Claude Code externally 

137 

138 These exclusions prevent stash pop conflicts after merge, since the merge 

139 may change HEAD and create conflicts with stashed rename operations. 

140 

141 Returns: 

142 True if changes were stashed, False if working tree was clean 

143 """ 

144 state_file_path = Path(self.config.state_file) 

145 state_file_str = str(state_file_path) 

146 state_file_name = state_file_path.name 

147 

148 # Check if there are any tracked changes to stash. 

149 # We only look at tracked files (exclude untracked with grep -v '??') 

150 # since we can only reliably stash tracked changes. 

151 status_result = self._git_lock.run( 

152 ["status", "--porcelain"], 

153 cwd=self.repo_path, 

154 timeout=30, 

155 ) 

156 

157 # Filter to only tracked changes (lines not starting with ??) 

158 # and exclude orchestrator-managed files to prevent stash pop conflicts 

159 tracked_changes = [] 

160 files_to_stash = [] 

161 # Note: Don't use .strip() on the full output - it removes leading spaces 

162 # from the first line which are significant in git status porcelain format 

163 for line in status_result.stdout.splitlines(): 

164 if not line or line.startswith("??"): 

165 continue 

166 # Skip lifecycle file moves (issue files moved to completed/) 

167 # These are managed by orchestrator and cause stash pop conflicts 

168 if self._is_lifecycle_file_move(line): 

169 self.logger.debug(f"Skipping lifecycle file move from stash: {line}") 

170 continue 

171 # Extract file path from porcelain format (XY filename or XY -> filename for renames) 

172 # Format: XY filename or XY old -> new (XY is exactly 2 chars + 1 space) 

173 file_path = line[3:].split(" -> ")[-1].strip() 

174 if file_path == state_file_str or file_path.endswith(state_file_name): 

175 continue # Skip state file - orchestrator manages it independently 

176 # Skip files in completed/deferred directory - these are lifecycle-managed 

177 # Handle both .issues/completed/ (with dot) and issues/completed/ (without dot) 

178 if ( 

179 ".issues/completed/" in file_path 

180 or file_path.startswith(".issues/completed/") 

181 or "issues/completed/" in file_path 

182 or file_path.startswith("issues/completed/") 

183 or ".issues/deferred/" in file_path 

184 or file_path.startswith(".issues/deferred/") 

185 or "issues/deferred/" in file_path 

186 or file_path.startswith("issues/deferred/") 

187 ): 

188 self.logger.debug( 

189 f"Skipping completed/deferred directory file from stash: {file_path}" 

190 ) 

191 continue 

192 # Skip Claude Code context state file - managed externally 

193 if file_path.endswith("ll-context-state.json"): 

194 self.logger.debug(f"Skipping Claude context state file from stash: {file_path}") 

195 continue 

196 tracked_changes.append(line) 

197 files_to_stash.append(file_path) 

198 

199 if not files_to_stash: 

200 return False # No tracked changes to stash (excluding state file) 

201 

202 # Log files to be stashed for debugging 

203 self.logger.debug(f"Tracked files to stash: {tracked_changes[:10]}") 

204 

205 # Stash only specific files, explicitly excluding the state file. 

206 # Using explicit file list avoids race conditions where the orchestrator 

207 # modifies the state file between a checkout and stash-all operation. 

208 # Note: gitignored files are never stashed anyway. 

209 stash_result = self._git_lock.run( 

210 [ 

211 "stash", 

212 "push", 

213 "-m", 

214 "ll-parallel: auto-stash before merge", 

215 "--", 

216 *files_to_stash, 

217 ], 

218 cwd=self.repo_path, 

219 timeout=30, 

220 ) 

221 

222 if stash_result.returncode == 0: 

223 self._stash_active = True 

224 self.logger.info("Stashed local changes before merge") 

225 return True 

226 

227 self.logger.error(f"Failed to stash local changes: {stash_result.stderr}") 

228 return False 

229 

230 def _pop_stash(self) -> bool: 

231 """Restore stashed changes if any were stashed. 

232 

233 Important: This method preserves the merge even if stash pop fails. 

234 We never reset --hard HEAD here because that would undo a successful merge. 

235 

236 Returns: 

237 True if stash was successfully popped or no stash was active, 

238 False if pop failed (stash is left for manual recovery). 

239 """ 

240 if not self._stash_active: 

241 return True 

242 

243 pop_result = self._git_lock.run( 

244 ["stash", "pop"], 

245 cwd=self.repo_path, 

246 timeout=30, 

247 ) 

248 

249 if pop_result.returncode != 0: 

250 self.logger.warning(f"Failed to pop stash: {pop_result.stderr.strip()}") 

251 

252 # Check if it's a conflict issue - in that case, stash pop may have 

253 # partially applied. We need to clean up the index but preserve the merge. 

254 status_result = self._git_lock.run( 

255 ["status", "--porcelain"], 

256 cwd=self.repo_path, 

257 timeout=30, 

258 ) 

259 

260 # Check for unmerged entries from the stash pop attempt 

261 unmerged_prefixes = ("UU", "AA", "DD", "AU", "UA", "DU", "UD") 

262 has_unmerged = any( 

263 line[:2] in unmerged_prefixes 

264 for line in status_result.stdout.splitlines() 

265 if len(line) >= 2 

266 ) 

267 

268 if has_unmerged: 

269 # Clean up the conflicted stash pop without affecting the merge 

270 # Use checkout to restore conflicted files to their post-merge state 

271 self._git_lock.run( 

272 ["checkout", "--theirs", "."], 

273 cwd=self.repo_path, 

274 timeout=30, 

275 ) 

276 self._git_lock.run( 

277 ["reset", "HEAD"], 

278 cwd=self.repo_path, 

279 timeout=30, 

280 ) 

281 self.logger.info("Cleaned up conflicted stash pop, merge preserved") 

282 

283 # Leave the stash intact for manual recovery 

284 self._stash_active = False 

285 

286 # Record this failure for reporting in final summary 

287 if self._current_issue_id: 

288 with self._lock: 

289 self._stash_pop_failures[self._current_issue_id] = ( 

290 "Local changes could not be restored after merge. " 

291 "Run 'git stash list' and 'git stash pop' to recover manually." 

292 ) 

293 

294 self.logger.warning( 

295 "Stash could not be restored - your changes are saved in 'git stash list'. " 

296 "Run 'git stash show' to view and 'git stash pop' to retry manually." 

297 ) 

298 return False 

299 

300 self._stash_active = False 

301 self.logger.info("Restored stashed local changes") 

302 return True 

303 

304 def _mark_state_file_assume_unchanged(self) -> bool: 

305 """Mark the state file as assume-unchanged to prevent git from seeing modifications. 

306 

307 This allows git pull --rebase to proceed even when the state file is modified, 

308 since the orchestrator continuously updates it during processing. 

309 

310 Returns: 

311 True if successfully marked, False otherwise 

312 """ 

313 state_file = str(self.config.state_file) 

314 

315 # Check if file exists and is tracked 

316 ls_files = self._git_lock.run( 

317 ["ls-files", state_file], 

318 cwd=self.repo_path, 

319 timeout=10, 

320 ) 

321 

322 if not ls_files.stdout.strip(): 

323 # File not tracked, nothing to do 

324 return True 

325 

326 result = self._git_lock.run( 

327 ["update-index", "--assume-unchanged", state_file], 

328 cwd=self.repo_path, 

329 timeout=10, 

330 ) 

331 

332 if result.returncode == 0: 

333 self._assume_unchanged_active = True 

334 self.logger.debug(f"Marked {state_file} as assume-unchanged") 

335 return True 

336 

337 self.logger.warning(f"Failed to mark state file assume-unchanged: {result.stderr}") 

338 return False 

339 

340 def _restore_state_file_tracking(self) -> bool: 

341 """Restore normal tracking for the state file. 

342 

343 Returns: 

344 True if successfully restored, False otherwise 

345 """ 

346 if not self._assume_unchanged_active: 

347 return True 

348 

349 state_file = str(self.config.state_file) 

350 

351 result = self._git_lock.run( 

352 ["update-index", "--no-assume-unchanged", state_file], 

353 cwd=self.repo_path, 

354 timeout=10, 

355 ) 

356 

357 self._assume_unchanged_active = False 

358 

359 if result.returncode != 0: 

360 self.logger.warning(f"Failed to restore state file tracking: {result.stderr}") 

361 return False 

362 

363 self.logger.debug(f"Restored tracking for {state_file}") 

364 return True 

365 

366 def _is_lifecycle_file_move(self, porcelain_line: str) -> bool: 

367 """Check if a porcelain status line represents a lifecycle file move. 

368 

369 Lifecycle file moves are issue files being moved to completed/ directory. 

370 These are managed by the orchestrator and should not be stashed, as they 

371 will conflict with the merge when popping. 

372 

373 Args: 

374 porcelain_line: A line from `git status --porcelain` output 

375 

376 Returns: 

377 True if this is a lifecycle file move that should be excluded from stash 

378 """ 

379 # Rename entries have format: R old_path -> new_path 

380 if not porcelain_line.startswith("R"): 

381 return False 

382 

383 # Check if it's a move to completed/ directory 

384 if " -> " not in porcelain_line: 

385 return False 

386 

387 # Extract destination path (after " -> ") 

388 parts = porcelain_line[3:].split(" -> ") 

389 if len(parts) != 2: 

390 return False 

391 

392 dest_path = parts[1].strip() 

393 

394 # Check if destination is in completed or deferred directory (with or without dot prefix) 

395 return ( 

396 ".issues/completed/" in dest_path 

397 or dest_path.startswith(".issues/completed/") 

398 or "issues/completed/" in dest_path 

399 or dest_path.startswith("issues/completed/") 

400 or ".issues/deferred/" in dest_path 

401 or dest_path.startswith(".issues/deferred/") 

402 or "issues/deferred/" in dest_path 

403 or dest_path.startswith("issues/deferred/") 

404 ) 

405 

406 def _commit_pending_lifecycle_moves(self) -> bool: 

407 """Commit any uncommitted lifecycle file moves. 

408 

409 Lifecycle file moves (issue files moved to completed/) are excluded from 

410 stashing to prevent stash pop conflicts. However, if they remain uncommitted 

411 when a merge starts, they will block the merge. This method commits any such 

412 pending moves before the merge proceeds. 

413 

414 Returns: 

415 True if any lifecycle moves were committed or none existed, 

416 False if commit failed 

417 """ 

418 # Check for lifecycle file moves in git status 

419 status_result = self._git_lock.run( 

420 ["status", "--porcelain"], 

421 cwd=self.repo_path, 

422 timeout=30, 

423 ) 

424 

425 lifecycle_moves = [] 

426 for line in status_result.stdout.splitlines(): 

427 if self._is_lifecycle_file_move(line): 

428 lifecycle_moves.append(line) 

429 

430 if not lifecycle_moves: 

431 return True 

432 

433 self.logger.info( 

434 f"Found {len(lifecycle_moves)} uncommitted lifecycle file move(s), committing..." 

435 ) 

436 

437 # Stage all changes (lifecycle moves should already be staged from git mv, 

438 # but add -A ensures any associated content changes are included) 

439 self._git_lock.run( 

440 ["add", "-A"], 

441 cwd=self.repo_path, 

442 timeout=30, 

443 ) 

444 

445 # Commit the lifecycle moves 

446 commit_result = self._git_lock.run( 

447 [ 

448 "commit", 

449 "-m", 

450 "chore(issues): commit pending lifecycle file moves\n\n" 

451 "Auto-committed before merge to prevent conflicts.", 

452 ], 

453 cwd=self.repo_path, 

454 timeout=30, 

455 ) 

456 

457 if commit_result.returncode != 0: 

458 if "nothing to commit" in commit_result.stdout.lower(): 

459 # This shouldn't happen since we detected moves, but handle gracefully 

460 self.logger.debug("No changes to commit despite detecting lifecycle moves") 

461 return True 

462 self.logger.error(f"Failed to commit lifecycle moves: {commit_result.stderr}") 

463 return False 

464 

465 self.logger.info("Committed pending lifecycle file moves") 

466 return True 

467 

468 def _is_local_changes_error(self, error_output: str) -> bool: 

469 """Check if the error is due to uncommitted local changes. 

470 

471 Args: 

472 error_output: The stderr/stdout from the failed git command 

473 

474 Returns: 

475 True if the error indicates local changes would be overwritten 

476 """ 

477 indicators = [ 

478 "Your local changes to the following files would be overwritten", 

479 "Please commit your changes or stash them before you merge", 

480 "error: cannot pull with rebase: You have unstaged changes", 

481 ] 

482 return any(indicator in error_output for indicator in indicators) 

483 

484 def _is_untracked_files_error(self, error_output: str) -> bool: 

485 """Check if the error is due to untracked files blocking merge. 

486 

487 Args: 

488 error_output: The stderr/stdout from the failed git command 

489 

490 Returns: 

491 True if the error indicates untracked files would be overwritten 

492 """ 

493 indicators = [ 

494 "untracked working tree files would be overwritten by merge", 

495 "Please move or remove them before you merge", 

496 ] 

497 return any(indicator in error_output for indicator in indicators) 

498 

499 def _is_index_error(self, error_output: str) -> bool: 

500 """Check if the error is due to a corrupted git index. 

501 

502 Args: 

503 error_output: The stderr/stdout from the failed git command 

504 

505 Returns: 

506 True if the error indicates index problems 

507 """ 

508 indicators = [ 

509 "you need to resolve your current index first", 

510 "fatal: cannot do a partial commit during a merge", 

511 "error: you have not concluded your merge", 

512 ] 

513 return any(indicator in error_output for indicator in indicators) 

514 

515 def _is_rebase_in_progress(self) -> bool: 

516 """Check if a rebase is currently in progress. 

517 

518 Returns: 

519 True if rebase is in progress (rebase-merge or rebase-apply exists) 

520 """ 

521 rebase_merge = self.repo_path / ".git" / "rebase-merge" 

522 rebase_apply = self.repo_path / ".git" / "rebase-apply" 

523 return rebase_merge.exists() or rebase_apply.exists() 

524 

525 def _abort_rebase_if_in_progress(self) -> bool: 

526 """Abort any in-progress rebase operation. 

527 

528 Returns: 

529 True if rebase was aborted or none was in progress, 

530 False if abort failed 

531 """ 

532 if not self._is_rebase_in_progress(): 

533 return True 

534 

535 self.logger.warning("Detected rebase in progress, aborting...") 

536 abort_result = self._git_lock.run( 

537 ["rebase", "--abort"], 

538 cwd=self.repo_path, 

539 timeout=30, 

540 ) 

541 

542 if abort_result.returncode != 0: 

543 self.logger.error(f"Failed to abort rebase: {abort_result.stderr}") 

544 # Force hard reset as last resort 

545 return self._attempt_hard_reset() 

546 

547 self.logger.info("Aborted incomplete rebase from pull") 

548 return True 

549 

550 def _is_unmerged_files_error(self, error_output: str) -> bool: 

551 """Check if the error is due to pre-existing unmerged files. 

552 

553 Args: 

554 error_output: The stderr/stdout from the failed git command 

555 

556 Returns: 

557 True if the error indicates unmerged files blocking the operation 

558 """ 

559 indicators = [ 

560 "you have unmerged files", 

561 "Merging is not possible because you have unmerged files", 

562 "fix conflicts and then commit the result", 

563 ] 

564 return any(indicator in error_output for indicator in indicators) 

565 

566 def _detect_conflict_commit(self, error_output: str) -> str | None: 

567 """Extract commit hash from rebase conflict output. 

568 

569 Looks for patterns like: 

570 - "dropping ae3b85ec1cac501058f6e5da362be37be1c99801 feat(ai): add stall detectio" 

571 

572 Args: 

573 error_output: The stderr/stdout from the failed git pull --rebase 

574 

575 Returns: 

576 The 40-character commit hash if found, None otherwise 

577 """ 

578 import re 

579 

580 # Pattern: "dropping <40-char-hash>" followed by space and message 

581 # Match only full 40-char hashes to avoid false positives 

582 match = re.search(r"dropping\s+([a-f0-9]{40})\s+", error_output, re.IGNORECASE) 

583 return match.group(1) if match else None 

584 

585 def _check_and_recover_index(self) -> bool: 

586 """Check git index health and attempt recovery if needed. 

587 

588 Returns: 

589 True if index is healthy or was recovered, False if unrecoverable 

590 """ 

591 # Check if we're in the middle of a merge 

592 merge_head = self.repo_path / ".git" / "MERGE_HEAD" 

593 if merge_head.exists(): 

594 self.logger.warning("Detected incomplete merge, aborting...") 

595 abort_result = self._git_lock.run( 

596 ["merge", "--abort"], 

597 cwd=self.repo_path, 

598 timeout=30, 

599 ) 

600 if abort_result.returncode != 0: 

601 self.logger.error(f"Failed to abort merge: {abort_result.stderr}") 

602 return False 

603 self.logger.info("Aborted incomplete merge") 

604 

605 # Check if we're in the middle of a rebase 

606 rebase_dir = self.repo_path / ".git" / "rebase-merge" 

607 rebase_apply = self.repo_path / ".git" / "rebase-apply" 

608 if rebase_dir.exists() or rebase_apply.exists(): 

609 self.logger.warning("Detected incomplete rebase, aborting...") 

610 abort_result = self._git_lock.run( 

611 ["rebase", "--abort"], 

612 cwd=self.repo_path, 

613 timeout=30, 

614 ) 

615 if abort_result.returncode != 0: 

616 self.logger.error(f"Failed to abort rebase: {abort_result.stderr}") 

617 return False 

618 self.logger.info("Aborted incomplete rebase") 

619 # Force reset after rebase abort - the abort can leave index in dirty state 

620 # This is defensive since unmerged file detection below may not trigger 

621 if not self._attempt_hard_reset(): 

622 return False 

623 

624 # Check for unmerged files in the index (UU, AA, DD, AU, UA, DU, UD prefixes) 

625 # These can persist even after merge --abort in some edge cases 

626 status_result = self._git_lock.run( 

627 ["status", "--porcelain"], 

628 cwd=self.repo_path, 

629 timeout=30, 

630 ) 

631 

632 if status_result.returncode != 0: 

633 self.logger.error(f"git status failed: {status_result.stderr}") 

634 return self._attempt_hard_reset() 

635 

636 # Debug logging to diagnose unmerged detection issues 

637 if status_result.stdout.strip(): 

638 self.logger.debug(f"Git status output: {status_result.stdout[:500]}") 

639 

640 # Check for unmerged entries (first two chars indicate index/worktree status) 

641 # Unmerged states: UU (both modified), AA (both added), DD (both deleted), 

642 # AU/UA (added by us/them), DU/UD (deleted by us/them) 

643 unmerged_prefixes = ("UU", "AA", "DD", "AU", "UA", "DU", "UD") 

644 has_unmerged = any( 

645 line[:2] in unmerged_prefixes 

646 for line in status_result.stdout.splitlines() 

647 if len(line) >= 2 

648 ) 

649 

650 if has_unmerged: 

651 self.logger.warning("Detected unmerged files in index, resetting...") 

652 if not self._attempt_hard_reset(): 

653 return False 

654 self.logger.info("Cleared unmerged files from index") 

655 

656 # Final safety check - if MERGE_HEAD still exists, force reset 

657 # This catches edge cases where abort succeeded but state is still dirty 

658 if merge_head.exists(): 

659 self.logger.warning("MERGE_HEAD persists after recovery attempts, forcing reset") 

660 if not self._attempt_hard_reset(): 

661 return False 

662 

663 return True 

664 

665 def _attempt_hard_reset(self) -> bool: 

666 """Attempt a hard reset to recover from index issues. 

667 

668 Returns: 

669 True if reset succeeded, False otherwise 

670 """ 

671 self.logger.warning("Attempting hard reset to recover...") 

672 reset_result = self._git_lock.run( 

673 ["reset", "--hard", "HEAD"], 

674 cwd=self.repo_path, 

675 timeout=30, 

676 ) 

677 if reset_result.returncode != 0: 

678 self.logger.error("Hard reset failed - manual intervention required") 

679 return False 

680 self.logger.info("Hard reset successful") 

681 return True 

682 

683 def _merge_loop(self) -> None: 

684 """Background thread loop for processing merge requests.""" 

685 while not self._shutdown_event.is_set(): 

686 try: 

687 # Wait for next merge request with timeout 

688 try: 

689 request = self._queue.get(timeout=1.0) 

690 except Empty: 

691 continue 

692 

693 # Process the merge 

694 self._process_merge(request) 

695 

696 except Exception as e: 

697 self.logger.error(f"Merge loop error: {e}") 

698 time.sleep(1.0) 

699 

700 def _process_merge(self, request: MergeRequest) -> None: 

701 """Process a single merge request. 

702 

703 Stashes any uncommitted local changes before attempting the merge, 

704 and restores them afterward (regardless of success/failure). 

705 

706 Args: 

707 request: Merge request to process 

708 """ 

709 result = request.worker_result 

710 with self._lock: 

711 self._current_issue_id = result.issue_id 

712 self.logger.info(f"Processing merge for {result.issue_id}") 

713 had_local_changes = False 

714 

715 try: 

716 # Circuit breaker check 

717 if self._paused: 

718 self.logger.warning( 

719 f"Merge coordinator paused due to repeated failures. " 

720 f"Skipping {result.issue_id}. Manual intervention required." 

721 ) 

722 self._handle_failure(request, "Merge coordinator paused - circuit breaker tripped") 

723 return 

724 

725 request.status = MergeStatus.IN_PROGRESS 

726 

727 # Health check: ensure git index is clean before proceeding 

728 if not self._check_and_recover_index(): 

729 self._consecutive_failures += 1 

730 if self._consecutive_failures >= 3: 

731 self._paused = True 

732 self.logger.error( 

733 f"Circuit breaker tripped after {self._consecutive_failures} consecutive failures. " 

734 "Merge coordinator paused. Manual recovery required." 

735 ) 

736 self._handle_failure(request, "Git index recovery failed") 

737 return 

738 

739 # Mark state file as assume-unchanged to prevent pull --rebase conflicts 

740 # The orchestrator continuously updates the state file during processing 

741 self._mark_state_file_assume_unchanged() 

742 

743 # Commit any uncommitted lifecycle file moves before stash/merge 

744 # These are excluded from stash to prevent pop conflicts, so they must 

745 # be committed to avoid blocking the merge (BUG-018 fix) 

746 if not self._commit_pending_lifecycle_moves(): 

747 self._handle_failure(request, "Failed to commit pending lifecycle moves") 

748 return 

749 

750 # Stash any local changes before merge operations 

751 had_local_changes = self._stash_local_changes() 

752 

753 # Ensure we're on base branch in the main repo 

754 base = self.config.base_branch 

755 checkout_result = self._git_lock.run( 

756 ["checkout", base], 

757 cwd=self.repo_path, 

758 timeout=30, 

759 ) 

760 

761 if checkout_result.returncode != 0: 

762 error_output = checkout_result.stderr + checkout_result.stdout 

763 if self._is_local_changes_error(error_output): 

764 # This shouldn't happen since we stashed, but handle it anyway 

765 self.logger.warning( 

766 "Checkout failed due to local changes despite stash attempt" 

767 ) 

768 if self._is_index_error(error_output): 

769 # Try recovery 

770 if self._check_and_recover_index(): 

771 # Retry checkout 

772 checkout_result = self._git_lock.run( 

773 ["checkout", base], 

774 cwd=self.repo_path, 

775 timeout=30, 

776 ) 

777 if checkout_result.returncode == 0: 

778 self.logger.info("Recovered from index error, checkout succeeded") 

779 else: 

780 raise RuntimeError( 

781 f"Failed to checkout {base} after recovery: " 

782 f"{checkout_result.stderr}" 

783 ) 

784 else: 

785 raise RuntimeError(f"Failed to checkout {base}: {checkout_result.stderr}") 

786 else: 

787 raise RuntimeError(f"Failed to checkout {base}: {checkout_result.stderr}") 

788 

789 # Track if merge strategy was used during pull (for conflict handling) 

790 used_merge_strategy = False 

791 

792 # Pull latest changes 

793 pull_result = self._git_lock.run( 

794 ["pull", "--rebase", "origin", base], 

795 cwd=self.repo_path, 

796 timeout=60, 

797 ) 

798 

799 # Handle pull failures 

800 if pull_result.returncode != 0: 

801 error_output = pull_result.stderr + pull_result.stdout 

802 

803 # Check if rebase conflicted - must abort before continuing 

804 if self._is_rebase_in_progress(): 

805 conflict_commit = self._detect_conflict_commit(error_output) 

806 

807 if conflict_commit and conflict_commit in self._problematic_commits: 

808 # Known problematic commit - use merge strategy instead 

809 self.logger.info( 

810 f"Repeated rebase conflict with {conflict_commit[:8]}, " 

811 f"using merge strategy (git pull --no-rebase)" 

812 ) 

813 if not self._abort_rebase_if_in_progress(): 

814 raise RuntimeError("Failed to recover from rebase conflict during pull") 

815 

816 # Attempt merge strategy pull 

817 merge_pull_result = self._git_lock.run( 

818 ["pull", "--no-rebase", "origin", base], 

819 cwd=self.repo_path, 

820 timeout=60, 

821 ) 

822 

823 if merge_pull_result.returncode != 0: 

824 self.logger.warning( 

825 f"Merge strategy pull also failed: {merge_pull_result.stderr[:200]}" 

826 ) 

827 # Continue anyway - merge may still work or fail appropriately 

828 else: 

829 self.logger.info( 

830 f"Merge strategy pull succeeded for {conflict_commit[:8]}" 

831 ) 

832 used_merge_strategy = True 

833 

834 else: 

835 # First time seeing this conflict or couldn't extract commit 

836 if conflict_commit: 

837 self._problematic_commits.add(conflict_commit) 

838 self.logger.warning( 

839 f"New rebase conflict with {conflict_commit[:8]}, " 

840 f"tracking for future merges (will use merge strategy on repeat)" 

841 ) 

842 else: 

843 self.logger.warning( 

844 "Rebase conflict detected (could not extract commit hash), " 

845 "tracking for future merges" 

846 ) 

847 

848 self.logger.warning( 

849 f"Pull --rebase failed with conflicts: {error_output[:200]}" 

850 ) 

851 

852 if not self._abort_rebase_if_in_progress(): 

853 raise RuntimeError("Failed to recover from rebase conflict during pull") 

854 # After aborting rebase, we're back to pre-pull state 

855 # Continue without the pull - merge may still work or conflict 

856 self.logger.info("Continuing without pull after rebase abort") 

857 

858 elif self._is_local_changes_error(error_output): 

859 self.logger.warning( 

860 f"Pull failed due to local changes, attempting re-stash: {error_output[:200]}" 

861 ) 

862 # Re-stash any local changes that appeared during pull 

863 if self._stash_local_changes(): 

864 self.logger.info("Re-stashed local changes after pull conflict") 

865 had_local_changes = True 

866 # For other pull failures, continue - merge will handle or fail 

867 

868 # Safety check: ensure no unmerged files before merge attempt 

869 # This catches edge cases where previous operations left dirty state 

870 if not self._check_and_recover_index(): 

871 raise RuntimeError( 

872 "Git index has unresolved conflicts before merge - recovery failed" 

873 ) 

874 

875 # Attempt merge with no-ff 

876 merge_result = self._git_lock.run( 

877 [ 

878 "merge", 

879 result.branch_name, 

880 "--no-ff", 

881 "-m", 

882 f"feat: parallel merge {result.issue_id}\n\n" 

883 f"Automated merge from parallel issue processing.", 

884 ], 

885 cwd=self.repo_path, 

886 timeout=60, 

887 ) 

888 

889 if merge_result.returncode != 0: 

890 error_output = merge_result.stderr + merge_result.stdout 

891 

892 # Check for local changes error (shouldn't happen after stash) 

893 if self._is_local_changes_error(error_output): 

894 self.logger.warning( 

895 f"Merge blocked by local changes despite stash: {error_output[:200]}" 

896 ) 

897 raise RuntimeError(f"Merge failed due to local changes: {error_output[:200]}") 

898 

899 # Check for untracked files blocking merge 

900 if self._is_untracked_files_error(error_output): 

901 self._handle_untracked_conflict(request, error_output) 

902 return 

903 

904 # Check for merge conflict (including unmerged files from current merge) 

905 # Unmerged files at this point are genuine conflicts from the current merge 

906 # attempt, not leftover state from previous operations (those are cleaned up 

907 # by _check_and_recover_index() at the start of _process_merge) 

908 if self._is_unmerged_files_error(error_output) or "CONFLICT" in error_output: 

909 self._handle_conflict(request, used_merge_strategy) 

910 return 

911 else: 

912 raise RuntimeError(f"Merge failed: {merge_result.stderr}") 

913 

914 # Merge successful 

915 self._finalize_merge(request) 

916 

917 except Exception as e: 

918 self._handle_failure(request, str(e)) 

919 

920 finally: 

921 # Always restore stashed changes 

922 if had_local_changes: 

923 self._pop_stash() 

924 # Always restore state file tracking 

925 self._restore_state_file_tracking() 

926 # Clear current issue tracking 

927 with self._lock: 

928 self._current_issue_id = None 

929 

930 def _handle_conflict(self, request: MergeRequest, used_merge_strategy: bool = False) -> None: 

931 """Handle a merge conflict with retry logic. 

932 

933 Args: 

934 request: The merge request that conflicted 

935 used_merge_strategy: If True, merge strategy was used during pull and 

936 rebase retry should be skipped (rebase would fail on same conflicts) 

937 """ 

938 result = request.worker_result 

939 request.retry_count += 1 

940 

941 # Abort the failed merge 

942 self._git_lock.run( 

943 ["merge", "--abort"], 

944 cwd=self.repo_path, 

945 timeout=10, 

946 ) 

947 

948 # Skip rebase retry if merge strategy was used during pull (BUG-079) 

949 # Rebase would fail on the same commits that caused the initial conflict 

950 if used_merge_strategy: 

951 self.logger.warning( 

952 f"Merge conflict for {result.issue_id}, " 

953 f"skipping rebase retry (merge strategy was used during pull)" 

954 ) 

955 self._handle_failure( 

956 request, 

957 "Merge conflict - rebase not attempted (would fail on same conflicts " 

958 "that required merge strategy)", 

959 ) 

960 return 

961 

962 if request.retry_count <= self.config.max_merge_retries: 

963 # Attempt rebase in the worktree 

964 self.logger.warning( 

965 f"Merge conflict for {result.issue_id}, " 

966 f"attempting rebase (retry {request.retry_count}/{self.config.max_merge_retries})" 

967 ) 

968 

969 request.status = MergeStatus.RETRYING 

970 

971 # Check for and stash any unstaged changes in the worktree before rebase 

972 worktree_status = subprocess.run( 

973 ["git", "status", "--porcelain"], 

974 cwd=result.worktree_path, 

975 capture_output=True, 

976 text=True, 

977 timeout=30, 

978 ) 

979 worktree_has_changes = bool(worktree_status.stdout.strip()) 

980 

981 if worktree_has_changes: 

982 self.logger.debug( 

983 f"Stashing worktree changes before rebase: {worktree_status.stdout[:200]}" 

984 ) 

985 stash_result = subprocess.run( 

986 ["git", "stash", "push", "-m", "ll-parallel: auto-stash before rebase"], 

987 cwd=result.worktree_path, 

988 capture_output=True, 

989 text=True, 

990 timeout=30, 

991 ) 

992 if stash_result.returncode != 0: 

993 self.logger.warning(f"Failed to stash worktree changes: {stash_result.stderr}") 

994 

995 # Fetch latest base branch before rebase (BUG-180) 

996 # Use origin/base if fetch succeeds, fall back to base if no remote 

997 base = self.config.base_branch 

998 fetch_result = subprocess.run( 

999 ["git", "fetch", "origin", base], 

1000 cwd=result.worktree_path, 

1001 capture_output=True, 

1002 text=True, 

1003 timeout=60, 

1004 ) 

1005 rebase_target = f"origin/{base}" if fetch_result.returncode == 0 else base 

1006 

1007 # Rebase the branch onto latest base branch (BUG-180) 

1008 rebase_result = subprocess.run( 

1009 ["git", "rebase", rebase_target], 

1010 cwd=result.worktree_path, 

1011 capture_output=True, 

1012 text=True, 

1013 timeout=120, 

1014 ) 

1015 

1016 if rebase_result.returncode == 0: 

1017 # Rebase succeeded, restore stash if we made one, then retry merge 

1018 if worktree_has_changes: 

1019 pop_result = subprocess.run( 

1020 ["git", "stash", "pop"], 

1021 cwd=result.worktree_path, 

1022 capture_output=True, 

1023 text=True, 

1024 timeout=30, 

1025 ) 

1026 if pop_result.returncode != 0: 

1027 self.logger.error( 

1028 f"Stash pop failed for {result.issue_id} after rebase: " 

1029 f"{pop_result.stderr.strip()}" 

1030 ) 

1031 self._handle_failure(request, "Stash pop conflict after rebase") 

1032 return 

1033 self._queue.put(request) 

1034 else: 

1035 # Rebase also failed - abort and restore stash 

1036 subprocess.run( 

1037 ["git", "rebase", "--abort"], 

1038 cwd=result.worktree_path, 

1039 capture_output=True, 

1040 timeout=10, 

1041 ) 

1042 if worktree_has_changes: 

1043 subprocess.run( 

1044 ["git", "stash", "pop"], 

1045 cwd=result.worktree_path, 

1046 capture_output=True, 

1047 timeout=30, 

1048 ) 

1049 self._handle_failure( 

1050 request, 

1051 f"Rebase failed after merge conflict: {rebase_result.stderr}", 

1052 ) 

1053 else: 

1054 self._handle_failure( 

1055 request, 

1056 f"Merge conflict after {request.retry_count} retries", 

1057 ) 

1058 

1059 def _handle_untracked_conflict(self, request: MergeRequest, error_output: str) -> None: 

1060 """Handle untracked files that would be overwritten by merge. 

1061 

1062 Backs up conflicting untracked files and retries the merge. 

1063 

1064 Args: 

1065 request: The merge request that failed 

1066 error_output: Git error message containing file list 

1067 """ 

1068 result = request.worker_result 

1069 request.retry_count += 1 

1070 

1071 if request.retry_count > self.config.max_merge_retries: 

1072 self._handle_failure( 

1073 request, 

1074 f"Untracked file conflict after {request.retry_count} retries", 

1075 ) 

1076 return 

1077 

1078 # Parse conflicting files from error message 

1079 # Format: "error: The following untracked working tree files would be overwritten..." 

1080 # followed by file paths, then "Please move or remove them..." 

1081 conflicting_files = [] 

1082 in_file_list = False 

1083 for line in error_output.splitlines(): 

1084 line = line.strip() 

1085 if "untracked working tree files would be overwritten" in line: 

1086 in_file_list = True 

1087 continue 

1088 if "Please move or remove them" in line: 

1089 in_file_list = False 

1090 continue 

1091 if in_file_list and line and not line.startswith("error:"): 

1092 conflicting_files.append(line) 

1093 

1094 if not conflicting_files: 

1095 self._handle_failure( 

1096 request, 

1097 f"Could not parse conflicting files from: {error_output[:200]}", 

1098 ) 

1099 return 

1100 

1101 # Create backup directory 

1102 backup_dir = self.repo_path / ".ll-backup" / result.issue_id 

1103 backup_dir.mkdir(parents=True, exist_ok=True) 

1104 

1105 # Move conflicting files to backup 

1106 moved_files = [] 

1107 for file_path in conflicting_files: 

1108 src = self.repo_path / file_path 

1109 if src.exists(): 

1110 dst = backup_dir / file_path 

1111 dst.parent.mkdir(parents=True, exist_ok=True) 

1112 shutil.move(str(src), str(dst)) 

1113 moved_files.append(file_path) 

1114 

1115 if moved_files: 

1116 self.logger.info( 

1117 f"Backed up {len(moved_files)} conflicting untracked file(s) to {backup_dir}" 

1118 ) 

1119 

1120 # Retry the merge 

1121 self.logger.warning( 

1122 f"Untracked files conflict for {result.issue_id}, " 

1123 f"retrying after backup (attempt {request.retry_count}/{self.config.max_merge_retries})" 

1124 ) 

1125 request.status = MergeStatus.RETRYING 

1126 self._queue.put(request) 

1127 

1128 def _finalize_merge(self, request: MergeRequest) -> None: 

1129 """Finalize a successful merge. 

1130 

1131 Args: 

1132 request: The completed merge request 

1133 """ 

1134 result = request.worker_result 

1135 request.status = MergeStatus.SUCCESS 

1136 

1137 # Reset circuit breaker on success 

1138 self._consecutive_failures = 0 

1139 

1140 with self._lock: 

1141 self._merged.append(result.issue_id) 

1142 

1143 # Cleanup worktree and branch 

1144 self._cleanup_worktree(result.worktree_path, result.branch_name) 

1145 

1146 self.logger.success(f"Merged {result.issue_id} successfully") 

1147 

1148 def _handle_failure(self, request: MergeRequest, error: str) -> None: 

1149 """Handle a merge failure. 

1150 

1151 Args: 

1152 request: The failed merge request 

1153 error: Error message describing the failure 

1154 """ 

1155 result = request.worker_result 

1156 request.status = MergeStatus.FAILED 

1157 request.error = error 

1158 

1159 with self._lock: 

1160 self._failed[result.issue_id] = error 

1161 

1162 self.logger.error(f"Merge failed for {result.issue_id}: {error}") 

1163 

1164 def _cleanup_worktree(self, worktree_path: Path, branch_name: str) -> None: 

1165 """Clean up a merged worktree and its branch. 

1166 

1167 Args: 

1168 worktree_path: Path to the worktree 

1169 branch_name: Name of the branch to delete 

1170 """ 

1171 if not worktree_path.exists(): 

1172 return 

1173 

1174 # Remove worktree 

1175 self._git_lock.run( 

1176 ["worktree", "remove", "--force", str(worktree_path)], 

1177 cwd=self.repo_path, 

1178 timeout=30, 

1179 ) 

1180 

1181 # Force delete directory if still exists 

1182 if worktree_path.exists(): 

1183 shutil.rmtree(worktree_path, ignore_errors=True) 

1184 

1185 # Delete the branch 

1186 if branch_name.startswith("parallel/"): 

1187 self._git_lock.run( 

1188 ["branch", "-D", branch_name], 

1189 cwd=self.repo_path, 

1190 timeout=10, 

1191 ) 

1192 

1193 @property 

1194 def merged_ids(self) -> list[str]: 

1195 """List of successfully merged issue IDs.""" 

1196 with self._lock: 

1197 return list(self._merged) 

1198 

1199 @property 

1200 def failed_merges(self) -> dict[str, str]: 

1201 """Mapping of failed issue IDs to error messages.""" 

1202 with self._lock: 

1203 return dict(self._failed) 

1204 

1205 @property 

1206 def pending_count(self) -> int: 

1207 """Number of pending merge requests.""" 

1208 return self._queue.qsize() 

1209 

1210 @property 

1211 def stash_pop_failures(self) -> dict[str, str]: 

1212 """Mapping of issue IDs to stash pop failure messages. 

1213 

1214 These represent issues where the merge succeeded but the user's 

1215 local changes could not be automatically restored and need manual 

1216 recovery via 'git stash pop'. 

1217 """ 

1218 with self._lock: 

1219 return dict(self._stash_pop_failures) 

1220 

1221 def wait_for_completion(self, timeout: float | None = None) -> bool: 

1222 """Wait for all pending merges to complete. 

1223 

1224 Waits until both: 

1225 1. The merge queue is empty (no pending requests) 

1226 2. No merge is actively being processed (_current_issue_id is None) 

1227 

1228 Args: 

1229 timeout: Maximum time to wait (None = forever) 

1230 

1231 Returns: 

1232 True if all merges completed, False if timeout 

1233 """ 

1234 start_time = time.time() 

1235 while True: 

1236 with self._lock: 

1237 active = self._current_issue_id 

1238 if self._queue.empty() and not active: 

1239 return True 

1240 if timeout and (time.time() - start_time) > timeout: 

1241 return False 

1242 time.sleep(0.5)