Coverage for src / domain / validation / worktree.py: 29%
168 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""Git worktree utilities for clean-room validation.
3Provides deterministic worktree creation and cleanup with state tracking.
4Worktree paths follow the format: {base_dir}/{run_id}/{issue_id}/{attempt}/
5"""
7from __future__ import annotations
9import re
10import shutil
11from dataclasses import dataclass, field
12from enum import Enum
13from typing import TYPE_CHECKING
15if TYPE_CHECKING:
16 from pathlib import Path
18 from src.core.protocols import CommandResultProtocol, CommandRunnerPort
21# Pattern for valid path components (alphanumeric, dash, underscore, dot)
22# Must not start with dot to prevent hidden files/directories
23_SAFE_PATH_COMPONENT = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9._-]*$")
26class WorktreeState(Enum):
27 """State of a validation worktree."""
29 PENDING = "pending" # Not yet created
30 CREATED = "created" # Successfully created
31 REMOVED = "removed" # Successfully removed
32 FAILED = "failed" # Creation or removal failed
33 KEPT = "kept" # Kept after failure (--keep-worktrees)
36@dataclass
37class WorktreeConfig:
38 """Configuration for worktree operations."""
40 base_dir: Path
41 """Base directory for all worktrees (e.g., /tmp/mala-worktrees)."""
43 keep_on_failure: bool = False
44 """If True, keep worktrees on validation failure for debugging."""
46 force_remove: bool = True
47 """If True, use --force when removing worktrees."""
50@dataclass
51class WorktreeResult:
52 """Result of a worktree operation."""
54 path: Path
55 """Path to the worktree directory."""
57 state: WorktreeState
58 """Current state of the worktree."""
60 error: str | None = None
61 """Error message if operation failed."""
64@dataclass
65class WorktreeContext:
66 """Context for a validation worktree with state tracking."""
68 config: WorktreeConfig
69 repo_path: Path
70 run_id: str
71 issue_id: str
72 attempt: int
74 state: WorktreeState = field(default=WorktreeState.PENDING)
75 error: str | None = None
76 _path: Path | None = field(default=None, repr=False)
77 _validated: bool = field(default=False, repr=False)
79 def _validate_path_components(self) -> None:
80 """Validate that path components are safe and don't escape base_dir.
82 Raises:
83 ValueError: If any path component is unsafe.
84 """
85 if self._validated:
86 return
88 # Validate run_id
89 if not _SAFE_PATH_COMPONENT.match(self.run_id):
90 raise ValueError(
91 f"Invalid run_id '{self.run_id}': must be alphanumeric with ._- allowed"
92 )
94 # Validate issue_id
95 if not _SAFE_PATH_COMPONENT.match(self.issue_id):
96 raise ValueError(
97 f"Invalid issue_id '{self.issue_id}': must be alphanumeric with ._- allowed"
98 )
100 # Validate attempt is positive
101 if self.attempt < 1:
102 raise ValueError(f"Invalid attempt '{self.attempt}': must be >= 1")
104 self._validated = True
106 @property
107 def path(self) -> Path:
108 """Get the deterministic worktree path.
110 Raises:
111 ValueError: If path components are unsafe.
112 """
113 if self._path is None:
114 # Validate before constructing path
115 self._validate_path_components()
117 # Construct path
118 candidate = (
119 self.config.base_dir / self.run_id / self.issue_id / str(self.attempt)
120 )
122 # Resolve and verify it's within base_dir
123 resolved_base = self.config.base_dir.resolve()
124 resolved_path = candidate.resolve()
126 if not str(resolved_path).startswith(str(resolved_base) + "/"):
127 raise ValueError(
128 f"Computed path '{resolved_path}' escapes base_dir '{resolved_base}'"
129 )
131 self._path = resolved_path
132 return self._path
134 def to_result(self) -> WorktreeResult:
135 """Convert to a WorktreeResult for external use."""
136 return WorktreeResult(path=self.path, state=self.state, error=self.error)
139def create_worktree(
140 repo_path: Path,
141 commit_sha: str,
142 config: WorktreeConfig,
143 run_id: str,
144 issue_id: str,
145 attempt: int,
146 command_runner: CommandRunnerPort,
147) -> WorktreeContext:
148 """Create a git worktree for validation.
150 Args:
151 repo_path: Path to the main git repository.
152 commit_sha: Commit SHA to checkout in the worktree.
153 config: Worktree configuration.
154 run_id: Unique identifier for this validation run.
155 issue_id: Issue identifier being validated.
156 attempt: Attempt number (1-indexed).
157 command_runner: Command runner for executing git commands.
159 Returns:
160 WorktreeContext with state tracking.
161 """
162 ctx = WorktreeContext(
163 config=config,
164 repo_path=repo_path.resolve(),
165 run_id=run_id,
166 issue_id=issue_id,
167 attempt=attempt,
168 )
170 # Validate and get path (may raise ValueError for unsafe inputs)
171 try:
172 worktree_path = ctx.path
173 except ValueError as e:
174 ctx.state = WorktreeState.FAILED
175 ctx.error = str(e)
176 return ctx
178 # Ensure parent directories exist
179 worktree_path.parent.mkdir(parents=True, exist_ok=True)
181 # Remove any existing path (stale from crashed run)
182 if worktree_path.exists():
183 if worktree_path.is_file():
184 ctx.state = WorktreeState.FAILED
185 ctx.error = f"Path exists as file, not directory: {worktree_path}"
186 return ctx
187 try:
188 shutil.rmtree(worktree_path)
189 except OSError as e:
190 ctx.state = WorktreeState.FAILED
191 ctx.error = f"Failed to remove stale worktree: {e}"
192 return ctx
194 result = command_runner.run(
195 ["git", "worktree", "add", "--detach", str(worktree_path), commit_sha],
196 cwd=ctx.repo_path,
197 )
199 if result.returncode != 0:
200 ctx.state = WorktreeState.FAILED
201 ctx.error = _format_git_error("git worktree add", result)
202 # Clean up any partial directory
203 if worktree_path.exists():
204 shutil.rmtree(worktree_path, ignore_errors=True)
205 return ctx
207 ctx.state = WorktreeState.CREATED
208 return ctx
211def remove_worktree(
212 ctx: WorktreeContext,
213 validation_passed: bool,
214 command_runner: CommandRunnerPort,
215) -> WorktreeContext:
216 """Remove a git worktree, respecting keep_on_failure setting.
218 Args:
219 ctx: Worktree context from create_worktree.
220 validation_passed: Whether validation succeeded.
221 command_runner: Command runner for executing git commands.
223 Returns:
224 Updated WorktreeContext with new state.
225 """
226 # Honor keep_on_failure for debugging failed validations
227 if not validation_passed and ctx.config.keep_on_failure:
228 ctx.state = WorktreeState.KEPT
229 return ctx
231 # Skip removal if worktree was never created
232 if ctx.state not in (WorktreeState.CREATED, WorktreeState.KEPT):
233 return ctx
235 # Try git worktree remove first
236 cmd = ["git", "worktree", "remove"]
237 if ctx.config.force_remove:
238 cmd.append("--force")
239 cmd.append(str(ctx.path))
241 result = command_runner.run(
242 cmd,
243 cwd=ctx.repo_path,
244 )
246 # Track git command failure
247 git_failed = result.returncode != 0
248 git_error = _format_git_error("git worktree remove", result) if git_failed else None
250 # Only attempt directory cleanup if:
251 # 1. Git worktree remove succeeded, OR
252 # 2. force_remove is True (user explicitly requested forced cleanup)
253 # This protects uncommitted changes when force_remove=False and git remove fails
254 dir_cleanup_failed = False
255 should_cleanup_dir = not git_failed or ctx.config.force_remove
257 if should_cleanup_dir and ctx.path.exists():
258 try:
259 shutil.rmtree(ctx.path)
260 except OSError as e:
261 dir_cleanup_failed = True
262 if git_error:
263 git_error = f"{git_error}; directory cleanup also failed: {e}"
264 else:
265 git_error = f"Directory cleanup failed: {e}"
267 # Prune the worktree list to clean up stale git metadata
268 command_runner.run(
269 ["git", "worktree", "prune"],
270 cwd=ctx.repo_path,
271 )
273 # Report failure if git command failed or directory cleanup failed
274 if git_failed or dir_cleanup_failed:
275 ctx.state = WorktreeState.FAILED
276 ctx.error = git_error
277 return ctx
279 ctx.state = WorktreeState.REMOVED
280 return ctx
283def cleanup_stale_worktrees(
284 repo_path: Path,
285 config: WorktreeConfig,
286 command_runner: CommandRunnerPort,
287 run_id: str | None = None,
288) -> int:
289 """Clean up stale worktrees from previous runs.
291 Args:
292 repo_path: Path to the main git repository.
293 config: Worktree configuration.
294 command_runner: Command runner for executing git commands.
295 run_id: If provided, only clean up worktrees for this run.
296 If None, clean up all worktrees under base_dir.
298 Returns:
299 Number of worktrees cleaned up.
300 """
301 cleaned = 0
302 base = config.base_dir
304 if not base.exists():
305 return 0
307 if run_id:
308 # Clean up specific run
309 run_dir = base / run_id
310 if run_dir.exists():
311 cleaned += _cleanup_run_dir(repo_path, run_dir, command_runner)
312 else:
313 # Clean up all runs
314 for run_dir in base.iterdir():
315 if run_dir.is_dir():
316 cleaned += _cleanup_run_dir(repo_path, run_dir, command_runner)
318 # Prune the worktree list
319 command_runner.run(
320 ["git", "worktree", "prune"],
321 cwd=repo_path,
322 )
324 return cleaned
327def _cleanup_run_dir(
328 repo_path: Path, run_dir: Path, command_runner: CommandRunnerPort
329) -> int:
330 """Clean up all worktrees in a run directory."""
331 cleaned = 0
333 for issue_dir in run_dir.iterdir():
334 if not issue_dir.is_dir():
335 continue
337 for attempt_dir in issue_dir.iterdir():
338 if not attempt_dir.is_dir():
339 continue
341 # Try git worktree remove, then force delete
342 command_runner.run(
343 ["git", "worktree", "remove", "--force", str(attempt_dir)],
344 cwd=repo_path,
345 )
347 if attempt_dir.exists():
348 try:
349 shutil.rmtree(attempt_dir)
350 except OSError:
351 continue
353 cleaned += 1
355 # Remove empty run/issue directories
356 try:
357 for issue_dir in run_dir.iterdir():
358 if issue_dir.is_dir() and not any(issue_dir.iterdir()):
359 issue_dir.rmdir()
360 if run_dir.exists() and not any(run_dir.iterdir()):
361 run_dir.rmdir()
362 except OSError:
363 pass
365 return cleaned
368def _format_git_error(cmd_name: str, result: CommandResultProtocol) -> str:
369 """Format a git command error message."""
370 msg = f"{cmd_name} exited {result.returncode}"
371 stderr = result.stderr.strip()
372 if stderr:
373 # Truncate long stderr
374 if len(stderr) > 200:
375 stderr = stderr[:200] + "..."
376 msg = f"{msg}: {stderr}"
377 return msg