Coverage for src / domain / quality_gate.py: 27%
246 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""Quality gate for verifying agent work before marking success.
3Implements Track A4 from 2025-12-26-coordination-plan.md:
4- Verify commit message contains bd-<issue_id>
5- Verify validation commands ran (parse JSONL logs)
6- On failure: mark needs-followup with failure context
8Evidence Detection:
9 Production code should use parse_validation_evidence_with_spec() or
10 check_with_resolution(..., spec=spec) to derive detection patterns from
11 the ValidationSpec. This ensures spec command changes automatically update
12 evidence expectations.
13"""
15from __future__ import annotations
17import re
18from dataclasses import dataclass, field
19from typing import TYPE_CHECKING, ClassVar
21from src.core.tool_name_extractor import extract_tool_name
23from .validation.spec import (
24 CommandKind,
25 IssueResolution,
26 ResolutionOutcome,
27 ValidationScope,
28 build_validation_spec,
29)
31if TYPE_CHECKING:
32 from collections.abc import Iterator
33 from pathlib import Path
35 from src.core.protocols import (
36 CommandRunnerPort,
37 IssueResolutionProtocol,
38 JsonlEntryProtocol,
39 LogProvider,
40 ValidationEvidenceProtocol,
41 )
43 from .validation.spec import ValidationSpec
46__all__ = [
47 "CommitResult",
48 "GateResult",
49 "QualityGate",
50 "ValidationEvidence",
51]
53# Command kinds that should not be required by the quality gate.
54# SETUP commands like `uv sync` are useful for local setup, but should not
55# block gate passing if omitted or failed.
56QUALITY_GATE_IGNORED_KINDS: set[CommandKind] = {CommandKind.SETUP}
59@dataclass
60class ValidationEvidence:
61 """Evidence of validation commands executed during agent run.
63 This class is spec-driven: evidence is stored by CommandKind rather than
64 using hardcoded tool-specific boolean flags. This allows adding new
65 validation commands without code changes to the evidence structure.
67 Backward Compatibility:
68 Properties like `pytest_ran`, `ruff_check_ran`, etc. are provided
69 for backward compatibility with existing code that references these
70 directly. Internally, all evidence is stored in `commands_ran`.
71 """
73 # Spec-driven evidence storage: CommandKind -> ran boolean
74 commands_ran: dict[CommandKind, bool] = field(default_factory=dict)
76 # Track which validation commands failed (exited non-zero)
77 failed_commands: list[str] = field(default_factory=list)
79 # Backward-compatible properties for external consumers
80 @property
81 def pytest_ran(self) -> bool:
82 """Whether pytest (TEST) command ran."""
83 return self.commands_ran.get(CommandKind.TEST, False)
85 @pytest_ran.setter
86 def pytest_ran(self, value: bool) -> None:
87 """Set pytest (TEST) evidence."""
88 self.commands_ran[CommandKind.TEST] = value
90 @property
91 def ruff_check_ran(self) -> bool:
92 """Whether ruff check (LINT) command ran."""
93 return self.commands_ran.get(CommandKind.LINT, False)
95 @ruff_check_ran.setter
96 def ruff_check_ran(self, value: bool) -> None:
97 """Set ruff check (LINT) evidence."""
98 self.commands_ran[CommandKind.LINT] = value
100 @property
101 def ruff_format_ran(self) -> bool:
102 """Whether ruff format (FORMAT) command ran."""
103 return self.commands_ran.get(CommandKind.FORMAT, False)
105 @ruff_format_ran.setter
106 def ruff_format_ran(self, value: bool) -> None:
107 """Set ruff format (FORMAT) evidence."""
108 self.commands_ran[CommandKind.FORMAT] = value
110 @property
111 def ty_check_ran(self) -> bool:
112 """Whether ty check (TYPECHECK) command ran."""
113 return self.commands_ran.get(CommandKind.TYPECHECK, False)
115 @ty_check_ran.setter
116 def ty_check_ran(self, value: bool) -> None:
117 """Set ty check (TYPECHECK) evidence."""
118 self.commands_ran[CommandKind.TYPECHECK] = value
120 def has_any_evidence(self) -> bool:
121 """Check if any validation command ran.
123 Used for progress detection to determine if new validation
124 activity occurred since the last check.
125 """
126 return any(self.commands_ran.values())
128 def has_minimum_validation(self) -> bool:
129 """Check if minimum required validation was performed.
131 Requires the full validation suite:
132 - pytest (run tests)
133 - ruff check (lint)
134 - ruff format (format)
135 - ty check (type check)
136 """
137 return (
138 self.pytest_ran
139 and self.ruff_check_ran
140 and self.ruff_format_ran
141 and self.ty_check_ran
142 )
144 def missing_commands(self) -> list[str]:
145 """List validation commands that didn't run."""
146 missing = []
147 if not self.pytest_ran:
148 missing.append("pytest")
149 if not self.ruff_check_ran:
150 missing.append("ruff check")
151 if not self.ruff_format_ran:
152 missing.append("ruff format")
153 if not self.ty_check_ran:
154 missing.append("ty check")
155 return missing
157 def to_evidence_dict(self) -> dict[str, bool]:
158 """Convert evidence to a serializable dict keyed by CommandKind value.
160 This is the spec-driven alternative to accessing individual properties.
161 Returns a dict with keys like "test", "lint", "format", "typecheck"
162 based on what commands were detected.
164 Use this method when building metadata to avoid hardcoded property access.
166 Returns:
167 Dict mapping CommandKind.value strings to their ran status.
168 """
169 return {kind.value: ran for kind, ran in self.commands_ran.items()}
172def get_required_evidence_kinds(spec: ValidationSpec) -> set[CommandKind]:
173 """Get the set of command kinds required by a ValidationSpec.
175 This derives the expected evidence from the spec, ensuring scope-aware
176 evidence requirements. For example, per-issue scope specs won't have
177 E2E commands, so E2E evidence won't be required.
179 Args:
180 spec: The ValidationSpec to extract requirements from.
182 Returns:
183 Set of CommandKind values that must have evidence.
184 """
185 return {
186 cmd.kind for cmd in spec.commands if cmd.kind not in QUALITY_GATE_IGNORED_KINDS
187 }
190def check_evidence_against_spec(
191 evidence: ValidationEvidence, spec: ValidationSpec
192) -> tuple[bool, list[str]]:
193 """Check if evidence satisfies a ValidationSpec's requirements.
195 This is fully spec-driven: evidence requirements and display names are
196 derived from the spec's commands, not hardcoded. This allows adding new
197 validation commands without code changes.
199 This is scope-aware: a per-issue spec won't require E2E evidence because
200 per-issue specs don't include E2E commands.
202 Args:
203 evidence: The parsed validation evidence.
204 spec: The ValidationSpec defining what's required.
206 Returns:
207 Tuple of (passed, missing_commands) where missing_commands lists
208 human-readable names of commands that didn't run.
209 """
210 missing: list[str] = []
212 # Build kind-to-name mapping from spec (spec-driven display names)
213 kind_to_name: dict[CommandKind, str] = {}
214 for cmd in spec.commands:
215 # Use first command name for each kind as the display name
216 if cmd.kind not in kind_to_name:
217 kind_to_name[cmd.kind] = cmd.name
219 # Check each required kind from the spec
220 for kind in get_required_evidence_kinds(spec):
221 ran = evidence.commands_ran.get(kind, False)
222 if not ran:
223 name = kind_to_name.get(kind, kind.value)
224 missing.append(name)
226 return len(missing) == 0, missing
229@dataclass
230class CommitResult:
231 """Result of checking for a matching commit."""
233 exists: bool
234 commit_hash: str | None = None
235 message: str | None = None
238@dataclass
239class GateResult:
240 """Result of quality gate check."""
242 passed: bool
243 failure_reasons: list[str] = field(default_factory=list)
244 commit_hash: str | None = None
245 validation_evidence: ValidationEvidence | ValidationEvidenceProtocol | None = None
246 no_progress: bool = False
247 resolution: IssueResolution | IssueResolutionProtocol | None = None
250class QualityGate:
251 """Quality gate for verifying agent work meets requirements.
253 Uses LogProvider for JSONL log parsing, keeping this class
254 focused on policy checking and validation logic.
255 """
257 # Patterns for detecting issue resolution markers in log text
258 RESOLUTION_PATTERNS: ClassVar[dict[str, re.Pattern[str]]] = {
259 "no_change": re.compile(r"ISSUE_NO_CHANGE:\s*(.*)$", re.MULTILINE),
260 "obsolete": re.compile(r"ISSUE_OBSOLETE:\s*(.*)$", re.MULTILINE),
261 "already_complete": re.compile(
262 r"ISSUE_ALREADY_COMPLETE:\s*(.*)$", re.MULTILINE
263 ),
264 }
266 # Map pattern names to resolution outcomes
267 PATTERN_TO_OUTCOME: ClassVar[dict[str, ResolutionOutcome]] = {
268 "no_change": ResolutionOutcome.NO_CHANGE,
269 "obsolete": ResolutionOutcome.OBSOLETE,
270 "already_complete": ResolutionOutcome.ALREADY_COMPLETE,
271 }
273 # Pattern to extract issue ID from ALREADY_COMPLETE rationale
274 # Matches: "bd-issue-123", "bd-mala-xyz", etc. in rationale text
275 RATIONALE_ISSUE_PATTERN: ClassVar[re.Pattern[str]] = re.compile(
276 r"\bbd-([a-zA-Z0-9_-]+)\b"
277 )
279 def __init__(
280 self,
281 repo_path: Path,
282 log_provider: LogProvider,
283 command_runner: CommandRunnerPort,
284 ):
285 """Initialize quality gate.
287 Args:
288 repo_path: Path to the repository for git operations.
289 log_provider: LogProvider for reading session logs.
290 command_runner: CommandRunnerPort for running git commands.
291 """
292 self.repo_path = repo_path
293 self._log_provider = log_provider
294 self._command_runner = command_runner
296 def _match_resolution_pattern(self, text: str) -> IssueResolution | None:
297 """Check text against all resolution patterns.
299 Args:
300 text: Text content to search for patterns.
302 Returns:
303 IssueResolution if a pattern matches, None otherwise.
304 """
305 for name, pattern in self.RESOLUTION_PATTERNS.items():
306 match = pattern.search(text)
307 if match:
308 return IssueResolution(
309 outcome=self.PATTERN_TO_OUTCOME[name],
310 rationale=match.group(1).strip(),
311 )
312 return None
314 def _match_spec_pattern_with_kinds(
315 self,
316 command: str,
317 evidence: ValidationEvidence,
318 kind_patterns: dict[CommandKind, list[re.Pattern[str]]],
319 ) -> list[CommandKind]:
320 """Check command against spec-defined patterns and return all matched kinds.
322 A command may match multiple kinds (e.g., "ruff" matches both LINT and FORMAT
323 patterns). This method returns all matching kinds for proper evidence tracking.
325 Args:
326 command: The bash command string.
327 evidence: ValidationEvidence to update.
328 kind_patterns: Mapping of CommandKind to detection patterns.
330 Returns:
331 List of matched CommandKinds (may be empty if no match).
332 """
333 matched_kinds: list[CommandKind] = []
334 for kind, patterns in kind_patterns.items():
335 for pattern in patterns:
336 if pattern.search(command):
337 # Spec-driven: record any CommandKind directly
338 evidence.commands_ran[kind] = True
339 matched_kinds.append(kind)
340 break # Found match for this kind, try next kind
341 return matched_kinds
343 def _build_spec_patterns(
344 self, spec: ValidationSpec
345 ) -> dict[CommandKind, list[re.Pattern[str]]]:
346 """Build pattern mapping from a ValidationSpec.
348 Args:
349 spec: The ValidationSpec defining commands and their detection patterns.
351 Returns:
352 Mapping of CommandKind to list of detection patterns.
353 """
354 kind_patterns: dict[CommandKind, list[re.Pattern[str]]] = {}
355 for cmd in spec.commands:
356 if cmd.kind not in kind_patterns:
357 kind_patterns[cmd.kind] = []
358 if cmd.detection_pattern is not None:
359 kind_patterns[cmd.kind].append(cmd.detection_pattern)
360 return kind_patterns
362 def _iter_jsonl_entries(
363 self, log_path: Path, offset: int = 0
364 ) -> Iterator[JsonlEntryProtocol]:
365 """Iterate over parsed JSONL entries from a log file.
367 Delegates to LogProvider.iter_events().
369 Args:
370 log_path: Path to the JSONL log file.
371 offset: Byte offset to start reading from (default 0).
373 Yields:
374 JsonlEntryProtocol objects for each successfully parsed JSON line.
375 """
376 return self._log_provider.iter_events(log_path, offset)
378 def parse_issue_resolution(self, log_path: Path) -> IssueResolution | None:
379 """Parse JSONL log file for issue resolution markers.
381 Looks for ISSUE_NO_CHANGE or ISSUE_OBSOLETE markers with rationale.
383 Args:
384 log_path: Path to the JSONL log file from agent session.
386 Returns:
387 IssueResolution if a marker was found, None otherwise.
388 """
389 resolution, _ = self.parse_issue_resolution_from_offset(log_path, offset=0)
390 return resolution
392 def parse_issue_resolution_from_offset(
393 self, log_path: Path, offset: int = 0
394 ) -> tuple[IssueResolution | None, int]:
395 """Parse JSONL log file for issue resolution markers starting at offset.
397 Only parses assistant messages to prevent user prompts from triggering
398 resolution markers.
400 Args:
401 log_path: Path to the JSONL log file from agent session.
402 offset: Byte offset to start reading from (default 0 = beginning).
404 Returns:
405 Tuple of (IssueResolution or None, new_offset).
406 """
407 if not log_path.exists():
408 return None, 0
410 try:
411 for entry in self._iter_jsonl_entries(log_path, offset):
412 for text in self._log_provider.extract_assistant_text_blocks(entry):
413 resolution = self._match_resolution_pattern(text)
414 if resolution:
415 return resolution, entry.offset + entry.line_len
416 # No match found - return EOF position (matches original f.tell())
417 return None, self.get_log_end_offset(log_path, offset)
418 except OSError:
419 return None, 0
421 def check_working_tree_clean(self) -> tuple[bool, str]:
422 """Check if the git working tree is clean (no uncommitted changes).
424 Returns:
425 Tuple of (is_clean, status_output). On git failure, returns
426 (False, error_message) to treat unknown state as dirty.
427 """
428 result = self._command_runner.run(["git", "status", "--porcelain"])
429 # Treat git failures as dirty/unknown state
430 if not result.ok:
431 error_msg = result.stderr.strip() or "git status failed"
432 return False, f"git error: {error_msg}"
433 output = result.stdout.strip()
434 return len(output) == 0, output
436 def parse_validation_evidence_with_spec(
437 self, log_path: Path, spec: ValidationSpec, offset: int = 0
438 ) -> ValidationEvidence:
439 """Parse JSONL log for validation evidence using spec-defined patterns."""
440 evidence = ValidationEvidence()
441 if not log_path.exists():
442 return evidence
444 kind_patterns = self._build_spec_patterns(spec)
445 # Track tool_id → list of (CommandKind, display_name) for proper failure tracking
446 # A command may match multiple kinds (e.g., "ruff" matches LINT and FORMAT)
447 tool_id_to_info: dict[str, list[tuple[CommandKind, str]]] = {}
448 # Track failures per CommandKind (latest status wins for retries of same command)
449 kind_failed: dict[CommandKind, tuple[bool, str]] = {}
451 for entry in self._iter_jsonl_entries(log_path, offset):
452 for tool_id, command in self._log_provider.extract_bash_commands(entry):
453 matched_kinds = self._match_spec_pattern_with_kinds(
454 command, evidence, kind_patterns
455 )
456 if matched_kinds:
457 cmd_name = extract_tool_name(command)
458 tool_id_to_info[tool_id] = [
459 (kind, cmd_name) for kind in matched_kinds
460 ]
461 for tool_use_id, is_error in self._log_provider.extract_tool_results(entry):
462 if tool_use_id in tool_id_to_info:
463 for kind, cmd_name in tool_id_to_info[tool_use_id]:
464 # Latest status for this CommandKind wins (allows retries to succeed)
465 kind_failed[kind] = (is_error, cmd_name)
467 # Build failed_commands from kinds that failed, using display names
468 # Filter out ignored kinds (e.g., SETUP) so they don't block the gate
469 # Deduplicate: multiple kinds (LINT, FORMAT) may map to the same tool (ruff)
470 evidence.failed_commands = list(
471 dict.fromkeys(
472 cmd_name
473 for kind, (is_failed, cmd_name) in kind_failed.items()
474 if is_failed and kind not in QUALITY_GATE_IGNORED_KINDS
475 )
476 )
477 return evidence
479 def get_log_end_offset(self, log_path: Path, start_offset: int = 0) -> int:
480 """Get the byte offset at the end of a log file.
482 Delegates to LogProvider.get_end_offset().
484 Args:
485 log_path: Path to the JSONL log file.
486 start_offset: Byte offset to start from (default 0).
488 Returns:
489 The byte offset at the end of the file, or start_offset if file
490 doesn't exist or can't be read.
491 """
492 return self._log_provider.get_end_offset(log_path, start_offset)
494 def check_no_progress(
495 self,
496 log_path: Path,
497 log_offset: int,
498 previous_commit_hash: str | None,
499 current_commit_hash: str | None,
500 spec: ValidationSpec | None = None,
501 check_validation_evidence: bool = True,
502 ) -> bool:
503 """Check if no progress was made since the last attempt.
505 No progress is detected when ALL of these are true:
506 - The commit hash hasn't changed (or both are None)
507 - No uncommitted changes in the working tree
508 - (Optionally) No new validation evidence was found after the log offset
510 Args:
511 log_path: Path to the JSONL log file from agent session.
512 log_offset: Byte offset marking the end of the previous attempt.
513 previous_commit_hash: Commit hash from the previous attempt (None if no commit).
514 current_commit_hash: Commit hash from this attempt (None if no commit).
515 spec: Optional ValidationSpec for spec-driven evidence detection.
516 If not provided, builds a default per-issue spec.
517 check_validation_evidence: If True (default), also check for new validation
518 evidence. Set to False for review retries where only commit/working-tree
519 changes should gate progress.
521 Returns:
522 True if no progress was made, False if progress was detected.
523 """
524 # Check if commit changed
525 commit_changed = previous_commit_hash != current_commit_hash
527 # A new commit from None is progress (first successful commit)
528 if previous_commit_hash is None and current_commit_hash is not None:
529 return False
531 # If commit changed, that's progress
532 if commit_changed:
533 return False
535 # Check for uncommitted working tree changes
536 if self._has_working_tree_changes():
537 return False
539 # Skip validation evidence check if not requested (for review retries)
540 if not check_validation_evidence:
541 # No commit change and no working tree changes = no progress
542 return True
544 # Build default spec if not provided
545 # Note: We don't pass repo_path here to ensure Python validation commands
546 # are always included for progress detection. The spec-driven parsing
547 # ensures consistency with the production evidence parsing patterns.
548 if spec is None:
549 spec = build_validation_spec(
550 self.repo_path,
551 scope=ValidationScope.PER_ISSUE,
552 )
554 # Check for new validation evidence after the offset using spec-driven parsing
555 evidence = self.parse_validation_evidence_with_spec(log_path, spec, log_offset)
557 # Any new validation evidence counts as progress (spec-driven)
558 if evidence.has_any_evidence():
559 return False
561 # No commit change, no working tree changes, and no new evidence = no progress
562 return True
564 def _has_working_tree_changes(self) -> bool:
565 """Check if the working tree has uncommitted changes.
567 Returns:
568 True if there are staged or unstaged changes, or if git status
569 fails (conservative assumption that changes may exist).
570 """
571 # Use git status --porcelain to detect any changes
572 # This includes staged, unstaged, and untracked files
573 result = self._command_runner.run(["git", "status", "--porcelain"], timeout=5.0)
574 if not result.ok:
575 # If git status fails, assume changes exist (conservative default)
576 # This prevents false "no progress" conclusions when git state is unknown
577 return True
579 # Any output means there are changes
580 return bool(result.stdout.strip())
582 def extract_issue_from_rationale(self, rationale: str) -> str | None:
583 """Extract issue ID from ALREADY_COMPLETE rationale.
585 For duplicate issues, the agent may reference a different issue ID
586 in the rationale (e.g., "Work committed in 238e17f (bd-mala-xyz: ...)").
587 This extracts that referenced issue ID so we can verify the correct commit.
589 Args:
590 rationale: The rationale text from ALREADY_COMPLETE resolution.
592 Returns:
593 The extracted issue ID (without bd- prefix), or None if not found.
594 """
595 match = self.RATIONALE_ISSUE_PATTERN.search(rationale)
596 if match:
597 return match.group(1)
598 return None
600 def check_commit_exists(
601 self, issue_id: str, baseline_timestamp: int | None = None
602 ) -> CommitResult:
603 """Check if a commit with bd-<issue_id> exists in recent history.
605 Searches commits from the last 30 days to accommodate long-running
606 work that may span multiple days.
608 Args:
609 issue_id: The issue ID to search for (without bd- prefix).
610 baseline_timestamp: Unix timestamp. If provided, only accepts commits
611 created after this time (to reject stale commits from previous runs).
613 Returns:
614 CommitResult indicating whether a matching commit exists.
615 """
616 # Search for commits with bd-<issue_id> in the message
617 # Use git log with grep to find matching commits
618 pattern = f"bd-{issue_id}"
620 # Include commit timestamp in format for baseline comparison
621 format_str = "%h %ct %s" if baseline_timestamp is not None else "%h %s"
623 result = self._command_runner.run(
624 [
625 "git",
626 "log",
627 f"--format={format_str}",
628 "--grep",
629 pattern,
630 "-n",
631 "1",
632 "--since=30 days ago",
633 ]
634 )
636 if not result.ok:
637 return CommitResult(exists=False)
639 output = result.stdout.strip()
640 if not output:
641 return CommitResult(exists=False)
643 # Parse the output based on format
644 if baseline_timestamp is not None:
645 # Format: "hash timestamp message"
646 parts = output.split(" ", 2)
647 if len(parts) < 2:
648 return CommitResult(exists=False)
650 commit_hash = parts[0]
651 try:
652 commit_timestamp = int(parts[1])
653 except ValueError:
654 return CommitResult(exists=False)
656 message = parts[2] if len(parts) > 2 else None
658 # Reject commits created before the baseline
659 if commit_timestamp < baseline_timestamp:
660 return CommitResult(exists=False)
662 return CommitResult(
663 exists=True,
664 commit_hash=commit_hash,
665 message=message,
666 )
667 else:
668 # Original format: "hash message"
669 parts = output.split(" ", 1)
670 commit_hash = parts[0] if parts else None
671 message = parts[1] if len(parts) > 1 else None
673 return CommitResult(
674 exists=True,
675 commit_hash=commit_hash,
676 message=message,
677 )
679 def check_with_resolution(
680 self,
681 issue_id: str,
682 log_path: Path,
683 baseline_timestamp: int | None = None,
684 log_offset: int = 0,
685 spec: ValidationSpec | None = None,
686 ) -> GateResult:
687 """Run quality gate check with support for no-op/obsolete resolutions.
689 This method is scope-aware and handles special resolution outcomes:
690 - ISSUE_NO_CHANGE: Issue already addressed, no commit needed
691 - ISSUE_OBSOLETE: Issue no longer relevant, no commit needed
692 - ISSUE_ALREADY_COMPLETE: Work done in previous run, verify commit exists
694 For no-op/obsolete resolutions:
695 - Gate 2 (commit check) is skipped
696 - Gate 3 (validation evidence) is skipped
697 - Requires clean working tree and rationale
699 For already_complete resolutions:
700 - Gate 2 (commit check) runs WITHOUT baseline timestamp (accepts stale commits)
701 - Gate 3 (validation evidence) is skipped
702 - Requires rationale and valid pre-existing commit
704 When a ValidationSpec is provided, evidence requirements are derived
705 from the spec rather than using hardcoded defaults. This ensures:
706 - Per-issue scope never requires E2E evidence
707 - Disabled validations don't cause failures
709 Args:
710 issue_id: The issue ID to verify.
711 log_path: Path to the JSONL log file from agent session.
712 baseline_timestamp: Unix timestamp for commit freshness check.
713 log_offset: Byte offset to start parsing from.
714 spec: ValidationSpec for scope-aware evidence checking. Required.
716 Returns:
717 GateResult with pass/fail, failure reasons, and resolution if applicable.
719 Raises:
720 ValueError: If spec is not provided.
721 """
722 if spec is None:
723 raise ValueError("spec is required for check_with_resolution")
725 failure_reasons: list[str] = []
727 # First, check for resolution markers
728 resolution, _ = self.parse_issue_resolution_from_offset(
729 log_path, offset=log_offset
730 )
732 if resolution is not None:
733 # No-op or obsolete resolution - verify requirements
734 if resolution.outcome in (
735 ResolutionOutcome.NO_CHANGE,
736 ResolutionOutcome.OBSOLETE,
737 ):
738 # Require rationale
739 if not resolution.rationale.strip():
740 failure_reasons.append(
741 f"{resolution.outcome.value.upper()} resolution requires a rationale"
742 )
743 return GateResult(
744 passed=False,
745 failure_reasons=failure_reasons,
746 resolution=resolution,
747 )
749 # No-op/obsolete with rationale passes
750 # (skip working tree check - parallel agents may have uncommitted changes)
751 return GateResult(
752 passed=True,
753 resolution=resolution,
754 )
756 # Already complete resolution - verify pre-existing commit
757 if resolution.outcome == ResolutionOutcome.ALREADY_COMPLETE:
758 # Require rationale
759 if not resolution.rationale.strip():
760 failure_reasons.append(
761 "ALREADY_COMPLETE resolution requires a rationale"
762 )
763 return GateResult(
764 passed=False,
765 failure_reasons=failure_reasons,
766 resolution=resolution,
767 )
769 # For duplicate issues, the rationale may reference a different issue ID
770 # (e.g., "Work committed in 238e17f (bd-mala-xyz: ...)").
771 # Extract and use that ID if present, otherwise fall back to current issue.
772 referenced_id = self.extract_issue_from_rationale(resolution.rationale)
773 check_issue_id = referenced_id or issue_id
775 # Verify commit exists WITHOUT baseline check (accepts stale commits)
776 commit_result = self.check_commit_exists(
777 check_issue_id, baseline_timestamp=None
778 )
779 if not commit_result.exists:
780 if referenced_id and referenced_id != issue_id:
781 failure_reasons.append(
782 f"ALREADY_COMPLETE resolution references bd-{referenced_id} "
783 "but no matching commit was found"
784 )
785 else:
786 failure_reasons.append(
787 f"ALREADY_COMPLETE resolution requires a commit with bd-{issue_id} "
788 "but none was found"
789 )
790 return GateResult(
791 passed=False,
792 failure_reasons=failure_reasons,
793 resolution=resolution,
794 )
796 # Already complete with rationale and valid commit passes
797 # (skip validation evidence - was validated in prior run)
798 return GateResult(
799 passed=True,
800 commit_hash=commit_result.commit_hash,
801 resolution=resolution,
802 )
804 # Normal flow - require commit and validation evidence
805 commit_result = self.check_commit_exists(issue_id, baseline_timestamp)
806 if not commit_result.exists:
807 if baseline_timestamp is not None:
808 failure_reasons.append(
809 f"No commit with bd-{issue_id} found after run baseline "
810 f"(stale commits from previous runs are rejected)"
811 )
812 else:
813 failure_reasons.append(
814 f"No commit with bd-{issue_id} found in the last 30 days"
815 )
816 return GateResult(
817 passed=False,
818 failure_reasons=failure_reasons,
819 )
821 # Gate 3: Check validation evidence (spec-driven)
822 evidence = self.parse_validation_evidence_with_spec(log_path, spec, log_offset)
824 passed, missing = check_evidence_against_spec(evidence, spec)
826 # Check for missing validation commands
827 if not passed:
828 failure_reasons.append(
829 f"Missing validation evidence for: {', '.join(missing)}"
830 )
832 # Check for failed validation commands
833 if evidence.failed_commands:
834 passed = False
835 failure_reasons.append(
836 f"Validation command(s) failed: {', '.join(evidence.failed_commands)}"
837 )
839 return GateResult(
840 passed=passed,
841 failure_reasons=failure_reasons,
842 commit_hash=commit_result.commit_hash,
843 validation_evidence=evidence,
844 )