Coverage for llm_dataset_engine/api/health_check.py: 24%
41 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
1"""
2Health check API for pipeline monitoring.
4Provides status information for operational monitoring.
5"""
7from datetime import datetime
8from typing import Any, Dict
10from llm_dataset_engine.api.pipeline import Pipeline
11from llm_dataset_engine.utils import get_logger
13logger = get_logger(__name__)
16class HealthCheck:
17 """
18 Health check API for monitoring pipeline status.
20 Provides information about pipeline health and readiness.
21 """
23 def __init__(self, pipeline: Pipeline):
24 """
25 Initialize health check.
27 Args:
28 pipeline: Pipeline instance to monitor
29 """
30 self.pipeline = pipeline
31 self.last_check: datetime | None = None
32 self.last_status: Dict[str, Any] = {}
34 def check(self) -> Dict[str, Any]:
35 """
36 Perform health check.
38 Returns:
39 Health status dictionary
40 """
41 self.last_check = datetime.now()
43 status = {
44 "status": "healthy",
45 "timestamp": self.last_check.isoformat(),
46 "pipeline_id": str(self.pipeline.id),
47 "checks": {},
48 }
50 # Check LLM provider configuration
51 try:
52 llm_spec = self.pipeline.specifications.llm
53 status["checks"]["llm_provider"] = {
54 "status": "ok",
55 "provider": llm_spec.provider.value,
56 "model": llm_spec.model,
57 }
58 except Exception as e:
59 status["checks"]["llm_provider"] = {
60 "status": "error",
61 "error": str(e),
62 }
63 status["status"] = "unhealthy"
65 # Check data source configuration
66 try:
67 dataset_spec = self.pipeline.specifications.dataset
68 source_exists = True
70 if dataset_spec.source_path:
71 source_exists = dataset_spec.source_path.exists()
73 status["checks"]["data_source"] = {
74 "status": "ok" if source_exists else "warning",
75 "source_type": dataset_spec.source_type.value,
76 "exists": source_exists,
77 }
78 except Exception as e:
79 status["checks"]["data_source"] = {
80 "status": "error",
81 "error": str(e),
82 }
83 status["status"] = "unhealthy"
85 # Check checkpoint storage
86 try:
87 checkpoint_dir = (
88 self.pipeline.specifications.processing.checkpoint_dir
89 )
90 status["checks"]["checkpoint_storage"] = {
91 "status": "ok",
92 "directory": str(checkpoint_dir),
93 "exists": checkpoint_dir.exists(),
94 }
95 except Exception as e:
96 status["checks"]["checkpoint_storage"] = {
97 "status": "warning",
98 "error": str(e),
99 }
101 # Store last status
102 self.last_status = status
104 return status
106 def is_healthy(self) -> bool:
107 """
108 Check if pipeline is healthy.
110 Returns:
111 True if healthy
112 """
113 status = self.check()
114 return status["status"] == "healthy"
116 def get_readiness(self) -> Dict[str, Any]:
117 """
118 Get readiness status.
120 Returns:
121 Readiness information
122 """
123 validation = self.pipeline.validate()
125 return {
126 "ready": validation.is_valid,
127 "errors": validation.errors,
128 "warnings": validation.warnings,
129 "timestamp": datetime.now().isoformat(),
130 }