Coverage for src / infra / io / log_output / run_metadata.py: 32%

213 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""Run metadata tracking for mala orchestrator runs. 

2 

3Captures orchestrator configuration, issue results, and pointers to Claude logs. 

4Replaces the duplicate JSONL logging with structured run metadata. 

5""" 

6 

7import json 

8import logging 

9import os 

10import uuid 

11from dataclasses import dataclass, field, asdict 

12from datetime import datetime, UTC 

13from pathlib import Path 

14from typing import Any, Literal 

15 

16from src.core.models import ( 

17 IssueResolution, 

18 ResolutionOutcome, 

19 ValidationArtifacts, 

20) 

21from src.infra.tools.env import get_lock_dir, get_repo_runs_dir 

22 

23 

24def configure_debug_logging(repo_path: Path, run_id: str) -> Path | None: 

25 """Configure Python logging to write debug logs to a file. 

26 

27 Creates a debug log file alongside run metadata at: 

28 ~/.mala/runs/{repo}/{timestamp}_{run_id}.debug.log 

29 

30 All loggers in the 'src' namespace will write DEBUG+ messages to this file. 

31 

32 This function is best-effort: if the log directory cannot be created or 

33 the log file cannot be opened (e.g., read-only filesystem, permission 

34 denied), it returns None and the run continues without debug logging. 

35 

36 Set MALA_DISABLE_DEBUG_LOG=1 to disable debug logging entirely. 

37 

38 Args: 

39 repo_path: Repository path for log directory. 

40 run_id: Run ID (UUID) for filename. 

41 

42 Returns: 

43 Path to the debug log file, or None if logging could not be configured 

44 or is disabled via environment variable. 

45 """ 

46 # Allow opt-out via environment variable 

47 if os.environ.get("MALA_DISABLE_DEBUG_LOG") == "1": 

48 return None 

49 

50 try: 

51 runs_dir = get_repo_runs_dir(repo_path) 

52 runs_dir.mkdir(parents=True, exist_ok=True) 

53 

54 timestamp = datetime.now(UTC).strftime("%Y-%m-%dT%H-%M-%S") 

55 short_id = run_id[:8] 

56 log_path = runs_dir / f"{timestamp}_{short_id}.debug.log" 

57 

58 # Create file handler for debug logs 

59 handler = logging.FileHandler(log_path) 

60 handler.setLevel(logging.DEBUG) 

61 handler.setFormatter( 

62 logging.Formatter( 

63 "%(asctime)s %(levelname)s %(name)s: %(message)s", 

64 datefmt="%H:%M:%S", 

65 ) 

66 ) 

67 # Tag the handler so we can identify it later 

68 handler.set_name(f"mala_debug_{run_id}") 

69 

70 # Add handler to root logger for 'src' namespace 

71 src_logger = logging.getLogger("src") 

72 src_logger.setLevel(logging.DEBUG) 

73 

74 # Remove any previous mala debug handlers to avoid duplicates/leaks 

75 for existing in src_logger.handlers[:]: 

76 if getattr(existing, "name", "").startswith("mala_debug_"): 

77 existing.close() 

78 src_logger.removeHandler(existing) 

79 

80 src_logger.addHandler(handler) 

81 

82 return log_path 

83 except OSError: 

84 # Best-effort: if we can't create the log file, continue without it 

85 # This handles read-only filesystems, permission denied, disk full, etc. 

86 return None 

87 

88 

89def cleanup_debug_logging(run_id: str) -> bool: 

90 """Clean up debug logging handler for a completed run. 

91 

92 Removes and closes the FileHandler associated with the given run_id 

93 to prevent file handle leaks. 

94 

95 Args: 

96 run_id: Run ID (UUID) whose handler should be cleaned up. 

97 

98 Returns: 

99 True if a handler was found and cleaned up, False otherwise. 

100 """ 

101 src_logger = logging.getLogger("src") 

102 handler_name = f"mala_debug_{run_id}" 

103 

104 for handler in src_logger.handlers[:]: 

105 if getattr(handler, "name", "") == handler_name: 

106 handler.close() 

107 src_logger.removeHandler(handler) 

108 return True 

109 

110 return False 

111 

112 

113@dataclass 

114class QualityGateResult: 

115 """Quality gate check result for an issue.""" 

116 

117 passed: bool 

118 evidence: dict[str, bool] = field(default_factory=dict) 

119 failure_reasons: list[str] = field(default_factory=list) 

120 

121 

122@dataclass 

123class ValidationResult: 

124 """Result of validation execution for observability. 

125 

126 Attributes: 

127 passed: Whether all validations passed. 

128 commands_run: List of command names that were executed. 

129 commands_failed: List of command names that failed. 

130 artifacts: Validation artifacts (logs, worktree, coverage report). 

131 coverage_percent: Coverage percentage if measured (None if not run). 

132 e2e_passed: Whether E2E tests passed (None if not run). 

133 """ 

134 

135 passed: bool 

136 commands_run: list[str] = field(default_factory=list) 

137 commands_failed: list[str] = field(default_factory=list) 

138 artifacts: ValidationArtifacts | None = None 

139 coverage_percent: float | None = None 

140 e2e_passed: bool | None = None 

141 

142 

143@dataclass 

144class IssueRun: 

145 """Result of running an agent on a single issue.""" 

146 

147 issue_id: str 

148 agent_id: str 

149 status: Literal["success", "failed", "timeout"] 

150 duration_seconds: float 

151 session_id: str | None = None # Claude SDK session ID 

152 log_path: str | None = None # Path to Claude's log file 

153 quality_gate: QualityGateResult | None = None 

154 error: str | None = None 

155 # Retry tracking (recorded even if defaulted) 

156 gate_attempts: int = 0 

157 review_attempts: int = 0 

158 # Validation results and resolution (from mala-e0i) 

159 validation: ValidationResult | None = None 

160 resolution: IssueResolution | None = None 

161 # Cerberus review session log path (verbose mode only) 

162 review_log_path: str | None = None 

163 

164 

165@dataclass 

166class RunConfig: 

167 """Orchestrator run configuration.""" 

168 

169 max_agents: int | None 

170 timeout_minutes: int | None 

171 max_issues: int | None 

172 epic_id: str | None 

173 only_ids: list[str] | None 

174 braintrust_enabled: bool 

175 # Retry/review config (optional for backward compatibility) 

176 max_gate_retries: int | None = None 

177 max_review_retries: int | None = None 

178 review_enabled: bool | None = None 

179 # CLI args for debugging/auditing (optional for backward compatibility) 

180 cli_args: dict[str, object] | None = None 

181 # Orphans-only filter (optional for backward compatibility) 

182 orphans_only: bool = False 

183 

184 

185class RunMetadata: 

186 """Tracks metadata for a single orchestrator run. 

187 

188 Creates a JSON file at ~/.config/mala/runs/{run_id}.json containing: 

189 - Run configuration 

190 - Per-issue results with Claude log path pointers 

191 - Quality gate outcomes 

192 - Validation results and artifacts 

193 - Timing and error information 

194 """ 

195 

196 def __init__( 

197 self, 

198 repo_path: Path, 

199 config: RunConfig, 

200 version: str, 

201 ): 

202 self.run_id = str(uuid.uuid4()) 

203 self.started_at = datetime.now(UTC) 

204 self.completed_at: datetime | None = None 

205 self.repo_path = repo_path 

206 self.config = config 

207 self.version = version 

208 self.issues: dict[str, IssueRun] = {} 

209 # Run-level validation results (from mala-e0i) 

210 self.run_validation: ValidationResult | None = None 

211 # Configure debug logging for this run (always enabled) 

212 self.debug_log_path: Path | None = configure_debug_logging( 

213 repo_path, self.run_id 

214 ) 

215 

216 def record_issue(self, issue: IssueRun) -> None: 

217 """Record the result of an issue run.""" 

218 self.issues[issue.issue_id] = issue 

219 

220 def record_run_validation(self, result: ValidationResult) -> None: 

221 """Record run-level validation results. 

222 

223 Args: 

224 result: The validation result for the entire run. 

225 """ 

226 self.run_validation = result 

227 

228 def _serialize_validation_artifacts( 

229 self, artifacts: ValidationArtifacts | None 

230 ) -> dict[str, Any] | None: 

231 """Serialize ValidationArtifacts to a JSON-compatible dict.""" 

232 if artifacts is None: 

233 return None 

234 return { 

235 "log_dir": str(artifacts.log_dir), 

236 "worktree_path": str(artifacts.worktree_path) 

237 if artifacts.worktree_path 

238 else None, 

239 "worktree_state": artifacts.worktree_state, 

240 "coverage_report": str(artifacts.coverage_report) 

241 if artifacts.coverage_report 

242 else None, 

243 "e2e_fixture_path": str(artifacts.e2e_fixture_path) 

244 if artifacts.e2e_fixture_path 

245 else None, 

246 } 

247 

248 def _serialize_validation_result( 

249 self, result: ValidationResult | None 

250 ) -> dict[str, Any] | None: 

251 """Serialize ValidationResult to a JSON-compatible dict.""" 

252 if result is None: 

253 return None 

254 return { 

255 "passed": result.passed, 

256 "commands_run": result.commands_run, 

257 "commands_failed": result.commands_failed, 

258 "artifacts": self._serialize_validation_artifacts(result.artifacts), 

259 "coverage_percent": result.coverage_percent, 

260 "e2e_passed": result.e2e_passed, 

261 } 

262 

263 def _serialize_issue_resolution( 

264 self, resolution: IssueResolution | None 

265 ) -> dict[str, Any] | None: 

266 """Serialize IssueResolution to a JSON-compatible dict.""" 

267 if resolution is None: 

268 return None 

269 return { 

270 "outcome": resolution.outcome.value, 

271 "rationale": resolution.rationale, 

272 } 

273 

274 def _to_dict(self) -> dict[str, Any]: 

275 """Convert to dictionary for JSON serialization.""" 

276 return { 

277 "run_id": self.run_id, 

278 "started_at": self.started_at.isoformat(), 

279 "completed_at": self.completed_at.isoformat() 

280 if self.completed_at 

281 else None, 

282 "version": self.version, 

283 "repo_path": str(self.repo_path), 

284 "config": asdict(self.config), 

285 "issues": { 

286 issue_id: { 

287 **asdict(issue), 

288 "quality_gate": asdict(issue.quality_gate) 

289 if issue.quality_gate 

290 else None, 

291 "validation": self._serialize_validation_result(issue.validation), 

292 "resolution": self._serialize_issue_resolution(issue.resolution), 

293 } 

294 for issue_id, issue in self.issues.items() 

295 }, 

296 "run_validation": self._serialize_validation_result(self.run_validation), 

297 "debug_log_path": str(self.debug_log_path) if self.debug_log_path else None, 

298 } 

299 

300 @staticmethod 

301 def _deserialize_validation_artifacts( 

302 data: dict[str, Any] | None, 

303 ) -> ValidationArtifacts | None: 

304 """Deserialize ValidationArtifacts from a dict.""" 

305 if data is None: 

306 return None 

307 return ValidationArtifacts( 

308 log_dir=Path(data["log_dir"]), 

309 worktree_path=Path(data["worktree_path"]) 

310 if data.get("worktree_path") 

311 else None, 

312 worktree_state=data.get("worktree_state"), 

313 coverage_report=Path(data["coverage_report"]) 

314 if data.get("coverage_report") 

315 else None, 

316 e2e_fixture_path=Path(data["e2e_fixture_path"]) 

317 if data.get("e2e_fixture_path") 

318 else None, 

319 ) 

320 

321 @staticmethod 

322 def _deserialize_validation_result( 

323 data: dict[str, Any] | None, 

324 ) -> ValidationResult | None: 

325 """Deserialize ValidationResult from a dict.""" 

326 if data is None: 

327 return None 

328 return ValidationResult( 

329 passed=data["passed"], 

330 commands_run=data.get("commands_run", []), 

331 commands_failed=data.get("commands_failed", []), 

332 artifacts=RunMetadata._deserialize_validation_artifacts( 

333 data.get("artifacts") 

334 ), 

335 coverage_percent=data.get("coverage_percent"), 

336 e2e_passed=data.get("e2e_passed"), 

337 ) 

338 

339 @staticmethod 

340 def _deserialize_issue_resolution( 

341 data: dict[str, Any] | None, 

342 ) -> IssueResolution | None: 

343 """Deserialize IssueResolution from a dict.""" 

344 if data is None: 

345 return None 

346 return IssueResolution( 

347 outcome=ResolutionOutcome(data["outcome"]), 

348 rationale=data["rationale"], 

349 ) 

350 

351 @classmethod 

352 def load(cls, path: Path) -> "RunMetadata": 

353 """Load run metadata from a JSON file. 

354 

355 Args: 

356 path: Path to the JSON file. 

357 

358 Returns: 

359 Loaded RunMetadata instance. 

360 

361 Raises: 

362 FileNotFoundError: If the file doesn't exist. 

363 json.JSONDecodeError: If the file is invalid JSON. 

364 """ 

365 with open(path) as f: 

366 data = json.load(f) 

367 

368 # Reconstruct config 

369 config_data = data["config"] 

370 config = RunConfig( 

371 max_agents=config_data.get("max_agents"), 

372 timeout_minutes=config_data.get("timeout_minutes"), 

373 max_issues=config_data.get("max_issues"), 

374 epic_id=config_data.get("epic_id"), 

375 only_ids=config_data.get("only_ids"), 

376 braintrust_enabled=config_data.get("braintrust_enabled", False), 

377 max_gate_retries=config_data.get("max_gate_retries"), 

378 max_review_retries=config_data.get("max_review_retries"), 

379 review_enabled=config_data.get("review_enabled"), 

380 cli_args=config_data.get("cli_args"), 

381 orphans_only=config_data.get("orphans_only", False), 

382 ) 

383 

384 # Create instance 

385 metadata = cls.__new__(cls) 

386 metadata.run_id = data["run_id"] 

387 metadata.started_at = datetime.fromisoformat(data["started_at"]) 

388 metadata.completed_at = ( 

389 datetime.fromisoformat(data["completed_at"]) 

390 if data.get("completed_at") 

391 else None 

392 ) 

393 metadata.repo_path = Path(data["repo_path"]) 

394 metadata.config = config 

395 metadata.version = data["version"] 

396 

397 # Reconstruct issues 

398 metadata.issues = {} 

399 for issue_id, issue_data in data.get("issues", {}).items(): 

400 quality_gate = None 

401 if issue_data.get("quality_gate"): 

402 qg_data = issue_data["quality_gate"] 

403 quality_gate = QualityGateResult( 

404 passed=qg_data["passed"], 

405 evidence=qg_data.get("evidence", {}), 

406 failure_reasons=qg_data.get("failure_reasons", []), 

407 ) 

408 

409 # Deserialize new fields 

410 validation = cls._deserialize_validation_result( 

411 issue_data.get("validation") 

412 ) 

413 resolution = cls._deserialize_issue_resolution(issue_data.get("resolution")) 

414 

415 issue = IssueRun( 

416 issue_id=issue_data["issue_id"], 

417 agent_id=issue_data["agent_id"], 

418 status=issue_data["status"], 

419 duration_seconds=issue_data["duration_seconds"], 

420 session_id=issue_data.get("session_id"), 

421 log_path=issue_data.get("log_path"), 

422 quality_gate=quality_gate, 

423 error=issue_data.get("error"), 

424 gate_attempts=issue_data.get("gate_attempts", 0), 

425 review_attempts=issue_data.get("review_attempts", 0), 

426 validation=validation, 

427 resolution=resolution, 

428 review_log_path=issue_data.get("review_log_path"), 

429 ) 

430 metadata.issues[issue_id] = issue 

431 

432 # Load run-level validation 

433 metadata.run_validation = cls._deserialize_validation_result( 

434 data.get("run_validation") 

435 ) 

436 

437 # Restore debug_log_path (don't reconfigure logging on load) 

438 debug_log_path = data.get("debug_log_path") 

439 metadata.debug_log_path = Path(debug_log_path) if debug_log_path else None 

440 

441 return metadata 

442 

443 def cleanup(self) -> None: 

444 """Clean up resources associated with this run. 

445 

446 This method is idempotent and safe to call multiple times. 

447 It cleans up the debug logging handler to prevent file handle leaks. 

448 

449 Should be called in a finally block to ensure cleanup happens even 

450 if the run crashes or is aborted before save() is called. 

451 """ 

452 if self.debug_log_path is not None: 

453 cleanup_debug_logging(self.run_id) 

454 

455 def save(self) -> Path: 

456 """Save run metadata to JSON file. 

457 

458 Saves to a repo-specific subdirectory with timestamp-based filename 

459 for easier sorting: {runs_dir}/{repo-safe-name}/{timestamp}_{short-uuid}.json 

460 

461 Also cleans up the debug logging handler to prevent file handle leaks. 

462 

463 Returns: 

464 Path to the saved metadata file. 

465 """ 

466 self.completed_at = datetime.now(UTC) 

467 

468 # Clean up debug logging handler before saving (idempotent) 

469 self.cleanup() 

470 

471 # Use repo-specific subdirectory 

472 runs_dir = get_repo_runs_dir(self.repo_path) 

473 runs_dir.mkdir(parents=True, exist_ok=True) 

474 

475 # Use timestamp + short UUID for filename 

476 timestamp = self.started_at.strftime("%Y-%m-%dT%H-%M-%S") 

477 short_id = self.run_id[:8] 

478 path = runs_dir / f"{timestamp}_{short_id}.json" 

479 

480 with open(path, "w") as f: 

481 json.dump(self._to_dict(), f, indent=2) 

482 f.flush() 

483 os.fsync(f.fileno()) 

484 return path 

485 

486 

487# --- Running Instance Tracking --- 

488 

489 

490@dataclass 

491class RunningInstance: 

492 """Information about a currently running mala instance.""" 

493 

494 run_id: str 

495 repo_path: Path 

496 started_at: datetime 

497 pid: int 

498 max_agents: int | None = None 

499 issues_in_progress: int = 0 

500 

501 

502def _get_marker_path(run_id: str) -> Path: 

503 """Get the path to a run marker file. 

504 

505 Args: 

506 run_id: The run ID. 

507 

508 Returns: 

509 Path to the marker file. 

510 """ 

511 return get_lock_dir() / f"run-{run_id}.marker" 

512 

513 

514def write_run_marker( 

515 run_id: str, 

516 repo_path: Path, 

517 max_agents: int | None = None, 

518) -> Path: 

519 """Write a run marker file to indicate a running instance. 

520 

521 Creates a marker file in the lock directory that records the run's 

522 repo path, start time, and PID. Used by status command to detect 

523 running instances. 

524 

525 Args: 

526 run_id: The unique run ID. 

527 repo_path: Path to the repository being processed. 

528 max_agents: Maximum number of concurrent agents (optional). 

529 

530 Returns: 

531 Path to the created marker file. 

532 """ 

533 lock_dir = get_lock_dir() 

534 lock_dir.mkdir(parents=True, exist_ok=True) 

535 

536 marker_path = _get_marker_path(run_id) 

537 data = { 

538 "run_id": run_id, 

539 "repo_path": str(repo_path.resolve()), 

540 "started_at": datetime.now(UTC).isoformat(), 

541 "pid": os.getpid(), 

542 "max_agents": max_agents, 

543 } 

544 

545 with open(marker_path, "w") as f: 

546 json.dump(data, f) 

547 f.flush() 

548 os.fsync(f.fileno()) 

549 

550 return marker_path 

551 

552 

553def remove_run_marker(run_id: str) -> bool: 

554 """Remove a run marker file. 

555 

556 Called when a run completes (successfully or not). 

557 

558 Args: 

559 run_id: The run ID whose marker should be removed. 

560 

561 Returns: 

562 True if the marker was removed, False if it didn't exist. 

563 """ 

564 marker_path = _get_marker_path(run_id) 

565 if marker_path.exists(): 

566 marker_path.unlink() 

567 return True 

568 return False 

569 

570 

571def get_running_instances() -> list[RunningInstance]: 

572 """Get all currently running mala instances. 

573 

574 Reads all run marker files from the lock directory and returns 

575 information about each running instance. Stale markers (where the 

576 PID is no longer running) are automatically cleaned up. 

577 

578 Returns: 

579 List of RunningInstance objects for all active runs. 

580 """ 

581 lock_dir = get_lock_dir() 

582 if not lock_dir.exists(): 

583 return [] 

584 

585 instances: list[RunningInstance] = [] 

586 stale_markers: list[Path] = [] 

587 

588 for marker_path in lock_dir.glob("run-*.marker"): 

589 try: 

590 with open(marker_path) as f: 

591 data = json.load(f) 

592 

593 pid = data.get("pid") 

594 # Check if the process is still running 

595 if pid and not _is_process_running(pid): 

596 stale_markers.append(marker_path) 

597 continue 

598 

599 instance = RunningInstance( 

600 run_id=data["run_id"], 

601 repo_path=Path(data["repo_path"]), 

602 started_at=datetime.fromisoformat(data["started_at"]), 

603 pid=pid or 0, 

604 max_agents=data.get("max_agents"), 

605 ) 

606 instances.append(instance) 

607 except (json.JSONDecodeError, KeyError, OSError): 

608 # Corrupted or unreadable marker - treat as stale 

609 stale_markers.append(marker_path) 

610 

611 # Clean up stale markers 

612 for marker in stale_markers: 

613 try: 

614 marker.unlink() 

615 except OSError: 

616 pass 

617 

618 return instances 

619 

620 

621def get_running_instances_for_dir(directory: Path) -> list[RunningInstance]: 

622 """Get running mala instances for a specific directory. 

623 

624 Filters running instances to only those whose repo_path matches 

625 the given directory (resolved to absolute path). 

626 

627 Args: 

628 directory: The directory to filter by. 

629 

630 Returns: 

631 List of RunningInstance objects running in the specified directory. 

632 """ 

633 resolved_dir = directory.resolve() 

634 return [ 

635 instance 

636 for instance in get_running_instances() 

637 if instance.repo_path.resolve() == resolved_dir 

638 ] 

639 

640 

641def _is_process_running(pid: int) -> bool: 

642 """Check if a process with the given PID is still running. 

643 

644 Args: 

645 pid: The process ID to check. 

646 

647 Returns: 

648 True if the process is running, False otherwise. 

649 """ 

650 try: 

651 os.kill(pid, 0) 

652 return True 

653 except (OSError, ProcessLookupError): 

654 return False