Coverage for src / domain / validation / coverage.py: 22%

259 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""Coverage parsing and threshold handling for mala validation. 

2 

3This module provides: 

4- CoverageResult: result of parsing a coverage report 

5- parse_coverage_xml: parse coverage.xml and return CoverageResult 

6- check_coverage_threshold: compare coverage against minimum threshold 

7- get_baseline_coverage: extract coverage percentage from existing baseline file 

8- is_baseline_stale: check if baseline file is older than last commit or repo is dirty 

9- BaselineCoverageService: service for refreshing baseline coverage in isolated worktree 

10""" 

11 

12from __future__ import annotations 

13 

14import os 

15import shutil 

16import tempfile 

17import uuid 

18import xml.etree.ElementTree as ET 

19from contextlib import contextmanager 

20from dataclasses import dataclass 

21from enum import Enum 

22from pathlib import Path 

23from typing import TYPE_CHECKING 

24 

25from .config import YamlCoverageConfig # noqa: TC001 - used at runtime 

26from .coverage_args import rewrite_coverage_command 

27 

28if TYPE_CHECKING: 

29 from collections.abc import Iterator 

30 

31 from src.core.protocols import CommandRunnerPort, EnvConfigPort, LockManagerPort 

32 

33 from .spec import ValidationSpec 

34 

35 

36def _infer_coverage_base_command(original_cmd: list[str]) -> list[str]: 

37 """Infer the coverage base command from the original coverage command. 

38 

39 Determines whether to use 'coverage', 'uv run coverage', or 'python -m coverage' 

40 based on how the original command invoked pytest. 

41 

42 Args: 

43 original_cmd: The original coverage command (e.g., ['uv', 'run', 'pytest', ...]) 

44 

45 Returns: 

46 Base command for running coverage subcommands like 'combine' or 'xml'. 

47 """ 

48 if len(original_cmd) >= 3 and original_cmd[0] == "uv" and original_cmd[1] == "run": 

49 return ["uv", "run", "coverage"] 

50 elif ( 

51 len(original_cmd) >= 3 

52 and original_cmd[1] == "-m" 

53 and original_cmd[2] == "pytest" 

54 ): 

55 return [original_cmd[0], "-m", "coverage"] 

56 else: 

57 return ["coverage"] 

58 

59 

60class CoverageStatus(Enum): 

61 """Status of coverage parsing/validation.""" 

62 

63 PASSED = "passed" 

64 FAILED = "failed" 

65 ERROR = "error" 

66 PARSED = "parsed" # Successfully parsed, but threshold not yet checked 

67 

68 

69@dataclass(frozen=True) 

70class CoverageResult: 

71 """Result of parsing and validating a coverage report. 

72 

73 Attributes: 

74 percent: Coverage percentage (0.0-100.0), or None if parsing failed. 

75 passed: Whether coverage meets the threshold (False until threshold checked). 

76 status: Status of the coverage check. 

77 report_path: Path to the coverage report file. 

78 failure_reason: Explanation for failure/error (None if passed). 

79 line_rate: Raw line rate from XML (0.0-1.0), or None if unavailable. 

80 branch_rate: Raw branch rate from XML (0.0-1.0), or None if unavailable. 

81 """ 

82 

83 percent: float | None 

84 passed: bool 

85 status: CoverageStatus 

86 report_path: Path | None 

87 failure_reason: str | None = None 

88 line_rate: float | None = None 

89 branch_rate: float | None = None 

90 

91 def short_summary(self) -> str: 

92 """One-line summary for logs/prompts.""" 

93 if self.passed: 

94 return f"coverage {self.percent:.1f}% passed" 

95 if self.failure_reason: 

96 return self.failure_reason 

97 if self.status == CoverageStatus.PARSED: 

98 return f"coverage {self.percent:.1f}% (threshold not checked)" 

99 return f"coverage {self.percent:.1f}% failed" 

100 

101 

102def parse_coverage_xml(report_path: Path) -> CoverageResult: 

103 """Parse a coverage.xml file and extract coverage metrics. 

104 

105 Note: This function returns status=PARSED with passed=False. Callers must 

106 use check_coverage_threshold() to determine if coverage meets requirements. 

107 

108 Args: 

109 report_path: Path to the coverage.xml file. 

110 

111 Returns: 

112 CoverageResult with parsed metrics (status=PARSED) or error information. 

113 """ 

114 if not report_path.exists(): 

115 return CoverageResult( 

116 percent=None, 

117 passed=False, 

118 status=CoverageStatus.ERROR, 

119 report_path=report_path, 

120 failure_reason=f"Coverage report not found: {report_path}", 

121 ) 

122 

123 try: 

124 tree = ET.parse(report_path) 

125 except ET.ParseError as e: 

126 return CoverageResult( 

127 percent=None, 

128 passed=False, 

129 status=CoverageStatus.ERROR, 

130 report_path=report_path, 

131 failure_reason=f"Invalid coverage XML: {e}", 

132 ) 

133 except OSError as e: 

134 return CoverageResult( 

135 percent=None, 

136 passed=False, 

137 status=CoverageStatus.ERROR, 

138 report_path=report_path, 

139 failure_reason=f"Cannot read coverage report: {e}", 

140 ) 

141 

142 root = tree.getroot() 

143 

144 # Check for expected root element 

145 if root.tag != "coverage": 

146 return CoverageResult( 

147 percent=None, 

148 passed=False, 

149 status=CoverageStatus.ERROR, 

150 report_path=report_path, 

151 failure_reason=f"Invalid coverage XML: expected <coverage> root, got <{root.tag}>", 

152 ) 

153 

154 # Extract line-rate and branch-rate from coverage element 

155 line_rate_str = root.get("line-rate") 

156 branch_rate_str = root.get("branch-rate") 

157 

158 if line_rate_str is None: 

159 return CoverageResult( 

160 percent=None, 

161 passed=False, 

162 status=CoverageStatus.ERROR, 

163 report_path=report_path, 

164 failure_reason="Invalid coverage XML: missing line-rate attribute", 

165 ) 

166 

167 try: 

168 line_rate = float(line_rate_str) 

169 except ValueError: 

170 return CoverageResult( 

171 percent=None, 

172 passed=False, 

173 status=CoverageStatus.ERROR, 

174 report_path=report_path, 

175 failure_reason=f"Invalid coverage XML: line-rate '{line_rate_str}' is not a number", 

176 ) 

177 

178 branch_rate: float | None = None 

179 if branch_rate_str is not None: 

180 try: 

181 branch_rate = float(branch_rate_str) 

182 except ValueError: 

183 pass # Branch rate is optional, ignore parse errors 

184 

185 # Convert line rate (0.0-1.0) to percentage (0.0-100.0) 

186 percent = line_rate * 100.0 

187 

188 return CoverageResult( 

189 percent=percent, 

190 passed=False, # Must call check_coverage_threshold to set passed=True 

191 status=CoverageStatus.PARSED, 

192 report_path=report_path, 

193 line_rate=line_rate, 

194 branch_rate=branch_rate, 

195 ) 

196 

197 

198def check_coverage_threshold( 

199 result: CoverageResult, 

200 min_percent: float | None, 

201) -> CoverageResult: 

202 """Check if coverage meets the minimum threshold. 

203 

204 Args: 

205 result: A CoverageResult from parse_coverage_xml. 

206 min_percent: Minimum required coverage percentage (0.0-100.0), or None 

207 to skip threshold checking (always passes). 

208 

209 Returns: 

210 A new CoverageResult with passed/status updated based on threshold. 

211 """ 

212 # If parsing failed, return as-is 

213 if result.status == CoverageStatus.ERROR or result.percent is None: 

214 return result 

215 

216 # If no threshold specified, consider it passed 

217 if min_percent is None: 

218 return CoverageResult( 

219 percent=result.percent, 

220 passed=True, 

221 status=CoverageStatus.PASSED, 

222 report_path=result.report_path, 

223 failure_reason=None, 

224 line_rate=result.line_rate, 

225 branch_rate=result.branch_rate, 

226 ) 

227 

228 # Use small epsilon for floating-point comparison to avoid precision issues 

229 # where coverage like 88.79999 fails against threshold 88.8 even though 

230 # they display as the same value 

231 epsilon = 1e-9 

232 passed = result.percent >= min_percent - epsilon 

233 

234 if passed: 

235 return CoverageResult( 

236 percent=result.percent, 

237 passed=True, 

238 status=CoverageStatus.PASSED, 

239 report_path=result.report_path, 

240 failure_reason=None, 

241 line_rate=result.line_rate, 

242 branch_rate=result.branch_rate, 

243 ) 

244 

245 return CoverageResult( 

246 percent=result.percent, 

247 passed=False, 

248 status=CoverageStatus.FAILED, 

249 report_path=result.report_path, 

250 failure_reason=f"Coverage {result.percent:.1f}% is below threshold {min_percent:.1f}%", 

251 line_rate=result.line_rate, 

252 branch_rate=result.branch_rate, 

253 ) 

254 

255 

256def parse_and_check_coverage( 

257 report_path: Path, 

258 min_percent: float | None, 

259) -> CoverageResult: 

260 """Parse coverage XML and check against threshold in one call. 

261 

262 This is a convenience function that combines parse_coverage_xml 

263 and check_coverage_threshold. 

264 

265 Args: 

266 report_path: Path to the coverage.xml file. 

267 min_percent: Minimum required coverage percentage (0.0-100.0), or None 

268 to skip threshold checking (always passes). 

269 

270 Returns: 

271 CoverageResult with parsing and threshold check results. 

272 """ 

273 result = parse_coverage_xml(report_path) 

274 return check_coverage_threshold(result, min_percent) 

275 

276 

277def check_coverage_from_config( 

278 coverage_config: YamlCoverageConfig | None, 

279 cwd: Path, 

280) -> CoverageResult | None: 

281 """Check coverage using YamlCoverageConfig settings. 

282 

283 This is the primary entry point for config-driven coverage checking. 

284 It uses the config's file path and threshold to perform the check. 

285 

286 Args: 

287 coverage_config: Coverage configuration from mala.yaml, or None to skip. 

288 cwd: Working directory to resolve relative paths against. 

289 

290 Returns: 

291 CoverageResult if coverage_config is provided, None if coverage is disabled. 

292 When the coverage file is missing, returns CoverageResult with ERROR status. 

293 """ 

294 if coverage_config is None: 

295 return None 

296 

297 # Resolve file path against cwd 

298 report_path = Path(coverage_config.file) 

299 if not report_path.is_absolute(): 

300 report_path = cwd / report_path 

301 

302 # Check for missing coverage file 

303 if not report_path.exists(): 

304 return CoverageResult( 

305 percent=None, 

306 passed=False, 

307 status=CoverageStatus.ERROR, 

308 report_path=report_path, 

309 failure_reason=f"Coverage report not found: {report_path}", 

310 ) 

311 

312 return parse_and_check_coverage(report_path, coverage_config.threshold) 

313 

314 

315def get_baseline_coverage(report_path: Path) -> float | None: 

316 """Extract coverage percentage from a baseline coverage report. 

317 

318 This function is used to read a previously saved coverage baseline file 

319 to get the minimum coverage threshold for "no decrease" checking. 

320 

321 Args: 

322 report_path: Path to the coverage.xml baseline file. 

323 

324 Returns: 

325 Coverage percentage (0.0-100.0) if file exists and is valid, None if 

326 the file is missing. 

327 

328 Raises: 

329 ValueError: If the file exists but cannot be parsed (malformed XML, 

330 missing required attributes, etc.). 

331 """ 

332 if not report_path.exists(): 

333 return None 

334 

335 result = parse_coverage_xml(report_path) 

336 

337 if result.status == CoverageStatus.ERROR: 

338 raise ValueError(result.failure_reason) 

339 

340 return result.percent 

341 

342 

343def is_baseline_stale( 

344 report_path: Path, 

345 repo_path: Path, 

346 command_runner: CommandRunnerPort, 

347) -> bool: 

348 """Check if the coverage baseline file is stale and needs refresh. 

349 

350 A baseline is considered stale if: 

351 - The baseline file doesn't exist 

352 - The repo has uncommitted changes (dirty working tree) 

353 - The baseline file's mtime is older than the last commit time 

354 - Git commands fail (non-git repo or git errors) 

355 

356 Args: 

357 report_path: Path to the coverage.xml baseline file. 

358 repo_path: Path to the git repository root. 

359 command_runner: CommandRunnerPort for running git commands. 

360 

361 Returns: 

362 True if baseline is stale or doesn't exist, False if baseline is fresh. 

363 """ 

364 # Missing baseline is considered stale 

365 if not report_path.exists(): 

366 return True 

367 

368 runner = command_runner 

369 

370 try: 

371 # Check for dirty working tree 

372 dirty_result = runner.run(["git", "status", "--porcelain"], cwd=repo_path) 

373 if not dirty_result.ok: 

374 # Git command failed - treat as stale 

375 return True 

376 if dirty_result.stdout.strip(): 

377 # Has uncommitted changes 

378 return True 

379 

380 # Get last commit timestamp (Unix epoch seconds) 

381 commit_time_result = runner.run( 

382 ["git", "log", "-1", "--format=%ct"], cwd=repo_path 

383 ) 

384 if not commit_time_result.ok: 

385 # Git command failed - treat as stale 

386 return True 

387 commit_time_str = commit_time_result.stdout.strip() 

388 if not commit_time_str: 

389 # No commits in repo 

390 return True 

391 

392 commit_time = int(commit_time_str) 

393 

394 # Get baseline file mtime 

395 baseline_mtime = report_path.stat().st_mtime 

396 

397 # Stale if baseline is older than last commit 

398 return baseline_mtime < commit_time 

399 

400 except (ValueError, OSError): 

401 # Path error or parse error - treat as stale 

402 return True 

403 

404 

405# Lock file path for baseline refresh coordination 

406_BASELINE_LOCK_FILE = "coverage-baseline.lock" 

407 

408 

409@dataclass 

410class WorktreeRefreshContext: 

411 """Context for baseline refresh worktree lifecycle. 

412 

413 Holds the worktree path, command runner, and environment for running 

414 commands in an isolated worktree during baseline coverage refresh. 

415 """ 

416 

417 worktree_path: Path 

418 runner: CommandRunnerPort 

419 env: dict[str, str] 

420 

421 

422@contextmanager 

423def baseline_worktree( 

424 repo_path: Path, 

425 timeout: float, 

426 lock_dir: Path, 

427 command_runner: CommandRunnerPort, 

428) -> Iterator[WorktreeRefreshContext]: 

429 """Create and manage a temporary worktree for baseline coverage refresh. 

430 

431 Args: 

432 repo_path: Path to the main repository. 

433 timeout: Timeout in seconds for commands. 

434 lock_dir: Lock directory to set in environment. 

435 command_runner: CommandRunnerPort for executing commands. 

436 

437 Yields: 

438 WorktreeRefreshContext with worktree path, runner, and environment. 

439 

440 Raises: 

441 RuntimeError: If worktree creation fails. 

442 """ 

443 from .worktree import ( 

444 WorktreeConfig, 

445 WorktreeState, 

446 create_worktree, 

447 remove_worktree, 

448 ) 

449 

450 run_id = f"baseline-{uuid.uuid4().hex[:8]}" 

451 temp_dir = Path(tempfile.mkdtemp(prefix="mala-baseline-")) 

452 worktree_config = WorktreeConfig( 

453 base_dir=temp_dir, 

454 keep_on_failure=False, 

455 ) 

456 

457 worktree_ctx = None 

458 

459 try: 

460 worktree_ctx = create_worktree( 

461 repo_path=repo_path, 

462 commit_sha="HEAD", 

463 config=worktree_config, 

464 run_id=run_id, 

465 issue_id="baseline", 

466 attempt=1, 

467 command_runner=command_runner, 

468 ) 

469 

470 if worktree_ctx.state == WorktreeState.FAILED: 

471 raise RuntimeError( 

472 f"Baseline worktree creation failed: {worktree_ctx.error}" 

473 ) 

474 

475 worktree_path = worktree_ctx.path 

476 

477 # Build environment 

478 env = { 

479 **os.environ, 

480 "AGENT_ID": f"baseline-{run_id}", 

481 "LOCK_DIR": str(lock_dir), 

482 } 

483 

484 yield WorktreeRefreshContext( 

485 worktree_path=worktree_path, 

486 runner=command_runner, 

487 env=env, 

488 ) 

489 finally: 

490 # Clean up temp worktree 

491 if worktree_ctx is not None: 

492 remove_worktree( 

493 worktree_ctx, validation_passed=True, command_runner=command_runner 

494 ) 

495 # Clean up temp directory 

496 shutil.rmtree(temp_dir, ignore_errors=True) 

497 

498 

499@dataclass 

500class BaselineRefreshResult: 

501 """Result of a baseline coverage refresh operation. 

502 

503 Attributes: 

504 percent: The baseline coverage percentage if successful. 

505 success: Whether the refresh succeeded. 

506 error: Error message if refresh failed. 

507 """ 

508 

509 percent: float | None 

510 success: bool 

511 error: str | None = None 

512 

513 @staticmethod 

514 def ok(percent: float) -> BaselineRefreshResult: 

515 """Create a successful result.""" 

516 return BaselineRefreshResult(percent=percent, success=True) 

517 

518 @staticmethod 

519 def fail(error: str) -> BaselineRefreshResult: 

520 """Create a failed result.""" 

521 return BaselineRefreshResult(percent=None, success=False, error=error) 

522 

523 

524class BaselineCoverageService: 

525 """Service for refreshing baseline coverage in an isolated worktree. 

526 

527 This service handles: 

528 - File-based locking to prevent concurrent refreshes 

529 - Temporary worktree creation at HEAD 

530 - Running coverage command to generate baseline 

531 - Copying the coverage report back to the main repo 

532 

533 Usage: 

534 config = YamlCoverageConfig(command="uv run pytest --cov", ...) 

535 service = BaselineCoverageService(repo_path, coverage_config=config) 

536 result = service.refresh_if_stale(spec) 

537 if result.success: 

538 baseline_percent = result.percent 

539 

540 Note: 

541 If coverage_config is None or coverage_config.command is None, 

542 baseline refresh is unavailable and refresh_if_stale will return 

543 a failure result. 

544 """ 

545 

546 def __init__( 

547 self, 

548 repo_path: Path, 

549 env_config: EnvConfigPort, 

550 command_runner: CommandRunnerPort, 

551 lock_manager: LockManagerPort, 

552 coverage_config: YamlCoverageConfig | None = None, 

553 step_timeout_seconds: float | None = None, 

554 ): 

555 """Initialize the baseline coverage service. 

556 

557 Args: 

558 repo_path: Path to the repository. 

559 env_config: Environment configuration for paths (lock_dir, etc.). 

560 command_runner: CommandRunnerPort for running commands. 

561 lock_manager: LockManagerPort for file locking. 

562 coverage_config: Coverage configuration from mala.yaml. Required for 

563 baseline refresh - if None or if command is None, refresh is unavailable. 

564 step_timeout_seconds: Optional fallback timeout for commands (used if 

565 coverage_config.timeout is None). 

566 """ 

567 self.repo_path = repo_path.resolve() 

568 self.coverage_config = coverage_config 

569 self.step_timeout_seconds = step_timeout_seconds 

570 self.env_config = env_config 

571 self.command_runner = command_runner 

572 self.lock_manager = lock_manager 

573 

574 def refresh_if_stale( 

575 self, 

576 spec: ValidationSpec, 

577 ) -> BaselineRefreshResult: 

578 """Refresh the baseline coverage if stale or missing. 

579 

580 Uses file locking with double-check pattern to prevent concurrent 

581 agents from clobbering each other's baseline refresh. 

582 

583 Args: 

584 spec: Validation spec with pytest command and coverage config. 

585 

586 Returns: 

587 BaselineRefreshResult with the baseline percentage or error. 

588 Returns failure if coverage_config is None or has no command. 

589 """ 

590 lock_mgr = self.lock_manager 

591 

592 # Check if baseline refresh is available 

593 if self.coverage_config is None: 

594 return BaselineRefreshResult.fail( 

595 "Baseline refresh unavailable: no coverage configuration" 

596 ) 

597 if self.coverage_config.command is None: 

598 return BaselineRefreshResult.fail( 

599 "Baseline refresh unavailable: no coverage command configured" 

600 ) 

601 

602 # Determine baseline report path from config 

603 coverage_file = Path(self.coverage_config.file) 

604 if coverage_file.is_absolute(): 

605 baseline_path = coverage_file 

606 else: 

607 baseline_path = self.repo_path / coverage_file 

608 

609 # Check if baseline is fresh (no refresh needed) 

610 if not is_baseline_stale( 

611 baseline_path, self.repo_path, command_runner=self.command_runner 

612 ): 

613 try: 

614 baseline = get_baseline_coverage(baseline_path) 

615 if baseline is not None: 

616 return BaselineRefreshResult.ok(baseline) 

617 except ValueError: 

618 # Malformed baseline - need to refresh 

619 pass 

620 

621 # Baseline is stale or missing - try to acquire lock for refresh 

622 run_id = f"baseline-{uuid.uuid4().hex[:8]}" 

623 agent_id = f"baseline-refresh-{run_id}" 

624 repo_namespace = str(self.repo_path) 

625 

626 # Try to acquire lock (non-blocking first) 

627 if not lock_mgr.try_lock(_BASELINE_LOCK_FILE, agent_id, repo_namespace): 

628 # Another agent is refreshing - wait for them 

629 if not lock_mgr.wait_for_lock( 

630 _BASELINE_LOCK_FILE, 

631 agent_id, 

632 repo_namespace, 

633 timeout_seconds=300.0, # 5 min max wait 

634 poll_interval_ms=1000, 

635 ): 

636 return BaselineRefreshResult.fail( 

637 "Timeout waiting for baseline refresh lock" 

638 ) 

639 

640 # Lock acquired - double-check if still stale (another agent may have refreshed) 

641 try: 

642 if not is_baseline_stale( 

643 baseline_path, self.repo_path, command_runner=self.command_runner 

644 ): 

645 try: 

646 baseline = get_baseline_coverage(baseline_path) 

647 if baseline is not None: 

648 return BaselineRefreshResult.ok(baseline) 

649 except ValueError: 

650 pass # Still need to refresh 

651 

652 # Still stale - run refresh in temp worktree 

653 return self._run_refresh(spec, baseline_path) 

654 finally: 

655 # Release lock through the abstraction 

656 lock_mgr.release_lock(_BASELINE_LOCK_FILE, agent_id, repo_namespace) 

657 

658 def _run_coverage_with_fallback( 

659 self, 

660 runner: CommandRunnerPort, 

661 coverage_cmd: list[str], 

662 coverage_file: Path, 

663 worktree_path: Path, 

664 env: dict[str, str], 

665 timeout: float, 

666 ) -> Path | str: 

667 """Run coverage command and fallback to combine if XML not generated. 

668 

669 Args: 

670 runner: Command runner for executing coverage commands. 

671 coverage_cmd: The rewritten coverage command to run. 

672 coverage_file: Path to expected coverage XML file (relative to worktree). 

673 worktree_path: Path to the worktree directory. 

674 env: Environment variables for command execution. 

675 timeout: Timeout in seconds for each command. 

676 

677 Returns: 

678 Path to coverage XML on success, or error string on failure. 

679 """ 

680 # Run coverage command - we ignore the exit code because tests may fail 

681 # but still generate a valid coverage.xml baseline 

682 coverage_result = runner.run( 

683 coverage_cmd, env=env, cwd=worktree_path, timeout=timeout 

684 ) 

685 

686 worktree_coverage = worktree_path / coverage_file 

687 if worktree_coverage.exists(): 

688 return worktree_coverage 

689 

690 # Fallback: combine coverage data if coverage command didn't emit XML 

691 coverage_data = [ 

692 path 

693 for path in worktree_path.glob(".coverage*") 

694 if path.is_file() and not path.name.endswith(".xml") 

695 ] 

696 

697 combine_result = None 

698 xml_result = None 

699 

700 if coverage_data: 

701 coverage_base = _infer_coverage_base_command(coverage_cmd) 

702 

703 combine_result = runner.run( 

704 [*coverage_base, "combine"], 

705 env=env, 

706 cwd=worktree_path, 

707 timeout=timeout, 

708 ) 

709 if combine_result.returncode == 0: 

710 xml_result = runner.run( 

711 [*coverage_base, "xml", "-o", str(worktree_coverage)], 

712 env=env, 

713 cwd=worktree_path, 

714 timeout=timeout, 

715 ) 

716 

717 if worktree_coverage.exists(): 

718 return worktree_coverage 

719 

720 # Build detailed error message 

721 details: list[str] = [] 

722 if coverage_result.timed_out: 

723 details.append("coverage command timed out") 

724 elif coverage_result.returncode != 0: 

725 details.append(f"coverage command exited {coverage_result.returncode}") 

726 cmd_tail = coverage_result.stderr_tail() or coverage_result.stdout_tail() 

727 if cmd_tail: 

728 details.append(f"command output: {cmd_tail}") 

729 if combine_result is not None and combine_result.returncode != 0: 

730 combine_tail = combine_result.stderr_tail() or combine_result.stdout_tail() 

731 if combine_tail: 

732 details.append(f"coverage combine failed: {combine_tail}") 

733 if xml_result is not None and xml_result.returncode != 0: 

734 xml_tail = xml_result.stderr_tail() or xml_result.stdout_tail() 

735 if xml_tail: 

736 details.append(f"coverage xml failed: {xml_tail}") 

737 

738 detail_msg = f" ({'; '.join(details)})" if details else "" 

739 return f"No {coverage_file} generated during baseline refresh" + detail_msg 

740 

741 def _run_refresh( 

742 self, 

743 spec: ValidationSpec, 

744 baseline_path: Path, 

745 ) -> BaselineRefreshResult: 

746 """Run coverage command in temp worktree to refresh baseline coverage. 

747 

748 Args: 

749 spec: Validation spec (used for worktree context only). 

750 baseline_path: Where to write the baseline coverage.xml. 

751 

752 Returns: 

753 BaselineRefreshResult with the new baseline percentage or error. 

754 

755 Note: 

756 Uses self.coverage_config.command for running coverage. This method 

757 assumes coverage_config and coverage_config.command are validated 

758 as non-None by the caller (refresh_if_stale). 

759 """ 

760 # coverage_config.command is validated non-None in refresh_if_stale 

761 assert self.coverage_config is not None 

762 assert self.coverage_config.command is not None 

763 coverage_command = self.coverage_config.command 

764 

765 # Determine timeout: prefer coverage_config.timeout, then step_timeout_seconds, then default 

766 timeout = float( 

767 self.coverage_config.timeout or self.step_timeout_seconds or 300.0 

768 ) 

769 

770 # Determine lock directory 

771 lock_dir = self.env_config.lock_dir 

772 

773 try: 

774 with baseline_worktree( 

775 repo_path=self.repo_path, 

776 timeout=timeout, 

777 lock_dir=lock_dir, 

778 command_runner=self.command_runner, 

779 ) as ctx: 

780 worktree_path = ctx.worktree_path 

781 runner = ctx.runner 

782 env = ctx.env 

783 

784 # Run uv sync first to install dependencies 

785 sync_result = runner.run( 

786 ["uv", "sync", "--all-extras"], 

787 env=env, 

788 cwd=worktree_path, 

789 timeout=timeout, 

790 ) 

791 if sync_result.returncode != 0: 

792 return BaselineRefreshResult.fail( 

793 f"uv sync failed during baseline refresh: {sync_result.stderr}" 

794 ) 

795 

796 # Rewrite coverage command for baseline refresh: 

797 # - Strip xdist flags for deterministic coverage 

798 # - Remove fail-under threshold 

799 # - Normalize marker expression (no e2e) 

800 # - Set output path for XML coverage 

801 coverage_file = Path(self.coverage_config.file) 

802 new_coverage_cmd = rewrite_coverage_command( 

803 coverage_command, str(coverage_file) 

804 ) 

805 

806 # Run coverage with fallback to combine 

807 result = self._run_coverage_with_fallback( 

808 runner, new_coverage_cmd, coverage_file, worktree_path, env, timeout 

809 ) 

810 

811 if isinstance(result, str): 

812 return BaselineRefreshResult.fail(result) 

813 

814 worktree_coverage = result 

815 

816 # Atomic rename to main repo 

817 temp_coverage = baseline_path.with_suffix(".xml.tmp") 

818 shutil.copy2(worktree_coverage, temp_coverage) 

819 os.rename(temp_coverage, baseline_path) 

820 

821 # Parse and return the coverage percentage 

822 try: 

823 baseline = get_baseline_coverage(baseline_path) 

824 if baseline is None: 

825 return BaselineRefreshResult.fail( 

826 f"Baseline {coverage_file} exists but has no coverage data" 

827 ) 

828 return BaselineRefreshResult.ok(baseline) 

829 except ValueError as e: 

830 return BaselineRefreshResult.fail( 

831 f"Failed to parse baseline coverage: {e}" 

832 ) 

833 except RuntimeError as e: 

834 # Worktree creation failed 

835 return BaselineRefreshResult.fail(str(e))