Coverage for llm_dataset_engine/api/pipeline_builder.py: 82%
77 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
1"""
2Pipeline Builder - Fluent API for constructing pipelines.
4Implements Builder pattern for intuitive pipeline creation.
5"""
7from decimal import Decimal
8from pathlib import Path
9from typing import List, Optional
11import pandas as pd
13from llm_dataset_engine.api.pipeline import Pipeline
14from llm_dataset_engine.core.specifications import (
15 DatasetSpec,
16 DataSourceType,
17 LLMProvider,
18 LLMSpec,
19 MergeStrategy,
20 OutputSpec,
21 PipelineSpecifications,
22 ProcessingSpec,
23 PromptSpec,
24)
25from llm_dataset_engine.orchestration import (
26 AsyncExecutor,
27 ExecutionStrategy,
28 StreamingExecutor,
29 SyncExecutor,
30)
33class PipelineBuilder:
34 """
35 Fluent builder for constructing pipelines.
37 Provides an intuitive, chainable API for common use cases.
39 Example:
40 pipeline = (
41 PipelineBuilder.create()
42 .from_csv("data.csv", input_columns=["text"], output_columns=["result"])
43 .with_prompt("Process: {text}")
44 .with_llm(provider="openai", model="gpt-4o-mini")
45 .build()
46 )
47 """
49 def __init__(self):
50 """Initialize builder with None values."""
51 self._dataset_spec: Optional[DatasetSpec] = None
52 self._prompt_spec: Optional[PromptSpec] = None
53 self._llm_spec: Optional[LLMSpec] = None
54 self._processing_spec: ProcessingSpec = ProcessingSpec()
55 self._output_spec: Optional[OutputSpec] = None
56 self._dataframe: Optional[pd.DataFrame] = None
57 self._executor: Optional[ExecutionStrategy] = None
59 @staticmethod
60 def create() -> "PipelineBuilder":
61 """
62 Start builder chain.
64 Returns:
65 New PipelineBuilder instance
66 """
67 return PipelineBuilder()
69 def from_csv(
70 self,
71 path: str,
72 input_columns: List[str],
73 output_columns: List[str],
74 delimiter: str = ",",
75 encoding: str = "utf-8",
76 ) -> "PipelineBuilder":
77 """
78 Configure CSV data source.
80 Args:
81 path: Path to CSV file
82 input_columns: Input column names
83 output_columns: Output column names
84 delimiter: CSV delimiter
85 encoding: File encoding
87 Returns:
88 Self for chaining
89 """
90 self._dataset_spec = DatasetSpec(
91 source_type=DataSourceType.CSV,
92 source_path=Path(path),
93 input_columns=input_columns,
94 output_columns=output_columns,
95 delimiter=delimiter,
96 encoding=encoding,
97 )
98 return self
100 def from_excel(
101 self,
102 path: str,
103 input_columns: List[str],
104 output_columns: List[str],
105 sheet_name: str | int = 0,
106 ) -> "PipelineBuilder":
107 """
108 Configure Excel data source.
110 Args:
111 path: Path to Excel file
112 input_columns: Input column names
113 output_columns: Output column names
114 sheet_name: Sheet name or index
116 Returns:
117 Self for chaining
118 """
119 self._dataset_spec = DatasetSpec(
120 source_type=DataSourceType.EXCEL,
121 source_path=Path(path),
122 input_columns=input_columns,
123 output_columns=output_columns,
124 sheet_name=sheet_name,
125 )
126 return self
128 def from_parquet(
129 self,
130 path: str,
131 input_columns: List[str],
132 output_columns: List[str],
133 ) -> "PipelineBuilder":
134 """
135 Configure Parquet data source.
137 Args:
138 path: Path to Parquet file
139 input_columns: Input column names
140 output_columns: Output column names
142 Returns:
143 Self for chaining
144 """
145 self._dataset_spec = DatasetSpec(
146 source_type=DataSourceType.PARQUET,
147 source_path=Path(path),
148 input_columns=input_columns,
149 output_columns=output_columns,
150 )
151 return self
153 def from_dataframe(
154 self,
155 df: pd.DataFrame,
156 input_columns: List[str],
157 output_columns: List[str],
158 ) -> "PipelineBuilder":
159 """
160 Configure DataFrame source.
162 Args:
163 df: Pandas DataFrame
164 input_columns: Input column names
165 output_columns: Output column names
167 Returns:
168 Self for chaining
169 """
170 self._dataset_spec = DatasetSpec(
171 source_type=DataSourceType.DATAFRAME,
172 input_columns=input_columns,
173 output_columns=output_columns,
174 )
175 self._dataframe = df
176 return self
178 def with_prompt(
179 self,
180 template: str,
181 system_message: Optional[str] = None,
182 ) -> "PipelineBuilder":
183 """
184 Configure prompt template.
186 Args:
187 template: Prompt template with {variable} placeholders
188 system_message: Optional system message
190 Returns:
191 Self for chaining
192 """
193 self._prompt_spec = PromptSpec(
194 template=template,
195 system_message=system_message,
196 )
197 return self
199 def with_llm(
200 self,
201 provider: str,
202 model: str,
203 api_key: Optional[str] = None,
204 temperature: float = 0.0,
205 max_tokens: Optional[int] = None,
206 **kwargs: any,
207 ) -> "PipelineBuilder":
208 """
209 Configure LLM provider.
211 Args:
212 provider: Provider name (openai, azure_openai, anthropic)
213 model: Model identifier
214 api_key: API key (or from env)
215 temperature: Sampling temperature
216 max_tokens: Max output tokens
217 **kwargs: Provider-specific parameters
219 Returns:
220 Self for chaining
221 """
222 provider_enum = LLMProvider(provider.lower())
224 self._llm_spec = LLMSpec(
225 provider=provider_enum,
226 model=model,
227 api_key=api_key,
228 temperature=temperature,
229 max_tokens=max_tokens,
230 **kwargs,
231 )
232 return self
234 def with_batch_size(self, size: int) -> "PipelineBuilder":
235 """
236 Configure batch size.
238 Args:
239 size: Rows per batch
241 Returns:
242 Self for chaining
243 """
244 self._processing_spec.batch_size = size
245 return self
247 def with_concurrency(self, threads: int) -> "PipelineBuilder":
248 """
249 Configure concurrent requests.
251 Args:
252 threads: Number of concurrent threads
254 Returns:
255 Self for chaining
256 """
257 self._processing_spec.concurrency = threads
258 return self
260 def with_checkpoint_interval(self, rows: int) -> "PipelineBuilder":
261 """
262 Configure checkpoint frequency.
264 Args:
265 rows: Rows between checkpoints
267 Returns:
268 Self for chaining
269 """
270 self._processing_spec.checkpoint_interval = rows
271 return self
273 def with_rate_limit(self, rpm: int) -> "PipelineBuilder":
274 """
275 Configure rate limiting.
277 Args:
278 rpm: Requests per minute
280 Returns:
281 Self for chaining
282 """
283 self._processing_spec.rate_limit_rpm = rpm
284 return self
286 def with_max_budget(self, budget: float) -> "PipelineBuilder":
287 """
288 Configure maximum budget.
290 Args:
291 budget: Maximum budget in USD
293 Returns:
294 Self for chaining
295 """
296 self._processing_spec.max_budget = Decimal(str(budget))
297 return self
299 def with_output(
300 self,
301 path: str,
302 format: str = "csv",
303 merge_strategy: str = "replace",
304 ) -> "PipelineBuilder":
305 """
306 Configure output destination.
308 Args:
309 path: Output file path
310 format: Output format (csv, excel, parquet)
311 merge_strategy: Merge strategy (replace, append, update)
313 Returns:
314 Self for chaining
315 """
316 format_map = {
317 "csv": DataSourceType.CSV,
318 "excel": DataSourceType.EXCEL,
319 "parquet": DataSourceType.PARQUET,
320 }
322 merge_map = {
323 "replace": MergeStrategy.REPLACE,
324 "append": MergeStrategy.APPEND,
325 "update": MergeStrategy.UPDATE,
326 }
328 self._output_spec = OutputSpec(
329 destination_type=format_map[format.lower()],
330 destination_path=Path(path),
331 merge_strategy=merge_map[merge_strategy.lower()],
332 )
333 return self
335 def with_executor(self, executor: ExecutionStrategy) -> "PipelineBuilder":
336 """
337 Set custom execution strategy.
339 Args:
340 executor: ExecutionStrategy instance
342 Returns:
343 Self for chaining
344 """
345 self._executor = executor
346 return self
348 def with_async_execution(
349 self, max_concurrency: int = 10
350 ) -> "PipelineBuilder":
351 """
352 Use async execution strategy.
354 Enables async/await for non-blocking execution.
355 Ideal for FastAPI, aiohttp, and async frameworks.
357 Args:
358 max_concurrency: Maximum concurrent async tasks
360 Returns:
361 Self for chaining
362 """
363 self._executor = AsyncExecutor(max_concurrency=max_concurrency)
364 return self
366 def with_streaming(self, chunk_size: int = 1000) -> "PipelineBuilder":
367 """
368 Use streaming execution strategy.
370 Processes data in chunks for memory-efficient handling.
371 Ideal for large datasets (100K+ rows).
373 Args:
374 chunk_size: Number of rows per chunk
376 Returns:
377 Self for chaining
378 """
379 self._executor = StreamingExecutor(chunk_size=chunk_size)
380 return self
382 def build(self) -> Pipeline:
383 """
384 Build final Pipeline.
386 Returns:
387 Configured Pipeline
389 Raises:
390 ValueError: If required specifications missing
391 """
392 # Validate required specs
393 if not self._dataset_spec:
394 raise ValueError("Dataset specification required")
395 if not self._prompt_spec:
396 raise ValueError("Prompt specification required")
397 if not self._llm_spec:
398 raise ValueError("LLM specification required")
400 # Create specifications bundle
401 specifications = PipelineSpecifications(
402 dataset=self._dataset_spec,
403 prompt=self._prompt_spec,
404 llm=self._llm_spec,
405 processing=self._processing_spec,
406 output=self._output_spec,
407 )
409 # Create and return pipeline
410 return Pipeline(
411 specifications,
412 dataframe=self._dataframe,
413 executor=self._executor,
414 )