Coverage for little_loops / parallel / worker_pool.py: 11%

466 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""Worker pool for parallel issue processing with git worktree isolation. 

2 

3Each worker operates in an isolated git worktree, allowing concurrent issue 

4processing without file conflicts. 

5""" 

6 

7from __future__ import annotations 

8 

9import json 

10import os 

11import re 

12import shutil 

13import subprocess 

14import sys 

15import threading 

16import time 

17from collections.abc import Callable 

18from concurrent.futures import Future, ThreadPoolExecutor 

19from datetime import datetime 

20from pathlib import Path 

21from typing import TYPE_CHECKING, Any, cast 

22 

23from little_loops.output_parsing import parse_ready_issue_output 

24from little_loops.parallel.git_lock import GitLock 

25from little_loops.parallel.types import ParallelConfig, WorkerResult, WorkerStage 

26from little_loops.subprocess_utils import ( 

27 detect_context_handoff, 

28 read_continuation_prompt, 

29) 

30from little_loops.subprocess_utils import ( 

31 run_claude_command as _run_claude_base, 

32) 

33from little_loops.work_verification import EXCLUDED_DIRECTORIES, verify_work_was_done 

34 

35if TYPE_CHECKING: 

36 from little_loops.config import BRConfig 

37 from little_loops.issue_parser import IssueInfo 

38 from little_loops.logger import Logger 

39 

40 

41class WorkerPool: 

42 """Thread pool for processing issues in isolated git worktrees. 

43 

44 Each worker: 

45 1. Creates a dedicated git worktree and branch 

46 2. Runs issue validation and implementation via Claude CLI 

47 3. Commits changes locally 

48 4. Returns results for merge coordination 

49 

50 Example: 

51 >>> pool = WorkerPool(parallel_config, br_config, logger) 

52 >>> pool.start() 

53 >>> future = pool.submit(issue_info) 

54 >>> result = future.result() # WorkerResult 

55 >>> pool.shutdown() 

56 """ 

57 

58 def __init__( 

59 self, 

60 parallel_config: ParallelConfig, 

61 br_config: BRConfig, 

62 logger: Logger, 

63 repo_path: Path | None = None, 

64 git_lock: GitLock | None = None, 

65 ) -> None: 

66 """Initialize the worker pool. 

67 

68 Args: 

69 parallel_config: Parallel processing configuration 

70 br_config: Project configuration (for category actions) 

71 logger: Logger for worker output 

72 repo_path: Path to the git repository (default: current directory) 

73 git_lock: Shared lock for git operations (created if not provided) 

74 """ 

75 self.parallel_config = parallel_config 

76 self.br_config = br_config 

77 self.logger = logger 

78 self.repo_path = repo_path or Path.cwd() 

79 self._git_lock = git_lock or GitLock(logger) 

80 self._executor: ThreadPoolExecutor | None = None 

81 self._active_workers: dict[str, Future[WorkerResult]] = {} 

82 # Track active subprocesses for forceful termination on shutdown 

83 self._active_processes: dict[str, subprocess.Popen[str]] = {} 

84 # Track active worktree paths to prevent cleanup while in use (BUG-142) 

85 self._active_worktrees: set[Path] = set() 

86 self._process_lock = threading.Lock() 

87 # Track callbacks currently executing 

88 self._pending_callbacks: set[str] = set() 

89 self._callback_lock = threading.Lock() 

90 # Shutdown tracking for interrupted worker detection (ENH-036) 

91 self._shutdown_requested = False 

92 self._terminated_during_shutdown: set[str] = set() 

93 # Track worker processing stages for progress visibility (ENH-262) 

94 self._worker_stages: dict[str, WorkerStage] = {} 

95 

96 def start(self) -> None: 

97 """Start the worker pool.""" 

98 if self._executor is not None: 

99 return 

100 

101 # Ensure worktree base directory exists 

102 worktree_base = self.repo_path / self.parallel_config.worktree_base 

103 worktree_base.mkdir(parents=True, exist_ok=True) 

104 

105 self._executor = ThreadPoolExecutor( 

106 max_workers=self.parallel_config.max_workers, 

107 thread_name_prefix="issue-worker", 

108 ) 

109 self.logger.info(f"Worker pool started with {self.parallel_config.max_workers} workers") 

110 

111 def shutdown(self, wait: bool = True) -> None: 

112 """Shutdown the worker pool. 

113 

114 Args: 

115 wait: Whether to wait for pending tasks to complete 

116 """ 

117 if self._executor is None: 

118 return 

119 

120 self.logger.info("Shutting down worker pool...") 

121 

122 # First, terminate all active subprocesses to unblock worker threads 

123 if not wait: 

124 self.terminate_all_processes() 

125 

126 self._executor.shutdown(wait=wait) 

127 self._executor = None 

128 

129 def set_shutdown_requested(self, value: bool = True) -> None: 

130 """Set the shutdown flag. 

131 

132 Called by orchestrator during shutdown to enable tracking of 

133 workers that are terminated due to shutdown vs. actual failures. 

134 """ 

135 self._shutdown_requested = value 

136 

137 def terminate_all_processes(self) -> None: 

138 """Forcefully terminate all active subprocesses. 

139 

140 Called when we need to abort workers immediately, 

141 such as on timeout or shutdown. 

142 """ 

143 with self._process_lock: 

144 for issue_id, process in list(self._active_processes.items()): 

145 if process.poll() is None: # Still running 

146 self.logger.warning( 

147 f"Terminating subprocess for {issue_id} (PID {process.pid})" 

148 ) 

149 # Track issues terminated during shutdown for interrupted detection (ENH-036) 

150 if self._shutdown_requested: 

151 self._terminated_during_shutdown.add(issue_id) 

152 try: 

153 # Send SIGTERM first for graceful termination 

154 process.terminate() 

155 try: 

156 process.wait(timeout=5) 

157 except subprocess.TimeoutExpired: 

158 # Force kill if SIGTERM didn't work 

159 self.logger.warning(f"Force killing {issue_id} (PID {process.pid})") 

160 process.kill() 

161 process.wait(timeout=2) 

162 except Exception as e: 

163 self.logger.error(f"Failed to terminate {issue_id}: {e}") 

164 self._active_processes.clear() 

165 

166 def submit( 

167 self, 

168 issue: IssueInfo, 

169 on_complete: Callable[[WorkerResult], None] | None = None, 

170 ) -> Future[WorkerResult]: 

171 """Submit an issue for processing. 

172 

173 Args: 

174 issue: Issue to process 

175 on_complete: Optional callback when processing completes 

176 

177 Returns: 

178 Future that will contain the WorkerResult 

179 """ 

180 if self._executor is None: 

181 raise RuntimeError("Worker pool not started") 

182 

183 future = self._executor.submit(self._process_issue, issue) 

184 with self._process_lock: 

185 self._active_workers[issue.issue_id] = future 

186 

187 if on_complete: 

188 future.add_done_callback( 

189 lambda f: self._handle_completion(f, on_complete, issue.issue_id) 

190 ) 

191 

192 return future 

193 

194 def _handle_completion( 

195 self, 

196 future: Future[WorkerResult], 

197 callback: Callable[[WorkerResult], None], 

198 issue_id: str, 

199 ) -> None: 

200 """Handle worker completion and invoke callback.""" 

201 with self._callback_lock: 

202 self._pending_callbacks.add(issue_id) 

203 try: 

204 try: 

205 result = future.result() 

206 except Exception as e: 

207 self.logger.error(f"Worker future failed for {issue_id}: {e}") 

208 result = WorkerResult( 

209 issue_id=issue_id, 

210 success=False, 

211 branch_name="", 

212 worktree_path=Path(), 

213 error=f"Worker future failed: {e}", 

214 ) 

215 # Set final stage based on result (ENH-262) 

216 if result.success: 

217 self.set_worker_stage(issue_id, WorkerStage.COMPLETED) 

218 elif result.interrupted: 

219 self.set_worker_stage(issue_id, WorkerStage.INTERRUPTED) 

220 else: 

221 self.set_worker_stage(issue_id, WorkerStage.FAILED) 

222 try: 

223 callback(result) 

224 except Exception as e: 

225 self.logger.error(f"Worker completion callback failed for {issue_id}: {e}") 

226 finally: 

227 with self._callback_lock: 

228 self._pending_callbacks.discard(issue_id) 

229 

230 def _process_issue(self, issue: IssueInfo) -> WorkerResult: 

231 """Process a single issue in an isolated worktree. 

232 

233 Args: 

234 issue: Issue to process 

235 

236 Returns: 

237 WorkerResult with processing outcome 

238 """ 

239 start_time = time.time() 

240 timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") 

241 branch_name = f"parallel/{issue.issue_id.lower()}-{timestamp}" 

242 worktree_path = ( 

243 self.repo_path 

244 / self.parallel_config.worktree_base 

245 / f"worker-{issue.issue_id.lower()}-{timestamp}" 

246 ) 

247 

248 # Set initial stage for progress tracking (ENH-262) 

249 self.set_worker_stage(issue.issue_id, WorkerStage.SETUP) 

250 

251 # Capture baseline of main repo status before worker starts 

252 # Used to detect files incorrectly written to main repo 

253 baseline_status = self._get_main_repo_baseline() 

254 # Capture main HEAD SHA before worker starts to detect committed leaks 

255 baseline_head_sha = self._get_main_head_sha() 

256 

257 try: 

258 # Step 1: Create worktree with new branch 

259 self._setup_worktree(worktree_path, branch_name) 

260 

261 # Register worktree as active to prevent cleanup while in use (BUG-142) 

262 with self._process_lock: 

263 self._active_worktrees.add(worktree_path) 

264 

265 # Update stage for progress tracking (ENH-262) 

266 self.set_worker_stage(issue.issue_id, WorkerStage.VALIDATING) 

267 

268 # Step 2: Run ready-issue validation 

269 ready_cmd = self.parallel_config.get_ready_command(issue.issue_id) 

270 ready_result = self._run_claude_command( 

271 ready_cmd, 

272 worktree_path, 

273 issue_id=issue.issue_id, 

274 ) 

275 

276 # Check if worker was terminated during shutdown (ENH-036) 

277 if issue.issue_id in self._terminated_during_shutdown: 

278 self.set_worker_stage(issue.issue_id, WorkerStage.INTERRUPTED) 

279 return WorkerResult( 

280 issue_id=issue.issue_id, 

281 success=False, 

282 interrupted=True, 

283 branch_name=branch_name, 

284 worktree_path=worktree_path, 

285 duration=time.time() - start_time, 

286 error="Interrupted during shutdown", 

287 stdout=ready_result.stdout, 

288 stderr=ready_result.stderr, 

289 ) 

290 

291 if ready_result.returncode != 0: 

292 return WorkerResult( 

293 issue_id=issue.issue_id, 

294 success=False, 

295 branch_name=branch_name, 

296 worktree_path=worktree_path, 

297 duration=time.time() - start_time, 

298 error=f"ready-issue failed: {ready_result.stderr}", 

299 stdout=ready_result.stdout, 

300 stderr=ready_result.stderr, 

301 ) 

302 

303 # Step 3: Parse ready-issue output and check verdict 

304 ready_parsed = parse_ready_issue_output(ready_result.stdout) 

305 

306 # Handle CLOSE verdict - issue should not be implemented 

307 if ready_parsed.get("should_close"): 

308 return WorkerResult( 

309 issue_id=issue.issue_id, 

310 success=True, # Closure is a valid outcome 

311 branch_name=branch_name, 

312 worktree_path=worktree_path, 

313 duration=time.time() - start_time, 

314 should_close=True, 

315 close_reason=ready_parsed.get("close_reason"), 

316 close_status=ready_parsed.get("close_status"), 

317 stdout=ready_result.stdout, 

318 stderr=ready_result.stderr, 

319 ) 

320 

321 # Handle BLOCKED verdict - issue has open dependencies 

322 if ready_parsed.get("is_blocked"): 

323 return WorkerResult( 

324 issue_id=issue.issue_id, 

325 success=False, 

326 was_blocked=True, 

327 branch_name=branch_name, 

328 worktree_path=worktree_path, 

329 duration=time.time() - start_time, 

330 error="ready-issue verdict: BLOCKED - open dependency detected", 

331 stdout=ready_result.stdout, 

332 stderr=ready_result.stderr, 

333 ) 

334 

335 # Handle NOT_READY verdict 

336 if not ready_parsed["is_ready"]: 

337 concerns = ready_parsed.get("concerns", []) 

338 if concerns: 

339 concern_msg = "; ".join(concerns) 

340 elif ready_parsed["verdict"] == "UNKNOWN": 

341 # For UNKNOWN verdicts, show a snippet of output for debugging 

342 raw_out = (ready_result.stdout or "")[:200].strip() 

343 concern_msg = ( 

344 f"Could not parse verdict. Output: {raw_out}..." 

345 if raw_out 

346 else "No output from ready-issue" 

347 ) 

348 else: 

349 concern_msg = "Issue not ready" 

350 return WorkerResult( 

351 issue_id=issue.issue_id, 

352 success=False, 

353 branch_name=branch_name, 

354 worktree_path=worktree_path, 

355 duration=time.time() - start_time, 

356 error=f"ready-issue verdict: {ready_parsed['verdict']} - {concern_msg}", 

357 stdout=ready_result.stdout, 

358 stderr=ready_result.stderr, 

359 ) 

360 

361 # Track if issue was corrected (corrections stay in worktree) 

362 was_corrected = ready_parsed.get("was_corrected", False) 

363 corrections = ready_parsed.get("corrections", []) 

364 

365 # Update stage for progress tracking (ENH-262) 

366 self.set_worker_stage(issue.issue_id, WorkerStage.IMPLEMENTING) 

367 

368 # Step 4: Get action from BRConfig 

369 action = self.br_config.get_category_action(issue.issue_type) 

370 

371 # Step 5: Run manage-issue implementation (with continuation support) 

372 manage_cmd = self.parallel_config.get_manage_command( 

373 issue.issue_type, action, issue.issue_id 

374 ) 

375 manage_result = self._run_with_continuation( 

376 manage_cmd, 

377 worktree_path, 

378 issue_id=issue.issue_id, 

379 ) 

380 

381 # Update stage for progress tracking (ENH-262) 

382 self.set_worker_stage(issue.issue_id, WorkerStage.VERIFYING) 

383 

384 # Check if worker was terminated during shutdown (ENH-036) 

385 if issue.issue_id in self._terminated_during_shutdown: 

386 self.set_worker_stage(issue.issue_id, WorkerStage.INTERRUPTED) 

387 return WorkerResult( 

388 issue_id=issue.issue_id, 

389 success=False, 

390 interrupted=True, 

391 branch_name=branch_name, 

392 worktree_path=worktree_path, 

393 duration=time.time() - start_time, 

394 error="Interrupted during shutdown", 

395 stdout=manage_result.stdout, 

396 stderr=manage_result.stderr, 

397 ) 

398 

399 # Step 6: Get list of changed files in worktree 

400 changed_files = self._get_changed_files(worktree_path) 

401 

402 # Step 8: Detect files leaked to main repo instead of worktree (unstaged) 

403 leaked_files = self._detect_main_repo_leaks(issue.issue_id, baseline_status) 

404 if leaked_files: 

405 self.logger.warning( 

406 f"{issue.issue_id} leaked {len(leaked_files)} file(s) to main repo: " 

407 f"{leaked_files}" 

408 ) 

409 # Clean up leaked files to prevent stash conflicts during merge. 

410 # The actual work is preserved in the worktree branch. 

411 self._cleanup_leaked_files(leaked_files) 

412 

413 # Step 8b: Detect commits made directly to main instead of worktree branch. 

414 # If Claude committed to main (not the worktree), worktree will have no diff, 

415 # causing work verification to fail. Attempt to recover by cherry-picking 

416 # the leaked commits to the worktree and resetting main. (BUG-580) 

417 committed_leaks = self._detect_committed_leaks(baseline_head_sha) 

418 if committed_leaks: 

419 self.logger.warning( 

420 f"{issue.issue_id} committed {len(committed_leaks)} commit(s) directly " 

421 f"to main instead of worktree: {[sha[:8] for sha in committed_leaks]}" 

422 ) 

423 if not changed_files: 

424 recovered = self._recover_committed_leaks( 

425 committed_leaks, worktree_path, baseline_head_sha, issue.issue_id 

426 ) 

427 if recovered: 

428 changed_files = self._get_changed_files(worktree_path) 

429 

430 # Step 7: Verify actual work was done (after potential committed-leak recovery) 

431 # Pass full filename for better doc-only keyword matching 

432 issue_filename = issue.path.stem if issue.path else "" 

433 work_verified, verification_error = self._verify_work_was_done( 

434 changed_files, issue.issue_id, issue_filename 

435 ) 

436 

437 if manage_result.returncode != 0: 

438 return WorkerResult( 

439 issue_id=issue.issue_id, 

440 success=False, 

441 branch_name=branch_name, 

442 worktree_path=worktree_path, 

443 changed_files=changed_files, 

444 leaked_files=leaked_files, 

445 duration=time.time() - start_time, 

446 error=f"manage-issue failed: {manage_result.stderr}", 

447 stdout=manage_result.stdout, 

448 stderr=manage_result.stderr, 

449 ) 

450 

451 if not work_verified: 

452 return WorkerResult( 

453 issue_id=issue.issue_id, 

454 success=False, 

455 branch_name=branch_name, 

456 worktree_path=worktree_path, 

457 changed_files=changed_files, 

458 leaked_files=leaked_files, 

459 duration=time.time() - start_time, 

460 error=verification_error, 

461 stdout=manage_result.stdout, 

462 stderr=manage_result.stderr, 

463 ) 

464 

465 # Step 9: Update branch base before merge (BUG-180) 

466 # Fetch origin/main and rebase to ensure branch is based on latest main 

467 base_updated, base_error = self._update_branch_base(worktree_path, issue.issue_id) 

468 

469 # Update stage for progress tracking (ENH-262) 

470 self.set_worker_stage(issue.issue_id, WorkerStage.MERGING) 

471 

472 if not base_updated: 

473 return WorkerResult( 

474 issue_id=issue.issue_id, 

475 success=False, 

476 branch_name=branch_name, 

477 worktree_path=worktree_path, 

478 changed_files=changed_files, 

479 leaked_files=leaked_files, 

480 duration=time.time() - start_time, 

481 error=base_error, 

482 stdout=manage_result.stdout, 

483 stderr=manage_result.stderr, 

484 ) 

485 

486 return WorkerResult( 

487 issue_id=issue.issue_id, 

488 success=True, 

489 branch_name=branch_name, 

490 worktree_path=worktree_path, 

491 changed_files=changed_files, 

492 leaked_files=leaked_files, 

493 duration=time.time() - start_time, 

494 error=None, 

495 stdout=manage_result.stdout, 

496 stderr=manage_result.stderr, 

497 was_corrected=was_corrected, 

498 corrections=corrections, 

499 ) 

500 

501 except Exception as e: 

502 return WorkerResult( 

503 issue_id=issue.issue_id, 

504 success=False, 

505 branch_name=branch_name, 

506 worktree_path=worktree_path, 

507 duration=time.time() - start_time, 

508 error=str(e), 

509 ) 

510 finally: 

511 # Unregister worktree as no longer active (BUG-142) 

512 with self._process_lock: 

513 self._active_worktrees.discard(worktree_path) 

514 

515 def _setup_worktree(self, worktree_path: Path, branch_name: str) -> None: 

516 """Create a git worktree with a new branch. 

517 

518 Args: 

519 worktree_path: Path for the new worktree 

520 branch_name: Name of the new branch 

521 """ 

522 # Remove existing worktree if present 

523 if worktree_path.exists(): 

524 self._cleanup_worktree(worktree_path) 

525 

526 # Create new worktree with branch 

527 result = self._git_lock.run( 

528 ["worktree", "add", "-b", branch_name, str(worktree_path)], 

529 cwd=self.repo_path, 

530 timeout=60, 

531 ) 

532 

533 if result.returncode != 0: 

534 raise RuntimeError(f"Failed to create worktree: {result.stderr}") 

535 

536 # Copy git identity from main repo 

537 for config_key in ["user.email", "user.name"]: 

538 value_result = self._git_lock.run( 

539 ["config", config_key], 

540 cwd=self.repo_path, 

541 ) 

542 if value_result.returncode == 0 and value_result.stdout.strip(): 

543 # Worktree config operations don't need the main repo lock 

544 subprocess.run( 

545 ["git", "config", config_key, value_result.stdout.strip()], 

546 cwd=worktree_path, 

547 capture_output=True, 

548 ) 

549 

550 # Copy .claude/ directory to establish project root for Claude Code (BUG-007) 

551 # Claude Code uses .claude/ directory as highest priority for project root detection. 

552 # Without this, Claude may detect the main repo as project root in worktrees, 

553 # causing file writes to leak to the main repository. 

554 claude_dir = self.repo_path / ".claude" 

555 if claude_dir.exists() and claude_dir.is_dir(): 

556 dest_claude_dir = worktree_path / ".claude" 

557 if dest_claude_dir.exists(): 

558 shutil.rmtree(dest_claude_dir) 

559 shutil.copytree(claude_dir, dest_claude_dir) 

560 self.logger.info("Copied .claude/ directory to worktree") 

561 

562 # Copy additional configured files from main repo to worktree 

563 for file_path in self.parallel_config.worktree_copy_files: 

564 if file_path.startswith(".claude/"): 

565 continue # Already copied with full .claude/ directory above 

566 src = self.repo_path / file_path 

567 if src.exists(): 

568 if src.is_dir(): 

569 self.logger.warning( 

570 f"Skipping '{file_path}' in worktree_copy_files: " 

571 "is a directory (use symlinks or copytree for directories)" 

572 ) 

573 continue 

574 dest = worktree_path / file_path 

575 dest.parent.mkdir(parents=True, exist_ok=True) 

576 shutil.copy2(src, dest) 

577 self.logger.info(f"Copied {file_path} to worktree") 

578 else: 

579 self.logger.debug(f"Skipped {file_path} (not found in main repo)") 

580 

581 self.logger.info(f"Created worktree at {worktree_path} on branch {branch_name}") 

582 

583 # Write session marker so concurrent orchestrators can identify this process's 

584 # worktrees and skip them during orphan cleanup (BUG-579) 

585 if worktree_path.exists(): 

586 marker_path = worktree_path / f".ll-session-{os.getpid()}" 

587 marker_path.write_text(str(os.getpid())) 

588 

589 # Verify model if --show-model flag is set (requires API call) 

590 if self.parallel_config.show_model: 

591 model = self._detect_worktree_model_via_api(worktree_path) 

592 if model: 

593 self.logger.info(f" Using model: {model}") 

594 else: 

595 self.logger.warning(" Could not detect Claude CLI model") 

596 

597 def _detect_worktree_model_via_api(self, worktree_path: Path) -> str | None: 

598 """Detect the model Claude will use by making an API call. 

599 

600 Runs a minimal Claude command with JSON output and parses the modelUsage 

601 field to verify settings.local.json is being respected. 

602 

603 Args: 

604 worktree_path: Path to the worktree to test 

605 

606 Returns: 

607 Model name (e.g., "claude-sonnet-4-20250514") or None if unable to detect 

608 """ 

609 try: 

610 # Set environment to keep Claude in the project working directory (BUG-007) 

611 # This ensures the first Claude CLI invocation in the worktree has the same 

612 # project root behavior as subsequent invocations via run_claude_command() 

613 env = os.environ.copy() 

614 env["CLAUDE_BASH_MAINTAIN_PROJECT_WORKING_DIR"] = "1" 

615 

616 # Use a minimal prompt that requires an API call to get modelUsage 

617 result = subprocess.run( 

618 [ 

619 "claude", 

620 "-p", 

621 "reply with just 'ok'", 

622 "--output-format", 

623 "json", 

624 ], 

625 cwd=worktree_path, 

626 capture_output=True, 

627 text=True, 

628 timeout=30, 

629 env=env, 

630 ) 

631 if result.returncode == 0 and result.stdout.strip(): 

632 data: dict[str, Any] = json.loads(result.stdout.strip()) 

633 model_usage: dict[str, Any] = data.get("modelUsage", {}) 

634 # Return the first (primary) model from modelUsage 

635 if model_usage: 

636 return cast(str, next(iter(model_usage.keys()))) 

637 except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError): 

638 pass 

639 return None 

640 

641 def _cleanup_worktree(self, worktree_path: Path) -> None: 

642 """Remove a git worktree and its associated branch. 

643 

644 Args: 

645 worktree_path: Path to the worktree to remove 

646 """ 

647 if not worktree_path.exists(): 

648 return 

649 

650 # Skip cleanup if worktree is actively in use by a running worker (BUG-142) 

651 with self._process_lock: 

652 if worktree_path in self._active_worktrees: 

653 self.logger.warning( 

654 f"Skipping cleanup of {worktree_path.name}: worktree is in active use" 

655 ) 

656 return 

657 

658 # Get branch name before removing worktree (worktree operation, no lock needed) 

659 branch_result = subprocess.run( 

660 ["git", "rev-parse", "--abbrev-ref", "HEAD"], 

661 cwd=worktree_path, 

662 capture_output=True, 

663 text=True, 

664 ) 

665 branch_name = branch_result.stdout.strip() if branch_result.returncode == 0 else None 

666 

667 # Remove worktree (main repo operation) 

668 self._git_lock.run( 

669 ["worktree", "remove", "--force", str(worktree_path)], 

670 cwd=self.repo_path, 

671 timeout=30, 

672 ) 

673 

674 # If worktree removal failed, force delete directory 

675 if worktree_path.exists(): 

676 shutil.rmtree(worktree_path, ignore_errors=True) 

677 

678 # Delete the branch if it was a parallel branch (main repo operation) 

679 if branch_name and branch_name.startswith("parallel/"): 

680 self._git_lock.run( 

681 ["branch", "-D", branch_name], 

682 cwd=self.repo_path, 

683 timeout=10, 

684 ) 

685 

686 def _run_claude_command( 

687 self, 

688 command: str, 

689 working_dir: Path, 

690 issue_id: str | None = None, 

691 ) -> subprocess.CompletedProcess[str]: 

692 """Run a Claude CLI command with real-time output streaming. 

693 

694 Args: 

695 command: The command to run (e.g., "/ll:ready-issue BUG-123") 

696 working_dir: Directory to run the command in 

697 issue_id: Optional issue ID for subprocess tracking 

698 

699 Returns: 

700 CompletedProcess with stdout and stderr 

701 """ 

702 stream_output = self.parallel_config.stream_subprocess_output 

703 

704 def stream_callback(line: str, is_stderr: bool) -> None: 

705 if stream_output: 

706 if is_stderr: 

707 print(f" {line}", file=sys.stderr) 

708 else: 

709 self.logger.info(f" {line}") 

710 

711 def on_start(process: subprocess.Popen[str]) -> None: 

712 if issue_id: 

713 with self._process_lock: 

714 self._active_processes[issue_id] = process 

715 

716 def on_end(process: subprocess.Popen[str]) -> None: 

717 if issue_id: 

718 with self._process_lock: 

719 self._active_processes.pop(issue_id, None) 

720 

721 return _run_claude_base( 

722 command=command, 

723 timeout=self.parallel_config.timeout_per_issue, 

724 working_dir=working_dir, 

725 stream_callback=stream_callback if stream_output else None, 

726 on_process_start=on_start if issue_id else None, 

727 on_process_end=on_end if issue_id else None, 

728 idle_timeout=self.parallel_config.idle_timeout_per_issue, 

729 ) 

730 

731 def _run_with_continuation( 

732 self, 

733 command: str, 

734 working_dir: Path, 

735 issue_id: str | None = None, 

736 max_continuations: int = 3, 

737 ) -> subprocess.CompletedProcess[str]: 

738 """Run a Claude command with automatic continuation on context handoff. 

739 

740 If the command signals CONTEXT_HANDOFF, reads the continuation prompt 

741 from the worktree and spawns a fresh Claude session to continue. 

742 

743 Args: 

744 command: The command to run 

745 working_dir: Directory (worktree) to run the command in 

746 issue_id: Optional issue ID for subprocess tracking 

747 max_continuations: Maximum number of continuation attempts 

748 

749 Returns: 

750 Combined CompletedProcess with all session outputs 

751 """ 

752 all_stdout: list[str] = [] 

753 all_stderr: list[str] = [] 

754 current_command = command 

755 continuation_count = 0 

756 result: subprocess.CompletedProcess[str] = subprocess.CompletedProcess( 

757 args=[], returncode=1, stdout="", stderr="" 

758 ) 

759 

760 while continuation_count <= max_continuations: 

761 result = self._run_claude_command( 

762 current_command, 

763 working_dir, 

764 issue_id=issue_id, 

765 ) 

766 

767 all_stdout.append(result.stdout) 

768 all_stderr.append(result.stderr) 

769 

770 # Check for context handoff signal 

771 if detect_context_handoff(result.stdout): 

772 self.logger.info(f"[{issue_id}] Detected CONTEXT_HANDOFF signal") 

773 

774 # Read continuation prompt from worktree 

775 prompt_content = read_continuation_prompt(working_dir) 

776 if not prompt_content: 

777 self.logger.warning( 

778 f"[{issue_id}] Context handoff signaled but no continuation prompt found" 

779 ) 

780 break 

781 

782 if continuation_count >= max_continuations: 

783 self.logger.warning( 

784 f"[{issue_id}] Reached max continuations ({max_continuations}), stopping" 

785 ) 

786 break 

787 

788 continuation_count += 1 

789 self.logger.info( 

790 f"[{issue_id}] Starting continuation session #{continuation_count}" 

791 ) 

792 

793 # Re-invoke the original command with --resume flag so the skill 

794 # lifecycle (including completion/file-move) runs in the new session. 

795 current_command = f"{command} --resume" 

796 continue 

797 

798 # No handoff signal, we're done 

799 break 

800 

801 return subprocess.CompletedProcess( 

802 args=result.args, 

803 returncode=result.returncode, 

804 stdout="\n---CONTINUATION---\n".join(all_stdout), 

805 stderr="\n---CONTINUATION---\n".join(all_stderr), 

806 ) 

807 

808 def _get_changed_files(self, worktree_path: Path) -> list[str]: 

809 """Get list of files changed in the worktree. 

810 

811 Args: 

812 worktree_path: Path to the worktree 

813 

814 Returns: 

815 List of changed file paths relative to repo root 

816 """ 

817 result = subprocess.run( 

818 ["git", "diff", "--name-only", self.parallel_config.base_branch, "HEAD"], 

819 cwd=worktree_path, 

820 capture_output=True, 

821 text=True, 

822 timeout=30, 

823 ) 

824 

825 if result.returncode != 0: 

826 return [] 

827 

828 return [f.strip() for f in result.stdout.strip().split("\n") if f.strip()] 

829 

830 def _update_branch_base(self, worktree_path: Path, issue_id: str) -> tuple[bool, str]: 

831 """Fetch origin/main and rebase worker branch onto it. 

832 

833 This ensures the worker branch is based on the latest main before 

834 merge coordination, preventing conflicts when main advances during 

835 sprint execution (BUG-180). 

836 

837 Args: 

838 worktree_path: Path to the worker's worktree 

839 issue_id: Issue ID for logging 

840 

841 Returns: 

842 Tuple of (success, error_message) 

843 """ 

844 # Fetch latest base branch from origin 

845 base = self.parallel_config.base_branch 

846 fetch_result = subprocess.run( 

847 ["git", "fetch", "origin", base], 

848 cwd=worktree_path, 

849 capture_output=True, 

850 text=True, 

851 timeout=60, 

852 ) 

853 

854 if fetch_result.returncode != 0: 

855 return False, f"Failed to fetch origin/{base}: {fetch_result.stderr}" 

856 

857 # Rebase current branch onto origin base branch 

858 rebase_result = subprocess.run( 

859 ["git", "rebase", f"origin/{base}"], 

860 cwd=worktree_path, 

861 capture_output=True, 

862 text=True, 

863 timeout=120, 

864 ) 

865 

866 if rebase_result.returncode != 0: 

867 # Abort the failed rebase 

868 subprocess.run( 

869 ["git", "rebase", "--abort"], 

870 cwd=worktree_path, 

871 capture_output=True, 

872 timeout=10, 

873 ) 

874 return False, f"Failed to rebase onto origin/{base}: {rebase_result.stderr}" 

875 

876 self.logger.info(f"[{issue_id}] Rebased branch onto origin/{base}") 

877 return True, "" 

878 

879 def _verify_work_was_done( 

880 self, changed_files: list[str], issue_id: str, issue_filename: str = "" 

881 ) -> tuple[bool, str]: 

882 """Verify that actual implementation work was done. 

883 

884 Uses the shared verify_work_was_done() function to check that changed 

885 files include meaningful work, not just issue files or other artifacts. 

886 

887 Args: 

888 changed_files: List of files changed during processing 

889 issue_id: The issue ID being processed (unused, kept for compatibility) 

890 issue_filename: Full issue filename (unused, kept for compatibility) 

891 

892 Returns: 

893 Tuple of (success, error_message) 

894 """ 

895 if not changed_files: 

896 return False, "No files were changed during implementation" 

897 

898 # Check if code changes are required 

899 if not self.parallel_config.require_code_changes: 

900 return True, "" 

901 

902 # Use shared verification function 

903 if verify_work_was_done(self.logger, changed_files): 

904 return True, "" 

905 

906 # Generate descriptive error with actual excluded files 

907 excluded_files = [ 

908 f 

909 for f in changed_files 

910 if f and any(f.startswith(excl) for excl in EXCLUDED_DIRECTORIES) 

911 ] 

912 if excluded_files: 

913 files_preview = ", ".join(excluded_files[:5]) 

914 if len(excluded_files) > 5: 

915 files_preview += f" (+{len(excluded_files) - 5} more)" 

916 return False, f"Only excluded files modified: {files_preview}" 

917 return False, "Only excluded files modified (e.g., .issues/, thoughts/)" 

918 

919 def _has_other_issue_id(self, file_lower: str, current_issue_id_lower: str) -> bool: 

920 """Check if file contains a different issue ID than the current worker's. 

921 

922 This prevents cross-worker contamination where worker A detects worker B's 

923 leaked file. When multiple workers run in parallel, their leaked files may 

924 both appear in the main repo. Each worker should only clean up its own leaks. 

925 

926 Args: 

927 file_lower: Lowercase file path to check 

928 current_issue_id_lower: Lowercase issue ID of the current worker 

929 

930 Returns: 

931 True if the file contains a different issue ID (belongs to another worker), 

932 False if the file contains the current issue ID or no recognizable issue ID 

933 """ 

934 # Pattern matches common issue ID formats: BUG-123, ENH-456, FEAT-789 

935 # Use non-capturing group (?:...) so findall returns full match, not group 

936 matches = re.findall(r"(?:bug|enh|feat)-\d+", file_lower) 

937 

938 if not matches: 

939 # No issue ID found - file doesn't belong to any specific worker 

940 return False 

941 

942 # Check if any of the found issue IDs match the current worker 

943 for match in matches: 

944 if match == current_issue_id_lower: 

945 return False # File belongs to current worker 

946 

947 # File has issue ID(s) but none match current worker - belongs to another worker 

948 return True 

949 

950 def _detect_main_repo_leaks(self, issue_id: str, baseline_status: set[str]) -> list[str]: 

951 """Detect files incorrectly written to main repo instead of worktree. 

952 

953 Claude Code may write files to the main repository instead of the 

954 worktree due to project root detection issues (see GitHub #8771). 

955 This method detects such leaks by comparing main repo status before 

956 and after worker execution. 

957 

958 Args: 

959 issue_id: ID of the issue being processed (for pattern matching) 

960 baseline_status: Set of file paths from git status before worker started 

961 

962 Returns: 

963 List of file paths that were leaked to main repo 

964 """ 

965 # Get current status of main repo 

966 result = self._git_lock.run( 

967 ["status", "--porcelain"], 

968 cwd=self.repo_path, 

969 timeout=30, 

970 ) 

971 

972 if result.returncode != 0: 

973 return [] 

974 

975 current_files: set[str] = set() 

976 for line in result.stdout.strip().split("\n"): 

977 if not line or len(line) < 3: 

978 continue 

979 # Extract file path (after status codes and space) 

980 file_path = line[3:].strip() 

981 # Handle renamed files (old -> new) 

982 if " -> " in file_path: 

983 file_path = file_path.split(" -> ")[-1] 

984 current_files.add(file_path) 

985 

986 # Find new files that appeared during worker execution 

987 new_files = current_files - baseline_status 

988 

989 # Filter to files likely related to this issue 

990 issue_id_lower = issue_id.lower() 

991 leaked_files: list[str] = [] 

992 

993 for file_path in new_files: 

994 # Skip state file (managed by orchestrator) 

995 if file_path.endswith(".parallel-manage-state.json"): 

996 continue 

997 # Skip .gitignore (may be modified by ll-parallel) 

998 if file_path == ".gitignore": 

999 continue 

1000 

1001 # Check if file is related to this issue 

1002 file_lower = file_path.lower() 

1003 if issue_id_lower in file_lower: 

1004 leaked_files.append(file_path) 

1005 # Also catch source files that shouldn't be modified in main 

1006 elif file_path.startswith(("backend/", "src/", "lib/", "tests/")): 

1007 leaked_files.append(file_path) 

1008 # Catch thoughts/plans files 

1009 elif file_path.startswith("thoughts/"): 

1010 leaked_files.append(file_path) 

1011 # Catch issue files in any issue directory variant 

1012 # Handles both .issues/ (with dot) and issues/ (without dot) 

1013 # Only include files without a different issue ID - files WITH other issue IDs 

1014 # belong to other workers running in parallel (cross-worker contamination) 

1015 elif file_path.startswith((".issues/", "issues/")): 

1016 if not self._has_other_issue_id(file_lower, issue_id_lower): 

1017 leaked_files.append(file_path) 

1018 

1019 return leaked_files 

1020 

1021 def _cleanup_leaked_files(self, leaked_files: list[str]) -> int: 

1022 """Discard leaked files from main repo working directory. 

1023 

1024 Claude Code sometimes writes files to the main repo instead of the 

1025 worktree. These files cause stash conflicts during merge operations. 

1026 Since the actual work is preserved in the worktree branch, we can 

1027 safely discard these leaked changes from the main repo. 

1028 

1029 Args: 

1030 leaked_files: List of file paths leaked to main repo 

1031 

1032 Returns: 

1033 Number of files successfully cleaned up 

1034 """ 

1035 if not leaked_files: 

1036 return 0 

1037 

1038 cleaned = 0 

1039 

1040 # Get status to determine which files are tracked vs untracked 

1041 status_result = self._git_lock.run( 

1042 ["status", "--porcelain", "--"] + leaked_files, 

1043 cwd=self.repo_path, 

1044 timeout=30, 

1045 ) 

1046 

1047 tracked_files: list[str] = [] 

1048 untracked_files: list[str] = [] 

1049 

1050 for line in status_result.stdout.splitlines(): 

1051 if not line or len(line) < 3: 

1052 continue 

1053 status_code = line[:2] 

1054 file_path = line[3:].split(" -> ")[-1].strip() 

1055 

1056 if status_code.startswith("?"): 

1057 # Untracked file - need to delete 

1058 untracked_files.append(file_path) 

1059 else: 

1060 # Tracked file - can use git checkout to discard 

1061 tracked_files.append(file_path) 

1062 

1063 # Discard changes to tracked files 

1064 if tracked_files: 

1065 checkout_result = self._git_lock.run( 

1066 ["checkout", "--"] + tracked_files, 

1067 cwd=self.repo_path, 

1068 timeout=30, 

1069 ) 

1070 if checkout_result.returncode == 0: 

1071 cleaned += len(tracked_files) 

1072 else: 

1073 self.logger.warning( 

1074 f"Failed to discard tracked leaked files: {checkout_result.stderr}" 

1075 ) 

1076 

1077 # Delete untracked files 

1078 for file_path in untracked_files: 

1079 full_path = self.repo_path / file_path 

1080 try: 

1081 if full_path.exists(): 

1082 full_path.unlink() 

1083 cleaned += 1 

1084 except OSError as e: 

1085 self.logger.warning(f"Failed to delete leaked file {file_path}: {e}") 

1086 

1087 # Fallback: directly delete files not reported by git status 

1088 # This handles gitignored files that git status --porcelain doesn't show 

1089 accounted_files = set(tracked_files + untracked_files) 

1090 for file_path in leaked_files: 

1091 if file_path not in accounted_files: 

1092 full_path = self.repo_path / file_path 

1093 if full_path.exists(): 

1094 try: 

1095 full_path.unlink() 

1096 cleaned += 1 

1097 self.logger.info(f"Deleted gitignored leaked file: {file_path}") 

1098 except OSError as e: 

1099 self.logger.warning( 

1100 f"Failed to delete gitignored leaked file {file_path}: {e}" 

1101 ) 

1102 else: 

1103 self.logger.debug(f"Leaked file not found (may have been moved): {file_path}") 

1104 

1105 if cleaned > 0: 

1106 self.logger.info(f"Cleaned up {cleaned} leaked file(s) from main repo") 

1107 

1108 return cleaned 

1109 

1110 def _get_main_repo_baseline(self) -> set[str]: 

1111 """Get baseline of modified/untracked files in main repo. 

1112 

1113 Returns: 

1114 Set of file paths currently showing in git status 

1115 """ 

1116 result = self._git_lock.run( 

1117 ["status", "--porcelain"], 

1118 cwd=self.repo_path, 

1119 timeout=30, 

1120 ) 

1121 

1122 if result.returncode != 0: 

1123 return set() 

1124 

1125 files: set[str] = set() 

1126 for line in result.stdout.strip().split("\n"): 

1127 if not line or len(line) < 3: 

1128 continue 

1129 file_path = line[3:].strip() 

1130 if " -> " in file_path: 

1131 file_path = file_path.split(" -> ")[-1] 

1132 files.add(file_path) 

1133 

1134 return files 

1135 

1136 def _get_main_head_sha(self) -> str: 

1137 """Get the current HEAD SHA of the main repo. 

1138 

1139 Returns: 

1140 HEAD SHA string, or empty string if unavailable 

1141 """ 

1142 result = self._git_lock.run( 

1143 ["rev-parse", "HEAD"], 

1144 cwd=self.repo_path, 

1145 timeout=10, 

1146 ) 

1147 if result.returncode == 0: 

1148 return result.stdout.strip() 

1149 return "" 

1150 

1151 def _detect_committed_leaks(self, baseline_head_sha: str) -> list[str]: 

1152 """Detect commits made directly to main repo during worker execution. 

1153 

1154 When Claude commits to the main repo instead of the worktree branch, 

1155 the commits appear on main's history but the worktree has no changes. 

1156 This method detects such leaked commits by comparing main's HEAD SHA 

1157 before and after worker execution. 

1158 

1159 Args: 

1160 baseline_head_sha: HEAD SHA captured before worker started 

1161 

1162 Returns: 

1163 List of commit SHAs committed to main during worker execution, 

1164 newest first. Empty list if no committed leaks detected. 

1165 """ 

1166 if not baseline_head_sha: 

1167 return [] 

1168 

1169 current_sha = self._get_main_head_sha() 

1170 if not current_sha or current_sha == baseline_head_sha: 

1171 return [] 

1172 

1173 # Get list of new commits on main since baseline 

1174 result = self._git_lock.run( 

1175 ["log", "--format=%H", f"{baseline_head_sha}..HEAD"], 

1176 cwd=self.repo_path, 

1177 timeout=30, 

1178 ) 

1179 if result.returncode != 0: 

1180 return [] 

1181 

1182 commits = [sha.strip() for sha in result.stdout.strip().split("\n") if sha.strip()] 

1183 return commits 

1184 

1185 def _recover_committed_leaks( 

1186 self, 

1187 leaked_commits: list[str], 

1188 worktree_path: Path, 

1189 baseline_head_sha: str, 

1190 issue_id: str, 

1191 ) -> bool: 

1192 """Attempt to recover committed leaks by cherry-picking to worktree. 

1193 

1194 When Claude commits directly to main instead of the worktree branch, 

1195 we attempt to: 

1196 1. Cherry-pick the leaked commits onto the worktree branch 

1197 2. Reset main back to the baseline SHA (if safe to do so) 

1198 

1199 This preserves the implementation work in the worktree while 

1200 cleaning up the incorrect commits on main. 

1201 

1202 Args: 

1203 leaked_commits: Commit SHAs that leaked to main (newest first) 

1204 worktree_path: Path to the worker's worktree 

1205 baseline_head_sha: Main HEAD SHA before worker started 

1206 issue_id: Issue ID for logging 

1207 

1208 Returns: 

1209 True if cherry-pick succeeded (main reset is attempted but 

1210 not required for a True return value) 

1211 """ 

1212 self.logger.info( 

1213 f"[{issue_id}] Attempting recovery: cherry-picking {len(leaked_commits)} " 

1214 f"commit(s) to worktree" 

1215 ) 

1216 

1217 # Cherry-pick in chronological order (oldest first = reverse of log output) 

1218 for sha in reversed(leaked_commits): 

1219 result = subprocess.run( 

1220 ["git", "cherry-pick", sha], 

1221 cwd=worktree_path, 

1222 capture_output=True, 

1223 text=True, 

1224 timeout=60, 

1225 ) 

1226 if result.returncode != 0: 

1227 subprocess.run( 

1228 ["git", "cherry-pick", "--abort"], 

1229 cwd=worktree_path, 

1230 capture_output=True, 

1231 timeout=10, 

1232 ) 

1233 self.logger.warning( 

1234 f"[{issue_id}] Cherry-pick of {sha[:8]} failed: {result.stderr.strip()}" 

1235 ) 

1236 return False 

1237 

1238 # Attempt to reset main to baseline (only if main hasn't advanced further) 

1239 current_main_sha = self._get_main_head_sha() 

1240 most_recent_leaked = leaked_commits[0] # Newest first 

1241 if current_main_sha == most_recent_leaked: 

1242 reset_result = self._git_lock.run( 

1243 ["reset", "--hard", baseline_head_sha], 

1244 cwd=self.repo_path, 

1245 timeout=30, 

1246 ) 

1247 if reset_result.returncode == 0: 

1248 self.logger.info(f"[{issue_id}] Reset main to baseline {baseline_head_sha[:8]}") 

1249 else: 

1250 self.logger.warning( 

1251 f"[{issue_id}] Cherry-pick succeeded but failed to reset main: " 

1252 f"{reset_result.stderr.strip()}" 

1253 ) 

1254 else: 

1255 self.logger.warning( 

1256 f"[{issue_id}] Main has additional commits beyond leaked ones " 

1257 f"({current_main_sha[:8]} != {most_recent_leaked[:8]}) — " 

1258 f"skipping main reset, manual cleanup may be needed" 

1259 ) 

1260 

1261 self.logger.info( 

1262 f"[{issue_id}] Recovered {len(leaked_commits)} commit(s): " 

1263 f"cherry-picked to worktree branch" 

1264 ) 

1265 return True 

1266 

1267 @property 

1268 def active_count(self) -> int: 

1269 """Number of currently active workers. 

1270 

1271 Includes both workers with running futures AND workers whose futures 

1272 are done but callbacks haven't completed yet. 

1273 """ 

1274 with self._process_lock: 

1275 running_futures = sum(1 for f in self._active_workers.values() if not f.done()) 

1276 with self._callback_lock: 

1277 pending_callback_count = len(self._pending_callbacks) 

1278 return running_futures + pending_callback_count 

1279 

1280 def set_worker_stage(self, issue_id: str, stage: WorkerStage) -> None: 

1281 """Update the stage of a worker. 

1282 

1283 Args: 

1284 issue_id: Issue ID being processed 

1285 stage: New stage value 

1286 """ 

1287 with self._process_lock: 

1288 self._worker_stages[issue_id] = stage 

1289 

1290 def get_worker_stage(self, issue_id: str) -> WorkerStage | None: 

1291 """Get the current stage of a worker. 

1292 

1293 Args: 

1294 issue_id: Issue ID being processed 

1295 

1296 Returns: 

1297 Current stage, or None if issue not being tracked 

1298 """ 

1299 with self._process_lock: 

1300 return self._worker_stages.get(issue_id) 

1301 

1302 def get_active_stages(self) -> dict[str, WorkerStage]: 

1303 """Get all active worker stages. 

1304 

1305 Returns: 

1306 Dictionary mapping issue_id to current stage for active workers 

1307 """ 

1308 with self._process_lock: 

1309 # Only return workers that are actually active 

1310 active_ids = set(self._active_workers.keys()) 

1311 return { 

1312 issue_id: stage 

1313 for issue_id, stage in self._worker_stages.items() 

1314 if issue_id in active_ids 

1315 } 

1316 

1317 def remove_worker_stage(self, issue_id: str) -> None: 

1318 """Remove a worker from stage tracking. 

1319 

1320 Args: 

1321 issue_id: Issue ID to remove 

1322 """ 

1323 with self._process_lock: 

1324 self._worker_stages.pop(issue_id, None) 

1325 

1326 def cleanup_all_worktrees(self) -> None: 

1327 """Clean up all worker worktrees.""" 

1328 worktree_base = self.repo_path / self.parallel_config.worktree_base 

1329 if not worktree_base.exists(): 

1330 return 

1331 

1332 for worktree_dir in worktree_base.iterdir(): 

1333 if worktree_dir.is_dir() and worktree_dir.name.startswith("worker-"): 

1334 self._cleanup_worktree(worktree_dir) 

1335 

1336 self.logger.info("Cleaned up all worker worktrees")