Coverage for little_loops / parallel / worker_pool.py: 11%
466 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""Worker pool for parallel issue processing with git worktree isolation.
3Each worker operates in an isolated git worktree, allowing concurrent issue
4processing without file conflicts.
5"""
7from __future__ import annotations
9import json
10import os
11import re
12import shutil
13import subprocess
14import sys
15import threading
16import time
17from collections.abc import Callable
18from concurrent.futures import Future, ThreadPoolExecutor
19from datetime import datetime
20from pathlib import Path
21from typing import TYPE_CHECKING, Any, cast
23from little_loops.output_parsing import parse_ready_issue_output
24from little_loops.parallel.git_lock import GitLock
25from little_loops.parallel.types import ParallelConfig, WorkerResult, WorkerStage
26from little_loops.subprocess_utils import (
27 detect_context_handoff,
28 read_continuation_prompt,
29)
30from little_loops.subprocess_utils import (
31 run_claude_command as _run_claude_base,
32)
33from little_loops.work_verification import EXCLUDED_DIRECTORIES, verify_work_was_done
35if TYPE_CHECKING:
36 from little_loops.config import BRConfig
37 from little_loops.issue_parser import IssueInfo
38 from little_loops.logger import Logger
41class WorkerPool:
42 """Thread pool for processing issues in isolated git worktrees.
44 Each worker:
45 1. Creates a dedicated git worktree and branch
46 2. Runs issue validation and implementation via Claude CLI
47 3. Commits changes locally
48 4. Returns results for merge coordination
50 Example:
51 >>> pool = WorkerPool(parallel_config, br_config, logger)
52 >>> pool.start()
53 >>> future = pool.submit(issue_info)
54 >>> result = future.result() # WorkerResult
55 >>> pool.shutdown()
56 """
58 def __init__(
59 self,
60 parallel_config: ParallelConfig,
61 br_config: BRConfig,
62 logger: Logger,
63 repo_path: Path | None = None,
64 git_lock: GitLock | None = None,
65 ) -> None:
66 """Initialize the worker pool.
68 Args:
69 parallel_config: Parallel processing configuration
70 br_config: Project configuration (for category actions)
71 logger: Logger for worker output
72 repo_path: Path to the git repository (default: current directory)
73 git_lock: Shared lock for git operations (created if not provided)
74 """
75 self.parallel_config = parallel_config
76 self.br_config = br_config
77 self.logger = logger
78 self.repo_path = repo_path or Path.cwd()
79 self._git_lock = git_lock or GitLock(logger)
80 self._executor: ThreadPoolExecutor | None = None
81 self._active_workers: dict[str, Future[WorkerResult]] = {}
82 # Track active subprocesses for forceful termination on shutdown
83 self._active_processes: dict[str, subprocess.Popen[str]] = {}
84 # Track active worktree paths to prevent cleanup while in use (BUG-142)
85 self._active_worktrees: set[Path] = set()
86 self._process_lock = threading.Lock()
87 # Track callbacks currently executing
88 self._pending_callbacks: set[str] = set()
89 self._callback_lock = threading.Lock()
90 # Shutdown tracking for interrupted worker detection (ENH-036)
91 self._shutdown_requested = False
92 self._terminated_during_shutdown: set[str] = set()
93 # Track worker processing stages for progress visibility (ENH-262)
94 self._worker_stages: dict[str, WorkerStage] = {}
96 def start(self) -> None:
97 """Start the worker pool."""
98 if self._executor is not None:
99 return
101 # Ensure worktree base directory exists
102 worktree_base = self.repo_path / self.parallel_config.worktree_base
103 worktree_base.mkdir(parents=True, exist_ok=True)
105 self._executor = ThreadPoolExecutor(
106 max_workers=self.parallel_config.max_workers,
107 thread_name_prefix="issue-worker",
108 )
109 self.logger.info(f"Worker pool started with {self.parallel_config.max_workers} workers")
111 def shutdown(self, wait: bool = True) -> None:
112 """Shutdown the worker pool.
114 Args:
115 wait: Whether to wait for pending tasks to complete
116 """
117 if self._executor is None:
118 return
120 self.logger.info("Shutting down worker pool...")
122 # First, terminate all active subprocesses to unblock worker threads
123 if not wait:
124 self.terminate_all_processes()
126 self._executor.shutdown(wait=wait)
127 self._executor = None
129 def set_shutdown_requested(self, value: bool = True) -> None:
130 """Set the shutdown flag.
132 Called by orchestrator during shutdown to enable tracking of
133 workers that are terminated due to shutdown vs. actual failures.
134 """
135 self._shutdown_requested = value
137 def terminate_all_processes(self) -> None:
138 """Forcefully terminate all active subprocesses.
140 Called when we need to abort workers immediately,
141 such as on timeout or shutdown.
142 """
143 with self._process_lock:
144 for issue_id, process in list(self._active_processes.items()):
145 if process.poll() is None: # Still running
146 self.logger.warning(
147 f"Terminating subprocess for {issue_id} (PID {process.pid})"
148 )
149 # Track issues terminated during shutdown for interrupted detection (ENH-036)
150 if self._shutdown_requested:
151 self._terminated_during_shutdown.add(issue_id)
152 try:
153 # Send SIGTERM first for graceful termination
154 process.terminate()
155 try:
156 process.wait(timeout=5)
157 except subprocess.TimeoutExpired:
158 # Force kill if SIGTERM didn't work
159 self.logger.warning(f"Force killing {issue_id} (PID {process.pid})")
160 process.kill()
161 process.wait(timeout=2)
162 except Exception as e:
163 self.logger.error(f"Failed to terminate {issue_id}: {e}")
164 self._active_processes.clear()
166 def submit(
167 self,
168 issue: IssueInfo,
169 on_complete: Callable[[WorkerResult], None] | None = None,
170 ) -> Future[WorkerResult]:
171 """Submit an issue for processing.
173 Args:
174 issue: Issue to process
175 on_complete: Optional callback when processing completes
177 Returns:
178 Future that will contain the WorkerResult
179 """
180 if self._executor is None:
181 raise RuntimeError("Worker pool not started")
183 future = self._executor.submit(self._process_issue, issue)
184 with self._process_lock:
185 self._active_workers[issue.issue_id] = future
187 if on_complete:
188 future.add_done_callback(
189 lambda f: self._handle_completion(f, on_complete, issue.issue_id)
190 )
192 return future
194 def _handle_completion(
195 self,
196 future: Future[WorkerResult],
197 callback: Callable[[WorkerResult], None],
198 issue_id: str,
199 ) -> None:
200 """Handle worker completion and invoke callback."""
201 with self._callback_lock:
202 self._pending_callbacks.add(issue_id)
203 try:
204 try:
205 result = future.result()
206 except Exception as e:
207 self.logger.error(f"Worker future failed for {issue_id}: {e}")
208 result = WorkerResult(
209 issue_id=issue_id,
210 success=False,
211 branch_name="",
212 worktree_path=Path(),
213 error=f"Worker future failed: {e}",
214 )
215 # Set final stage based on result (ENH-262)
216 if result.success:
217 self.set_worker_stage(issue_id, WorkerStage.COMPLETED)
218 elif result.interrupted:
219 self.set_worker_stage(issue_id, WorkerStage.INTERRUPTED)
220 else:
221 self.set_worker_stage(issue_id, WorkerStage.FAILED)
222 try:
223 callback(result)
224 except Exception as e:
225 self.logger.error(f"Worker completion callback failed for {issue_id}: {e}")
226 finally:
227 with self._callback_lock:
228 self._pending_callbacks.discard(issue_id)
230 def _process_issue(self, issue: IssueInfo) -> WorkerResult:
231 """Process a single issue in an isolated worktree.
233 Args:
234 issue: Issue to process
236 Returns:
237 WorkerResult with processing outcome
238 """
239 start_time = time.time()
240 timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
241 branch_name = f"parallel/{issue.issue_id.lower()}-{timestamp}"
242 worktree_path = (
243 self.repo_path
244 / self.parallel_config.worktree_base
245 / f"worker-{issue.issue_id.lower()}-{timestamp}"
246 )
248 # Set initial stage for progress tracking (ENH-262)
249 self.set_worker_stage(issue.issue_id, WorkerStage.SETUP)
251 # Capture baseline of main repo status before worker starts
252 # Used to detect files incorrectly written to main repo
253 baseline_status = self._get_main_repo_baseline()
254 # Capture main HEAD SHA before worker starts to detect committed leaks
255 baseline_head_sha = self._get_main_head_sha()
257 try:
258 # Step 1: Create worktree with new branch
259 self._setup_worktree(worktree_path, branch_name)
261 # Register worktree as active to prevent cleanup while in use (BUG-142)
262 with self._process_lock:
263 self._active_worktrees.add(worktree_path)
265 # Update stage for progress tracking (ENH-262)
266 self.set_worker_stage(issue.issue_id, WorkerStage.VALIDATING)
268 # Step 2: Run ready-issue validation
269 ready_cmd = self.parallel_config.get_ready_command(issue.issue_id)
270 ready_result = self._run_claude_command(
271 ready_cmd,
272 worktree_path,
273 issue_id=issue.issue_id,
274 )
276 # Check if worker was terminated during shutdown (ENH-036)
277 if issue.issue_id in self._terminated_during_shutdown:
278 self.set_worker_stage(issue.issue_id, WorkerStage.INTERRUPTED)
279 return WorkerResult(
280 issue_id=issue.issue_id,
281 success=False,
282 interrupted=True,
283 branch_name=branch_name,
284 worktree_path=worktree_path,
285 duration=time.time() - start_time,
286 error="Interrupted during shutdown",
287 stdout=ready_result.stdout,
288 stderr=ready_result.stderr,
289 )
291 if ready_result.returncode != 0:
292 return WorkerResult(
293 issue_id=issue.issue_id,
294 success=False,
295 branch_name=branch_name,
296 worktree_path=worktree_path,
297 duration=time.time() - start_time,
298 error=f"ready-issue failed: {ready_result.stderr}",
299 stdout=ready_result.stdout,
300 stderr=ready_result.stderr,
301 )
303 # Step 3: Parse ready-issue output and check verdict
304 ready_parsed = parse_ready_issue_output(ready_result.stdout)
306 # Handle CLOSE verdict - issue should not be implemented
307 if ready_parsed.get("should_close"):
308 return WorkerResult(
309 issue_id=issue.issue_id,
310 success=True, # Closure is a valid outcome
311 branch_name=branch_name,
312 worktree_path=worktree_path,
313 duration=time.time() - start_time,
314 should_close=True,
315 close_reason=ready_parsed.get("close_reason"),
316 close_status=ready_parsed.get("close_status"),
317 stdout=ready_result.stdout,
318 stderr=ready_result.stderr,
319 )
321 # Handle BLOCKED verdict - issue has open dependencies
322 if ready_parsed.get("is_blocked"):
323 return WorkerResult(
324 issue_id=issue.issue_id,
325 success=False,
326 was_blocked=True,
327 branch_name=branch_name,
328 worktree_path=worktree_path,
329 duration=time.time() - start_time,
330 error="ready-issue verdict: BLOCKED - open dependency detected",
331 stdout=ready_result.stdout,
332 stderr=ready_result.stderr,
333 )
335 # Handle NOT_READY verdict
336 if not ready_parsed["is_ready"]:
337 concerns = ready_parsed.get("concerns", [])
338 if concerns:
339 concern_msg = "; ".join(concerns)
340 elif ready_parsed["verdict"] == "UNKNOWN":
341 # For UNKNOWN verdicts, show a snippet of output for debugging
342 raw_out = (ready_result.stdout or "")[:200].strip()
343 concern_msg = (
344 f"Could not parse verdict. Output: {raw_out}..."
345 if raw_out
346 else "No output from ready-issue"
347 )
348 else:
349 concern_msg = "Issue not ready"
350 return WorkerResult(
351 issue_id=issue.issue_id,
352 success=False,
353 branch_name=branch_name,
354 worktree_path=worktree_path,
355 duration=time.time() - start_time,
356 error=f"ready-issue verdict: {ready_parsed['verdict']} - {concern_msg}",
357 stdout=ready_result.stdout,
358 stderr=ready_result.stderr,
359 )
361 # Track if issue was corrected (corrections stay in worktree)
362 was_corrected = ready_parsed.get("was_corrected", False)
363 corrections = ready_parsed.get("corrections", [])
365 # Update stage for progress tracking (ENH-262)
366 self.set_worker_stage(issue.issue_id, WorkerStage.IMPLEMENTING)
368 # Step 4: Get action from BRConfig
369 action = self.br_config.get_category_action(issue.issue_type)
371 # Step 5: Run manage-issue implementation (with continuation support)
372 manage_cmd = self.parallel_config.get_manage_command(
373 issue.issue_type, action, issue.issue_id
374 )
375 manage_result = self._run_with_continuation(
376 manage_cmd,
377 worktree_path,
378 issue_id=issue.issue_id,
379 )
381 # Update stage for progress tracking (ENH-262)
382 self.set_worker_stage(issue.issue_id, WorkerStage.VERIFYING)
384 # Check if worker was terminated during shutdown (ENH-036)
385 if issue.issue_id in self._terminated_during_shutdown:
386 self.set_worker_stage(issue.issue_id, WorkerStage.INTERRUPTED)
387 return WorkerResult(
388 issue_id=issue.issue_id,
389 success=False,
390 interrupted=True,
391 branch_name=branch_name,
392 worktree_path=worktree_path,
393 duration=time.time() - start_time,
394 error="Interrupted during shutdown",
395 stdout=manage_result.stdout,
396 stderr=manage_result.stderr,
397 )
399 # Step 6: Get list of changed files in worktree
400 changed_files = self._get_changed_files(worktree_path)
402 # Step 8: Detect files leaked to main repo instead of worktree (unstaged)
403 leaked_files = self._detect_main_repo_leaks(issue.issue_id, baseline_status)
404 if leaked_files:
405 self.logger.warning(
406 f"{issue.issue_id} leaked {len(leaked_files)} file(s) to main repo: "
407 f"{leaked_files}"
408 )
409 # Clean up leaked files to prevent stash conflicts during merge.
410 # The actual work is preserved in the worktree branch.
411 self._cleanup_leaked_files(leaked_files)
413 # Step 8b: Detect commits made directly to main instead of worktree branch.
414 # If Claude committed to main (not the worktree), worktree will have no diff,
415 # causing work verification to fail. Attempt to recover by cherry-picking
416 # the leaked commits to the worktree and resetting main. (BUG-580)
417 committed_leaks = self._detect_committed_leaks(baseline_head_sha)
418 if committed_leaks:
419 self.logger.warning(
420 f"{issue.issue_id} committed {len(committed_leaks)} commit(s) directly "
421 f"to main instead of worktree: {[sha[:8] for sha in committed_leaks]}"
422 )
423 if not changed_files:
424 recovered = self._recover_committed_leaks(
425 committed_leaks, worktree_path, baseline_head_sha, issue.issue_id
426 )
427 if recovered:
428 changed_files = self._get_changed_files(worktree_path)
430 # Step 7: Verify actual work was done (after potential committed-leak recovery)
431 # Pass full filename for better doc-only keyword matching
432 issue_filename = issue.path.stem if issue.path else ""
433 work_verified, verification_error = self._verify_work_was_done(
434 changed_files, issue.issue_id, issue_filename
435 )
437 if manage_result.returncode != 0:
438 return WorkerResult(
439 issue_id=issue.issue_id,
440 success=False,
441 branch_name=branch_name,
442 worktree_path=worktree_path,
443 changed_files=changed_files,
444 leaked_files=leaked_files,
445 duration=time.time() - start_time,
446 error=f"manage-issue failed: {manage_result.stderr}",
447 stdout=manage_result.stdout,
448 stderr=manage_result.stderr,
449 )
451 if not work_verified:
452 return WorkerResult(
453 issue_id=issue.issue_id,
454 success=False,
455 branch_name=branch_name,
456 worktree_path=worktree_path,
457 changed_files=changed_files,
458 leaked_files=leaked_files,
459 duration=time.time() - start_time,
460 error=verification_error,
461 stdout=manage_result.stdout,
462 stderr=manage_result.stderr,
463 )
465 # Step 9: Update branch base before merge (BUG-180)
466 # Fetch origin/main and rebase to ensure branch is based on latest main
467 base_updated, base_error = self._update_branch_base(worktree_path, issue.issue_id)
469 # Update stage for progress tracking (ENH-262)
470 self.set_worker_stage(issue.issue_id, WorkerStage.MERGING)
472 if not base_updated:
473 return WorkerResult(
474 issue_id=issue.issue_id,
475 success=False,
476 branch_name=branch_name,
477 worktree_path=worktree_path,
478 changed_files=changed_files,
479 leaked_files=leaked_files,
480 duration=time.time() - start_time,
481 error=base_error,
482 stdout=manage_result.stdout,
483 stderr=manage_result.stderr,
484 )
486 return WorkerResult(
487 issue_id=issue.issue_id,
488 success=True,
489 branch_name=branch_name,
490 worktree_path=worktree_path,
491 changed_files=changed_files,
492 leaked_files=leaked_files,
493 duration=time.time() - start_time,
494 error=None,
495 stdout=manage_result.stdout,
496 stderr=manage_result.stderr,
497 was_corrected=was_corrected,
498 corrections=corrections,
499 )
501 except Exception as e:
502 return WorkerResult(
503 issue_id=issue.issue_id,
504 success=False,
505 branch_name=branch_name,
506 worktree_path=worktree_path,
507 duration=time.time() - start_time,
508 error=str(e),
509 )
510 finally:
511 # Unregister worktree as no longer active (BUG-142)
512 with self._process_lock:
513 self._active_worktrees.discard(worktree_path)
515 def _setup_worktree(self, worktree_path: Path, branch_name: str) -> None:
516 """Create a git worktree with a new branch.
518 Args:
519 worktree_path: Path for the new worktree
520 branch_name: Name of the new branch
521 """
522 # Remove existing worktree if present
523 if worktree_path.exists():
524 self._cleanup_worktree(worktree_path)
526 # Create new worktree with branch
527 result = self._git_lock.run(
528 ["worktree", "add", "-b", branch_name, str(worktree_path)],
529 cwd=self.repo_path,
530 timeout=60,
531 )
533 if result.returncode != 0:
534 raise RuntimeError(f"Failed to create worktree: {result.stderr}")
536 # Copy git identity from main repo
537 for config_key in ["user.email", "user.name"]:
538 value_result = self._git_lock.run(
539 ["config", config_key],
540 cwd=self.repo_path,
541 )
542 if value_result.returncode == 0 and value_result.stdout.strip():
543 # Worktree config operations don't need the main repo lock
544 subprocess.run(
545 ["git", "config", config_key, value_result.stdout.strip()],
546 cwd=worktree_path,
547 capture_output=True,
548 )
550 # Copy .claude/ directory to establish project root for Claude Code (BUG-007)
551 # Claude Code uses .claude/ directory as highest priority for project root detection.
552 # Without this, Claude may detect the main repo as project root in worktrees,
553 # causing file writes to leak to the main repository.
554 claude_dir = self.repo_path / ".claude"
555 if claude_dir.exists() and claude_dir.is_dir():
556 dest_claude_dir = worktree_path / ".claude"
557 if dest_claude_dir.exists():
558 shutil.rmtree(dest_claude_dir)
559 shutil.copytree(claude_dir, dest_claude_dir)
560 self.logger.info("Copied .claude/ directory to worktree")
562 # Copy additional configured files from main repo to worktree
563 for file_path in self.parallel_config.worktree_copy_files:
564 if file_path.startswith(".claude/"):
565 continue # Already copied with full .claude/ directory above
566 src = self.repo_path / file_path
567 if src.exists():
568 if src.is_dir():
569 self.logger.warning(
570 f"Skipping '{file_path}' in worktree_copy_files: "
571 "is a directory (use symlinks or copytree for directories)"
572 )
573 continue
574 dest = worktree_path / file_path
575 dest.parent.mkdir(parents=True, exist_ok=True)
576 shutil.copy2(src, dest)
577 self.logger.info(f"Copied {file_path} to worktree")
578 else:
579 self.logger.debug(f"Skipped {file_path} (not found in main repo)")
581 self.logger.info(f"Created worktree at {worktree_path} on branch {branch_name}")
583 # Write session marker so concurrent orchestrators can identify this process's
584 # worktrees and skip them during orphan cleanup (BUG-579)
585 if worktree_path.exists():
586 marker_path = worktree_path / f".ll-session-{os.getpid()}"
587 marker_path.write_text(str(os.getpid()))
589 # Verify model if --show-model flag is set (requires API call)
590 if self.parallel_config.show_model:
591 model = self._detect_worktree_model_via_api(worktree_path)
592 if model:
593 self.logger.info(f" Using model: {model}")
594 else:
595 self.logger.warning(" Could not detect Claude CLI model")
597 def _detect_worktree_model_via_api(self, worktree_path: Path) -> str | None:
598 """Detect the model Claude will use by making an API call.
600 Runs a minimal Claude command with JSON output and parses the modelUsage
601 field to verify settings.local.json is being respected.
603 Args:
604 worktree_path: Path to the worktree to test
606 Returns:
607 Model name (e.g., "claude-sonnet-4-20250514") or None if unable to detect
608 """
609 try:
610 # Set environment to keep Claude in the project working directory (BUG-007)
611 # This ensures the first Claude CLI invocation in the worktree has the same
612 # project root behavior as subsequent invocations via run_claude_command()
613 env = os.environ.copy()
614 env["CLAUDE_BASH_MAINTAIN_PROJECT_WORKING_DIR"] = "1"
616 # Use a minimal prompt that requires an API call to get modelUsage
617 result = subprocess.run(
618 [
619 "claude",
620 "-p",
621 "reply with just 'ok'",
622 "--output-format",
623 "json",
624 ],
625 cwd=worktree_path,
626 capture_output=True,
627 text=True,
628 timeout=30,
629 env=env,
630 )
631 if result.returncode == 0 and result.stdout.strip():
632 data: dict[str, Any] = json.loads(result.stdout.strip())
633 model_usage: dict[str, Any] = data.get("modelUsage", {})
634 # Return the first (primary) model from modelUsage
635 if model_usage:
636 return cast(str, next(iter(model_usage.keys())))
637 except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError):
638 pass
639 return None
641 def _cleanup_worktree(self, worktree_path: Path) -> None:
642 """Remove a git worktree and its associated branch.
644 Args:
645 worktree_path: Path to the worktree to remove
646 """
647 if not worktree_path.exists():
648 return
650 # Skip cleanup if worktree is actively in use by a running worker (BUG-142)
651 with self._process_lock:
652 if worktree_path in self._active_worktrees:
653 self.logger.warning(
654 f"Skipping cleanup of {worktree_path.name}: worktree is in active use"
655 )
656 return
658 # Get branch name before removing worktree (worktree operation, no lock needed)
659 branch_result = subprocess.run(
660 ["git", "rev-parse", "--abbrev-ref", "HEAD"],
661 cwd=worktree_path,
662 capture_output=True,
663 text=True,
664 )
665 branch_name = branch_result.stdout.strip() if branch_result.returncode == 0 else None
667 # Remove worktree (main repo operation)
668 self._git_lock.run(
669 ["worktree", "remove", "--force", str(worktree_path)],
670 cwd=self.repo_path,
671 timeout=30,
672 )
674 # If worktree removal failed, force delete directory
675 if worktree_path.exists():
676 shutil.rmtree(worktree_path, ignore_errors=True)
678 # Delete the branch if it was a parallel branch (main repo operation)
679 if branch_name and branch_name.startswith("parallel/"):
680 self._git_lock.run(
681 ["branch", "-D", branch_name],
682 cwd=self.repo_path,
683 timeout=10,
684 )
686 def _run_claude_command(
687 self,
688 command: str,
689 working_dir: Path,
690 issue_id: str | None = None,
691 ) -> subprocess.CompletedProcess[str]:
692 """Run a Claude CLI command with real-time output streaming.
694 Args:
695 command: The command to run (e.g., "/ll:ready-issue BUG-123")
696 working_dir: Directory to run the command in
697 issue_id: Optional issue ID for subprocess tracking
699 Returns:
700 CompletedProcess with stdout and stderr
701 """
702 stream_output = self.parallel_config.stream_subprocess_output
704 def stream_callback(line: str, is_stderr: bool) -> None:
705 if stream_output:
706 if is_stderr:
707 print(f" {line}", file=sys.stderr)
708 else:
709 self.logger.info(f" {line}")
711 def on_start(process: subprocess.Popen[str]) -> None:
712 if issue_id:
713 with self._process_lock:
714 self._active_processes[issue_id] = process
716 def on_end(process: subprocess.Popen[str]) -> None:
717 if issue_id:
718 with self._process_lock:
719 self._active_processes.pop(issue_id, None)
721 return _run_claude_base(
722 command=command,
723 timeout=self.parallel_config.timeout_per_issue,
724 working_dir=working_dir,
725 stream_callback=stream_callback if stream_output else None,
726 on_process_start=on_start if issue_id else None,
727 on_process_end=on_end if issue_id else None,
728 idle_timeout=self.parallel_config.idle_timeout_per_issue,
729 )
731 def _run_with_continuation(
732 self,
733 command: str,
734 working_dir: Path,
735 issue_id: str | None = None,
736 max_continuations: int = 3,
737 ) -> subprocess.CompletedProcess[str]:
738 """Run a Claude command with automatic continuation on context handoff.
740 If the command signals CONTEXT_HANDOFF, reads the continuation prompt
741 from the worktree and spawns a fresh Claude session to continue.
743 Args:
744 command: The command to run
745 working_dir: Directory (worktree) to run the command in
746 issue_id: Optional issue ID for subprocess tracking
747 max_continuations: Maximum number of continuation attempts
749 Returns:
750 Combined CompletedProcess with all session outputs
751 """
752 all_stdout: list[str] = []
753 all_stderr: list[str] = []
754 current_command = command
755 continuation_count = 0
756 result: subprocess.CompletedProcess[str] = subprocess.CompletedProcess(
757 args=[], returncode=1, stdout="", stderr=""
758 )
760 while continuation_count <= max_continuations:
761 result = self._run_claude_command(
762 current_command,
763 working_dir,
764 issue_id=issue_id,
765 )
767 all_stdout.append(result.stdout)
768 all_stderr.append(result.stderr)
770 # Check for context handoff signal
771 if detect_context_handoff(result.stdout):
772 self.logger.info(f"[{issue_id}] Detected CONTEXT_HANDOFF signal")
774 # Read continuation prompt from worktree
775 prompt_content = read_continuation_prompt(working_dir)
776 if not prompt_content:
777 self.logger.warning(
778 f"[{issue_id}] Context handoff signaled but no continuation prompt found"
779 )
780 break
782 if continuation_count >= max_continuations:
783 self.logger.warning(
784 f"[{issue_id}] Reached max continuations ({max_continuations}), stopping"
785 )
786 break
788 continuation_count += 1
789 self.logger.info(
790 f"[{issue_id}] Starting continuation session #{continuation_count}"
791 )
793 # Re-invoke the original command with --resume flag so the skill
794 # lifecycle (including completion/file-move) runs in the new session.
795 current_command = f"{command} --resume"
796 continue
798 # No handoff signal, we're done
799 break
801 return subprocess.CompletedProcess(
802 args=result.args,
803 returncode=result.returncode,
804 stdout="\n---CONTINUATION---\n".join(all_stdout),
805 stderr="\n---CONTINUATION---\n".join(all_stderr),
806 )
808 def _get_changed_files(self, worktree_path: Path) -> list[str]:
809 """Get list of files changed in the worktree.
811 Args:
812 worktree_path: Path to the worktree
814 Returns:
815 List of changed file paths relative to repo root
816 """
817 result = subprocess.run(
818 ["git", "diff", "--name-only", self.parallel_config.base_branch, "HEAD"],
819 cwd=worktree_path,
820 capture_output=True,
821 text=True,
822 timeout=30,
823 )
825 if result.returncode != 0:
826 return []
828 return [f.strip() for f in result.stdout.strip().split("\n") if f.strip()]
830 def _update_branch_base(self, worktree_path: Path, issue_id: str) -> tuple[bool, str]:
831 """Fetch origin/main and rebase worker branch onto it.
833 This ensures the worker branch is based on the latest main before
834 merge coordination, preventing conflicts when main advances during
835 sprint execution (BUG-180).
837 Args:
838 worktree_path: Path to the worker's worktree
839 issue_id: Issue ID for logging
841 Returns:
842 Tuple of (success, error_message)
843 """
844 # Fetch latest base branch from origin
845 base = self.parallel_config.base_branch
846 fetch_result = subprocess.run(
847 ["git", "fetch", "origin", base],
848 cwd=worktree_path,
849 capture_output=True,
850 text=True,
851 timeout=60,
852 )
854 if fetch_result.returncode != 0:
855 return False, f"Failed to fetch origin/{base}: {fetch_result.stderr}"
857 # Rebase current branch onto origin base branch
858 rebase_result = subprocess.run(
859 ["git", "rebase", f"origin/{base}"],
860 cwd=worktree_path,
861 capture_output=True,
862 text=True,
863 timeout=120,
864 )
866 if rebase_result.returncode != 0:
867 # Abort the failed rebase
868 subprocess.run(
869 ["git", "rebase", "--abort"],
870 cwd=worktree_path,
871 capture_output=True,
872 timeout=10,
873 )
874 return False, f"Failed to rebase onto origin/{base}: {rebase_result.stderr}"
876 self.logger.info(f"[{issue_id}] Rebased branch onto origin/{base}")
877 return True, ""
879 def _verify_work_was_done(
880 self, changed_files: list[str], issue_id: str, issue_filename: str = ""
881 ) -> tuple[bool, str]:
882 """Verify that actual implementation work was done.
884 Uses the shared verify_work_was_done() function to check that changed
885 files include meaningful work, not just issue files or other artifacts.
887 Args:
888 changed_files: List of files changed during processing
889 issue_id: The issue ID being processed (unused, kept for compatibility)
890 issue_filename: Full issue filename (unused, kept for compatibility)
892 Returns:
893 Tuple of (success, error_message)
894 """
895 if not changed_files:
896 return False, "No files were changed during implementation"
898 # Check if code changes are required
899 if not self.parallel_config.require_code_changes:
900 return True, ""
902 # Use shared verification function
903 if verify_work_was_done(self.logger, changed_files):
904 return True, ""
906 # Generate descriptive error with actual excluded files
907 excluded_files = [
908 f
909 for f in changed_files
910 if f and any(f.startswith(excl) for excl in EXCLUDED_DIRECTORIES)
911 ]
912 if excluded_files:
913 files_preview = ", ".join(excluded_files[:5])
914 if len(excluded_files) > 5:
915 files_preview += f" (+{len(excluded_files) - 5} more)"
916 return False, f"Only excluded files modified: {files_preview}"
917 return False, "Only excluded files modified (e.g., .issues/, thoughts/)"
919 def _has_other_issue_id(self, file_lower: str, current_issue_id_lower: str) -> bool:
920 """Check if file contains a different issue ID than the current worker's.
922 This prevents cross-worker contamination where worker A detects worker B's
923 leaked file. When multiple workers run in parallel, their leaked files may
924 both appear in the main repo. Each worker should only clean up its own leaks.
926 Args:
927 file_lower: Lowercase file path to check
928 current_issue_id_lower: Lowercase issue ID of the current worker
930 Returns:
931 True if the file contains a different issue ID (belongs to another worker),
932 False if the file contains the current issue ID or no recognizable issue ID
933 """
934 # Pattern matches common issue ID formats: BUG-123, ENH-456, FEAT-789
935 # Use non-capturing group (?:...) so findall returns full match, not group
936 matches = re.findall(r"(?:bug|enh|feat)-\d+", file_lower)
938 if not matches:
939 # No issue ID found - file doesn't belong to any specific worker
940 return False
942 # Check if any of the found issue IDs match the current worker
943 for match in matches:
944 if match == current_issue_id_lower:
945 return False # File belongs to current worker
947 # File has issue ID(s) but none match current worker - belongs to another worker
948 return True
950 def _detect_main_repo_leaks(self, issue_id: str, baseline_status: set[str]) -> list[str]:
951 """Detect files incorrectly written to main repo instead of worktree.
953 Claude Code may write files to the main repository instead of the
954 worktree due to project root detection issues (see GitHub #8771).
955 This method detects such leaks by comparing main repo status before
956 and after worker execution.
958 Args:
959 issue_id: ID of the issue being processed (for pattern matching)
960 baseline_status: Set of file paths from git status before worker started
962 Returns:
963 List of file paths that were leaked to main repo
964 """
965 # Get current status of main repo
966 result = self._git_lock.run(
967 ["status", "--porcelain"],
968 cwd=self.repo_path,
969 timeout=30,
970 )
972 if result.returncode != 0:
973 return []
975 current_files: set[str] = set()
976 for line in result.stdout.strip().split("\n"):
977 if not line or len(line) < 3:
978 continue
979 # Extract file path (after status codes and space)
980 file_path = line[3:].strip()
981 # Handle renamed files (old -> new)
982 if " -> " in file_path:
983 file_path = file_path.split(" -> ")[-1]
984 current_files.add(file_path)
986 # Find new files that appeared during worker execution
987 new_files = current_files - baseline_status
989 # Filter to files likely related to this issue
990 issue_id_lower = issue_id.lower()
991 leaked_files: list[str] = []
993 for file_path in new_files:
994 # Skip state file (managed by orchestrator)
995 if file_path.endswith(".parallel-manage-state.json"):
996 continue
997 # Skip .gitignore (may be modified by ll-parallel)
998 if file_path == ".gitignore":
999 continue
1001 # Check if file is related to this issue
1002 file_lower = file_path.lower()
1003 if issue_id_lower in file_lower:
1004 leaked_files.append(file_path)
1005 # Also catch source files that shouldn't be modified in main
1006 elif file_path.startswith(("backend/", "src/", "lib/", "tests/")):
1007 leaked_files.append(file_path)
1008 # Catch thoughts/plans files
1009 elif file_path.startswith("thoughts/"):
1010 leaked_files.append(file_path)
1011 # Catch issue files in any issue directory variant
1012 # Handles both .issues/ (with dot) and issues/ (without dot)
1013 # Only include files without a different issue ID - files WITH other issue IDs
1014 # belong to other workers running in parallel (cross-worker contamination)
1015 elif file_path.startswith((".issues/", "issues/")):
1016 if not self._has_other_issue_id(file_lower, issue_id_lower):
1017 leaked_files.append(file_path)
1019 return leaked_files
1021 def _cleanup_leaked_files(self, leaked_files: list[str]) -> int:
1022 """Discard leaked files from main repo working directory.
1024 Claude Code sometimes writes files to the main repo instead of the
1025 worktree. These files cause stash conflicts during merge operations.
1026 Since the actual work is preserved in the worktree branch, we can
1027 safely discard these leaked changes from the main repo.
1029 Args:
1030 leaked_files: List of file paths leaked to main repo
1032 Returns:
1033 Number of files successfully cleaned up
1034 """
1035 if not leaked_files:
1036 return 0
1038 cleaned = 0
1040 # Get status to determine which files are tracked vs untracked
1041 status_result = self._git_lock.run(
1042 ["status", "--porcelain", "--"] + leaked_files,
1043 cwd=self.repo_path,
1044 timeout=30,
1045 )
1047 tracked_files: list[str] = []
1048 untracked_files: list[str] = []
1050 for line in status_result.stdout.splitlines():
1051 if not line or len(line) < 3:
1052 continue
1053 status_code = line[:2]
1054 file_path = line[3:].split(" -> ")[-1].strip()
1056 if status_code.startswith("?"):
1057 # Untracked file - need to delete
1058 untracked_files.append(file_path)
1059 else:
1060 # Tracked file - can use git checkout to discard
1061 tracked_files.append(file_path)
1063 # Discard changes to tracked files
1064 if tracked_files:
1065 checkout_result = self._git_lock.run(
1066 ["checkout", "--"] + tracked_files,
1067 cwd=self.repo_path,
1068 timeout=30,
1069 )
1070 if checkout_result.returncode == 0:
1071 cleaned += len(tracked_files)
1072 else:
1073 self.logger.warning(
1074 f"Failed to discard tracked leaked files: {checkout_result.stderr}"
1075 )
1077 # Delete untracked files
1078 for file_path in untracked_files:
1079 full_path = self.repo_path / file_path
1080 try:
1081 if full_path.exists():
1082 full_path.unlink()
1083 cleaned += 1
1084 except OSError as e:
1085 self.logger.warning(f"Failed to delete leaked file {file_path}: {e}")
1087 # Fallback: directly delete files not reported by git status
1088 # This handles gitignored files that git status --porcelain doesn't show
1089 accounted_files = set(tracked_files + untracked_files)
1090 for file_path in leaked_files:
1091 if file_path not in accounted_files:
1092 full_path = self.repo_path / file_path
1093 if full_path.exists():
1094 try:
1095 full_path.unlink()
1096 cleaned += 1
1097 self.logger.info(f"Deleted gitignored leaked file: {file_path}")
1098 except OSError as e:
1099 self.logger.warning(
1100 f"Failed to delete gitignored leaked file {file_path}: {e}"
1101 )
1102 else:
1103 self.logger.debug(f"Leaked file not found (may have been moved): {file_path}")
1105 if cleaned > 0:
1106 self.logger.info(f"Cleaned up {cleaned} leaked file(s) from main repo")
1108 return cleaned
1110 def _get_main_repo_baseline(self) -> set[str]:
1111 """Get baseline of modified/untracked files in main repo.
1113 Returns:
1114 Set of file paths currently showing in git status
1115 """
1116 result = self._git_lock.run(
1117 ["status", "--porcelain"],
1118 cwd=self.repo_path,
1119 timeout=30,
1120 )
1122 if result.returncode != 0:
1123 return set()
1125 files: set[str] = set()
1126 for line in result.stdout.strip().split("\n"):
1127 if not line or len(line) < 3:
1128 continue
1129 file_path = line[3:].strip()
1130 if " -> " in file_path:
1131 file_path = file_path.split(" -> ")[-1]
1132 files.add(file_path)
1134 return files
1136 def _get_main_head_sha(self) -> str:
1137 """Get the current HEAD SHA of the main repo.
1139 Returns:
1140 HEAD SHA string, or empty string if unavailable
1141 """
1142 result = self._git_lock.run(
1143 ["rev-parse", "HEAD"],
1144 cwd=self.repo_path,
1145 timeout=10,
1146 )
1147 if result.returncode == 0:
1148 return result.stdout.strip()
1149 return ""
1151 def _detect_committed_leaks(self, baseline_head_sha: str) -> list[str]:
1152 """Detect commits made directly to main repo during worker execution.
1154 When Claude commits to the main repo instead of the worktree branch,
1155 the commits appear on main's history but the worktree has no changes.
1156 This method detects such leaked commits by comparing main's HEAD SHA
1157 before and after worker execution.
1159 Args:
1160 baseline_head_sha: HEAD SHA captured before worker started
1162 Returns:
1163 List of commit SHAs committed to main during worker execution,
1164 newest first. Empty list if no committed leaks detected.
1165 """
1166 if not baseline_head_sha:
1167 return []
1169 current_sha = self._get_main_head_sha()
1170 if not current_sha or current_sha == baseline_head_sha:
1171 return []
1173 # Get list of new commits on main since baseline
1174 result = self._git_lock.run(
1175 ["log", "--format=%H", f"{baseline_head_sha}..HEAD"],
1176 cwd=self.repo_path,
1177 timeout=30,
1178 )
1179 if result.returncode != 0:
1180 return []
1182 commits = [sha.strip() for sha in result.stdout.strip().split("\n") if sha.strip()]
1183 return commits
1185 def _recover_committed_leaks(
1186 self,
1187 leaked_commits: list[str],
1188 worktree_path: Path,
1189 baseline_head_sha: str,
1190 issue_id: str,
1191 ) -> bool:
1192 """Attempt to recover committed leaks by cherry-picking to worktree.
1194 When Claude commits directly to main instead of the worktree branch,
1195 we attempt to:
1196 1. Cherry-pick the leaked commits onto the worktree branch
1197 2. Reset main back to the baseline SHA (if safe to do so)
1199 This preserves the implementation work in the worktree while
1200 cleaning up the incorrect commits on main.
1202 Args:
1203 leaked_commits: Commit SHAs that leaked to main (newest first)
1204 worktree_path: Path to the worker's worktree
1205 baseline_head_sha: Main HEAD SHA before worker started
1206 issue_id: Issue ID for logging
1208 Returns:
1209 True if cherry-pick succeeded (main reset is attempted but
1210 not required for a True return value)
1211 """
1212 self.logger.info(
1213 f"[{issue_id}] Attempting recovery: cherry-picking {len(leaked_commits)} "
1214 f"commit(s) to worktree"
1215 )
1217 # Cherry-pick in chronological order (oldest first = reverse of log output)
1218 for sha in reversed(leaked_commits):
1219 result = subprocess.run(
1220 ["git", "cherry-pick", sha],
1221 cwd=worktree_path,
1222 capture_output=True,
1223 text=True,
1224 timeout=60,
1225 )
1226 if result.returncode != 0:
1227 subprocess.run(
1228 ["git", "cherry-pick", "--abort"],
1229 cwd=worktree_path,
1230 capture_output=True,
1231 timeout=10,
1232 )
1233 self.logger.warning(
1234 f"[{issue_id}] Cherry-pick of {sha[:8]} failed: {result.stderr.strip()}"
1235 )
1236 return False
1238 # Attempt to reset main to baseline (only if main hasn't advanced further)
1239 current_main_sha = self._get_main_head_sha()
1240 most_recent_leaked = leaked_commits[0] # Newest first
1241 if current_main_sha == most_recent_leaked:
1242 reset_result = self._git_lock.run(
1243 ["reset", "--hard", baseline_head_sha],
1244 cwd=self.repo_path,
1245 timeout=30,
1246 )
1247 if reset_result.returncode == 0:
1248 self.logger.info(f"[{issue_id}] Reset main to baseline {baseline_head_sha[:8]}")
1249 else:
1250 self.logger.warning(
1251 f"[{issue_id}] Cherry-pick succeeded but failed to reset main: "
1252 f"{reset_result.stderr.strip()}"
1253 )
1254 else:
1255 self.logger.warning(
1256 f"[{issue_id}] Main has additional commits beyond leaked ones "
1257 f"({current_main_sha[:8]} != {most_recent_leaked[:8]}) — "
1258 f"skipping main reset, manual cleanup may be needed"
1259 )
1261 self.logger.info(
1262 f"[{issue_id}] Recovered {len(leaked_commits)} commit(s): "
1263 f"cherry-picked to worktree branch"
1264 )
1265 return True
1267 @property
1268 def active_count(self) -> int:
1269 """Number of currently active workers.
1271 Includes both workers with running futures AND workers whose futures
1272 are done but callbacks haven't completed yet.
1273 """
1274 with self._process_lock:
1275 running_futures = sum(1 for f in self._active_workers.values() if not f.done())
1276 with self._callback_lock:
1277 pending_callback_count = len(self._pending_callbacks)
1278 return running_futures + pending_callback_count
1280 def set_worker_stage(self, issue_id: str, stage: WorkerStage) -> None:
1281 """Update the stage of a worker.
1283 Args:
1284 issue_id: Issue ID being processed
1285 stage: New stage value
1286 """
1287 with self._process_lock:
1288 self._worker_stages[issue_id] = stage
1290 def get_worker_stage(self, issue_id: str) -> WorkerStage | None:
1291 """Get the current stage of a worker.
1293 Args:
1294 issue_id: Issue ID being processed
1296 Returns:
1297 Current stage, or None if issue not being tracked
1298 """
1299 with self._process_lock:
1300 return self._worker_stages.get(issue_id)
1302 def get_active_stages(self) -> dict[str, WorkerStage]:
1303 """Get all active worker stages.
1305 Returns:
1306 Dictionary mapping issue_id to current stage for active workers
1307 """
1308 with self._process_lock:
1309 # Only return workers that are actually active
1310 active_ids = set(self._active_workers.keys())
1311 return {
1312 issue_id: stage
1313 for issue_id, stage in self._worker_stages.items()
1314 if issue_id in active_ids
1315 }
1317 def remove_worker_stage(self, issue_id: str) -> None:
1318 """Remove a worker from stage tracking.
1320 Args:
1321 issue_id: Issue ID to remove
1322 """
1323 with self._process_lock:
1324 self._worker_stages.pop(issue_id, None)
1326 def cleanup_all_worktrees(self) -> None:
1327 """Clean up all worker worktrees."""
1328 worktree_base = self.repo_path / self.parallel_config.worktree_base
1329 if not worktree_base.exists():
1330 return
1332 for worktree_dir in worktree_base.iterdir():
1333 if worktree_dir.is_dir() and worktree_dir.name.startswith("worker-"):
1334 self._cleanup_worktree(worktree_dir)
1336 self.logger.info("Cleaned up all worker worktrees")