#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-3.0-or-later
import enum
import logging
import os
import shlex
import shutil
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
import click
try:
import rich.logging
handler = rich.logging.RichHandler(rich_tracebacks=True)
except ModuleNotFoundError:
handler = logging.StreamHandler()
from .cmd import run_command
from .markdown import MarkdownInstructions
from .worktree import BRANCH_PREFIX, Worktree, WorktreeOverlayFs
logging.basicConfig(
level="INFO",
format="%(message)s",
datefmt="[%X]",
handlers=[handler],
)
logger = logging.getLogger("papagai")
[docs]
@dataclass
class Context:
dry_run: bool = False
ALLOWED_TOOLS = [
"Glob",
"Grep",
"Read",
"Bash(git status)",
"Bash(git diff:*)",
"Bash(git log:*)",
"Bash(git show:*)",
"Bash(git add:*)",
"Bash(git commit:*)",
"Bash(uv :*)",
"Bash(pytest3 :*)",
"Edit(./**)",
"Write(./**)",
]
PROMPT_SUFFIX = """
# Important
Any changes to this repository should be commited into git following git best
practices:
- use descriptive subject lines
- group logical changes together
"""
[docs]
class Isolation(enum.StrEnum):
AUTO = "auto"
WORKTREE = "worktree"
OVERLAYFS = "overlayfs"
[docs]
def get_builtin_tasks_dir() -> Path:
package_dir = Path(__file__).parent
return package_dir / "tasks"
[docs]
def get_builtin_primers_dir() -> Path:
package_dir = Path(__file__).parent
return package_dir / "primers"
[docs]
def get_xdg_task_dir() -> Path:
return (
Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config"))
/ "papagai"
/ "tasks"
)
[docs]
def get_branch(repo_dir: Path, ref: str = "HEAD") -> str:
"""
Get the branch name for a given ref (commit-ish).
Args:
repo_dir: Path to git repository
ref: Git ref (branch name, HEAD, etc.). Default: HEAD
Returns:
The branch name
Raises:
subprocess.CalledProcessError if ref doesn't exist or not a git repo
"""
result = run_command(
["git", "rev-parse", "--abbrev-ref", "--verify", ref],
cwd=repo_dir,
)
return result.stdout.strip()
[docs]
def branch_exists(repo_dir: Path, branch: str) -> bool:
result = run_command(
["git", "rev-parse", "--verify", f"refs/heads/{branch}"],
cwd=repo_dir,
check=False,
)
return result.returncode == 0
[docs]
def create_branch_if_not_exists(
repo_dir: Path, branch_spec: str | None, base_branch: str
) -> str:
# None or "." means use current branch
if branch_spec is None or branch_spec == ".":
return base_branch
if branch_exists(repo_dir, branch_spec):
return branch_spec
logger.debug(f"Creating new branch: {branch_spec} from {base_branch}")
run_command(
["git", "branch", branch_spec, base_branch],
cwd=repo_dir,
)
return branch_spec
[docs]
def merge_into_target_branch(repo_dir: Path, dest: str, src: str) -> int:
result = run_command(
[
"git",
"merge-base",
"--is-ancestor",
dest,
src,
],
cwd=repo_dir,
check=False,
)
if result.returncode != 0:
click.secho(
f"Error: Cannot fast-forward {dest} to {src}",
err=True,
fg="red",
)
click.secho(
"The branches have diverged. Manual merge required.",
err=True,
fg="red",
)
return 1
# Check if target branch is currently checked out
try:
current_checkout = get_branch(repo_dir, "HEAD")
is_checked_out = current_checkout == dest
except subprocess.CalledProcessError:
is_checked_out = False
try:
if is_checked_out:
# Use git merge if the target branch is currently checked out
run_command(
["git", "merge", "--ff-only", src],
cwd=repo_dir,
)
else:
# Use internal fetch if the target branch is not checked out
run_command(
["git", "fetch", ".", f"{src}:{dest}"],
cwd=repo_dir,
)
except subprocess.CalledProcessError as e:
click.secho(
f"Error: Failed to merge {src} into {dest}: {e}",
err=True,
fg="red",
)
click.secho(
f"Work is available in branch {src}",
err=True,
fg="yellow",
)
return 1
return 0
[docs]
def purge_branches(repo_dir: Path) -> None:
"""
Delete all papagai branches from the repository.
"""
result = run_command(
["git", "branch", "--format=%(refname:short)", "--list", f"{BRANCH_PREFIX}/*"],
cwd=repo_dir,
)
branches = result.stdout.strip().split("\n")
for branch in [b for b in branches if b]:
click.echo(f"Deleting branch: {branch}")
run_command(["git", "branch", "-D", branch], cwd=repo_dir, check=False)
[docs]
def purge_worktrees(repo_dir: Path) -> None:
"""
Remove any leftover git worktrees created by papagai.
"""
result = run_command(
["git", "worktree", "list", "--porcelain"],
cwd=repo_dir,
)
# Parse worktree list output
# Format is: worktree <path>\nHEAD <sha>\nbranch <branch>\n\n
worktrees = []
current_worktree = {}
for line in result.stdout.strip().split("\n"):
if line.startswith("worktree ") and len(line) > 0:
if current_worktree:
worktrees.append(current_worktree)
current_worktree = {"path": line.split(" ", 1)[1]}
elif line.startswith("branch ") and len(line) > 0:
current_worktree["branch"] = line.split(" ", 1)[1]
if current_worktree:
worktrees.append(current_worktree)
for worktree in worktrees:
branch = worktree.get("branch", "")
if branch.startswith(f"refs/heads/{BRANCH_PREFIX}/"):
path = worktree.get("path", "")
click.echo(f"Removing worktree: {path} (branch: {branch})")
run_command(
["git", "worktree", "remove", "--force", path],
cwd=repo_dir,
check=False,
)
[docs]
def purge_overlays(repo_dir: Path) -> None:
"""
Remove and unmount any leftover overlayfs created by papagai.
"""
xdg_cache_home = (
Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "papagai"
)
overlay_base = xdg_cache_home / repo_dir.name
if not overlay_base.exists():
return
# Find all overlay directories
dirs = list(overlay_base.glob("**/mounted"))
for mount_dir in dirs:
if not mount_dir.is_dir():
continue
click.echo(f"Unmounting overlay: {mount_dir}")
result = run_command(
["fusermount", "-u", str(mount_dir)],
check=False,
)
if result.returncode == 0:
click.echo(f"Removing overlay directory: {mount_dir.parent}")
shutil.rmtree(mount_dir.parent, ignore_errors=True)
else:
logger.warning(f"Failed to unmount {mount_dir}, it may not be mounted")
[docs]
def run_claude(
worktree_dir: Path,
instructions: str,
dry_run: bool,
allowed_tools: list[str] | None = None,
) -> None:
"""Run the Claude review agent."""
if allowed_tools is None:
allowed_tools = []
cmd = [
"claude",
"--allowed-tools",
" ".join(allowed_tools),
"-p",
instructions,
]
if dry_run:
click.echo("Would execute command:")
click.echo(f" cd {shlex.quote(str(worktree_dir))}")
click.echo(f" {' '.join(shlex.quote(arg) for arg in cmd)}")
return
click.secho(
"Claude is pondering, contemplating, mulling, puzzling, meditating, etc.",
fg="blue",
)
try:
result = run_command(cmd, cwd=worktree_dir)
if result.stdout:
click.echo(result.stdout)
if result.stderr:
click.secho(result.stderr, err=True, fg="red")
except subprocess.CalledProcessError as e:
click.secho(f"Error running claude: {e}", err=True, fg="red")
raise
[docs]
def claude_run(
base_branch: str,
instructions: MarkdownInstructions,
dry_run: bool,
branch_prefix: str = "",
isolation: Isolation = Isolation.AUTO,
keep: bool = False,
target_branch: str | None = None,
) -> int:
# Resolve repository directory
repo_dir = Path.cwd().resolve()
if not repo_dir.is_dir():
click.secho(f"Error: {repo_dir} is not a directory", err=True, fg="red")
return 1
try:
current_branch = get_branch(repo_dir, base_branch)
except subprocess.CalledProcessError:
click.secho(
f"Error: Unable to find branch {base_branch} in this repo",
err=True,
fg="red",
)
return 1
# Handle target branch: create if needed, or use existing
try:
work_base_branch = create_branch_if_not_exists(
repo_dir, target_branch, current_branch
)
except subprocess.CalledProcessError as e:
click.secho(
f"Error: Failed to create or access target branch: {e}",
err=True,
fg="red",
)
return 1
worktree_class = Worktree
if isolation in [Isolation.AUTO, Isolation.OVERLAYFS]:
if WorktreeOverlayFs.is_supported():
worktree_class = WorktreeOverlayFs
elif isolation != Isolation.AUTO:
click.secho(
"Error: fuse-overlayfs is not available. Please install it or use --isolation=worktree",
err=True,
fg="red",
)
return 1
else:
worktree_class = Worktree
if isolation == Isolation.AUTO:
logger.debug(f"Using isolation {worktree_class.__name__}")
elif isolation == Isolation.WORKTREE:
worktree_class = Worktree
else:
raise NotImplementedError(f"Error: Invalid isolation mode {isolation}")
allowed_tools = ALLOWED_TOOLS + instructions.tools
try:
with worktree_class.from_branch(
repo_dir,
work_base_branch,
branch_prefix=f"{BRANCH_PREFIX}/{branch_prefix}",
keep=keep,
) as worktree:
click.secho(
f"Working in branch {worktree.branch} (based off {work_base_branch})",
bold=True,
)
assert instructions.text
insts = instructions.text.replace("{BRANCH}", work_base_branch)
insts = insts.replace("{WORKTREE_BRANCH}", worktree.branch)
insts = f"{insts}\n{PROMPT_SUFFIX}"
run_claude(worktree.worktree_dir, insts, dry_run, allowed_tools)
# Save the worktree branch before context manager exits
worktree_branch = worktree.branch
click.secho(
f"My work here is done. Check out branch {work_base_branch if target_branch else worktree_branch}",
bold=True,
)
# After worktree cleanup, merge work branch into target branch if specified
if target_branch is not None:
return merge_into_target_branch(
repo_dir, dest=work_base_branch, src=worktree_branch
)
return 0
except AssertionError as e:
raise e
except Exception as e:
click.secho(f"Error: {e}", err=True, fg="red")
return 1
[docs]
def list_all_tasks() -> int:
"""
List all available tasks from the instructions directory.
Returns:
Exit code (0 for success, 1 for error)
"""
# Find the instructions directory from the installed package
builtin_tasks_dir = get_builtin_tasks_dir()
if not builtin_tasks_dir.exists() or not builtin_tasks_dir.is_dir():
click.secho(
f"Error: Tasks directory not found at {builtin_tasks_dir}",
err=True,
fg="red",
)
return 1
all_dirs = [builtin_tasks_dir]
xdg_dir = get_xdg_task_dir()
if xdg_dir.exists():
all_dirs = [xdg_dir] + all_dirs
@dataclass
class Task:
name: str
description: str
tasks: list[Task] = []
for dir in all_dirs:
md_files = sorted(dir.glob("**/*.md"))
if not md_files:
continue
for md_file in md_files:
try:
md = MarkdownInstructions.from_file(md_file)
if md.description:
# Get relative path from instructions directory without .md extension
rel_path = md_file.relative_to(dir)
task_name = str(rel_path.with_suffix(""))
tasks.append(Task(task_name, md.description))
else:
click.secho(
"Found task file {md_file} but it doesn't have a description"
)
except Exception as e:
click.secho(
f"Warning: Failed to parse {md_file}: {e}", err=True, fg="red"
)
if not tasks:
click.secho("No tasks with descriptions found.", err=True, fg="red")
return 1
# Calculate the maximum task name length for alignment
max_name_len = max(len(task.name) for task in tasks)
# Print tasks with aligned descriptions
for task in tasks:
click.echo(f"{task.name:<{max_name_len}} ... {task.description}")
return 0
@click.group()
@click.option("-v", "--verbose", count=True, help="increase verbosity")
@click.option(
"--dry-run",
is_flag=True,
help="Show the claude command that would be executed without running it",
)
@click.pass_context
def papagai(ctx: click.Context, dry_run: bool, verbose: int) -> None:
"""Papagai: Automate code changes with Claude AI on git worktrees."""
verbose_levels = {0: logging.ERROR, 1: logging.INFO, 2: logging.DEBUG}
logger.setLevel(verbose_levels.get(verbose, 0))
logger.debug(f"Verbose level set {logger.getEffectiveLevel()}")
# Store context object for subcommands
ctx.obj = Context(dry_run=dry_run)
@papagai.command("do")
@click.argument(
"instructions_file",
type=click.Path(exists=True, path_type=Path),
required=False,
)
@click.option(
"--base-branch",
default="HEAD",
help="Branch to base the work on (default: current branch)",
)
@click.option(
"-b",
"--branch",
"target_branch",
default=None,
help="Target branch to work on (creates if needed, merges work into it)",
)
@click.option(
"--isolation",
type=click.Choice(["auto", "worktree", "overlayfs"], case_sensitive=False),
default="auto",
help="Worktree isolation mode: auto (try overlayfs, fall back to worktree), worktree (git worktree), or overlayfs (fuse-overlayfs)",
)
@click.option(
"--keep/--no-keep",
default=False,
help="Keep the worktree/overlay after completion (default: --no-keep)",
)
@click.pass_context
def cmd_do(
ctx: click.Context,
instructions_file: Path | None,
base_branch: str,
target_branch: str | None,
isolation: str,
keep: bool,
) -> int:
"""
Tell Claude to do something non-code related on a work tree.
This is the command for non-coding related tasks, and the instructions
should include priming Claude for the task at hand.
See papagai code for programming tasks.
"""
if instructions_file is not None:
try:
instructions = MarkdownInstructions.from_file(instructions_file)
except (FileNotFoundError, PermissionError) as e:
click.secho(f"Error reading instructions file: {e}", err=True, fg="red")
return 1
else:
if sys.stdin.isatty():
click.secho(
"Please tell me what you want me to do (Ctrl+D to complete)", bold=True
)
instructions = MarkdownInstructions(text=sys.stdin.read())
if not instructions.text:
click.secho(
"Empty instructions. That's it, I can't work under these conditions!",
err=True,
fg="red",
)
return 1
return claude_run(
base_branch=base_branch,
instructions=instructions,
dry_run=ctx.obj.dry_run,
isolation=Isolation(isolation),
keep=keep,
target_branch=target_branch,
)
@papagai.command("code")
@click.argument(
"instructions_file",
type=click.Path(exists=True, path_type=Path), # type: ignore[type-var]
required=False,
)
@click.option(
"--base-branch",
default="HEAD",
help="Branch to base the work on (default: current branch)",
)
@click.option(
"-b",
"--branch",
"target_branch",
default=None,
help="Target branch to work on (creates if needed, merges work into it)",
)
@click.option(
"--isolation",
type=click.Choice(["auto", "worktree", "overlayfs"], case_sensitive=False),
default="auto",
help="Worktree isolation mode: auto (try overlayfs, fall back to worktree), worktree (git worktree), or overlayfs (fuse-overlayfs)",
)
@click.option(
"--keep/--no-keep",
default=False,
help="Keep the worktree/overlay after completion (default: --no-keep)",
)
@click.pass_context
def cmd_code(
ctx: click.Context,
instructions_file: Path | None,
base_branch: str,
target_branch: str | None,
isolation: str,
keep: bool,
) -> int:
"""
Tell Claude to code something on a work tree.
This command primes Claude to be software developer with an automatically
inserted prompt prefix. The provided instructions thus only need to focus on
the actual code.
See papagai code for coding tasks.
"""
if instructions_file is not None:
try:
instructions = MarkdownInstructions.from_file(instructions_file)
except (FileNotFoundError, PermissionError) as e:
click.secho(f"Error reading instructions file: {e}", err=True, fg="red")
return 1
else:
if sys.stdin.isatty():
click.secho(
"Please tell me what you want me to do (Ctrl+D to complete)", bold=True
)
instructions = MarkdownInstructions(text=sys.stdin.read())
if not instructions.text:
click.secho(
"Empty instructions. That's it, I can't work under these conditions!",
err=True,
fg="red",
)
return 1
primer = MarkdownInstructions.from_file(get_builtin_primers_dir() / "code.md")
instructions = primer.combine(instructions)
return claude_run(
base_branch=base_branch,
instructions=instructions,
dry_run=ctx.obj.dry_run,
isolation=Isolation(isolation),
keep=keep,
target_branch=target_branch,
)
@papagai.command("purge")
@click.option(
"--branches/--no-branches",
default=True,
help="Remove git branches created by papagai (default: --branches)",
)
@click.option(
"--worktrees/--no-worktrees",
default=True,
help="Remove leftover git worktrees created by papagai (default: --worktrees)",
)
@click.option(
"--overlays/--no-overlays",
default=True,
help="Remove and unmount leftover overlayfs created by papagai (default: --overlays)",
)
def cmd_purge(branches: bool, worktrees: bool, overlays: bool) -> int:
"""
Clean up papagai artifacts: branches, worktrees, and overlayfs.
By default, removes all types of artifacts. Use --no-* flags to skip specific types.
"""
repo_dir = Path.cwd().resolve()
if not repo_dir.is_dir():
click.secho(f"Error: {repo_dir} is not a directory", err=True, fg="red")
return 1
error_occurred = False
if branches:
try:
purge_branches(repo_dir)
except subprocess.CalledProcessError as e:
click.secho(f"Error purging branches: {e}", err=True, fg="red")
error_occurred = True
if worktrees:
try:
purge_worktrees(repo_dir)
except subprocess.CalledProcessError as e:
click.secho(f"Error purging worktrees: {e}", err=True, fg="red")
error_occurred = True
if overlays:
try:
purge_overlays(repo_dir)
except Exception as e:
click.secho(f"Error purging overlays: {e}", err=True, fg="red")
error_occurred = True
return 1 if error_occurred else 0
@papagai.command("task")
@click.option(
"--list",
"list_tasks",
is_flag=True,
help="List all available tasks",
)
@click.option(
"--base-branch",
default="HEAD",
help="Branch to base the work on (default: current branch)",
)
@click.argument("task_name", required=False)
@click.pass_context
def cmd_task(
ctx: click.Context,
list_tasks: bool,
base_branch: str,
task_name: str | None,
) -> int:
"""
Run a pre-written task, either from the built-in list or from tasks in
XDG_CONFIG_HOME/papagai/tasks/**/*.md.
Use --list to see all available tasks.
"""
if list_tasks:
return list_all_tasks()
if not task_name:
click.secho(
"Error: missing task name. Available tasks:",
err=True,
fg="red",
)
return list_all_tasks()
# Resolve repository directory
repo_dir = Path.cwd().resolve()
if not repo_dir.is_dir():
click.secho(f"Error: {repo_dir} is not a directory", err=True, fg="red")
return 1
# Resolve task to instructions file from installed package
task_files = [
get_xdg_task_dir() / f"{task_name}.md",
get_builtin_tasks_dir() / f"{task_name}.md",
]
task_file = next((f for f in task_files if f.exists()), None)
if not task_file:
click.secho(
f"Error: Task '{task_name}' not found",
err=True,
fg="red",
)
click.secho(
"Run 'papagai task --list' to see available tasks",
err=True,
fg="red",
)
return 1
try:
instructions = MarkdownInstructions.from_file(task_file)
except (FileNotFoundError, PermissionError) as e:
click.secho(f"Error reading instructions file: {e}", err=True, fg="red")
return 1
return claude_run(
base_branch=base_branch,
instructions=instructions,
dry_run=ctx.obj.dry_run,
)
@papagai.command("review")
@click.option(
"--base-branch",
default="HEAD",
help="Branch to base the work on (default: current branch)",
)
@click.option(
"-b",
"--branch",
"target_branch",
default=None,
help="Target branch to work on (creates if needed, merges work into it)",
)
@click.option(
"--isolation",
type=click.Choice(["auto", "worktree", "overlayfs"], case_sensitive=False),
default="auto",
help="Worktree isolation mode: auto (try overlayfs, fall back to worktree), worktree (git worktree), or overlayfs (fuse-overlayfs)",
)
@click.option(
"--keep/--no-keep",
default=False,
help="Keep the worktree/overlay after completion (default: --no-keep)",
)
@click.pass_context
def cmd_review(
ctx: click.Context,
base_branch: str,
target_branch: str | None,
isolation: str,
keep: bool,
) -> int:
"""
Run a code review on the current branch.
"""
# Resolve repository directory
repo_dir = Path.cwd().resolve()
if not repo_dir.is_dir():
click.secho(f"Error: {repo_dir} is not a directory", err=True, fg="red")
return 1
primers_dir = get_builtin_primers_dir()
review_task_file = primers_dir / "review.md"
if not review_task_file.exists():
click.secho(
f"Error: Review task not found at {review_task_file}. This is a bug",
err=True,
fg="red",
)
return 1
try:
instructions = MarkdownInstructions.from_file(review_task_file)
except (FileNotFoundError, PermissionError) as e:
click.secho(f"Error reading review instructions: {e}", err=True, fg="red")
return 1
return claude_run(
base_branch=base_branch,
instructions=instructions,
dry_run=ctx.obj.dry_run,
branch_prefix="review/",
isolation=Isolation(isolation),
keep=keep,
target_branch=target_branch,
)
if __name__ == "__main__":
sys.exit(papagai())