Source code for papagai.cli

#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-3.0-or-later

import enum
import logging
import os
import shlex
import shutil
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path

import click

try:
    import rich.logging

    handler = rich.logging.RichHandler(rich_tracebacks=True)
except ModuleNotFoundError:
    handler = logging.StreamHandler()


from .cmd import run_command
from .markdown import MarkdownInstructions
from .worktree import BRANCH_PREFIX, Worktree, WorktreeOverlayFs

logging.basicConfig(
    level="INFO",
    format="%(message)s",
    datefmt="[%X]",
    handlers=[handler],
)
logger = logging.getLogger("papagai")


[docs] @dataclass class Context: dry_run: bool = False
ALLOWED_TOOLS = [ "Glob", "Grep", "Read", "Bash(git status)", "Bash(git diff:*)", "Bash(git log:*)", "Bash(git show:*)", "Bash(git add:*)", "Bash(git commit:*)", "Bash(uv :*)", "Bash(pytest3 :*)", "Edit(./**)", "Write(./**)", ] PROMPT_SUFFIX = """ # Important Any changes to this repository should be commited into git following git best practices: - use descriptive subject lines - group logical changes together """
[docs] class Isolation(enum.StrEnum): AUTO = "auto" WORKTREE = "worktree" OVERLAYFS = "overlayfs"
[docs] def get_builtin_tasks_dir() -> Path: package_dir = Path(__file__).parent return package_dir / "tasks"
[docs] def get_builtin_primers_dir() -> Path: package_dir = Path(__file__).parent return package_dir / "primers"
[docs] def get_xdg_task_dir() -> Path: return ( Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) / "papagai" / "tasks" )
[docs] def get_branch(repo_dir: Path, ref: str = "HEAD") -> str: """ Get the branch name for a given ref (commit-ish). Args: repo_dir: Path to git repository ref: Git ref (branch name, HEAD, etc.). Default: HEAD Returns: The branch name Raises: subprocess.CalledProcessError if ref doesn't exist or not a git repo """ result = run_command( ["git", "rev-parse", "--abbrev-ref", "--verify", ref], cwd=repo_dir, ) return result.stdout.strip()
[docs] def branch_exists(repo_dir: Path, branch: str) -> bool: result = run_command( ["git", "rev-parse", "--verify", f"refs/heads/{branch}"], cwd=repo_dir, check=False, ) return result.returncode == 0
[docs] def create_branch_if_not_exists( repo_dir: Path, branch_spec: str | None, base_branch: str ) -> str: # None or "." means use current branch if branch_spec is None or branch_spec == ".": return base_branch if branch_exists(repo_dir, branch_spec): return branch_spec logger.debug(f"Creating new branch: {branch_spec} from {base_branch}") run_command( ["git", "branch", branch_spec, base_branch], cwd=repo_dir, ) return branch_spec
[docs] def merge_into_target_branch(repo_dir: Path, dest: str, src: str) -> int: result = run_command( [ "git", "merge-base", "--is-ancestor", dest, src, ], cwd=repo_dir, check=False, ) if result.returncode != 0: click.secho( f"Error: Cannot fast-forward {dest} to {src}", err=True, fg="red", ) click.secho( "The branches have diverged. Manual merge required.", err=True, fg="red", ) return 1 # Check if target branch is currently checked out try: current_checkout = get_branch(repo_dir, "HEAD") is_checked_out = current_checkout == dest except subprocess.CalledProcessError: is_checked_out = False try: if is_checked_out: # Use git merge if the target branch is currently checked out run_command( ["git", "merge", "--ff-only", src], cwd=repo_dir, ) else: # Use internal fetch if the target branch is not checked out run_command( ["git", "fetch", ".", f"{src}:{dest}"], cwd=repo_dir, ) except subprocess.CalledProcessError as e: click.secho( f"Error: Failed to merge {src} into {dest}: {e}", err=True, fg="red", ) click.secho( f"Work is available in branch {src}", err=True, fg="yellow", ) return 1 return 0
[docs] def purge_branches(repo_dir: Path) -> None: """ Delete all papagai branches from the repository. """ result = run_command( ["git", "branch", "--format=%(refname:short)", "--list", f"{BRANCH_PREFIX}/*"], cwd=repo_dir, ) branches = result.stdout.strip().split("\n") for branch in [b for b in branches if b]: click.echo(f"Deleting branch: {branch}") run_command(["git", "branch", "-D", branch], cwd=repo_dir, check=False)
[docs] def purge_worktrees(repo_dir: Path) -> None: """ Remove any leftover git worktrees created by papagai. """ result = run_command( ["git", "worktree", "list", "--porcelain"], cwd=repo_dir, ) # Parse worktree list output # Format is: worktree <path>\nHEAD <sha>\nbranch <branch>\n\n worktrees = [] current_worktree = {} for line in result.stdout.strip().split("\n"): if line.startswith("worktree ") and len(line) > 0: if current_worktree: worktrees.append(current_worktree) current_worktree = {"path": line.split(" ", 1)[1]} elif line.startswith("branch ") and len(line) > 0: current_worktree["branch"] = line.split(" ", 1)[1] if current_worktree: worktrees.append(current_worktree) for worktree in worktrees: branch = worktree.get("branch", "") if branch.startswith(f"refs/heads/{BRANCH_PREFIX}/"): path = worktree.get("path", "") click.echo(f"Removing worktree: {path} (branch: {branch})") run_command( ["git", "worktree", "remove", "--force", path], cwd=repo_dir, check=False, )
[docs] def purge_overlays(repo_dir: Path) -> None: """ Remove and unmount any leftover overlayfs created by papagai. """ xdg_cache_home = ( Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "papagai" ) overlay_base = xdg_cache_home / repo_dir.name if not overlay_base.exists(): return # Find all overlay directories dirs = list(overlay_base.glob("**/mounted")) for mount_dir in dirs: if not mount_dir.is_dir(): continue click.echo(f"Unmounting overlay: {mount_dir}") result = run_command( ["fusermount", "-u", str(mount_dir)], check=False, ) if result.returncode == 0: click.echo(f"Removing overlay directory: {mount_dir.parent}") shutil.rmtree(mount_dir.parent, ignore_errors=True) else: logger.warning(f"Failed to unmount {mount_dir}, it may not be mounted")
[docs] def run_claude( worktree_dir: Path, instructions: str, dry_run: bool, allowed_tools: list[str] | None = None, ) -> None: """Run the Claude review agent.""" if allowed_tools is None: allowed_tools = [] cmd = [ "claude", "--allowed-tools", " ".join(allowed_tools), "-p", instructions, ] if dry_run: click.echo("Would execute command:") click.echo(f" cd {shlex.quote(str(worktree_dir))}") click.echo(f" {' '.join(shlex.quote(arg) for arg in cmd)}") return click.secho( "Claude is pondering, contemplating, mulling, puzzling, meditating, etc.", fg="blue", ) try: result = run_command(cmd, cwd=worktree_dir) if result.stdout: click.echo(result.stdout) if result.stderr: click.secho(result.stderr, err=True, fg="red") except subprocess.CalledProcessError as e: click.secho(f"Error running claude: {e}", err=True, fg="red") raise
[docs] def claude_run( base_branch: str, instructions: MarkdownInstructions, dry_run: bool, branch_prefix: str = "", isolation: Isolation = Isolation.AUTO, keep: bool = False, target_branch: str | None = None, ) -> int: # Resolve repository directory repo_dir = Path.cwd().resolve() if not repo_dir.is_dir(): click.secho(f"Error: {repo_dir} is not a directory", err=True, fg="red") return 1 try: current_branch = get_branch(repo_dir, base_branch) except subprocess.CalledProcessError: click.secho( f"Error: Unable to find branch {base_branch} in this repo", err=True, fg="red", ) return 1 # Handle target branch: create if needed, or use existing try: work_base_branch = create_branch_if_not_exists( repo_dir, target_branch, current_branch ) except subprocess.CalledProcessError as e: click.secho( f"Error: Failed to create or access target branch: {e}", err=True, fg="red", ) return 1 worktree_class = Worktree if isolation in [Isolation.AUTO, Isolation.OVERLAYFS]: if WorktreeOverlayFs.is_supported(): worktree_class = WorktreeOverlayFs elif isolation != Isolation.AUTO: click.secho( "Error: fuse-overlayfs is not available. Please install it or use --isolation=worktree", err=True, fg="red", ) return 1 else: worktree_class = Worktree if isolation == Isolation.AUTO: logger.debug(f"Using isolation {worktree_class.__name__}") elif isolation == Isolation.WORKTREE: worktree_class = Worktree else: raise NotImplementedError(f"Error: Invalid isolation mode {isolation}") allowed_tools = ALLOWED_TOOLS + instructions.tools try: with worktree_class.from_branch( repo_dir, work_base_branch, branch_prefix=f"{BRANCH_PREFIX}/{branch_prefix}", keep=keep, ) as worktree: click.secho( f"Working in branch {worktree.branch} (based off {work_base_branch})", bold=True, ) assert instructions.text insts = instructions.text.replace("{BRANCH}", work_base_branch) insts = insts.replace("{WORKTREE_BRANCH}", worktree.branch) insts = f"{insts}\n{PROMPT_SUFFIX}" run_claude(worktree.worktree_dir, insts, dry_run, allowed_tools) # Save the worktree branch before context manager exits worktree_branch = worktree.branch click.secho( f"My work here is done. Check out branch {work_base_branch if target_branch else worktree_branch}", bold=True, ) # After worktree cleanup, merge work branch into target branch if specified if target_branch is not None: return merge_into_target_branch( repo_dir, dest=work_base_branch, src=worktree_branch ) return 0 except AssertionError as e: raise e except Exception as e: click.secho(f"Error: {e}", err=True, fg="red") return 1
[docs] def list_all_tasks() -> int: """ List all available tasks from the instructions directory. Returns: Exit code (0 for success, 1 for error) """ # Find the instructions directory from the installed package builtin_tasks_dir = get_builtin_tasks_dir() if not builtin_tasks_dir.exists() or not builtin_tasks_dir.is_dir(): click.secho( f"Error: Tasks directory not found at {builtin_tasks_dir}", err=True, fg="red", ) return 1 all_dirs = [builtin_tasks_dir] xdg_dir = get_xdg_task_dir() if xdg_dir.exists(): all_dirs = [xdg_dir] + all_dirs @dataclass class Task: name: str description: str tasks: list[Task] = [] for dir in all_dirs: md_files = sorted(dir.glob("**/*.md")) if not md_files: continue for md_file in md_files: try: md = MarkdownInstructions.from_file(md_file) if md.description: # Get relative path from instructions directory without .md extension rel_path = md_file.relative_to(dir) task_name = str(rel_path.with_suffix("")) tasks.append(Task(task_name, md.description)) else: click.secho( "Found task file {md_file} but it doesn't have a description" ) except Exception as e: click.secho( f"Warning: Failed to parse {md_file}: {e}", err=True, fg="red" ) if not tasks: click.secho("No tasks with descriptions found.", err=True, fg="red") return 1 # Calculate the maximum task name length for alignment max_name_len = max(len(task.name) for task in tasks) # Print tasks with aligned descriptions for task in tasks: click.echo(f"{task.name:<{max_name_len}} ... {task.description}") return 0
@click.group() @click.option("-v", "--verbose", count=True, help="increase verbosity") @click.option( "--dry-run", is_flag=True, help="Show the claude command that would be executed without running it", ) @click.pass_context def papagai(ctx: click.Context, dry_run: bool, verbose: int) -> None: """Papagai: Automate code changes with Claude AI on git worktrees.""" verbose_levels = {0: logging.ERROR, 1: logging.INFO, 2: logging.DEBUG} logger.setLevel(verbose_levels.get(verbose, 0)) logger.debug(f"Verbose level set {logger.getEffectiveLevel()}") # Store context object for subcommands ctx.obj = Context(dry_run=dry_run) @papagai.command("do") @click.argument( "instructions_file", type=click.Path(exists=True, path_type=Path), required=False, ) @click.option( "--base-branch", default="HEAD", help="Branch to base the work on (default: current branch)", ) @click.option( "-b", "--branch", "target_branch", default=None, help="Target branch to work on (creates if needed, merges work into it)", ) @click.option( "--isolation", type=click.Choice(["auto", "worktree", "overlayfs"], case_sensitive=False), default="auto", help="Worktree isolation mode: auto (try overlayfs, fall back to worktree), worktree (git worktree), or overlayfs (fuse-overlayfs)", ) @click.option( "--keep/--no-keep", default=False, help="Keep the worktree/overlay after completion (default: --no-keep)", ) @click.pass_context def cmd_do( ctx: click.Context, instructions_file: Path | None, base_branch: str, target_branch: str | None, isolation: str, keep: bool, ) -> int: """ Tell Claude to do something non-code related on a work tree. This is the command for non-coding related tasks, and the instructions should include priming Claude for the task at hand. See papagai code for programming tasks. """ if instructions_file is not None: try: instructions = MarkdownInstructions.from_file(instructions_file) except (FileNotFoundError, PermissionError) as e: click.secho(f"Error reading instructions file: {e}", err=True, fg="red") return 1 else: if sys.stdin.isatty(): click.secho( "Please tell me what you want me to do (Ctrl+D to complete)", bold=True ) instructions = MarkdownInstructions(text=sys.stdin.read()) if not instructions.text: click.secho( "Empty instructions. That's it, I can't work under these conditions!", err=True, fg="red", ) return 1 return claude_run( base_branch=base_branch, instructions=instructions, dry_run=ctx.obj.dry_run, isolation=Isolation(isolation), keep=keep, target_branch=target_branch, ) @papagai.command("code") @click.argument( "instructions_file", type=click.Path(exists=True, path_type=Path), # type: ignore[type-var] required=False, ) @click.option( "--base-branch", default="HEAD", help="Branch to base the work on (default: current branch)", ) @click.option( "-b", "--branch", "target_branch", default=None, help="Target branch to work on (creates if needed, merges work into it)", ) @click.option( "--isolation", type=click.Choice(["auto", "worktree", "overlayfs"], case_sensitive=False), default="auto", help="Worktree isolation mode: auto (try overlayfs, fall back to worktree), worktree (git worktree), or overlayfs (fuse-overlayfs)", ) @click.option( "--keep/--no-keep", default=False, help="Keep the worktree/overlay after completion (default: --no-keep)", ) @click.pass_context def cmd_code( ctx: click.Context, instructions_file: Path | None, base_branch: str, target_branch: str | None, isolation: str, keep: bool, ) -> int: """ Tell Claude to code something on a work tree. This command primes Claude to be software developer with an automatically inserted prompt prefix. The provided instructions thus only need to focus on the actual code. See papagai code for coding tasks. """ if instructions_file is not None: try: instructions = MarkdownInstructions.from_file(instructions_file) except (FileNotFoundError, PermissionError) as e: click.secho(f"Error reading instructions file: {e}", err=True, fg="red") return 1 else: if sys.stdin.isatty(): click.secho( "Please tell me what you want me to do (Ctrl+D to complete)", bold=True ) instructions = MarkdownInstructions(text=sys.stdin.read()) if not instructions.text: click.secho( "Empty instructions. That's it, I can't work under these conditions!", err=True, fg="red", ) return 1 primer = MarkdownInstructions.from_file(get_builtin_primers_dir() / "code.md") instructions = primer.combine(instructions) return claude_run( base_branch=base_branch, instructions=instructions, dry_run=ctx.obj.dry_run, isolation=Isolation(isolation), keep=keep, target_branch=target_branch, ) @papagai.command("purge") @click.option( "--branches/--no-branches", default=True, help="Remove git branches created by papagai (default: --branches)", ) @click.option( "--worktrees/--no-worktrees", default=True, help="Remove leftover git worktrees created by papagai (default: --worktrees)", ) @click.option( "--overlays/--no-overlays", default=True, help="Remove and unmount leftover overlayfs created by papagai (default: --overlays)", ) def cmd_purge(branches: bool, worktrees: bool, overlays: bool) -> int: """ Clean up papagai artifacts: branches, worktrees, and overlayfs. By default, removes all types of artifacts. Use --no-* flags to skip specific types. """ repo_dir = Path.cwd().resolve() if not repo_dir.is_dir(): click.secho(f"Error: {repo_dir} is not a directory", err=True, fg="red") return 1 error_occurred = False if branches: try: purge_branches(repo_dir) except subprocess.CalledProcessError as e: click.secho(f"Error purging branches: {e}", err=True, fg="red") error_occurred = True if worktrees: try: purge_worktrees(repo_dir) except subprocess.CalledProcessError as e: click.secho(f"Error purging worktrees: {e}", err=True, fg="red") error_occurred = True if overlays: try: purge_overlays(repo_dir) except Exception as e: click.secho(f"Error purging overlays: {e}", err=True, fg="red") error_occurred = True return 1 if error_occurred else 0 @papagai.command("task") @click.option( "--list", "list_tasks", is_flag=True, help="List all available tasks", ) @click.option( "--base-branch", default="HEAD", help="Branch to base the work on (default: current branch)", ) @click.argument("task_name", required=False) @click.pass_context def cmd_task( ctx: click.Context, list_tasks: bool, base_branch: str, task_name: str | None, ) -> int: """ Run a pre-written task, either from the built-in list or from tasks in XDG_CONFIG_HOME/papagai/tasks/**/*.md. Use --list to see all available tasks. """ if list_tasks: return list_all_tasks() if not task_name: click.secho( "Error: missing task name. Available tasks:", err=True, fg="red", ) return list_all_tasks() # Resolve repository directory repo_dir = Path.cwd().resolve() if not repo_dir.is_dir(): click.secho(f"Error: {repo_dir} is not a directory", err=True, fg="red") return 1 # Resolve task to instructions file from installed package task_files = [ get_xdg_task_dir() / f"{task_name}.md", get_builtin_tasks_dir() / f"{task_name}.md", ] task_file = next((f for f in task_files if f.exists()), None) if not task_file: click.secho( f"Error: Task '{task_name}' not found", err=True, fg="red", ) click.secho( "Run 'papagai task --list' to see available tasks", err=True, fg="red", ) return 1 try: instructions = MarkdownInstructions.from_file(task_file) except (FileNotFoundError, PermissionError) as e: click.secho(f"Error reading instructions file: {e}", err=True, fg="red") return 1 return claude_run( base_branch=base_branch, instructions=instructions, dry_run=ctx.obj.dry_run, ) @papagai.command("review") @click.option( "--base-branch", default="HEAD", help="Branch to base the work on (default: current branch)", ) @click.option( "-b", "--branch", "target_branch", default=None, help="Target branch to work on (creates if needed, merges work into it)", ) @click.option( "--isolation", type=click.Choice(["auto", "worktree", "overlayfs"], case_sensitive=False), default="auto", help="Worktree isolation mode: auto (try overlayfs, fall back to worktree), worktree (git worktree), or overlayfs (fuse-overlayfs)", ) @click.option( "--keep/--no-keep", default=False, help="Keep the worktree/overlay after completion (default: --no-keep)", ) @click.pass_context def cmd_review( ctx: click.Context, base_branch: str, target_branch: str | None, isolation: str, keep: bool, ) -> int: """ Run a code review on the current branch. """ # Resolve repository directory repo_dir = Path.cwd().resolve() if not repo_dir.is_dir(): click.secho(f"Error: {repo_dir} is not a directory", err=True, fg="red") return 1 primers_dir = get_builtin_primers_dir() review_task_file = primers_dir / "review.md" if not review_task_file.exists(): click.secho( f"Error: Review task not found at {review_task_file}. This is a bug", err=True, fg="red", ) return 1 try: instructions = MarkdownInstructions.from_file(review_task_file) except (FileNotFoundError, PermissionError) as e: click.secho(f"Error reading review instructions: {e}", err=True, fg="red") return 1 return claude_run( base_branch=base_branch, instructions=instructions, dry_run=ctx.obj.dry_run, branch_prefix="review/", isolation=Isolation(isolation), keep=keep, target_branch=target_branch, ) if __name__ == "__main__": sys.exit(papagai())