Coverage for src / crump / parquet_file.py: 94%
79 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 14:40 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 14:40 +0000
1"""Parquet file reader and writer implementations using pyarrow."""
3from __future__ import annotations
5from collections.abc import Iterator
6from pathlib import Path
7from typing import Any
9try:
10 import pyarrow as pa # type: ignore[import-untyped]
11 import pyarrow.parquet as pq # type: ignore[import-untyped]
12except ImportError as e:
13 raise ImportError(
14 "pyarrow is required for Parquet file support. Install it with: pip install pyarrow"
15 ) from e
17from .tabular_file import TabularFileReader, TabularFileWriter
20class ParquetFileReader(TabularFileReader):
21 """Parquet file reader implementation.
23 Uses pyarrow to read Parquet files and provide a consistent interface
24 for reading tabular data files. Reads the entire file into memory as
25 a PyArrow Table, then iterates through batches for memory efficiency.
26 """
28 def __init__(self, file_path: str | Path):
29 """Initialize Parquet file reader.
31 Args:
32 file_path: Path to the Parquet file
33 """
34 super().__init__(file_path)
35 self._table: Any = None
36 self._fieldnames: list[str] | None = None
38 def __enter__(self) -> ParquetFileReader:
39 """Open the Parquet file and read the schema.
41 Returns:
42 Self for use in with statement
43 """
44 # Read the entire Parquet file into a PyArrow Table
45 self._table = pq.read_table(str(self.file_path))
46 self._fieldnames = self._table.schema.names
47 return self
49 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
50 """Cleanup resources.
52 Args:
53 exc_type: Exception type if an error occurred
54 exc_val: Exception value if an error occurred
55 exc_tb: Exception traceback if an error occurred
56 """
57 self._table = None
58 self._fieldnames = None
60 @property
61 def fieldnames(self) -> list[str]:
62 """Get column names from the Parquet file.
64 Returns:
65 List of column names
67 Raises:
68 RuntimeError: If called outside of context manager
69 """
70 if self._fieldnames is None:
71 raise RuntimeError("Reader must be used within a context manager (with statement)")
72 return self._fieldnames
74 def __iter__(self) -> Iterator[dict[str, Any]]:
75 """Iterate through Parquet rows as dictionaries.
77 Converts each row to a dictionary mapping column names to values.
78 For memory efficiency, processes the table in batches.
80 Yields:
81 Dictionary mapping column names to values for each row
83 Raises:
84 RuntimeError: If called outside of context manager
85 """
86 if self._table is None:
87 raise RuntimeError("Reader must be used within a context manager (with statement)")
89 # Convert table to list of dictionaries
90 # We use to_pylist() which converts the entire table to Python dicts
91 # This is memory intensive but matches the CSV interface behavior
92 yield from self._table.to_pylist()
95class ParquetFileWriter(TabularFileWriter):
96 """Parquet file writer implementation.
98 Uses pyarrow to write Parquet files. Accumulates rows in memory
99 and writes them all at once when the context manager exits.
100 """
102 def __init__(self, file_path: str | Path, append: bool = False):
103 """Initialize Parquet file writer.
105 Args:
106 file_path: Path to the Parquet file
107 append: If True, append to existing file. If False, overwrite.
109 Note:
110 Append mode for Parquet files works by reading the existing file,
111 combining it with new data, and writing the result. This is less
112 efficient than CSV append but maintains Parquet's columnar format.
113 """
114 super().__init__(file_path, append)
115 self._rows: list[list[Any]] = []
116 self._header: list[Any] | None = None
117 self._existing_table: Any = None
119 def __enter__(self) -> ParquetFileWriter:
120 """Prepare for writing.
122 If appending to an existing file, reads it into memory.
124 Returns:
125 Self for use in with statement
126 """
127 # If appending and file exists, read the existing data
128 if self.append and self.file_path.exists():
129 self._existing_table = pq.read_table(str(self.file_path))
130 # When appending, we already have a header from the existing file
131 # Set it so that subsequent writerow() calls are treated as data
132 self._header = self._existing_table.schema.names
133 return self
135 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
136 """Write accumulated rows to the Parquet file.
138 Args:
139 exc_type: Exception type if an error occurred
140 exc_val: Exception value if an error occurred
141 exc_tb: Exception traceback if an error occurred
142 """
143 # Only write if no exception occurred and we have a header
144 # (we write even if there are no data rows, to create an empty table with schema)
145 if exc_type is None and self._header is not None:
146 self._write_parquet()
148 # Cleanup
149 self._rows = []
150 self._header = None
151 self._existing_table = None
153 def writerow(self, row: list[Any]) -> None:
154 """Accumulate a row to be written to the Parquet file.
156 The first row is treated as the header (column names).
157 Subsequent rows are treated as data.
159 When appending, if the first row matches the existing header,
160 it is validated and skipped.
162 Args:
163 row: List of values to write
164 """
165 if self._header is None:
166 # First row is the header (when creating new file)
167 self._header = row
168 elif self._existing_table is not None and len(self._rows) == 0:
169 # When appending, the first data row might be a header row
170 # If it matches the existing header, skip it (validation happens in _write_parquet)
171 # If it doesn't match, treat it as data
172 if row == self._header:
173 # Header matches, skip it
174 return
175 else:
176 # Not a header, treat as data
177 self._rows.append(row)
178 else:
179 # Subsequent rows are data
180 self._rows.append(row)
182 def _write_parquet(self) -> None:
183 """Write the accumulated rows to the Parquet file.
185 Combines with existing data if appending.
186 """
187 if not self._header:
188 raise ValueError("Cannot write Parquet file without header row")
190 # If appending, use the existing table's schema for type compatibility
191 if self._existing_table is not None:
192 # Verify column names match
193 if self._existing_table.schema.names != self._header:
194 raise ValueError(
195 f"Cannot append to {self.file_path}: "
196 f"column names don't match. "
197 f"Existing: {self._existing_table.schema.names}, "
198 f"New: {self._header}"
199 )
201 # Use existing schema for new data
202 schema = self._existing_table.schema
203 else:
204 # No existing schema, let PyArrow infer from data
205 schema = None
207 # Convert rows to PyArrow Table
208 if self._rows:
209 # Create a dictionary of column_name -> list_of_values
210 data: dict[Any, list[Any]] = {col: [] for col in self._header}
211 for row in self._rows:
212 for col, value in zip(self._header, row, strict=False):
213 data[col].append(value)
215 # Create PyArrow Table from dictionary with schema
216 if schema is not None:
217 new_table = pa.Table.from_pydict(data, schema=schema)
218 else:
219 new_table = pa.Table.from_pydict(data)
220 else:
221 # No data rows, create empty table with schema
222 if schema is None:
223 schema = pa.schema([(col, pa.string()) for col in self._header])
224 new_table = pa.Table.from_pydict({col: [] for col in self._header}, schema=schema)
226 # If appending, combine with existing table
227 if self._existing_table is not None:
228 # Combine tables
229 combined_table = pa.concat_tables([self._existing_table, new_table])
230 pq.write_table(combined_table, str(self.file_path))
231 else:
232 # Write new table
233 pq.write_table(new_table, str(self.file_path))