Coverage for src / domain / validation / worktree.py: 29%

168 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""Git worktree utilities for clean-room validation. 

2 

3Provides deterministic worktree creation and cleanup with state tracking. 

4Worktree paths follow the format: {base_dir}/{run_id}/{issue_id}/{attempt}/ 

5""" 

6 

7from __future__ import annotations 

8 

9import re 

10import shutil 

11from dataclasses import dataclass, field 

12from enum import Enum 

13from typing import TYPE_CHECKING 

14 

15if TYPE_CHECKING: 

16 from pathlib import Path 

17 

18 from src.core.protocols import CommandResultProtocol, CommandRunnerPort 

19 

20 

21# Pattern for valid path components (alphanumeric, dash, underscore, dot) 

22# Must not start with dot to prevent hidden files/directories 

23_SAFE_PATH_COMPONENT = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9._-]*$") 

24 

25 

26class WorktreeState(Enum): 

27 """State of a validation worktree.""" 

28 

29 PENDING = "pending" # Not yet created 

30 CREATED = "created" # Successfully created 

31 REMOVED = "removed" # Successfully removed 

32 FAILED = "failed" # Creation or removal failed 

33 KEPT = "kept" # Kept after failure (--keep-worktrees) 

34 

35 

36@dataclass 

37class WorktreeConfig: 

38 """Configuration for worktree operations.""" 

39 

40 base_dir: Path 

41 """Base directory for all worktrees (e.g., /tmp/mala-worktrees).""" 

42 

43 keep_on_failure: bool = False 

44 """If True, keep worktrees on validation failure for debugging.""" 

45 

46 force_remove: bool = True 

47 """If True, use --force when removing worktrees.""" 

48 

49 

50@dataclass 

51class WorktreeResult: 

52 """Result of a worktree operation.""" 

53 

54 path: Path 

55 """Path to the worktree directory.""" 

56 

57 state: WorktreeState 

58 """Current state of the worktree.""" 

59 

60 error: str | None = None 

61 """Error message if operation failed.""" 

62 

63 

64@dataclass 

65class WorktreeContext: 

66 """Context for a validation worktree with state tracking.""" 

67 

68 config: WorktreeConfig 

69 repo_path: Path 

70 run_id: str 

71 issue_id: str 

72 attempt: int 

73 

74 state: WorktreeState = field(default=WorktreeState.PENDING) 

75 error: str | None = None 

76 _path: Path | None = field(default=None, repr=False) 

77 _validated: bool = field(default=False, repr=False) 

78 

79 def _validate_path_components(self) -> None: 

80 """Validate that path components are safe and don't escape base_dir. 

81 

82 Raises: 

83 ValueError: If any path component is unsafe. 

84 """ 

85 if self._validated: 

86 return 

87 

88 # Validate run_id 

89 if not _SAFE_PATH_COMPONENT.match(self.run_id): 

90 raise ValueError( 

91 f"Invalid run_id '{self.run_id}': must be alphanumeric with ._- allowed" 

92 ) 

93 

94 # Validate issue_id 

95 if not _SAFE_PATH_COMPONENT.match(self.issue_id): 

96 raise ValueError( 

97 f"Invalid issue_id '{self.issue_id}': must be alphanumeric with ._- allowed" 

98 ) 

99 

100 # Validate attempt is positive 

101 if self.attempt < 1: 

102 raise ValueError(f"Invalid attempt '{self.attempt}': must be >= 1") 

103 

104 self._validated = True 

105 

106 @property 

107 def path(self) -> Path: 

108 """Get the deterministic worktree path. 

109 

110 Raises: 

111 ValueError: If path components are unsafe. 

112 """ 

113 if self._path is None: 

114 # Validate before constructing path 

115 self._validate_path_components() 

116 

117 # Construct path 

118 candidate = ( 

119 self.config.base_dir / self.run_id / self.issue_id / str(self.attempt) 

120 ) 

121 

122 # Resolve and verify it's within base_dir 

123 resolved_base = self.config.base_dir.resolve() 

124 resolved_path = candidate.resolve() 

125 

126 if not str(resolved_path).startswith(str(resolved_base) + "/"): 

127 raise ValueError( 

128 f"Computed path '{resolved_path}' escapes base_dir '{resolved_base}'" 

129 ) 

130 

131 self._path = resolved_path 

132 return self._path 

133 

134 def to_result(self) -> WorktreeResult: 

135 """Convert to a WorktreeResult for external use.""" 

136 return WorktreeResult(path=self.path, state=self.state, error=self.error) 

137 

138 

139def create_worktree( 

140 repo_path: Path, 

141 commit_sha: str, 

142 config: WorktreeConfig, 

143 run_id: str, 

144 issue_id: str, 

145 attempt: int, 

146 command_runner: CommandRunnerPort, 

147) -> WorktreeContext: 

148 """Create a git worktree for validation. 

149 

150 Args: 

151 repo_path: Path to the main git repository. 

152 commit_sha: Commit SHA to checkout in the worktree. 

153 config: Worktree configuration. 

154 run_id: Unique identifier for this validation run. 

155 issue_id: Issue identifier being validated. 

156 attempt: Attempt number (1-indexed). 

157 command_runner: Command runner for executing git commands. 

158 

159 Returns: 

160 WorktreeContext with state tracking. 

161 """ 

162 ctx = WorktreeContext( 

163 config=config, 

164 repo_path=repo_path.resolve(), 

165 run_id=run_id, 

166 issue_id=issue_id, 

167 attempt=attempt, 

168 ) 

169 

170 # Validate and get path (may raise ValueError for unsafe inputs) 

171 try: 

172 worktree_path = ctx.path 

173 except ValueError as e: 

174 ctx.state = WorktreeState.FAILED 

175 ctx.error = str(e) 

176 return ctx 

177 

178 # Ensure parent directories exist 

179 worktree_path.parent.mkdir(parents=True, exist_ok=True) 

180 

181 # Remove any existing path (stale from crashed run) 

182 if worktree_path.exists(): 

183 if worktree_path.is_file(): 

184 ctx.state = WorktreeState.FAILED 

185 ctx.error = f"Path exists as file, not directory: {worktree_path}" 

186 return ctx 

187 try: 

188 shutil.rmtree(worktree_path) 

189 except OSError as e: 

190 ctx.state = WorktreeState.FAILED 

191 ctx.error = f"Failed to remove stale worktree: {e}" 

192 return ctx 

193 

194 result = command_runner.run( 

195 ["git", "worktree", "add", "--detach", str(worktree_path), commit_sha], 

196 cwd=ctx.repo_path, 

197 ) 

198 

199 if result.returncode != 0: 

200 ctx.state = WorktreeState.FAILED 

201 ctx.error = _format_git_error("git worktree add", result) 

202 # Clean up any partial directory 

203 if worktree_path.exists(): 

204 shutil.rmtree(worktree_path, ignore_errors=True) 

205 return ctx 

206 

207 ctx.state = WorktreeState.CREATED 

208 return ctx 

209 

210 

211def remove_worktree( 

212 ctx: WorktreeContext, 

213 validation_passed: bool, 

214 command_runner: CommandRunnerPort, 

215) -> WorktreeContext: 

216 """Remove a git worktree, respecting keep_on_failure setting. 

217 

218 Args: 

219 ctx: Worktree context from create_worktree. 

220 validation_passed: Whether validation succeeded. 

221 command_runner: Command runner for executing git commands. 

222 

223 Returns: 

224 Updated WorktreeContext with new state. 

225 """ 

226 # Honor keep_on_failure for debugging failed validations 

227 if not validation_passed and ctx.config.keep_on_failure: 

228 ctx.state = WorktreeState.KEPT 

229 return ctx 

230 

231 # Skip removal if worktree was never created 

232 if ctx.state not in (WorktreeState.CREATED, WorktreeState.KEPT): 

233 return ctx 

234 

235 # Try git worktree remove first 

236 cmd = ["git", "worktree", "remove"] 

237 if ctx.config.force_remove: 

238 cmd.append("--force") 

239 cmd.append(str(ctx.path)) 

240 

241 result = command_runner.run( 

242 cmd, 

243 cwd=ctx.repo_path, 

244 ) 

245 

246 # Track git command failure 

247 git_failed = result.returncode != 0 

248 git_error = _format_git_error("git worktree remove", result) if git_failed else None 

249 

250 # Only attempt directory cleanup if: 

251 # 1. Git worktree remove succeeded, OR 

252 # 2. force_remove is True (user explicitly requested forced cleanup) 

253 # This protects uncommitted changes when force_remove=False and git remove fails 

254 dir_cleanup_failed = False 

255 should_cleanup_dir = not git_failed or ctx.config.force_remove 

256 

257 if should_cleanup_dir and ctx.path.exists(): 

258 try: 

259 shutil.rmtree(ctx.path) 

260 except OSError as e: 

261 dir_cleanup_failed = True 

262 if git_error: 

263 git_error = f"{git_error}; directory cleanup also failed: {e}" 

264 else: 

265 git_error = f"Directory cleanup failed: {e}" 

266 

267 # Prune the worktree list to clean up stale git metadata 

268 command_runner.run( 

269 ["git", "worktree", "prune"], 

270 cwd=ctx.repo_path, 

271 ) 

272 

273 # Report failure if git command failed or directory cleanup failed 

274 if git_failed or dir_cleanup_failed: 

275 ctx.state = WorktreeState.FAILED 

276 ctx.error = git_error 

277 return ctx 

278 

279 ctx.state = WorktreeState.REMOVED 

280 return ctx 

281 

282 

283def cleanup_stale_worktrees( 

284 repo_path: Path, 

285 config: WorktreeConfig, 

286 command_runner: CommandRunnerPort, 

287 run_id: str | None = None, 

288) -> int: 

289 """Clean up stale worktrees from previous runs. 

290 

291 Args: 

292 repo_path: Path to the main git repository. 

293 config: Worktree configuration. 

294 command_runner: Command runner for executing git commands. 

295 run_id: If provided, only clean up worktrees for this run. 

296 If None, clean up all worktrees under base_dir. 

297 

298 Returns: 

299 Number of worktrees cleaned up. 

300 """ 

301 cleaned = 0 

302 base = config.base_dir 

303 

304 if not base.exists(): 

305 return 0 

306 

307 if run_id: 

308 # Clean up specific run 

309 run_dir = base / run_id 

310 if run_dir.exists(): 

311 cleaned += _cleanup_run_dir(repo_path, run_dir, command_runner) 

312 else: 

313 # Clean up all runs 

314 for run_dir in base.iterdir(): 

315 if run_dir.is_dir(): 

316 cleaned += _cleanup_run_dir(repo_path, run_dir, command_runner) 

317 

318 # Prune the worktree list 

319 command_runner.run( 

320 ["git", "worktree", "prune"], 

321 cwd=repo_path, 

322 ) 

323 

324 return cleaned 

325 

326 

327def _cleanup_run_dir( 

328 repo_path: Path, run_dir: Path, command_runner: CommandRunnerPort 

329) -> int: 

330 """Clean up all worktrees in a run directory.""" 

331 cleaned = 0 

332 

333 for issue_dir in run_dir.iterdir(): 

334 if not issue_dir.is_dir(): 

335 continue 

336 

337 for attempt_dir in issue_dir.iterdir(): 

338 if not attempt_dir.is_dir(): 

339 continue 

340 

341 # Try git worktree remove, then force delete 

342 command_runner.run( 

343 ["git", "worktree", "remove", "--force", str(attempt_dir)], 

344 cwd=repo_path, 

345 ) 

346 

347 if attempt_dir.exists(): 

348 try: 

349 shutil.rmtree(attempt_dir) 

350 except OSError: 

351 continue 

352 

353 cleaned += 1 

354 

355 # Remove empty run/issue directories 

356 try: 

357 for issue_dir in run_dir.iterdir(): 

358 if issue_dir.is_dir() and not any(issue_dir.iterdir()): 

359 issue_dir.rmdir() 

360 if run_dir.exists() and not any(run_dir.iterdir()): 

361 run_dir.rmdir() 

362 except OSError: 

363 pass 

364 

365 return cleaned 

366 

367 

368def _format_git_error(cmd_name: str, result: CommandResultProtocol) -> str: 

369 """Format a git command error message.""" 

370 msg = f"{cmd_name} exited {result.returncode}" 

371 stderr = result.stderr.strip() 

372 if stderr: 

373 # Truncate long stderr 

374 if len(stderr) > 200: 

375 stderr = stderr[:200] + "..." 

376 msg = f"{msg}: {stderr}" 

377 return msg