Coverage for llm_dataset_engine/stages/streaming_loader_stage.py: 0%

24 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-15 18:04 +0200

1""" 

2Streaming data loader for memory-efficient processing of large files. 

3 

4Implements streaming pattern for datasets that don't fit in memory. 

5""" 

6 

7from decimal import Decimal 

8from typing import Any, Iterator 

9 

10import pandas as pd 

11 

12from llm_dataset_engine.adapters.data_io import create_data_reader 

13from llm_dataset_engine.core.models import CostEstimate, ValidationResult 

14from llm_dataset_engine.core.specifications import DatasetSpec 

15from llm_dataset_engine.stages.pipeline_stage import PipelineStage 

16 

17 

18class StreamingDataLoaderStage(PipelineStage[DatasetSpec, Iterator[pd.DataFrame]]): 

19 """ 

20 Load data in chunks for memory-efficient processing. 

21  

22 Use this for very large datasets (100K+ rows) that don't fit in memory. 

23 """ 

24 

25 def __init__(self, chunk_size: int = 1000): 

26 """ 

27 Initialize streaming data loader. 

28 

29 Args: 

30 chunk_size: Number of rows per chunk 

31 """ 

32 super().__init__("StreamingDataLoader") 

33 self.chunk_size = chunk_size 

34 

35 def process( 

36 self, spec: DatasetSpec, context: Any 

37 ) -> Iterator[pd.DataFrame]: 

38 """Load data as iterator of chunks.""" 

39 # Create appropriate reader 

40 reader = create_data_reader( 

41 source_type=spec.source_type, 

42 source_path=spec.source_path, 

43 delimiter=spec.delimiter, 

44 encoding=spec.encoding, 

45 sheet_name=spec.sheet_name, 

46 ) 

47 

48 self.logger.info( 

49 f"Streaming data in chunks of {self.chunk_size} rows" 

50 ) 

51 

52 # Return chunked iterator 

53 return reader.read_chunked(self.chunk_size) 

54 

55 def validate_input(self, spec: DatasetSpec) -> ValidationResult: 

56 """Validate dataset specification.""" 

57 result = ValidationResult(is_valid=True) 

58 

59 # Check file exists for file sources 

60 if spec.source_path and not spec.source_path.exists(): 

61 result.add_error(f"Source file not found: {spec.source_path}") 

62 

63 # Check columns specified 

64 if not spec.input_columns: 

65 result.add_error("No input columns specified") 

66 

67 return result 

68 

69 def estimate_cost(self, spec: DatasetSpec) -> CostEstimate: 

70 """Streaming has no direct LLM cost.""" 

71 return CostEstimate( 

72 total_cost=Decimal("0.0"), 

73 total_tokens=0, 

74 input_tokens=0, 

75 output_tokens=0, 

76 rows=0, # Unknown until streaming starts 

77 ) 

78