Coverage for llm_dataset_engine/adapters/data_io.py: 34%
131 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
1"""
2Data I/O adapters for reading and writing tabular data.
4Provides unified interface for multiple data formats following the
5Adapter pattern.
6"""
8from abc import ABC, abstractmethod
9from pathlib import Path
10from typing import Iterator, Optional, Union
12import pandas as pd
13import polars as pl
15from llm_dataset_engine.core.models import WriteConfirmation
16from llm_dataset_engine.core.specifications import DataSourceType
19class DataReader(ABC):
20 """
21 Abstract base class for data readers.
23 Follows Open/Closed principle: open for extension via new readers,
24 closed for modification.
25 """
27 @abstractmethod
28 def read(self) -> pd.DataFrame:
29 """
30 Read entire dataset.
32 Returns:
33 DataFrame with all data
34 """
35 pass
37 @abstractmethod
38 def read_chunked(self, chunk_size: int) -> Iterator[pd.DataFrame]:
39 """
40 Read data in chunks for memory efficiency.
42 Args:
43 chunk_size: Number of rows per chunk
45 Yields:
46 DataFrame chunks
47 """
48 pass
51class CSVReader(DataReader):
52 """CSV file reader implementation."""
54 def __init__(
55 self,
56 file_path: Path,
57 delimiter: str = ",",
58 encoding: str = "utf-8",
59 ):
60 """
61 Initialize CSV reader.
63 Args:
64 file_path: Path to CSV file
65 delimiter: Column delimiter
66 encoding: File encoding
67 """
68 self.file_path = file_path
69 self.delimiter = delimiter
70 self.encoding = encoding
72 def read(self) -> pd.DataFrame:
73 """Read entire CSV file."""
74 return pd.read_csv(
75 self.file_path,
76 delimiter=self.delimiter,
77 encoding=self.encoding,
78 )
80 def read_chunked(self, chunk_size: int) -> Iterator[pd.DataFrame]:
81 """Read CSV in chunks."""
82 for chunk in pd.read_csv(
83 self.file_path,
84 delimiter=self.delimiter,
85 encoding=self.encoding,
86 chunksize=chunk_size,
87 ):
88 yield chunk
91class ExcelReader(DataReader):
92 """Excel file reader implementation."""
94 def __init__(
95 self, file_path: Path, sheet_name: Union[str, int] = 0
96 ):
97 """
98 Initialize Excel reader.
100 Args:
101 file_path: Path to Excel file
102 sheet_name: Sheet name or index
103 """
104 self.file_path = file_path
105 self.sheet_name = sheet_name
107 def read(self) -> pd.DataFrame:
108 """Read entire Excel file."""
109 return pd.read_excel(self.file_path, sheet_name=self.sheet_name)
111 def read_chunked(self, chunk_size: int) -> Iterator[pd.DataFrame]:
112 """
113 Read Excel in chunks.
115 Note: Excel doesn't support native chunking, so we load all
116 and yield chunks.
117 """
118 df = self.read()
119 for i in range(0, len(df), chunk_size):
120 yield df.iloc[i : i + chunk_size]
123class ParquetReader(DataReader):
124 """Parquet file reader implementation."""
126 def __init__(self, file_path: Path):
127 """
128 Initialize Parquet reader.
130 Args:
131 file_path: Path to Parquet file
132 """
133 self.file_path = file_path
135 def read(self) -> pd.DataFrame:
136 """Read entire Parquet file."""
137 return pd.read_parquet(self.file_path)
139 def read_chunked(self, chunk_size: int) -> Iterator[pd.DataFrame]:
140 """
141 Read Parquet in chunks using Polars for efficiency.
142 """
143 # Use Polars for efficient chunked reading
144 lf = pl.scan_parquet(self.file_path)
146 # Read in batches
147 total_rows = lf.select(pl.len()).collect().item()
149 for i in range(0, total_rows, chunk_size):
150 chunk = lf.slice(i, chunk_size).collect().to_pandas()
151 yield chunk
154class DataFrameReader(DataReader):
155 """In-memory DataFrame reader (pass-through)."""
157 def __init__(self, dataframe: pd.DataFrame):
158 """
159 Initialize DataFrame reader.
161 Args:
162 dataframe: Pandas DataFrame
163 """
164 self.dataframe = dataframe.copy()
166 def read(self) -> pd.DataFrame:
167 """Return DataFrame copy."""
168 return self.dataframe.copy()
170 def read_chunked(self, chunk_size: int) -> Iterator[pd.DataFrame]:
171 """Yield DataFrame chunks."""
172 for i in range(0, len(self.dataframe), chunk_size):
173 yield self.dataframe.iloc[i : i + chunk_size].copy()
176class DataWriter(ABC):
177 """
178 Abstract base class for data writers.
180 Follows Single Responsibility: only handles data persistence.
181 """
183 @abstractmethod
184 def write(
185 self, data: pd.DataFrame, path: Path
186 ) -> WriteConfirmation:
187 """
188 Write data to destination.
190 Args:
191 data: DataFrame to write
192 path: Destination path
194 Returns:
195 WriteConfirmation with details
196 """
197 pass
199 @abstractmethod
200 def atomic_write(
201 self, data: pd.DataFrame, path: Path
202 ) -> WriteConfirmation:
203 """
204 Write data atomically (with rollback on failure).
206 Args:
207 data: DataFrame to write
208 path: Destination path
210 Returns:
211 WriteConfirmation with details
212 """
213 pass
216class CSVWriter(DataWriter):
217 """CSV file writer implementation."""
219 def __init__(self, delimiter: str = ",", encoding: str = "utf-8"):
220 """
221 Initialize CSV writer.
223 Args:
224 delimiter: Column delimiter
225 encoding: File encoding
226 """
227 self.delimiter = delimiter
228 self.encoding = encoding
230 def write(
231 self, data: pd.DataFrame, path: Path
232 ) -> WriteConfirmation:
233 """Write to CSV file."""
234 data.to_csv(
235 path,
236 sep=self.delimiter,
237 encoding=self.encoding,
238 index=False,
239 )
241 return WriteConfirmation(
242 path=str(path),
243 rows_written=len(data),
244 success=True,
245 )
247 def atomic_write(
248 self, data: pd.DataFrame, path: Path
249 ) -> WriteConfirmation:
250 """Write to CSV atomically."""
251 temp_path = path.with_suffix(".tmp")
253 try:
254 # Write to temp file
255 data.to_csv(
256 temp_path,
257 sep=self.delimiter,
258 encoding=self.encoding,
259 index=False,
260 )
262 # Atomic rename
263 temp_path.replace(path)
265 return WriteConfirmation(
266 path=str(path),
267 rows_written=len(data),
268 success=True,
269 )
270 except Exception as e:
271 # Cleanup on failure
272 if temp_path.exists():
273 temp_path.unlink()
274 raise e
277class ExcelWriter(DataWriter):
278 """Excel file writer implementation."""
280 def write(
281 self, data: pd.DataFrame, path: Path
282 ) -> WriteConfirmation:
283 """Write to Excel file."""
284 data.to_excel(path, index=False)
286 return WriteConfirmation(
287 path=str(path),
288 rows_written=len(data),
289 success=True,
290 )
292 def atomic_write(
293 self, data: pd.DataFrame, path: Path
294 ) -> WriteConfirmation:
295 """Write to Excel atomically."""
296 temp_path = path.with_suffix(".tmp")
298 try:
299 data.to_excel(temp_path, index=False)
300 temp_path.replace(path)
302 return WriteConfirmation(
303 path=str(path),
304 rows_written=len(data),
305 success=True,
306 )
307 except Exception as e:
308 if temp_path.exists():
309 temp_path.unlink()
310 raise e
313class ParquetWriter(DataWriter):
314 """Parquet file writer implementation."""
316 def write(
317 self, data: pd.DataFrame, path: Path
318 ) -> WriteConfirmation:
319 """Write to Parquet file."""
320 data.to_parquet(path, index=False)
322 return WriteConfirmation(
323 path=str(path),
324 rows_written=len(data),
325 success=True,
326 )
328 def atomic_write(
329 self, data: pd.DataFrame, path: Path
330 ) -> WriteConfirmation:
331 """Write to Parquet atomically."""
332 temp_path = path.with_suffix(".tmp")
334 try:
335 data.to_parquet(temp_path, index=False)
336 temp_path.replace(path)
338 return WriteConfirmation(
339 path=str(path),
340 rows_written=len(data),
341 success=True,
342 )
343 except Exception as e:
344 if temp_path.exists():
345 temp_path.unlink()
346 raise e
349def create_data_reader(
350 source_type: DataSourceType,
351 source_path: Optional[Path] = None,
352 dataframe: Optional[pd.DataFrame] = None,
353 **kwargs: any,
354) -> DataReader:
355 """
356 Factory function to create appropriate data reader.
358 Args:
359 source_type: Type of data source
360 source_path: Path to file (for file sources)
361 dataframe: DataFrame (for DataFrame source)
362 **kwargs: Additional reader-specific parameters
364 Returns:
365 Configured DataReader
367 Raises:
368 ValueError: If source type not supported or parameters invalid
369 """
370 if source_type == DataSourceType.CSV:
371 if not source_path:
372 raise ValueError("source_path required for CSV")
373 return CSVReader(
374 source_path,
375 delimiter=kwargs.get("delimiter", ","),
376 encoding=kwargs.get("encoding", "utf-8"),
377 )
378 elif source_type == DataSourceType.EXCEL:
379 if not source_path:
380 raise ValueError("source_path required for Excel")
381 return ExcelReader(
382 source_path, sheet_name=kwargs.get("sheet_name", 0)
383 )
384 elif source_type == DataSourceType.PARQUET:
385 if not source_path:
386 raise ValueError("source_path required for Parquet")
387 return ParquetReader(source_path)
388 elif source_type == DataSourceType.DATAFRAME:
389 if dataframe is None:
390 raise ValueError("dataframe required for DataFrame source")
391 return DataFrameReader(dataframe)
392 else:
393 raise ValueError(f"Unsupported source type: {source_type}")
396def create_data_writer(destination_type: DataSourceType) -> DataWriter:
397 """
398 Factory function to create appropriate data writer.
400 Args:
401 destination_type: Type of destination
403 Returns:
404 Configured DataWriter
406 Raises:
407 ValueError: If destination type not supported
408 """
409 if destination_type == DataSourceType.CSV:
410 return CSVWriter()
411 elif destination_type == DataSourceType.EXCEL:
412 return ExcelWriter()
413 elif destination_type == DataSourceType.PARQUET:
414 return ParquetWriter()
415 else:
416 raise ValueError(f"Unsupported destination: {destination_type}")