Coverage for llm_dataset_engine/orchestration/streaming_executor.py: 39%
59 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
1"""
2Streaming execution strategy.
4Provides memory-efficient processing for large datasets by processing
5data in chunks.
6"""
8from datetime import datetime
9from decimal import Decimal
10from typing import Iterator, List
12import pandas as pd
14from llm_dataset_engine.core.models import ExecutionResult
15from llm_dataset_engine.orchestration.execution_context import ExecutionContext
16from llm_dataset_engine.orchestration.execution_strategy import ExecutionStrategy
17from llm_dataset_engine.stages.pipeline_stage import PipelineStage
18from llm_dataset_engine.utils import get_logger
20logger = get_logger(__name__)
23class StreamingExecutor(ExecutionStrategy):
24 """
25 Streaming execution strategy.
27 Processes data in chunks to maintain constant memory usage.
28 Ideal for very large datasets (100K+ rows) that don't fit in memory.
30 Benefits:
31 - Constant memory footprint
32 - Can process unlimited dataset sizes
33 - Checkpoints at chunk boundaries
34 - Early results available
35 """
37 def __init__(self, chunk_size: int = 1000):
38 """
39 Initialize streaming executor.
41 Args:
42 chunk_size: Number of rows per chunk
43 """
44 self.chunk_size = chunk_size
45 self.logger = logger
47 def execute(
48 self,
49 stages: List[PipelineStage],
50 context: ExecutionContext,
51 ) -> Iterator[pd.DataFrame]:
52 """
53 Execute stages in streaming mode.
55 Args:
56 stages: Pipeline stages
57 context: Execution context
59 Yields:
60 DataFrames with processed chunks
61 """
62 self.logger.info(
63 f"Starting streaming execution (chunk_size={self.chunk_size})"
64 )
66 # Get data loader stage
67 data_loader = stages[0]
69 # Stream data in chunks
70 chunk_index = 0
71 total_rows_processed = 0
73 # Read data in chunks
74 for chunk in self._read_chunks(data_loader, context):
75 self.logger.info(
76 f"Processing chunk {chunk_index} ({len(chunk)} rows)"
77 )
79 # Process chunk through remaining stages
80 result_chunk = self._process_chunk(chunk, stages[1:], context)
82 # Update context
83 total_rows_processed += len(result_chunk)
84 context.update_row(total_rows_processed - 1)
86 # Yield result
87 yield result_chunk
89 chunk_index += 1
91 self.logger.info(
92 f"Streaming execution complete: {total_rows_processed} rows, "
93 f"{chunk_index} chunks"
94 )
96 def _read_chunks(
97 self,
98 data_loader: PipelineStage,
99 context: ExecutionContext,
100 ) -> Iterator[pd.DataFrame]:
101 """
102 Read data in chunks.
104 Uses pandas chunksize parameter for memory-efficient reading.
105 """
106 # Get data source from data loader
107 # For now, this is a simplified implementation
108 # In full implementation, would use data_loader's chunked reading
110 # Placeholder: would integrate with DataLoaderStage's chunked reading
111 yield pd.DataFrame() # Placeholder
113 def _process_chunk(
114 self,
115 chunk: pd.DataFrame,
116 stages: List[PipelineStage],
117 context: ExecutionContext,
118 ) -> pd.DataFrame:
119 """
120 Process a single chunk through all stages.
122 Args:
123 chunk: Data chunk to process
124 stages: Stages to apply (excluding data loader)
125 context: Execution context
127 Returns:
128 Processed chunk
129 """
130 current_data = chunk
132 for stage in stages:
133 self.logger.debug(f"Applying stage: {stage.name}")
134 current_data = stage.process(current_data, context)
136 return current_data
138 def supports_async(self) -> bool:
139 """Streaming executor doesn't support async."""
140 return False
142 def supports_streaming(self) -> bool:
143 """Streaming executor supports streaming."""
144 return True
146 @property
147 def name(self) -> str:
148 """Strategy name."""
149 return "StreamingExecutor"
152class StreamingResult:
153 """
154 Result container for streaming execution.
156 Provides access to metrics after consuming the stream.
157 """
159 def __init__(self):
160 """Initialize streaming result."""
161 self.chunks_processed = 0
162 self.total_rows = 0
163 self.total_cost = Decimal("0.0")
164 self.start_time = datetime.now()
165 self.end_time = None
167 def add_chunk(self, chunk: pd.DataFrame, cost: Decimal):
168 """Add chunk statistics."""
169 self.chunks_processed += 1
170 self.total_rows += len(chunk)
171 self.total_cost += cost
173 def finalize(self) -> ExecutionResult:
174 """Create final ExecutionResult."""
175 self.end_time = datetime.now()
176 duration = (self.end_time - self.start_time).total_seconds()
178 stats = ProcessingStats(
179 total_rows=self.total_rows,
180 processed_rows=self.total_rows,
181 failed_rows=0,
182 skipped_rows=0,
183 rows_per_second=self.total_rows / duration if duration > 0 else 0,
184 total_duration_seconds=duration,
185 )
187 costs = CostEstimate(
188 total_cost=self.total_cost,
189 total_tokens=0,
190 input_tokens=0,
191 output_tokens=0,
192 rows=self.total_rows,
193 confidence="actual",
194 )
196 return ExecutionResult(
197 data=pd.DataFrame(), # Streaming doesn't return full data
198 metrics=stats,
199 costs=costs,
200 start_time=self.start_time,
201 end_time=self.end_time,
202 success=True,
203 )