Source code for papagai.worktree

#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-3.0-or-later

"""Git worktree management for papagai."""

import logging
import os
import shutil
import subprocess
import uuid
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Self

from .cmd import run_command

logger = logging.getLogger("papagai.worktree")

BRANCH_PREFIX = "papagai"
LATEST_BRANCH = f"{BRANCH_PREFIX}/latest"


[docs] def repoint_latest_branch(repo_dir: Path, branch: str) -> None: """ Update the papagai/latest branch to point to the specified branch. Removes the papagai/latest branch if it exists, then creates it pointing to the same commit as the specified branch. Args: repo_dir: Path to the repository root branch: Branch name to point papagai/latest to """ try: run_command( ["git", "branch", "-f", LATEST_BRANCH, branch], cwd=repo_dir, check=True, ) except subprocess.CalledProcessError as e: logger.error(f"Warning: Failed to update {LATEST_BRANCH}: {e}")
[docs] @dataclass class Worktree: """ Git worktree context manager for automated branch creation and cleanup. Attributes: worktree_dir: Path to the worktree directory branch: Name of the created branch repo_dir: Path to the repository root keep: If True, skip worktree directory removal during cleanup """ worktree_dir: Path branch: str repo_dir: Path keep: bool = False
[docs] @classmethod def from_branch( cls, repo_dir: Path, base_branch: str, branch_prefix: str | None = None, keep: bool = False, ) -> Self: """ Create a new review branch using git worktree. Args: repo_dir: Path to the repository root base_branch: Branch to base the new branch on branch_prefix: Optional prefix for the branch name keep: If True, skip worktree directory removal during cleanup Returns: Worktree instance Raises: subprocess.CalledProcessError: If git worktree creation fails """ assert base_branch is not None rand = str(uuid.uuid4()).split("-")[0] date = datetime.now().strftime("%Y%m%d-%H%M") branch_prefix = branch_prefix or "" branch = f"{branch_prefix}{base_branch}-{date}-{rand}" worktree_dir = repo_dir / branch run_command( [ "git", "worktree", "add", "--quiet", "-b", branch, str(worktree_dir), base_branch, ], cwd=repo_dir, ) return cls( worktree_dir=worktree_dir, branch=branch, repo_dir=repo_dir, keep=keep )
def __enter__(self) -> Self: """Enter the context manager.""" return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: """Exit the context manager and cleanup the worktree.""" self._cleanup() def _cleanup(self) -> None: """Clean up the worktree and any empty parent directories.""" try: if ( run_command( ["git", "diff", "--quiet", "--exit-code"], cwd=self.worktree_dir, check=False, ).returncode != 0 ): logger.warning( "Uncommitted changes found in worktree, committing them." ) try: run_command( ["git", "add", "-A"], cwd=self.worktree_dir, check=True, ) run_command( ["git", "commit", "-m", "FIXME: changes left in worktree"], cwd=self.worktree_dir, check=True, ) except subprocess.SubprocessError as e: logger.error(f"Failed to commit uncommitted changes: {e}") logger.error("To clean up manually, run:") logger.error(f" $ git worktree remove --force {self.branch}") return repoint_latest_branch(self.repo_dir, self.branch) # Skip directory cleanup if keep is True if self.keep: logger.info(f"Keeping worktree at {self.worktree_dir}") return run_command( ["git", "worktree", "remove", "--force", str(self.branch)], cwd=self.repo_dir, check=False, ) # Remove the worktree directory if it still exists if self.worktree_dir.exists(): shutil.rmtree(self.worktree_dir, ignore_errors=True) # Remove empty parent directories up to repo_dir current = self.worktree_dir.parent while current != self.repo_dir and current.is_relative_to(self.repo_dir): try: if current.exists() and not any(current.iterdir()): current.rmdir() current = current.parent else: break except OSError: # Directory not empty or other error, stop cleanup break except Exception as e: logger.error(f"Warning during cleanup: {e}")
[docs] @dataclass class WorktreeOverlayFs(Worktree): """ Git worktree using overlay filesystem (fuse-overlayfs). This class creates a copy-on-write worktree using fuse-overlayfs, where the original repository is the read-only lower layer and modifications are stored in an upper layer. Attributes: worktree_dir: Path to the mounted overlay directory branch: Name of the created branch repo_dir: Path to the repository root keep: If True, skip unmounting and directory removal during cleanup overlay_base_dir: Path to the overlay filesystem base directory mount_dir: Path to the mounted overlay filesystem """ overlay_base_dir: Path | None = None mount_dir: Path | None = None
[docs] def umount(self, check: bool = False) -> None: run_command(["fusermount", "-u", str(self.mount_dir)], check=check)
[docs] @classmethod def is_supported(cls) -> bool: """ Check if fuse-overlayfs is available on the system. Returns: True if fuse-overlayfs command is available, False otherwise """ return shutil.which("fuse-overlayfs") is not None
[docs] @classmethod def from_branch( cls, repo_dir: Path, base_branch: str, branch_prefix: str | None = None, keep: bool = False, ) -> Self: """ Create a new overlay worktree using fuse-overlayfs. Directory structure: - $XDG_CACHE_HOME/papagai/<project>/<branch>-<date>-<uuid>/ - upperdir/ - workdir/ - fuse-overlayfs workdir - mounted/ - mount point The repo_dir is the read-only lower layer with fuse-overlayfs. Args: repo_dir: Path to the repository root base_branch: Branch to base the new branch on branch_prefix: Optional prefix for the branch name keep: If True, skip unmounting and directory removal during cleanup Returns: WorktreeOverlayFs instance with mounted overlay filesystem Raises: subprocess.CalledProcessError: If git or mount operations fail RuntimeError: If fuse-overlayfs is not available """ assert base_branch is not None # Generate unique directory name using same scheme as Worktree rand = str(uuid.uuid4()).split("-")[0] date = datetime.now().strftime("%Y%m%d-%H%M") # Skip the branch prefix here so we don't nest directories too much branch = f"{base_branch}-{date}-{rand}" xdg_cache_home = ( Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "papagai" ) overlay_base_dir = xdg_cache_home / repo_dir.name / branch overlay_base_dir.mkdir(parents=True, exist_ok=True) logger.debug(f"Setting up overlayfs in {overlay_base_dir}") upperdir = overlay_base_dir / "upperdir" workdir = overlay_base_dir / "workdir" mount_dir = overlay_base_dir / "mounted" upperdir.mkdir(exist_ok=True) workdir.mkdir(exist_ok=True) mount_dir.mkdir(exist_ok=True) # Now add the branch prefix branch = f"{branch_prefix or ''}{branch}" try: run_command( [ "fuse-overlayfs", "-o", f"lowerdir={repo_dir},upperdir={upperdir},workdir={workdir}", str(mount_dir), ] ) except subprocess.CalledProcessError as e: # Cleanup directories if mount fails shutil.rmtree(overlay_base_dir, ignore_errors=True) raise RuntimeError( f"Failed to mount overlay filesystem. Is fuse-overlayfs installed? Error: {e}" ) from e # Create a new git branch in the mounted directory try: run_command( ["git", "checkout", "-fb", branch, base_branch], cwd=mount_dir, ) except subprocess.CalledProcessError as e: # Cleanup on failure run_command(["fusermount", "-u", str(mount_dir)], check=False) shutil.rmtree(overlay_base_dir, ignore_errors=True) raise RuntimeError(f"Failed to create git branch: {e}") from e return cls( worktree_dir=mount_dir, branch=branch, repo_dir=repo_dir, keep=keep, overlay_base_dir=overlay_base_dir, mount_dir=mount_dir, )
def _cleanup(self) -> None: """Clean up the overlay filesystem and directories.""" try: if ( run_command( ["git", "diff", "--quiet", "--exit-code"], cwd=self.worktree_dir, check=False, ).returncode != 0 ): logger.warning( "Uncommitted changes found in worktree, committing them." ) try: run_command( ["git", "add", "-A"], cwd=self.worktree_dir, check=True, ) run_command( ["git", "commit", "-m", "FIXME: changes left in worktree"], cwd=self.worktree_dir, check=True, ) except subprocess.SubprocessError as e: logger.error(f"Failed to commit uncommitted changes: {e}") logger.error("To clean up manually, run:") logger.error(f" $ fusermount -u {self.mount_dir}") logger.error(f" $ rm -rf {self.overlay_base_dir}") return # Pull the branch from the overlay into the main repository # before unmounting and cleaning up try: run_command( [ "git", "fetch", str(self.mount_dir), f"{self.branch}:{self.branch}", ], cwd=self.repo_dir, check=True, ) # Verify the branch exists in the main repository run_command( ["git", "rev-parse", "--verify", self.branch], cwd=self.repo_dir, check=True, ) except subprocess.CalledProcessError as e: logger.error( f"Warning: Failed to pull branch {self.branch} from overlay: {e}" ) logger.error("To clean up manually, run:") logger.error( f" $ git fetch {self.mount_dir} {self.branch}:{self.branch}" ) logger.error(f" $ fusermount -u {self.mount_dir}") logger.error(f" $ rm -rf {self.overlay_base_dir}") return repoint_latest_branch(self.repo_dir, self.branch) # Skip unmounting and directory cleanup if keep is True if self.keep: logger.info(f"Keeping overlay mounted at {self.mount_dir}") return # Unmount the overlay filesystem if self.mount_dir and self.mount_dir.exists(): try: self.umount(check=True) except subprocess.CalledProcessError as e: logger.error(f"Warning: Failed to unmount {self.mount_dir}: {e}") logger.error( f"You may need to manually unmount: fusermount -u {self.mount_dir}" ) return # Remove the overlay base directory if self.overlay_base_dir and self.overlay_base_dir.exists(): shutil.rmtree(self.overlay_base_dir, ignore_errors=True) except Exception as e: logger.error(f"Warning during cleanup: {e}")