Coverage for src / infra / io / log_output / run_metadata.py: 32%
213 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""Run metadata tracking for mala orchestrator runs.
3Captures orchestrator configuration, issue results, and pointers to Claude logs.
4Replaces the duplicate JSONL logging with structured run metadata.
5"""
7import json
8import logging
9import os
10import uuid
11from dataclasses import dataclass, field, asdict
12from datetime import datetime, UTC
13from pathlib import Path
14from typing import Any, Literal
16from src.core.models import (
17 IssueResolution,
18 ResolutionOutcome,
19 ValidationArtifacts,
20)
21from src.infra.tools.env import get_lock_dir, get_repo_runs_dir
24def configure_debug_logging(repo_path: Path, run_id: str) -> Path | None:
25 """Configure Python logging to write debug logs to a file.
27 Creates a debug log file alongside run metadata at:
28 ~/.mala/runs/{repo}/{timestamp}_{run_id}.debug.log
30 All loggers in the 'src' namespace will write DEBUG+ messages to this file.
32 This function is best-effort: if the log directory cannot be created or
33 the log file cannot be opened (e.g., read-only filesystem, permission
34 denied), it returns None and the run continues without debug logging.
36 Set MALA_DISABLE_DEBUG_LOG=1 to disable debug logging entirely.
38 Args:
39 repo_path: Repository path for log directory.
40 run_id: Run ID (UUID) for filename.
42 Returns:
43 Path to the debug log file, or None if logging could not be configured
44 or is disabled via environment variable.
45 """
46 # Allow opt-out via environment variable
47 if os.environ.get("MALA_DISABLE_DEBUG_LOG") == "1":
48 return None
50 try:
51 runs_dir = get_repo_runs_dir(repo_path)
52 runs_dir.mkdir(parents=True, exist_ok=True)
54 timestamp = datetime.now(UTC).strftime("%Y-%m-%dT%H-%M-%S")
55 short_id = run_id[:8]
56 log_path = runs_dir / f"{timestamp}_{short_id}.debug.log"
58 # Create file handler for debug logs
59 handler = logging.FileHandler(log_path)
60 handler.setLevel(logging.DEBUG)
61 handler.setFormatter(
62 logging.Formatter(
63 "%(asctime)s %(levelname)s %(name)s: %(message)s",
64 datefmt="%H:%M:%S",
65 )
66 )
67 # Tag the handler so we can identify it later
68 handler.set_name(f"mala_debug_{run_id}")
70 # Add handler to root logger for 'src' namespace
71 src_logger = logging.getLogger("src")
72 src_logger.setLevel(logging.DEBUG)
74 # Remove any previous mala debug handlers to avoid duplicates/leaks
75 for existing in src_logger.handlers[:]:
76 if getattr(existing, "name", "").startswith("mala_debug_"):
77 existing.close()
78 src_logger.removeHandler(existing)
80 src_logger.addHandler(handler)
82 return log_path
83 except OSError:
84 # Best-effort: if we can't create the log file, continue without it
85 # This handles read-only filesystems, permission denied, disk full, etc.
86 return None
89def cleanup_debug_logging(run_id: str) -> bool:
90 """Clean up debug logging handler for a completed run.
92 Removes and closes the FileHandler associated with the given run_id
93 to prevent file handle leaks.
95 Args:
96 run_id: Run ID (UUID) whose handler should be cleaned up.
98 Returns:
99 True if a handler was found and cleaned up, False otherwise.
100 """
101 src_logger = logging.getLogger("src")
102 handler_name = f"mala_debug_{run_id}"
104 for handler in src_logger.handlers[:]:
105 if getattr(handler, "name", "") == handler_name:
106 handler.close()
107 src_logger.removeHandler(handler)
108 return True
110 return False
113@dataclass
114class QualityGateResult:
115 """Quality gate check result for an issue."""
117 passed: bool
118 evidence: dict[str, bool] = field(default_factory=dict)
119 failure_reasons: list[str] = field(default_factory=list)
122@dataclass
123class ValidationResult:
124 """Result of validation execution for observability.
126 Attributes:
127 passed: Whether all validations passed.
128 commands_run: List of command names that were executed.
129 commands_failed: List of command names that failed.
130 artifacts: Validation artifacts (logs, worktree, coverage report).
131 coverage_percent: Coverage percentage if measured (None if not run).
132 e2e_passed: Whether E2E tests passed (None if not run).
133 """
135 passed: bool
136 commands_run: list[str] = field(default_factory=list)
137 commands_failed: list[str] = field(default_factory=list)
138 artifacts: ValidationArtifacts | None = None
139 coverage_percent: float | None = None
140 e2e_passed: bool | None = None
143@dataclass
144class IssueRun:
145 """Result of running an agent on a single issue."""
147 issue_id: str
148 agent_id: str
149 status: Literal["success", "failed", "timeout"]
150 duration_seconds: float
151 session_id: str | None = None # Claude SDK session ID
152 log_path: str | None = None # Path to Claude's log file
153 quality_gate: QualityGateResult | None = None
154 error: str | None = None
155 # Retry tracking (recorded even if defaulted)
156 gate_attempts: int = 0
157 review_attempts: int = 0
158 # Validation results and resolution (from mala-e0i)
159 validation: ValidationResult | None = None
160 resolution: IssueResolution | None = None
161 # Cerberus review session log path (verbose mode only)
162 review_log_path: str | None = None
165@dataclass
166class RunConfig:
167 """Orchestrator run configuration."""
169 max_agents: int | None
170 timeout_minutes: int | None
171 max_issues: int | None
172 epic_id: str | None
173 only_ids: list[str] | None
174 braintrust_enabled: bool
175 # Retry/review config (optional for backward compatibility)
176 max_gate_retries: int | None = None
177 max_review_retries: int | None = None
178 review_enabled: bool | None = None
179 # CLI args for debugging/auditing (optional for backward compatibility)
180 cli_args: dict[str, object] | None = None
181 # Orphans-only filter (optional for backward compatibility)
182 orphans_only: bool = False
185class RunMetadata:
186 """Tracks metadata for a single orchestrator run.
188 Creates a JSON file at ~/.config/mala/runs/{run_id}.json containing:
189 - Run configuration
190 - Per-issue results with Claude log path pointers
191 - Quality gate outcomes
192 - Validation results and artifacts
193 - Timing and error information
194 """
196 def __init__(
197 self,
198 repo_path: Path,
199 config: RunConfig,
200 version: str,
201 ):
202 self.run_id = str(uuid.uuid4())
203 self.started_at = datetime.now(UTC)
204 self.completed_at: datetime | None = None
205 self.repo_path = repo_path
206 self.config = config
207 self.version = version
208 self.issues: dict[str, IssueRun] = {}
209 # Run-level validation results (from mala-e0i)
210 self.run_validation: ValidationResult | None = None
211 # Configure debug logging for this run (always enabled)
212 self.debug_log_path: Path | None = configure_debug_logging(
213 repo_path, self.run_id
214 )
216 def record_issue(self, issue: IssueRun) -> None:
217 """Record the result of an issue run."""
218 self.issues[issue.issue_id] = issue
220 def record_run_validation(self, result: ValidationResult) -> None:
221 """Record run-level validation results.
223 Args:
224 result: The validation result for the entire run.
225 """
226 self.run_validation = result
228 def _serialize_validation_artifacts(
229 self, artifacts: ValidationArtifacts | None
230 ) -> dict[str, Any] | None:
231 """Serialize ValidationArtifacts to a JSON-compatible dict."""
232 if artifacts is None:
233 return None
234 return {
235 "log_dir": str(artifacts.log_dir),
236 "worktree_path": str(artifacts.worktree_path)
237 if artifacts.worktree_path
238 else None,
239 "worktree_state": artifacts.worktree_state,
240 "coverage_report": str(artifacts.coverage_report)
241 if artifacts.coverage_report
242 else None,
243 "e2e_fixture_path": str(artifacts.e2e_fixture_path)
244 if artifacts.e2e_fixture_path
245 else None,
246 }
248 def _serialize_validation_result(
249 self, result: ValidationResult | None
250 ) -> dict[str, Any] | None:
251 """Serialize ValidationResult to a JSON-compatible dict."""
252 if result is None:
253 return None
254 return {
255 "passed": result.passed,
256 "commands_run": result.commands_run,
257 "commands_failed": result.commands_failed,
258 "artifacts": self._serialize_validation_artifacts(result.artifacts),
259 "coverage_percent": result.coverage_percent,
260 "e2e_passed": result.e2e_passed,
261 }
263 def _serialize_issue_resolution(
264 self, resolution: IssueResolution | None
265 ) -> dict[str, Any] | None:
266 """Serialize IssueResolution to a JSON-compatible dict."""
267 if resolution is None:
268 return None
269 return {
270 "outcome": resolution.outcome.value,
271 "rationale": resolution.rationale,
272 }
274 def _to_dict(self) -> dict[str, Any]:
275 """Convert to dictionary for JSON serialization."""
276 return {
277 "run_id": self.run_id,
278 "started_at": self.started_at.isoformat(),
279 "completed_at": self.completed_at.isoformat()
280 if self.completed_at
281 else None,
282 "version": self.version,
283 "repo_path": str(self.repo_path),
284 "config": asdict(self.config),
285 "issues": {
286 issue_id: {
287 **asdict(issue),
288 "quality_gate": asdict(issue.quality_gate)
289 if issue.quality_gate
290 else None,
291 "validation": self._serialize_validation_result(issue.validation),
292 "resolution": self._serialize_issue_resolution(issue.resolution),
293 }
294 for issue_id, issue in self.issues.items()
295 },
296 "run_validation": self._serialize_validation_result(self.run_validation),
297 "debug_log_path": str(self.debug_log_path) if self.debug_log_path else None,
298 }
300 @staticmethod
301 def _deserialize_validation_artifacts(
302 data: dict[str, Any] | None,
303 ) -> ValidationArtifacts | None:
304 """Deserialize ValidationArtifacts from a dict."""
305 if data is None:
306 return None
307 return ValidationArtifacts(
308 log_dir=Path(data["log_dir"]),
309 worktree_path=Path(data["worktree_path"])
310 if data.get("worktree_path")
311 else None,
312 worktree_state=data.get("worktree_state"),
313 coverage_report=Path(data["coverage_report"])
314 if data.get("coverage_report")
315 else None,
316 e2e_fixture_path=Path(data["e2e_fixture_path"])
317 if data.get("e2e_fixture_path")
318 else None,
319 )
321 @staticmethod
322 def _deserialize_validation_result(
323 data: dict[str, Any] | None,
324 ) -> ValidationResult | None:
325 """Deserialize ValidationResult from a dict."""
326 if data is None:
327 return None
328 return ValidationResult(
329 passed=data["passed"],
330 commands_run=data.get("commands_run", []),
331 commands_failed=data.get("commands_failed", []),
332 artifacts=RunMetadata._deserialize_validation_artifacts(
333 data.get("artifacts")
334 ),
335 coverage_percent=data.get("coverage_percent"),
336 e2e_passed=data.get("e2e_passed"),
337 )
339 @staticmethod
340 def _deserialize_issue_resolution(
341 data: dict[str, Any] | None,
342 ) -> IssueResolution | None:
343 """Deserialize IssueResolution from a dict."""
344 if data is None:
345 return None
346 return IssueResolution(
347 outcome=ResolutionOutcome(data["outcome"]),
348 rationale=data["rationale"],
349 )
351 @classmethod
352 def load(cls, path: Path) -> "RunMetadata":
353 """Load run metadata from a JSON file.
355 Args:
356 path: Path to the JSON file.
358 Returns:
359 Loaded RunMetadata instance.
361 Raises:
362 FileNotFoundError: If the file doesn't exist.
363 json.JSONDecodeError: If the file is invalid JSON.
364 """
365 with open(path) as f:
366 data = json.load(f)
368 # Reconstruct config
369 config_data = data["config"]
370 config = RunConfig(
371 max_agents=config_data.get("max_agents"),
372 timeout_minutes=config_data.get("timeout_minutes"),
373 max_issues=config_data.get("max_issues"),
374 epic_id=config_data.get("epic_id"),
375 only_ids=config_data.get("only_ids"),
376 braintrust_enabled=config_data.get("braintrust_enabled", False),
377 max_gate_retries=config_data.get("max_gate_retries"),
378 max_review_retries=config_data.get("max_review_retries"),
379 review_enabled=config_data.get("review_enabled"),
380 cli_args=config_data.get("cli_args"),
381 orphans_only=config_data.get("orphans_only", False),
382 )
384 # Create instance
385 metadata = cls.__new__(cls)
386 metadata.run_id = data["run_id"]
387 metadata.started_at = datetime.fromisoformat(data["started_at"])
388 metadata.completed_at = (
389 datetime.fromisoformat(data["completed_at"])
390 if data.get("completed_at")
391 else None
392 )
393 metadata.repo_path = Path(data["repo_path"])
394 metadata.config = config
395 metadata.version = data["version"]
397 # Reconstruct issues
398 metadata.issues = {}
399 for issue_id, issue_data in data.get("issues", {}).items():
400 quality_gate = None
401 if issue_data.get("quality_gate"):
402 qg_data = issue_data["quality_gate"]
403 quality_gate = QualityGateResult(
404 passed=qg_data["passed"],
405 evidence=qg_data.get("evidence", {}),
406 failure_reasons=qg_data.get("failure_reasons", []),
407 )
409 # Deserialize new fields
410 validation = cls._deserialize_validation_result(
411 issue_data.get("validation")
412 )
413 resolution = cls._deserialize_issue_resolution(issue_data.get("resolution"))
415 issue = IssueRun(
416 issue_id=issue_data["issue_id"],
417 agent_id=issue_data["agent_id"],
418 status=issue_data["status"],
419 duration_seconds=issue_data["duration_seconds"],
420 session_id=issue_data.get("session_id"),
421 log_path=issue_data.get("log_path"),
422 quality_gate=quality_gate,
423 error=issue_data.get("error"),
424 gate_attempts=issue_data.get("gate_attempts", 0),
425 review_attempts=issue_data.get("review_attempts", 0),
426 validation=validation,
427 resolution=resolution,
428 review_log_path=issue_data.get("review_log_path"),
429 )
430 metadata.issues[issue_id] = issue
432 # Load run-level validation
433 metadata.run_validation = cls._deserialize_validation_result(
434 data.get("run_validation")
435 )
437 # Restore debug_log_path (don't reconfigure logging on load)
438 debug_log_path = data.get("debug_log_path")
439 metadata.debug_log_path = Path(debug_log_path) if debug_log_path else None
441 return metadata
443 def cleanup(self) -> None:
444 """Clean up resources associated with this run.
446 This method is idempotent and safe to call multiple times.
447 It cleans up the debug logging handler to prevent file handle leaks.
449 Should be called in a finally block to ensure cleanup happens even
450 if the run crashes or is aborted before save() is called.
451 """
452 if self.debug_log_path is not None:
453 cleanup_debug_logging(self.run_id)
455 def save(self) -> Path:
456 """Save run metadata to JSON file.
458 Saves to a repo-specific subdirectory with timestamp-based filename
459 for easier sorting: {runs_dir}/{repo-safe-name}/{timestamp}_{short-uuid}.json
461 Also cleans up the debug logging handler to prevent file handle leaks.
463 Returns:
464 Path to the saved metadata file.
465 """
466 self.completed_at = datetime.now(UTC)
468 # Clean up debug logging handler before saving (idempotent)
469 self.cleanup()
471 # Use repo-specific subdirectory
472 runs_dir = get_repo_runs_dir(self.repo_path)
473 runs_dir.mkdir(parents=True, exist_ok=True)
475 # Use timestamp + short UUID for filename
476 timestamp = self.started_at.strftime("%Y-%m-%dT%H-%M-%S")
477 short_id = self.run_id[:8]
478 path = runs_dir / f"{timestamp}_{short_id}.json"
480 with open(path, "w") as f:
481 json.dump(self._to_dict(), f, indent=2)
482 f.flush()
483 os.fsync(f.fileno())
484 return path
487# --- Running Instance Tracking ---
490@dataclass
491class RunningInstance:
492 """Information about a currently running mala instance."""
494 run_id: str
495 repo_path: Path
496 started_at: datetime
497 pid: int
498 max_agents: int | None = None
499 issues_in_progress: int = 0
502def _get_marker_path(run_id: str) -> Path:
503 """Get the path to a run marker file.
505 Args:
506 run_id: The run ID.
508 Returns:
509 Path to the marker file.
510 """
511 return get_lock_dir() / f"run-{run_id}.marker"
514def write_run_marker(
515 run_id: str,
516 repo_path: Path,
517 max_agents: int | None = None,
518) -> Path:
519 """Write a run marker file to indicate a running instance.
521 Creates a marker file in the lock directory that records the run's
522 repo path, start time, and PID. Used by status command to detect
523 running instances.
525 Args:
526 run_id: The unique run ID.
527 repo_path: Path to the repository being processed.
528 max_agents: Maximum number of concurrent agents (optional).
530 Returns:
531 Path to the created marker file.
532 """
533 lock_dir = get_lock_dir()
534 lock_dir.mkdir(parents=True, exist_ok=True)
536 marker_path = _get_marker_path(run_id)
537 data = {
538 "run_id": run_id,
539 "repo_path": str(repo_path.resolve()),
540 "started_at": datetime.now(UTC).isoformat(),
541 "pid": os.getpid(),
542 "max_agents": max_agents,
543 }
545 with open(marker_path, "w") as f:
546 json.dump(data, f)
547 f.flush()
548 os.fsync(f.fileno())
550 return marker_path
553def remove_run_marker(run_id: str) -> bool:
554 """Remove a run marker file.
556 Called when a run completes (successfully or not).
558 Args:
559 run_id: The run ID whose marker should be removed.
561 Returns:
562 True if the marker was removed, False if it didn't exist.
563 """
564 marker_path = _get_marker_path(run_id)
565 if marker_path.exists():
566 marker_path.unlink()
567 return True
568 return False
571def get_running_instances() -> list[RunningInstance]:
572 """Get all currently running mala instances.
574 Reads all run marker files from the lock directory and returns
575 information about each running instance. Stale markers (where the
576 PID is no longer running) are automatically cleaned up.
578 Returns:
579 List of RunningInstance objects for all active runs.
580 """
581 lock_dir = get_lock_dir()
582 if not lock_dir.exists():
583 return []
585 instances: list[RunningInstance] = []
586 stale_markers: list[Path] = []
588 for marker_path in lock_dir.glob("run-*.marker"):
589 try:
590 with open(marker_path) as f:
591 data = json.load(f)
593 pid = data.get("pid")
594 # Check if the process is still running
595 if pid and not _is_process_running(pid):
596 stale_markers.append(marker_path)
597 continue
599 instance = RunningInstance(
600 run_id=data["run_id"],
601 repo_path=Path(data["repo_path"]),
602 started_at=datetime.fromisoformat(data["started_at"]),
603 pid=pid or 0,
604 max_agents=data.get("max_agents"),
605 )
606 instances.append(instance)
607 except (json.JSONDecodeError, KeyError, OSError):
608 # Corrupted or unreadable marker - treat as stale
609 stale_markers.append(marker_path)
611 # Clean up stale markers
612 for marker in stale_markers:
613 try:
614 marker.unlink()
615 except OSError:
616 pass
618 return instances
621def get_running_instances_for_dir(directory: Path) -> list[RunningInstance]:
622 """Get running mala instances for a specific directory.
624 Filters running instances to only those whose repo_path matches
625 the given directory (resolved to absolute path).
627 Args:
628 directory: The directory to filter by.
630 Returns:
631 List of RunningInstance objects running in the specified directory.
632 """
633 resolved_dir = directory.resolve()
634 return [
635 instance
636 for instance in get_running_instances()
637 if instance.repo_path.resolve() == resolved_dir
638 ]
641def _is_process_running(pid: int) -> bool:
642 """Check if a process with the given PID is still running.
644 Args:
645 pid: The process ID to check.
647 Returns:
648 True if the process is running, False otherwise.
649 """
650 try:
651 os.kill(pid, 0)
652 return True
653 except (OSError, ProcessLookupError):
654 return False