Coverage for llm_dataset_engine/stages/streaming_loader_stage.py: 0%
24 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
1"""
2Streaming data loader for memory-efficient processing of large files.
4Implements streaming pattern for datasets that don't fit in memory.
5"""
7from decimal import Decimal
8from typing import Any, Iterator
10import pandas as pd
12from llm_dataset_engine.adapters.data_io import create_data_reader
13from llm_dataset_engine.core.models import CostEstimate, ValidationResult
14from llm_dataset_engine.core.specifications import DatasetSpec
15from llm_dataset_engine.stages.pipeline_stage import PipelineStage
18class StreamingDataLoaderStage(PipelineStage[DatasetSpec, Iterator[pd.DataFrame]]):
19 """
20 Load data in chunks for memory-efficient processing.
22 Use this for very large datasets (100K+ rows) that don't fit in memory.
23 """
25 def __init__(self, chunk_size: int = 1000):
26 """
27 Initialize streaming data loader.
29 Args:
30 chunk_size: Number of rows per chunk
31 """
32 super().__init__("StreamingDataLoader")
33 self.chunk_size = chunk_size
35 def process(
36 self, spec: DatasetSpec, context: Any
37 ) -> Iterator[pd.DataFrame]:
38 """Load data as iterator of chunks."""
39 # Create appropriate reader
40 reader = create_data_reader(
41 source_type=spec.source_type,
42 source_path=spec.source_path,
43 delimiter=spec.delimiter,
44 encoding=spec.encoding,
45 sheet_name=spec.sheet_name,
46 )
48 self.logger.info(
49 f"Streaming data in chunks of {self.chunk_size} rows"
50 )
52 # Return chunked iterator
53 return reader.read_chunked(self.chunk_size)
55 def validate_input(self, spec: DatasetSpec) -> ValidationResult:
56 """Validate dataset specification."""
57 result = ValidationResult(is_valid=True)
59 # Check file exists for file sources
60 if spec.source_path and not spec.source_path.exists():
61 result.add_error(f"Source file not found: {spec.source_path}")
63 # Check columns specified
64 if not spec.input_columns:
65 result.add_error("No input columns specified")
67 return result
69 def estimate_cost(self, spec: DatasetSpec) -> CostEstimate:
70 """Streaming has no direct LLM cost."""
71 return CostEstimate(
72 total_cost=Decimal("0.0"),
73 total_tokens=0,
74 input_tokens=0,
75 output_tokens=0,
76 rows=0, # Unknown until streaming starts
77 )