Coverage for src / crump / parquet_file.py: 94%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 14:40 +0000

1"""Parquet file reader and writer implementations using pyarrow.""" 

2 

3from __future__ import annotations 

4 

5from collections.abc import Iterator 

6from pathlib import Path 

7from typing import Any 

8 

9try: 

10 import pyarrow as pa # type: ignore[import-untyped] 

11 import pyarrow.parquet as pq # type: ignore[import-untyped] 

12except ImportError as e: 

13 raise ImportError( 

14 "pyarrow is required for Parquet file support. Install it with: pip install pyarrow" 

15 ) from e 

16 

17from .tabular_file import TabularFileReader, TabularFileWriter 

18 

19 

20class ParquetFileReader(TabularFileReader): 

21 """Parquet file reader implementation. 

22 

23 Uses pyarrow to read Parquet files and provide a consistent interface 

24 for reading tabular data files. Reads the entire file into memory as 

25 a PyArrow Table, then iterates through batches for memory efficiency. 

26 """ 

27 

28 def __init__(self, file_path: str | Path): 

29 """Initialize Parquet file reader. 

30 

31 Args: 

32 file_path: Path to the Parquet file 

33 """ 

34 super().__init__(file_path) 

35 self._table: Any = None 

36 self._fieldnames: list[str] | None = None 

37 

38 def __enter__(self) -> ParquetFileReader: 

39 """Open the Parquet file and read the schema. 

40 

41 Returns: 

42 Self for use in with statement 

43 """ 

44 # Read the entire Parquet file into a PyArrow Table 

45 self._table = pq.read_table(str(self.file_path)) 

46 self._fieldnames = self._table.schema.names 

47 return self 

48 

49 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

50 """Cleanup resources. 

51 

52 Args: 

53 exc_type: Exception type if an error occurred 

54 exc_val: Exception value if an error occurred 

55 exc_tb: Exception traceback if an error occurred 

56 """ 

57 self._table = None 

58 self._fieldnames = None 

59 

60 @property 

61 def fieldnames(self) -> list[str]: 

62 """Get column names from the Parquet file. 

63 

64 Returns: 

65 List of column names 

66 

67 Raises: 

68 RuntimeError: If called outside of context manager 

69 """ 

70 if self._fieldnames is None: 

71 raise RuntimeError("Reader must be used within a context manager (with statement)") 

72 return self._fieldnames 

73 

74 def __iter__(self) -> Iterator[dict[str, Any]]: 

75 """Iterate through Parquet rows as dictionaries. 

76 

77 Converts each row to a dictionary mapping column names to values. 

78 For memory efficiency, processes the table in batches. 

79 

80 Yields: 

81 Dictionary mapping column names to values for each row 

82 

83 Raises: 

84 RuntimeError: If called outside of context manager 

85 """ 

86 if self._table is None: 

87 raise RuntimeError("Reader must be used within a context manager (with statement)") 

88 

89 # Convert table to list of dictionaries 

90 # We use to_pylist() which converts the entire table to Python dicts 

91 # This is memory intensive but matches the CSV interface behavior 

92 yield from self._table.to_pylist() 

93 

94 

95class ParquetFileWriter(TabularFileWriter): 

96 """Parquet file writer implementation. 

97 

98 Uses pyarrow to write Parquet files. Accumulates rows in memory 

99 and writes them all at once when the context manager exits. 

100 """ 

101 

102 def __init__(self, file_path: str | Path, append: bool = False): 

103 """Initialize Parquet file writer. 

104 

105 Args: 

106 file_path: Path to the Parquet file 

107 append: If True, append to existing file. If False, overwrite. 

108 

109 Note: 

110 Append mode for Parquet files works by reading the existing file, 

111 combining it with new data, and writing the result. This is less 

112 efficient than CSV append but maintains Parquet's columnar format. 

113 """ 

114 super().__init__(file_path, append) 

115 self._rows: list[list[Any]] = [] 

116 self._header: list[Any] | None = None 

117 self._existing_table: Any = None 

118 

119 def __enter__(self) -> ParquetFileWriter: 

120 """Prepare for writing. 

121 

122 If appending to an existing file, reads it into memory. 

123 

124 Returns: 

125 Self for use in with statement 

126 """ 

127 # If appending and file exists, read the existing data 

128 if self.append and self.file_path.exists(): 

129 self._existing_table = pq.read_table(str(self.file_path)) 

130 # When appending, we already have a header from the existing file 

131 # Set it so that subsequent writerow() calls are treated as data 

132 self._header = self._existing_table.schema.names 

133 return self 

134 

135 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

136 """Write accumulated rows to the Parquet file. 

137 

138 Args: 

139 exc_type: Exception type if an error occurred 

140 exc_val: Exception value if an error occurred 

141 exc_tb: Exception traceback if an error occurred 

142 """ 

143 # Only write if no exception occurred and we have a header 

144 # (we write even if there are no data rows, to create an empty table with schema) 

145 if exc_type is None and self._header is not None: 

146 self._write_parquet() 

147 

148 # Cleanup 

149 self._rows = [] 

150 self._header = None 

151 self._existing_table = None 

152 

153 def writerow(self, row: list[Any]) -> None: 

154 """Accumulate a row to be written to the Parquet file. 

155 

156 The first row is treated as the header (column names). 

157 Subsequent rows are treated as data. 

158 

159 When appending, if the first row matches the existing header, 

160 it is validated and skipped. 

161 

162 Args: 

163 row: List of values to write 

164 """ 

165 if self._header is None: 

166 # First row is the header (when creating new file) 

167 self._header = row 

168 elif self._existing_table is not None and len(self._rows) == 0: 

169 # When appending, the first data row might be a header row 

170 # If it matches the existing header, skip it (validation happens in _write_parquet) 

171 # If it doesn't match, treat it as data 

172 if row == self._header: 

173 # Header matches, skip it 

174 return 

175 else: 

176 # Not a header, treat as data 

177 self._rows.append(row) 

178 else: 

179 # Subsequent rows are data 

180 self._rows.append(row) 

181 

182 def _write_parquet(self) -> None: 

183 """Write the accumulated rows to the Parquet file. 

184 

185 Combines with existing data if appending. 

186 """ 

187 if not self._header: 

188 raise ValueError("Cannot write Parquet file without header row") 

189 

190 # If appending, use the existing table's schema for type compatibility 

191 if self._existing_table is not None: 

192 # Verify column names match 

193 if self._existing_table.schema.names != self._header: 

194 raise ValueError( 

195 f"Cannot append to {self.file_path}: " 

196 f"column names don't match. " 

197 f"Existing: {self._existing_table.schema.names}, " 

198 f"New: {self._header}" 

199 ) 

200 

201 # Use existing schema for new data 

202 schema = self._existing_table.schema 

203 else: 

204 # No existing schema, let PyArrow infer from data 

205 schema = None 

206 

207 # Convert rows to PyArrow Table 

208 if self._rows: 

209 # Create a dictionary of column_name -> list_of_values 

210 data: dict[Any, list[Any]] = {col: [] for col in self._header} 

211 for row in self._rows: 

212 for col, value in zip(self._header, row, strict=False): 

213 data[col].append(value) 

214 

215 # Create PyArrow Table from dictionary with schema 

216 if schema is not None: 

217 new_table = pa.Table.from_pydict(data, schema=schema) 

218 else: 

219 new_table = pa.Table.from_pydict(data) 

220 else: 

221 # No data rows, create empty table with schema 

222 if schema is None: 

223 schema = pa.schema([(col, pa.string()) for col in self._header]) 

224 new_table = pa.Table.from_pydict({col: [] for col in self._header}, schema=schema) 

225 

226 # If appending, combine with existing table 

227 if self._existing_table is not None: 

228 # Combine tables 

229 combined_table = pa.concat_tables([self._existing_table, new_table]) 

230 pq.write_table(combined_table, str(self.file_path)) 

231 else: 

232 # Write new table 

233 pq.write_table(new_table, str(self.file_path))