Coverage for src / domain / quality_gate.py: 27%

246 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""Quality gate for verifying agent work before marking success. 

2 

3Implements Track A4 from 2025-12-26-coordination-plan.md: 

4- Verify commit message contains bd-<issue_id> 

5- Verify validation commands ran (parse JSONL logs) 

6- On failure: mark needs-followup with failure context 

7 

8Evidence Detection: 

9 Production code should use parse_validation_evidence_with_spec() or 

10 check_with_resolution(..., spec=spec) to derive detection patterns from 

11 the ValidationSpec. This ensures spec command changes automatically update 

12 evidence expectations. 

13""" 

14 

15from __future__ import annotations 

16 

17import re 

18from dataclasses import dataclass, field 

19from typing import TYPE_CHECKING, ClassVar 

20 

21from src.core.tool_name_extractor import extract_tool_name 

22 

23from .validation.spec import ( 

24 CommandKind, 

25 IssueResolution, 

26 ResolutionOutcome, 

27 ValidationScope, 

28 build_validation_spec, 

29) 

30 

31if TYPE_CHECKING: 

32 from collections.abc import Iterator 

33 from pathlib import Path 

34 

35 from src.core.protocols import ( 

36 CommandRunnerPort, 

37 IssueResolutionProtocol, 

38 JsonlEntryProtocol, 

39 LogProvider, 

40 ValidationEvidenceProtocol, 

41 ) 

42 

43 from .validation.spec import ValidationSpec 

44 

45 

46__all__ = [ 

47 "CommitResult", 

48 "GateResult", 

49 "QualityGate", 

50 "ValidationEvidence", 

51] 

52 

53# Command kinds that should not be required by the quality gate. 

54# SETUP commands like `uv sync` are useful for local setup, but should not 

55# block gate passing if omitted or failed. 

56QUALITY_GATE_IGNORED_KINDS: set[CommandKind] = {CommandKind.SETUP} 

57 

58 

59@dataclass 

60class ValidationEvidence: 

61 """Evidence of validation commands executed during agent run. 

62 

63 This class is spec-driven: evidence is stored by CommandKind rather than 

64 using hardcoded tool-specific boolean flags. This allows adding new 

65 validation commands without code changes to the evidence structure. 

66 

67 Backward Compatibility: 

68 Properties like `pytest_ran`, `ruff_check_ran`, etc. are provided 

69 for backward compatibility with existing code that references these 

70 directly. Internally, all evidence is stored in `commands_ran`. 

71 """ 

72 

73 # Spec-driven evidence storage: CommandKind -> ran boolean 

74 commands_ran: dict[CommandKind, bool] = field(default_factory=dict) 

75 

76 # Track which validation commands failed (exited non-zero) 

77 failed_commands: list[str] = field(default_factory=list) 

78 

79 # Backward-compatible properties for external consumers 

80 @property 

81 def pytest_ran(self) -> bool: 

82 """Whether pytest (TEST) command ran.""" 

83 return self.commands_ran.get(CommandKind.TEST, False) 

84 

85 @pytest_ran.setter 

86 def pytest_ran(self, value: bool) -> None: 

87 """Set pytest (TEST) evidence.""" 

88 self.commands_ran[CommandKind.TEST] = value 

89 

90 @property 

91 def ruff_check_ran(self) -> bool: 

92 """Whether ruff check (LINT) command ran.""" 

93 return self.commands_ran.get(CommandKind.LINT, False) 

94 

95 @ruff_check_ran.setter 

96 def ruff_check_ran(self, value: bool) -> None: 

97 """Set ruff check (LINT) evidence.""" 

98 self.commands_ran[CommandKind.LINT] = value 

99 

100 @property 

101 def ruff_format_ran(self) -> bool: 

102 """Whether ruff format (FORMAT) command ran.""" 

103 return self.commands_ran.get(CommandKind.FORMAT, False) 

104 

105 @ruff_format_ran.setter 

106 def ruff_format_ran(self, value: bool) -> None: 

107 """Set ruff format (FORMAT) evidence.""" 

108 self.commands_ran[CommandKind.FORMAT] = value 

109 

110 @property 

111 def ty_check_ran(self) -> bool: 

112 """Whether ty check (TYPECHECK) command ran.""" 

113 return self.commands_ran.get(CommandKind.TYPECHECK, False) 

114 

115 @ty_check_ran.setter 

116 def ty_check_ran(self, value: bool) -> None: 

117 """Set ty check (TYPECHECK) evidence.""" 

118 self.commands_ran[CommandKind.TYPECHECK] = value 

119 

120 def has_any_evidence(self) -> bool: 

121 """Check if any validation command ran. 

122 

123 Used for progress detection to determine if new validation 

124 activity occurred since the last check. 

125 """ 

126 return any(self.commands_ran.values()) 

127 

128 def has_minimum_validation(self) -> bool: 

129 """Check if minimum required validation was performed. 

130 

131 Requires the full validation suite: 

132 - pytest (run tests) 

133 - ruff check (lint) 

134 - ruff format (format) 

135 - ty check (type check) 

136 """ 

137 return ( 

138 self.pytest_ran 

139 and self.ruff_check_ran 

140 and self.ruff_format_ran 

141 and self.ty_check_ran 

142 ) 

143 

144 def missing_commands(self) -> list[str]: 

145 """List validation commands that didn't run.""" 

146 missing = [] 

147 if not self.pytest_ran: 

148 missing.append("pytest") 

149 if not self.ruff_check_ran: 

150 missing.append("ruff check") 

151 if not self.ruff_format_ran: 

152 missing.append("ruff format") 

153 if not self.ty_check_ran: 

154 missing.append("ty check") 

155 return missing 

156 

157 def to_evidence_dict(self) -> dict[str, bool]: 

158 """Convert evidence to a serializable dict keyed by CommandKind value. 

159 

160 This is the spec-driven alternative to accessing individual properties. 

161 Returns a dict with keys like "test", "lint", "format", "typecheck" 

162 based on what commands were detected. 

163 

164 Use this method when building metadata to avoid hardcoded property access. 

165 

166 Returns: 

167 Dict mapping CommandKind.value strings to their ran status. 

168 """ 

169 return {kind.value: ran for kind, ran in self.commands_ran.items()} 

170 

171 

172def get_required_evidence_kinds(spec: ValidationSpec) -> set[CommandKind]: 

173 """Get the set of command kinds required by a ValidationSpec. 

174 

175 This derives the expected evidence from the spec, ensuring scope-aware 

176 evidence requirements. For example, per-issue scope specs won't have 

177 E2E commands, so E2E evidence won't be required. 

178 

179 Args: 

180 spec: The ValidationSpec to extract requirements from. 

181 

182 Returns: 

183 Set of CommandKind values that must have evidence. 

184 """ 

185 return { 

186 cmd.kind for cmd in spec.commands if cmd.kind not in QUALITY_GATE_IGNORED_KINDS 

187 } 

188 

189 

190def check_evidence_against_spec( 

191 evidence: ValidationEvidence, spec: ValidationSpec 

192) -> tuple[bool, list[str]]: 

193 """Check if evidence satisfies a ValidationSpec's requirements. 

194 

195 This is fully spec-driven: evidence requirements and display names are 

196 derived from the spec's commands, not hardcoded. This allows adding new 

197 validation commands without code changes. 

198 

199 This is scope-aware: a per-issue spec won't require E2E evidence because 

200 per-issue specs don't include E2E commands. 

201 

202 Args: 

203 evidence: The parsed validation evidence. 

204 spec: The ValidationSpec defining what's required. 

205 

206 Returns: 

207 Tuple of (passed, missing_commands) where missing_commands lists 

208 human-readable names of commands that didn't run. 

209 """ 

210 missing: list[str] = [] 

211 

212 # Build kind-to-name mapping from spec (spec-driven display names) 

213 kind_to_name: dict[CommandKind, str] = {} 

214 for cmd in spec.commands: 

215 # Use first command name for each kind as the display name 

216 if cmd.kind not in kind_to_name: 

217 kind_to_name[cmd.kind] = cmd.name 

218 

219 # Check each required kind from the spec 

220 for kind in get_required_evidence_kinds(spec): 

221 ran = evidence.commands_ran.get(kind, False) 

222 if not ran: 

223 name = kind_to_name.get(kind, kind.value) 

224 missing.append(name) 

225 

226 return len(missing) == 0, missing 

227 

228 

229@dataclass 

230class CommitResult: 

231 """Result of checking for a matching commit.""" 

232 

233 exists: bool 

234 commit_hash: str | None = None 

235 message: str | None = None 

236 

237 

238@dataclass 

239class GateResult: 

240 """Result of quality gate check.""" 

241 

242 passed: bool 

243 failure_reasons: list[str] = field(default_factory=list) 

244 commit_hash: str | None = None 

245 validation_evidence: ValidationEvidence | ValidationEvidenceProtocol | None = None 

246 no_progress: bool = False 

247 resolution: IssueResolution | IssueResolutionProtocol | None = None 

248 

249 

250class QualityGate: 

251 """Quality gate for verifying agent work meets requirements. 

252 

253 Uses LogProvider for JSONL log parsing, keeping this class 

254 focused on policy checking and validation logic. 

255 """ 

256 

257 # Patterns for detecting issue resolution markers in log text 

258 RESOLUTION_PATTERNS: ClassVar[dict[str, re.Pattern[str]]] = { 

259 "no_change": re.compile(r"ISSUE_NO_CHANGE:\s*(.*)$", re.MULTILINE), 

260 "obsolete": re.compile(r"ISSUE_OBSOLETE:\s*(.*)$", re.MULTILINE), 

261 "already_complete": re.compile( 

262 r"ISSUE_ALREADY_COMPLETE:\s*(.*)$", re.MULTILINE 

263 ), 

264 } 

265 

266 # Map pattern names to resolution outcomes 

267 PATTERN_TO_OUTCOME: ClassVar[dict[str, ResolutionOutcome]] = { 

268 "no_change": ResolutionOutcome.NO_CHANGE, 

269 "obsolete": ResolutionOutcome.OBSOLETE, 

270 "already_complete": ResolutionOutcome.ALREADY_COMPLETE, 

271 } 

272 

273 # Pattern to extract issue ID from ALREADY_COMPLETE rationale 

274 # Matches: "bd-issue-123", "bd-mala-xyz", etc. in rationale text 

275 RATIONALE_ISSUE_PATTERN: ClassVar[re.Pattern[str]] = re.compile( 

276 r"\bbd-([a-zA-Z0-9_-]+)\b" 

277 ) 

278 

279 def __init__( 

280 self, 

281 repo_path: Path, 

282 log_provider: LogProvider, 

283 command_runner: CommandRunnerPort, 

284 ): 

285 """Initialize quality gate. 

286 

287 Args: 

288 repo_path: Path to the repository for git operations. 

289 log_provider: LogProvider for reading session logs. 

290 command_runner: CommandRunnerPort for running git commands. 

291 """ 

292 self.repo_path = repo_path 

293 self._log_provider = log_provider 

294 self._command_runner = command_runner 

295 

296 def _match_resolution_pattern(self, text: str) -> IssueResolution | None: 

297 """Check text against all resolution patterns. 

298 

299 Args: 

300 text: Text content to search for patterns. 

301 

302 Returns: 

303 IssueResolution if a pattern matches, None otherwise. 

304 """ 

305 for name, pattern in self.RESOLUTION_PATTERNS.items(): 

306 match = pattern.search(text) 

307 if match: 

308 return IssueResolution( 

309 outcome=self.PATTERN_TO_OUTCOME[name], 

310 rationale=match.group(1).strip(), 

311 ) 

312 return None 

313 

314 def _match_spec_pattern_with_kinds( 

315 self, 

316 command: str, 

317 evidence: ValidationEvidence, 

318 kind_patterns: dict[CommandKind, list[re.Pattern[str]]], 

319 ) -> list[CommandKind]: 

320 """Check command against spec-defined patterns and return all matched kinds. 

321 

322 A command may match multiple kinds (e.g., "ruff" matches both LINT and FORMAT 

323 patterns). This method returns all matching kinds for proper evidence tracking. 

324 

325 Args: 

326 command: The bash command string. 

327 evidence: ValidationEvidence to update. 

328 kind_patterns: Mapping of CommandKind to detection patterns. 

329 

330 Returns: 

331 List of matched CommandKinds (may be empty if no match). 

332 """ 

333 matched_kinds: list[CommandKind] = [] 

334 for kind, patterns in kind_patterns.items(): 

335 for pattern in patterns: 

336 if pattern.search(command): 

337 # Spec-driven: record any CommandKind directly 

338 evidence.commands_ran[kind] = True 

339 matched_kinds.append(kind) 

340 break # Found match for this kind, try next kind 

341 return matched_kinds 

342 

343 def _build_spec_patterns( 

344 self, spec: ValidationSpec 

345 ) -> dict[CommandKind, list[re.Pattern[str]]]: 

346 """Build pattern mapping from a ValidationSpec. 

347 

348 Args: 

349 spec: The ValidationSpec defining commands and their detection patterns. 

350 

351 Returns: 

352 Mapping of CommandKind to list of detection patterns. 

353 """ 

354 kind_patterns: dict[CommandKind, list[re.Pattern[str]]] = {} 

355 for cmd in spec.commands: 

356 if cmd.kind not in kind_patterns: 

357 kind_patterns[cmd.kind] = [] 

358 if cmd.detection_pattern is not None: 

359 kind_patterns[cmd.kind].append(cmd.detection_pattern) 

360 return kind_patterns 

361 

362 def _iter_jsonl_entries( 

363 self, log_path: Path, offset: int = 0 

364 ) -> Iterator[JsonlEntryProtocol]: 

365 """Iterate over parsed JSONL entries from a log file. 

366 

367 Delegates to LogProvider.iter_events(). 

368 

369 Args: 

370 log_path: Path to the JSONL log file. 

371 offset: Byte offset to start reading from (default 0). 

372 

373 Yields: 

374 JsonlEntryProtocol objects for each successfully parsed JSON line. 

375 """ 

376 return self._log_provider.iter_events(log_path, offset) 

377 

378 def parse_issue_resolution(self, log_path: Path) -> IssueResolution | None: 

379 """Parse JSONL log file for issue resolution markers. 

380 

381 Looks for ISSUE_NO_CHANGE or ISSUE_OBSOLETE markers with rationale. 

382 

383 Args: 

384 log_path: Path to the JSONL log file from agent session. 

385 

386 Returns: 

387 IssueResolution if a marker was found, None otherwise. 

388 """ 

389 resolution, _ = self.parse_issue_resolution_from_offset(log_path, offset=0) 

390 return resolution 

391 

392 def parse_issue_resolution_from_offset( 

393 self, log_path: Path, offset: int = 0 

394 ) -> tuple[IssueResolution | None, int]: 

395 """Parse JSONL log file for issue resolution markers starting at offset. 

396 

397 Only parses assistant messages to prevent user prompts from triggering 

398 resolution markers. 

399 

400 Args: 

401 log_path: Path to the JSONL log file from agent session. 

402 offset: Byte offset to start reading from (default 0 = beginning). 

403 

404 Returns: 

405 Tuple of (IssueResolution or None, new_offset). 

406 """ 

407 if not log_path.exists(): 

408 return None, 0 

409 

410 try: 

411 for entry in self._iter_jsonl_entries(log_path, offset): 

412 for text in self._log_provider.extract_assistant_text_blocks(entry): 

413 resolution = self._match_resolution_pattern(text) 

414 if resolution: 

415 return resolution, entry.offset + entry.line_len 

416 # No match found - return EOF position (matches original f.tell()) 

417 return None, self.get_log_end_offset(log_path, offset) 

418 except OSError: 

419 return None, 0 

420 

421 def check_working_tree_clean(self) -> tuple[bool, str]: 

422 """Check if the git working tree is clean (no uncommitted changes). 

423 

424 Returns: 

425 Tuple of (is_clean, status_output). On git failure, returns 

426 (False, error_message) to treat unknown state as dirty. 

427 """ 

428 result = self._command_runner.run(["git", "status", "--porcelain"]) 

429 # Treat git failures as dirty/unknown state 

430 if not result.ok: 

431 error_msg = result.stderr.strip() or "git status failed" 

432 return False, f"git error: {error_msg}" 

433 output = result.stdout.strip() 

434 return len(output) == 0, output 

435 

436 def parse_validation_evidence_with_spec( 

437 self, log_path: Path, spec: ValidationSpec, offset: int = 0 

438 ) -> ValidationEvidence: 

439 """Parse JSONL log for validation evidence using spec-defined patterns.""" 

440 evidence = ValidationEvidence() 

441 if not log_path.exists(): 

442 return evidence 

443 

444 kind_patterns = self._build_spec_patterns(spec) 

445 # Track tool_id → list of (CommandKind, display_name) for proper failure tracking 

446 # A command may match multiple kinds (e.g., "ruff" matches LINT and FORMAT) 

447 tool_id_to_info: dict[str, list[tuple[CommandKind, str]]] = {} 

448 # Track failures per CommandKind (latest status wins for retries of same command) 

449 kind_failed: dict[CommandKind, tuple[bool, str]] = {} 

450 

451 for entry in self._iter_jsonl_entries(log_path, offset): 

452 for tool_id, command in self._log_provider.extract_bash_commands(entry): 

453 matched_kinds = self._match_spec_pattern_with_kinds( 

454 command, evidence, kind_patterns 

455 ) 

456 if matched_kinds: 

457 cmd_name = extract_tool_name(command) 

458 tool_id_to_info[tool_id] = [ 

459 (kind, cmd_name) for kind in matched_kinds 

460 ] 

461 for tool_use_id, is_error in self._log_provider.extract_tool_results(entry): 

462 if tool_use_id in tool_id_to_info: 

463 for kind, cmd_name in tool_id_to_info[tool_use_id]: 

464 # Latest status for this CommandKind wins (allows retries to succeed) 

465 kind_failed[kind] = (is_error, cmd_name) 

466 

467 # Build failed_commands from kinds that failed, using display names 

468 # Filter out ignored kinds (e.g., SETUP) so they don't block the gate 

469 # Deduplicate: multiple kinds (LINT, FORMAT) may map to the same tool (ruff) 

470 evidence.failed_commands = list( 

471 dict.fromkeys( 

472 cmd_name 

473 for kind, (is_failed, cmd_name) in kind_failed.items() 

474 if is_failed and kind not in QUALITY_GATE_IGNORED_KINDS 

475 ) 

476 ) 

477 return evidence 

478 

479 def get_log_end_offset(self, log_path: Path, start_offset: int = 0) -> int: 

480 """Get the byte offset at the end of a log file. 

481 

482 Delegates to LogProvider.get_end_offset(). 

483 

484 Args: 

485 log_path: Path to the JSONL log file. 

486 start_offset: Byte offset to start from (default 0). 

487 

488 Returns: 

489 The byte offset at the end of the file, or start_offset if file 

490 doesn't exist or can't be read. 

491 """ 

492 return self._log_provider.get_end_offset(log_path, start_offset) 

493 

494 def check_no_progress( 

495 self, 

496 log_path: Path, 

497 log_offset: int, 

498 previous_commit_hash: str | None, 

499 current_commit_hash: str | None, 

500 spec: ValidationSpec | None = None, 

501 check_validation_evidence: bool = True, 

502 ) -> bool: 

503 """Check if no progress was made since the last attempt. 

504 

505 No progress is detected when ALL of these are true: 

506 - The commit hash hasn't changed (or both are None) 

507 - No uncommitted changes in the working tree 

508 - (Optionally) No new validation evidence was found after the log offset 

509 

510 Args: 

511 log_path: Path to the JSONL log file from agent session. 

512 log_offset: Byte offset marking the end of the previous attempt. 

513 previous_commit_hash: Commit hash from the previous attempt (None if no commit). 

514 current_commit_hash: Commit hash from this attempt (None if no commit). 

515 spec: Optional ValidationSpec for spec-driven evidence detection. 

516 If not provided, builds a default per-issue spec. 

517 check_validation_evidence: If True (default), also check for new validation 

518 evidence. Set to False for review retries where only commit/working-tree 

519 changes should gate progress. 

520 

521 Returns: 

522 True if no progress was made, False if progress was detected. 

523 """ 

524 # Check if commit changed 

525 commit_changed = previous_commit_hash != current_commit_hash 

526 

527 # A new commit from None is progress (first successful commit) 

528 if previous_commit_hash is None and current_commit_hash is not None: 

529 return False 

530 

531 # If commit changed, that's progress 

532 if commit_changed: 

533 return False 

534 

535 # Check for uncommitted working tree changes 

536 if self._has_working_tree_changes(): 

537 return False 

538 

539 # Skip validation evidence check if not requested (for review retries) 

540 if not check_validation_evidence: 

541 # No commit change and no working tree changes = no progress 

542 return True 

543 

544 # Build default spec if not provided 

545 # Note: We don't pass repo_path here to ensure Python validation commands 

546 # are always included for progress detection. The spec-driven parsing 

547 # ensures consistency with the production evidence parsing patterns. 

548 if spec is None: 

549 spec = build_validation_spec( 

550 self.repo_path, 

551 scope=ValidationScope.PER_ISSUE, 

552 ) 

553 

554 # Check for new validation evidence after the offset using spec-driven parsing 

555 evidence = self.parse_validation_evidence_with_spec(log_path, spec, log_offset) 

556 

557 # Any new validation evidence counts as progress (spec-driven) 

558 if evidence.has_any_evidence(): 

559 return False 

560 

561 # No commit change, no working tree changes, and no new evidence = no progress 

562 return True 

563 

564 def _has_working_tree_changes(self) -> bool: 

565 """Check if the working tree has uncommitted changes. 

566 

567 Returns: 

568 True if there are staged or unstaged changes, or if git status 

569 fails (conservative assumption that changes may exist). 

570 """ 

571 # Use git status --porcelain to detect any changes 

572 # This includes staged, unstaged, and untracked files 

573 result = self._command_runner.run(["git", "status", "--porcelain"], timeout=5.0) 

574 if not result.ok: 

575 # If git status fails, assume changes exist (conservative default) 

576 # This prevents false "no progress" conclusions when git state is unknown 

577 return True 

578 

579 # Any output means there are changes 

580 return bool(result.stdout.strip()) 

581 

582 def extract_issue_from_rationale(self, rationale: str) -> str | None: 

583 """Extract issue ID from ALREADY_COMPLETE rationale. 

584 

585 For duplicate issues, the agent may reference a different issue ID 

586 in the rationale (e.g., "Work committed in 238e17f (bd-mala-xyz: ...)"). 

587 This extracts that referenced issue ID so we can verify the correct commit. 

588 

589 Args: 

590 rationale: The rationale text from ALREADY_COMPLETE resolution. 

591 

592 Returns: 

593 The extracted issue ID (without bd- prefix), or None if not found. 

594 """ 

595 match = self.RATIONALE_ISSUE_PATTERN.search(rationale) 

596 if match: 

597 return match.group(1) 

598 return None 

599 

600 def check_commit_exists( 

601 self, issue_id: str, baseline_timestamp: int | None = None 

602 ) -> CommitResult: 

603 """Check if a commit with bd-<issue_id> exists in recent history. 

604 

605 Searches commits from the last 30 days to accommodate long-running 

606 work that may span multiple days. 

607 

608 Args: 

609 issue_id: The issue ID to search for (without bd- prefix). 

610 baseline_timestamp: Unix timestamp. If provided, only accepts commits 

611 created after this time (to reject stale commits from previous runs). 

612 

613 Returns: 

614 CommitResult indicating whether a matching commit exists. 

615 """ 

616 # Search for commits with bd-<issue_id> in the message 

617 # Use git log with grep to find matching commits 

618 pattern = f"bd-{issue_id}" 

619 

620 # Include commit timestamp in format for baseline comparison 

621 format_str = "%h %ct %s" if baseline_timestamp is not None else "%h %s" 

622 

623 result = self._command_runner.run( 

624 [ 

625 "git", 

626 "log", 

627 f"--format={format_str}", 

628 "--grep", 

629 pattern, 

630 "-n", 

631 "1", 

632 "--since=30 days ago", 

633 ] 

634 ) 

635 

636 if not result.ok: 

637 return CommitResult(exists=False) 

638 

639 output = result.stdout.strip() 

640 if not output: 

641 return CommitResult(exists=False) 

642 

643 # Parse the output based on format 

644 if baseline_timestamp is not None: 

645 # Format: "hash timestamp message" 

646 parts = output.split(" ", 2) 

647 if len(parts) < 2: 

648 return CommitResult(exists=False) 

649 

650 commit_hash = parts[0] 

651 try: 

652 commit_timestamp = int(parts[1]) 

653 except ValueError: 

654 return CommitResult(exists=False) 

655 

656 message = parts[2] if len(parts) > 2 else None 

657 

658 # Reject commits created before the baseline 

659 if commit_timestamp < baseline_timestamp: 

660 return CommitResult(exists=False) 

661 

662 return CommitResult( 

663 exists=True, 

664 commit_hash=commit_hash, 

665 message=message, 

666 ) 

667 else: 

668 # Original format: "hash message" 

669 parts = output.split(" ", 1) 

670 commit_hash = parts[0] if parts else None 

671 message = parts[1] if len(parts) > 1 else None 

672 

673 return CommitResult( 

674 exists=True, 

675 commit_hash=commit_hash, 

676 message=message, 

677 ) 

678 

679 def check_with_resolution( 

680 self, 

681 issue_id: str, 

682 log_path: Path, 

683 baseline_timestamp: int | None = None, 

684 log_offset: int = 0, 

685 spec: ValidationSpec | None = None, 

686 ) -> GateResult: 

687 """Run quality gate check with support for no-op/obsolete resolutions. 

688 

689 This method is scope-aware and handles special resolution outcomes: 

690 - ISSUE_NO_CHANGE: Issue already addressed, no commit needed 

691 - ISSUE_OBSOLETE: Issue no longer relevant, no commit needed 

692 - ISSUE_ALREADY_COMPLETE: Work done in previous run, verify commit exists 

693 

694 For no-op/obsolete resolutions: 

695 - Gate 2 (commit check) is skipped 

696 - Gate 3 (validation evidence) is skipped 

697 - Requires clean working tree and rationale 

698 

699 For already_complete resolutions: 

700 - Gate 2 (commit check) runs WITHOUT baseline timestamp (accepts stale commits) 

701 - Gate 3 (validation evidence) is skipped 

702 - Requires rationale and valid pre-existing commit 

703 

704 When a ValidationSpec is provided, evidence requirements are derived 

705 from the spec rather than using hardcoded defaults. This ensures: 

706 - Per-issue scope never requires E2E evidence 

707 - Disabled validations don't cause failures 

708 

709 Args: 

710 issue_id: The issue ID to verify. 

711 log_path: Path to the JSONL log file from agent session. 

712 baseline_timestamp: Unix timestamp for commit freshness check. 

713 log_offset: Byte offset to start parsing from. 

714 spec: ValidationSpec for scope-aware evidence checking. Required. 

715 

716 Returns: 

717 GateResult with pass/fail, failure reasons, and resolution if applicable. 

718 

719 Raises: 

720 ValueError: If spec is not provided. 

721 """ 

722 if spec is None: 

723 raise ValueError("spec is required for check_with_resolution") 

724 

725 failure_reasons: list[str] = [] 

726 

727 # First, check for resolution markers 

728 resolution, _ = self.parse_issue_resolution_from_offset( 

729 log_path, offset=log_offset 

730 ) 

731 

732 if resolution is not None: 

733 # No-op or obsolete resolution - verify requirements 

734 if resolution.outcome in ( 

735 ResolutionOutcome.NO_CHANGE, 

736 ResolutionOutcome.OBSOLETE, 

737 ): 

738 # Require rationale 

739 if not resolution.rationale.strip(): 

740 failure_reasons.append( 

741 f"{resolution.outcome.value.upper()} resolution requires a rationale" 

742 ) 

743 return GateResult( 

744 passed=False, 

745 failure_reasons=failure_reasons, 

746 resolution=resolution, 

747 ) 

748 

749 # No-op/obsolete with rationale passes 

750 # (skip working tree check - parallel agents may have uncommitted changes) 

751 return GateResult( 

752 passed=True, 

753 resolution=resolution, 

754 ) 

755 

756 # Already complete resolution - verify pre-existing commit 

757 if resolution.outcome == ResolutionOutcome.ALREADY_COMPLETE: 

758 # Require rationale 

759 if not resolution.rationale.strip(): 

760 failure_reasons.append( 

761 "ALREADY_COMPLETE resolution requires a rationale" 

762 ) 

763 return GateResult( 

764 passed=False, 

765 failure_reasons=failure_reasons, 

766 resolution=resolution, 

767 ) 

768 

769 # For duplicate issues, the rationale may reference a different issue ID 

770 # (e.g., "Work committed in 238e17f (bd-mala-xyz: ...)"). 

771 # Extract and use that ID if present, otherwise fall back to current issue. 

772 referenced_id = self.extract_issue_from_rationale(resolution.rationale) 

773 check_issue_id = referenced_id or issue_id 

774 

775 # Verify commit exists WITHOUT baseline check (accepts stale commits) 

776 commit_result = self.check_commit_exists( 

777 check_issue_id, baseline_timestamp=None 

778 ) 

779 if not commit_result.exists: 

780 if referenced_id and referenced_id != issue_id: 

781 failure_reasons.append( 

782 f"ALREADY_COMPLETE resolution references bd-{referenced_id} " 

783 "but no matching commit was found" 

784 ) 

785 else: 

786 failure_reasons.append( 

787 f"ALREADY_COMPLETE resolution requires a commit with bd-{issue_id} " 

788 "but none was found" 

789 ) 

790 return GateResult( 

791 passed=False, 

792 failure_reasons=failure_reasons, 

793 resolution=resolution, 

794 ) 

795 

796 # Already complete with rationale and valid commit passes 

797 # (skip validation evidence - was validated in prior run) 

798 return GateResult( 

799 passed=True, 

800 commit_hash=commit_result.commit_hash, 

801 resolution=resolution, 

802 ) 

803 

804 # Normal flow - require commit and validation evidence 

805 commit_result = self.check_commit_exists(issue_id, baseline_timestamp) 

806 if not commit_result.exists: 

807 if baseline_timestamp is not None: 

808 failure_reasons.append( 

809 f"No commit with bd-{issue_id} found after run baseline " 

810 f"(stale commits from previous runs are rejected)" 

811 ) 

812 else: 

813 failure_reasons.append( 

814 f"No commit with bd-{issue_id} found in the last 30 days" 

815 ) 

816 return GateResult( 

817 passed=False, 

818 failure_reasons=failure_reasons, 

819 ) 

820 

821 # Gate 3: Check validation evidence (spec-driven) 

822 evidence = self.parse_validation_evidence_with_spec(log_path, spec, log_offset) 

823 

824 passed, missing = check_evidence_against_spec(evidence, spec) 

825 

826 # Check for missing validation commands 

827 if not passed: 

828 failure_reasons.append( 

829 f"Missing validation evidence for: {', '.join(missing)}" 

830 ) 

831 

832 # Check for failed validation commands 

833 if evidence.failed_commands: 

834 passed = False 

835 failure_reasons.append( 

836 f"Validation command(s) failed: {', '.join(evidence.failed_commands)}" 

837 ) 

838 

839 return GateResult( 

840 passed=passed, 

841 failure_reasons=failure_reasons, 

842 commit_hash=commit_result.commit_hash, 

843 validation_evidence=evidence, 

844 )