Coverage for llm_dataset_engine/adapters/data_io.py: 34%

131 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-15 18:04 +0200

1""" 

2Data I/O adapters for reading and writing tabular data. 

3 

4Provides unified interface for multiple data formats following the 

5Adapter pattern. 

6""" 

7 

8from abc import ABC, abstractmethod 

9from pathlib import Path 

10from typing import Iterator, Optional, Union 

11 

12import pandas as pd 

13import polars as pl 

14 

15from llm_dataset_engine.core.models import WriteConfirmation 

16from llm_dataset_engine.core.specifications import DataSourceType 

17 

18 

19class DataReader(ABC): 

20 """ 

21 Abstract base class for data readers. 

22  

23 Follows Open/Closed principle: open for extension via new readers, 

24 closed for modification. 

25 """ 

26 

27 @abstractmethod 

28 def read(self) -> pd.DataFrame: 

29 """ 

30 Read entire dataset. 

31 

32 Returns: 

33 DataFrame with all data 

34 """ 

35 pass 

36 

37 @abstractmethod 

38 def read_chunked(self, chunk_size: int) -> Iterator[pd.DataFrame]: 

39 """ 

40 Read data in chunks for memory efficiency. 

41 

42 Args: 

43 chunk_size: Number of rows per chunk 

44 

45 Yields: 

46 DataFrame chunks 

47 """ 

48 pass 

49 

50 

51class CSVReader(DataReader): 

52 """CSV file reader implementation.""" 

53 

54 def __init__( 

55 self, 

56 file_path: Path, 

57 delimiter: str = ",", 

58 encoding: str = "utf-8", 

59 ): 

60 """ 

61 Initialize CSV reader. 

62 

63 Args: 

64 file_path: Path to CSV file 

65 delimiter: Column delimiter 

66 encoding: File encoding 

67 """ 

68 self.file_path = file_path 

69 self.delimiter = delimiter 

70 self.encoding = encoding 

71 

72 def read(self) -> pd.DataFrame: 

73 """Read entire CSV file.""" 

74 return pd.read_csv( 

75 self.file_path, 

76 delimiter=self.delimiter, 

77 encoding=self.encoding, 

78 ) 

79 

80 def read_chunked(self, chunk_size: int) -> Iterator[pd.DataFrame]: 

81 """Read CSV in chunks.""" 

82 for chunk in pd.read_csv( 

83 self.file_path, 

84 delimiter=self.delimiter, 

85 encoding=self.encoding, 

86 chunksize=chunk_size, 

87 ): 

88 yield chunk 

89 

90 

91class ExcelReader(DataReader): 

92 """Excel file reader implementation.""" 

93 

94 def __init__( 

95 self, file_path: Path, sheet_name: Union[str, int] = 0 

96 ): 

97 """ 

98 Initialize Excel reader. 

99 

100 Args: 

101 file_path: Path to Excel file 

102 sheet_name: Sheet name or index 

103 """ 

104 self.file_path = file_path 

105 self.sheet_name = sheet_name 

106 

107 def read(self) -> pd.DataFrame: 

108 """Read entire Excel file.""" 

109 return pd.read_excel(self.file_path, sheet_name=self.sheet_name) 

110 

111 def read_chunked(self, chunk_size: int) -> Iterator[pd.DataFrame]: 

112 """ 

113 Read Excel in chunks. 

114  

115 Note: Excel doesn't support native chunking, so we load all 

116 and yield chunks. 

117 """ 

118 df = self.read() 

119 for i in range(0, len(df), chunk_size): 

120 yield df.iloc[i : i + chunk_size] 

121 

122 

123class ParquetReader(DataReader): 

124 """Parquet file reader implementation.""" 

125 

126 def __init__(self, file_path: Path): 

127 """ 

128 Initialize Parquet reader. 

129 

130 Args: 

131 file_path: Path to Parquet file 

132 """ 

133 self.file_path = file_path 

134 

135 def read(self) -> pd.DataFrame: 

136 """Read entire Parquet file.""" 

137 return pd.read_parquet(self.file_path) 

138 

139 def read_chunked(self, chunk_size: int) -> Iterator[pd.DataFrame]: 

140 """ 

141 Read Parquet in chunks using Polars for efficiency. 

142 """ 

143 # Use Polars for efficient chunked reading 

144 lf = pl.scan_parquet(self.file_path) 

145 

146 # Read in batches 

147 total_rows = lf.select(pl.len()).collect().item() 

148 

149 for i in range(0, total_rows, chunk_size): 

150 chunk = lf.slice(i, chunk_size).collect().to_pandas() 

151 yield chunk 

152 

153 

154class DataFrameReader(DataReader): 

155 """In-memory DataFrame reader (pass-through).""" 

156 

157 def __init__(self, dataframe: pd.DataFrame): 

158 """ 

159 Initialize DataFrame reader. 

160 

161 Args: 

162 dataframe: Pandas DataFrame 

163 """ 

164 self.dataframe = dataframe.copy() 

165 

166 def read(self) -> pd.DataFrame: 

167 """Return DataFrame copy.""" 

168 return self.dataframe.copy() 

169 

170 def read_chunked(self, chunk_size: int) -> Iterator[pd.DataFrame]: 

171 """Yield DataFrame chunks.""" 

172 for i in range(0, len(self.dataframe), chunk_size): 

173 yield self.dataframe.iloc[i : i + chunk_size].copy() 

174 

175 

176class DataWriter(ABC): 

177 """ 

178 Abstract base class for data writers. 

179  

180 Follows Single Responsibility: only handles data persistence. 

181 """ 

182 

183 @abstractmethod 

184 def write( 

185 self, data: pd.DataFrame, path: Path 

186 ) -> WriteConfirmation: 

187 """ 

188 Write data to destination. 

189 

190 Args: 

191 data: DataFrame to write 

192 path: Destination path 

193 

194 Returns: 

195 WriteConfirmation with details 

196 """ 

197 pass 

198 

199 @abstractmethod 

200 def atomic_write( 

201 self, data: pd.DataFrame, path: Path 

202 ) -> WriteConfirmation: 

203 """ 

204 Write data atomically (with rollback on failure). 

205 

206 Args: 

207 data: DataFrame to write 

208 path: Destination path 

209 

210 Returns: 

211 WriteConfirmation with details 

212 """ 

213 pass 

214 

215 

216class CSVWriter(DataWriter): 

217 """CSV file writer implementation.""" 

218 

219 def __init__(self, delimiter: str = ",", encoding: str = "utf-8"): 

220 """ 

221 Initialize CSV writer. 

222 

223 Args: 

224 delimiter: Column delimiter 

225 encoding: File encoding 

226 """ 

227 self.delimiter = delimiter 

228 self.encoding = encoding 

229 

230 def write( 

231 self, data: pd.DataFrame, path: Path 

232 ) -> WriteConfirmation: 

233 """Write to CSV file.""" 

234 data.to_csv( 

235 path, 

236 sep=self.delimiter, 

237 encoding=self.encoding, 

238 index=False, 

239 ) 

240 

241 return WriteConfirmation( 

242 path=str(path), 

243 rows_written=len(data), 

244 success=True, 

245 ) 

246 

247 def atomic_write( 

248 self, data: pd.DataFrame, path: Path 

249 ) -> WriteConfirmation: 

250 """Write to CSV atomically.""" 

251 temp_path = path.with_suffix(".tmp") 

252 

253 try: 

254 # Write to temp file 

255 data.to_csv( 

256 temp_path, 

257 sep=self.delimiter, 

258 encoding=self.encoding, 

259 index=False, 

260 ) 

261 

262 # Atomic rename 

263 temp_path.replace(path) 

264 

265 return WriteConfirmation( 

266 path=str(path), 

267 rows_written=len(data), 

268 success=True, 

269 ) 

270 except Exception as e: 

271 # Cleanup on failure 

272 if temp_path.exists(): 

273 temp_path.unlink() 

274 raise e 

275 

276 

277class ExcelWriter(DataWriter): 

278 """Excel file writer implementation.""" 

279 

280 def write( 

281 self, data: pd.DataFrame, path: Path 

282 ) -> WriteConfirmation: 

283 """Write to Excel file.""" 

284 data.to_excel(path, index=False) 

285 

286 return WriteConfirmation( 

287 path=str(path), 

288 rows_written=len(data), 

289 success=True, 

290 ) 

291 

292 def atomic_write( 

293 self, data: pd.DataFrame, path: Path 

294 ) -> WriteConfirmation: 

295 """Write to Excel atomically.""" 

296 temp_path = path.with_suffix(".tmp") 

297 

298 try: 

299 data.to_excel(temp_path, index=False) 

300 temp_path.replace(path) 

301 

302 return WriteConfirmation( 

303 path=str(path), 

304 rows_written=len(data), 

305 success=True, 

306 ) 

307 except Exception as e: 

308 if temp_path.exists(): 

309 temp_path.unlink() 

310 raise e 

311 

312 

313class ParquetWriter(DataWriter): 

314 """Parquet file writer implementation.""" 

315 

316 def write( 

317 self, data: pd.DataFrame, path: Path 

318 ) -> WriteConfirmation: 

319 """Write to Parquet file.""" 

320 data.to_parquet(path, index=False) 

321 

322 return WriteConfirmation( 

323 path=str(path), 

324 rows_written=len(data), 

325 success=True, 

326 ) 

327 

328 def atomic_write( 

329 self, data: pd.DataFrame, path: Path 

330 ) -> WriteConfirmation: 

331 """Write to Parquet atomically.""" 

332 temp_path = path.with_suffix(".tmp") 

333 

334 try: 

335 data.to_parquet(temp_path, index=False) 

336 temp_path.replace(path) 

337 

338 return WriteConfirmation( 

339 path=str(path), 

340 rows_written=len(data), 

341 success=True, 

342 ) 

343 except Exception as e: 

344 if temp_path.exists(): 

345 temp_path.unlink() 

346 raise e 

347 

348 

349def create_data_reader( 

350 source_type: DataSourceType, 

351 source_path: Optional[Path] = None, 

352 dataframe: Optional[pd.DataFrame] = None, 

353 **kwargs: any, 

354) -> DataReader: 

355 """ 

356 Factory function to create appropriate data reader. 

357 

358 Args: 

359 source_type: Type of data source 

360 source_path: Path to file (for file sources) 

361 dataframe: DataFrame (for DataFrame source) 

362 **kwargs: Additional reader-specific parameters 

363 

364 Returns: 

365 Configured DataReader 

366 

367 Raises: 

368 ValueError: If source type not supported or parameters invalid 

369 """ 

370 if source_type == DataSourceType.CSV: 

371 if not source_path: 

372 raise ValueError("source_path required for CSV") 

373 return CSVReader( 

374 source_path, 

375 delimiter=kwargs.get("delimiter", ","), 

376 encoding=kwargs.get("encoding", "utf-8"), 

377 ) 

378 elif source_type == DataSourceType.EXCEL: 

379 if not source_path: 

380 raise ValueError("source_path required for Excel") 

381 return ExcelReader( 

382 source_path, sheet_name=kwargs.get("sheet_name", 0) 

383 ) 

384 elif source_type == DataSourceType.PARQUET: 

385 if not source_path: 

386 raise ValueError("source_path required for Parquet") 

387 return ParquetReader(source_path) 

388 elif source_type == DataSourceType.DATAFRAME: 

389 if dataframe is None: 

390 raise ValueError("dataframe required for DataFrame source") 

391 return DataFrameReader(dataframe) 

392 else: 

393 raise ValueError(f"Unsupported source type: {source_type}") 

394 

395 

396def create_data_writer(destination_type: DataSourceType) -> DataWriter: 

397 """ 

398 Factory function to create appropriate data writer. 

399 

400 Args: 

401 destination_type: Type of destination 

402 

403 Returns: 

404 Configured DataWriter 

405 

406 Raises: 

407 ValueError: If destination type not supported 

408 """ 

409 if destination_type == DataSourceType.CSV: 

410 return CSVWriter() 

411 elif destination_type == DataSourceType.EXCEL: 

412 return ExcelWriter() 

413 elif destination_type == DataSourceType.PARQUET: 

414 return ParquetWriter() 

415 else: 

416 raise ValueError(f"Unsupported destination: {destination_type}") 

417