Coverage for llm_dataset_engine/orchestration/streaming_executor.py: 39%

59 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-15 18:04 +0200

1""" 

2Streaming execution strategy. 

3 

4Provides memory-efficient processing for large datasets by processing 

5data in chunks. 

6""" 

7 

8from datetime import datetime 

9from decimal import Decimal 

10from typing import Iterator, List 

11 

12import pandas as pd 

13 

14from llm_dataset_engine.core.models import ExecutionResult 

15from llm_dataset_engine.orchestration.execution_context import ExecutionContext 

16from llm_dataset_engine.orchestration.execution_strategy import ExecutionStrategy 

17from llm_dataset_engine.stages.pipeline_stage import PipelineStage 

18from llm_dataset_engine.utils import get_logger 

19 

20logger = get_logger(__name__) 

21 

22 

23class StreamingExecutor(ExecutionStrategy): 

24 """ 

25 Streaming execution strategy. 

26  

27 Processes data in chunks to maintain constant memory usage. 

28 Ideal for very large datasets (100K+ rows) that don't fit in memory. 

29  

30 Benefits: 

31 - Constant memory footprint 

32 - Can process unlimited dataset sizes 

33 - Checkpoints at chunk boundaries 

34 - Early results available 

35 """ 

36 

37 def __init__(self, chunk_size: int = 1000): 

38 """ 

39 Initialize streaming executor. 

40 

41 Args: 

42 chunk_size: Number of rows per chunk 

43 """ 

44 self.chunk_size = chunk_size 

45 self.logger = logger 

46 

47 def execute( 

48 self, 

49 stages: List[PipelineStage], 

50 context: ExecutionContext, 

51 ) -> Iterator[pd.DataFrame]: 

52 """ 

53 Execute stages in streaming mode. 

54 

55 Args: 

56 stages: Pipeline stages 

57 context: Execution context 

58 

59 Yields: 

60 DataFrames with processed chunks 

61 """ 

62 self.logger.info( 

63 f"Starting streaming execution (chunk_size={self.chunk_size})" 

64 ) 

65 

66 # Get data loader stage 

67 data_loader = stages[0] 

68 

69 # Stream data in chunks 

70 chunk_index = 0 

71 total_rows_processed = 0 

72 

73 # Read data in chunks 

74 for chunk in self._read_chunks(data_loader, context): 

75 self.logger.info( 

76 f"Processing chunk {chunk_index} ({len(chunk)} rows)" 

77 ) 

78 

79 # Process chunk through remaining stages 

80 result_chunk = self._process_chunk(chunk, stages[1:], context) 

81 

82 # Update context 

83 total_rows_processed += len(result_chunk) 

84 context.update_row(total_rows_processed - 1) 

85 

86 # Yield result 

87 yield result_chunk 

88 

89 chunk_index += 1 

90 

91 self.logger.info( 

92 f"Streaming execution complete: {total_rows_processed} rows, " 

93 f"{chunk_index} chunks" 

94 ) 

95 

96 def _read_chunks( 

97 self, 

98 data_loader: PipelineStage, 

99 context: ExecutionContext, 

100 ) -> Iterator[pd.DataFrame]: 

101 """ 

102 Read data in chunks. 

103  

104 Uses pandas chunksize parameter for memory-efficient reading. 

105 """ 

106 # Get data source from data loader 

107 # For now, this is a simplified implementation 

108 # In full implementation, would use data_loader's chunked reading 

109 

110 # Placeholder: would integrate with DataLoaderStage's chunked reading 

111 yield pd.DataFrame() # Placeholder 

112 

113 def _process_chunk( 

114 self, 

115 chunk: pd.DataFrame, 

116 stages: List[PipelineStage], 

117 context: ExecutionContext, 

118 ) -> pd.DataFrame: 

119 """ 

120 Process a single chunk through all stages. 

121 

122 Args: 

123 chunk: Data chunk to process 

124 stages: Stages to apply (excluding data loader) 

125 context: Execution context 

126 

127 Returns: 

128 Processed chunk 

129 """ 

130 current_data = chunk 

131 

132 for stage in stages: 

133 self.logger.debug(f"Applying stage: {stage.name}") 

134 current_data = stage.process(current_data, context) 

135 

136 return current_data 

137 

138 def supports_async(self) -> bool: 

139 """Streaming executor doesn't support async.""" 

140 return False 

141 

142 def supports_streaming(self) -> bool: 

143 """Streaming executor supports streaming.""" 

144 return True 

145 

146 @property 

147 def name(self) -> str: 

148 """Strategy name.""" 

149 return "StreamingExecutor" 

150 

151 

152class StreamingResult: 

153 """ 

154 Result container for streaming execution. 

155  

156 Provides access to metrics after consuming the stream. 

157 """ 

158 

159 def __init__(self): 

160 """Initialize streaming result.""" 

161 self.chunks_processed = 0 

162 self.total_rows = 0 

163 self.total_cost = Decimal("0.0") 

164 self.start_time = datetime.now() 

165 self.end_time = None 

166 

167 def add_chunk(self, chunk: pd.DataFrame, cost: Decimal): 

168 """Add chunk statistics.""" 

169 self.chunks_processed += 1 

170 self.total_rows += len(chunk) 

171 self.total_cost += cost 

172 

173 def finalize(self) -> ExecutionResult: 

174 """Create final ExecutionResult.""" 

175 self.end_time = datetime.now() 

176 duration = (self.end_time - self.start_time).total_seconds() 

177 

178 stats = ProcessingStats( 

179 total_rows=self.total_rows, 

180 processed_rows=self.total_rows, 

181 failed_rows=0, 

182 skipped_rows=0, 

183 rows_per_second=self.total_rows / duration if duration > 0 else 0, 

184 total_duration_seconds=duration, 

185 ) 

186 

187 costs = CostEstimate( 

188 total_cost=self.total_cost, 

189 total_tokens=0, 

190 input_tokens=0, 

191 output_tokens=0, 

192 rows=self.total_rows, 

193 confidence="actual", 

194 ) 

195 

196 return ExecutionResult( 

197 data=pd.DataFrame(), # Streaming doesn't return full data 

198 metrics=stats, 

199 costs=costs, 

200 start_time=self.start_time, 

201 end_time=self.end_time, 

202 success=True, 

203 ) 

204