Coverage for llm_dataset_engine/api/health_check.py: 24%

41 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-15 18:04 +0200

1""" 

2Health check API for pipeline monitoring. 

3 

4Provides status information for operational monitoring. 

5""" 

6 

7from datetime import datetime 

8from typing import Any, Dict 

9 

10from llm_dataset_engine.api.pipeline import Pipeline 

11from llm_dataset_engine.utils import get_logger 

12 

13logger = get_logger(__name__) 

14 

15 

16class HealthCheck: 

17 """ 

18 Health check API for monitoring pipeline status. 

19  

20 Provides information about pipeline health and readiness. 

21 """ 

22 

23 def __init__(self, pipeline: Pipeline): 

24 """ 

25 Initialize health check. 

26 

27 Args: 

28 pipeline: Pipeline instance to monitor 

29 """ 

30 self.pipeline = pipeline 

31 self.last_check: datetime | None = None 

32 self.last_status: Dict[str, Any] = {} 

33 

34 def check(self) -> Dict[str, Any]: 

35 """ 

36 Perform health check. 

37 

38 Returns: 

39 Health status dictionary 

40 """ 

41 self.last_check = datetime.now() 

42 

43 status = { 

44 "status": "healthy", 

45 "timestamp": self.last_check.isoformat(), 

46 "pipeline_id": str(self.pipeline.id), 

47 "checks": {}, 

48 } 

49 

50 # Check LLM provider configuration 

51 try: 

52 llm_spec = self.pipeline.specifications.llm 

53 status["checks"]["llm_provider"] = { 

54 "status": "ok", 

55 "provider": llm_spec.provider.value, 

56 "model": llm_spec.model, 

57 } 

58 except Exception as e: 

59 status["checks"]["llm_provider"] = { 

60 "status": "error", 

61 "error": str(e), 

62 } 

63 status["status"] = "unhealthy" 

64 

65 # Check data source configuration 

66 try: 

67 dataset_spec = self.pipeline.specifications.dataset 

68 source_exists = True 

69 

70 if dataset_spec.source_path: 

71 source_exists = dataset_spec.source_path.exists() 

72 

73 status["checks"]["data_source"] = { 

74 "status": "ok" if source_exists else "warning", 

75 "source_type": dataset_spec.source_type.value, 

76 "exists": source_exists, 

77 } 

78 except Exception as e: 

79 status["checks"]["data_source"] = { 

80 "status": "error", 

81 "error": str(e), 

82 } 

83 status["status"] = "unhealthy" 

84 

85 # Check checkpoint storage 

86 try: 

87 checkpoint_dir = ( 

88 self.pipeline.specifications.processing.checkpoint_dir 

89 ) 

90 status["checks"]["checkpoint_storage"] = { 

91 "status": "ok", 

92 "directory": str(checkpoint_dir), 

93 "exists": checkpoint_dir.exists(), 

94 } 

95 except Exception as e: 

96 status["checks"]["checkpoint_storage"] = { 

97 "status": "warning", 

98 "error": str(e), 

99 } 

100 

101 # Store last status 

102 self.last_status = status 

103 

104 return status 

105 

106 def is_healthy(self) -> bool: 

107 """ 

108 Check if pipeline is healthy. 

109 

110 Returns: 

111 True if healthy 

112 """ 

113 status = self.check() 

114 return status["status"] == "healthy" 

115 

116 def get_readiness(self) -> Dict[str, Any]: 

117 """ 

118 Get readiness status. 

119 

120 Returns: 

121 Readiness information 

122 """ 

123 validation = self.pipeline.validate() 

124 

125 return { 

126 "ready": validation.is_valid, 

127 "errors": validation.errors, 

128 "warnings": validation.warnings, 

129 "timestamp": datetime.now().isoformat(), 

130 } 

131