Coverage for llm_dataset_engine/core/error_handler.py: 45%
47 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
1"""
2Error handling system with configurable policies.
4Implements Strategy pattern for different error handling approaches.
5"""
7from dataclasses import dataclass
8from enum import Enum
9from typing import Any, Callable, Optional
11from llm_dataset_engine.core.specifications import ErrorPolicy
12from llm_dataset_engine.utils import get_logger
14logger = get_logger(__name__)
17class ErrorAction(str, Enum):
18 """Actions to take on errors."""
20 RETRY = "retry"
21 SKIP = "skip"
22 FAIL = "fail"
23 USE_DEFAULT = "use_default"
26@dataclass
27class ErrorDecision:
28 """Decision on how to handle an error."""
30 action: ErrorAction
31 default_value: Any = None
32 retry_count: int = 0
33 context: dict[str, Any] | None = None
36class ErrorHandler:
37 """
38 Handles errors according to configured policies.
40 Follows Strategy pattern for pluggable error handling logic.
41 """
43 def __init__(
44 self,
45 policy: ErrorPolicy = ErrorPolicy.SKIP,
46 max_retries: int = 3,
47 default_value_factory: Optional[Callable[[], Any]] = None,
48 ):
49 """
50 Initialize error handler.
52 Args:
53 policy: Error handling policy
54 max_retries: Maximum retry attempts
55 default_value_factory: Function to generate default values
56 """
57 self.policy = policy
58 self.max_retries = max_retries
59 self.default_value_factory = default_value_factory or (
60 lambda: None
61 )
63 def handle_error(
64 self,
65 error: Exception,
66 context: dict[str, Any],
67 attempt: int = 1,
68 ) -> ErrorDecision:
69 """
70 Decide how to handle an error.
72 Args:
73 error: The exception that occurred
74 context: Error context (row_index, stage, etc.)
75 attempt: Current attempt number
77 Returns:
78 ErrorDecision with action to take
79 """
80 row_index = context.get("row_index", "unknown")
81 stage = context.get("stage", "unknown")
83 # Log the error
84 logger.error(
85 f"Error in {stage} at row {row_index}: {error}",
86 exc_info=True,
87 )
89 # Apply policy
90 if self.policy == ErrorPolicy.RETRY:
91 if attempt < self.max_retries:
92 logger.info(
93 f"Retrying (attempt {attempt + 1}/{self.max_retries})"
94 )
95 return ErrorDecision(
96 action=ErrorAction.RETRY,
97 retry_count=attempt + 1,
98 context=context,
99 )
100 else:
101 logger.warning(
102 f"Max retries ({self.max_retries}) exceeded, skipping"
103 )
104 return ErrorDecision(
105 action=ErrorAction.SKIP,
106 context=context,
107 )
109 elif self.policy == ErrorPolicy.SKIP:
110 logger.info(f"Skipping row {row_index} due to error")
111 return ErrorDecision(
112 action=ErrorAction.SKIP,
113 context=context,
114 )
116 elif self.policy == ErrorPolicy.USE_DEFAULT:
117 default = self.default_value_factory()
118 logger.info(
119 f"Using default value for row {row_index}: {default}"
120 )
121 return ErrorDecision(
122 action=ErrorAction.USE_DEFAULT,
123 default_value=default,
124 context=context,
125 )
127 elif self.policy == ErrorPolicy.FAIL:
128 logger.error("Failing pipeline due to error")
129 return ErrorDecision(
130 action=ErrorAction.FAIL,
131 context=context,
132 )
134 else:
135 # Unknown policy, default to fail
136 return ErrorDecision(
137 action=ErrorAction.FAIL,
138 context=context,
139 )
141 def should_retry(self, error: Exception) -> bool:
142 """
143 Determine if error should be retried.
145 Args:
146 error: The exception
148 Returns:
149 True if retriable
150 """
151 retriable_keywords = [
152 "rate limit",
153 "timeout",
154 "network",
155 "connection",
156 "503",
157 "502",
158 "429",
159 ]
161 error_str = str(error).lower()
162 return any(keyword in error_str for keyword in retriable_keywords)