Coverage for src / crump / cdf_reader.py: 76%

134 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 14:40 +0000

1"""CDF file reading utilities for data_sync.""" 

2 

3from __future__ import annotations 

4 

5from dataclasses import dataclass 

6from pathlib import Path 

7from typing import Any 

8 

9import numpy as np 

10 

11 

12@dataclass 

13class CDFVariable: 

14 """Represents a variable from a CDF file.""" 

15 

16 name: str 

17 data: np.ndarray | Any 

18 num_records: int 

19 shape: tuple[int, ...] 

20 dtype: str 

21 attributes: dict[str, Any] 

22 

23 @property 

24 def is_array(self) -> bool: 

25 """Check if this variable contains array data (2D).""" 

26 return isinstance(self.data, np.ndarray) and len(self.shape) == 2 

27 

28 @property 

29 def array_size(self) -> int: 

30 """Get the size of array elements (for 2D arrays).""" 

31 return self.shape[1] if self.is_array else 1 

32 

33 def get_column_names(self, cdf_file: Any) -> list[str]: 

34 """Generate column names for this variable. 

35 

36 Args: 

37 cdf_file: The CDF file object to read label metadata 

38 

39 Returns: 

40 List of column names for this variable 

41 """ 

42 if not self.is_array: 

43 # Simple 1D variable - use variable name 

44 return [self.name] 

45 

46 # Try to get labels from CDF metadata 

47 labels = self._get_labels_from_metadata(cdf_file) 

48 if labels and len(labels) == self.array_size: 

49 return [f"{self.name}_{label}" for label in labels] 

50 

51 # Fall back to generic names based on common patterns 

52 return self._generate_generic_column_names() 

53 

54 def _get_labels_from_metadata(self, cdf_file: Any) -> list[str] | None: 

55 """Try to extract labels from CDF metadata. 

56 

57 Args: 

58 cdf_file: The CDF file object 

59 

60 Returns: 

61 List of labels if found, None otherwise 

62 """ 

63 

64 def _is_useful_label(label: str) -> bool: 

65 """Check if a label is useful (not just a number or index).""" 

66 # Filter out labels that are just numbers or single digits 

67 if label.isdigit(): 

68 return False 

69 # Filter out very short labels that look like indices 

70 return not len(label) <= 1 

71 

72 def _process_labels(label_data: np.ndarray) -> list[str] | None: 

73 """Process and validate label data.""" 

74 labels = [str(label).strip() for label in label_data] 

75 # Only return if at least some labels are useful 

76 useful_labels = [lbl for lbl in labels if _is_useful_label(lbl)] 

77 if len(useful_labels) >= len(labels) // 2: # At least half should be useful 

78 return labels 

79 return None 

80 

81 # Try LABL_PTR_1 attribute (points to a label variable) 

82 if "LABL_PTR_1" in self.attributes: 

83 label_var_name = self.attributes["LABL_PTR_1"] 

84 try: 

85 label_data = cdf_file.varget(label_var_name) 

86 if isinstance(label_data, np.ndarray): 

87 labels = _process_labels(label_data) 

88 if labels: 

89 return labels 

90 except Exception: 

91 pass 

92 

93 # Try LBL1_{varname} or similar patterns 

94 potential_label_vars = [ 

95 f"LBL1_{self.name}", 

96 f"LABL_{self.name}", 

97 f"{self.name}_LABEL", 

98 f"{self.name}_label", 

99 ] 

100 

101 for label_var in potential_label_vars: 

102 try: 

103 label_data = cdf_file.varget(label_var) 

104 if isinstance(label_data, np.ndarray): 

105 labels = _process_labels(label_data) 

106 if labels: 

107 return labels 

108 except Exception: 

109 continue 

110 

111 # Try REP1_{varname} for representation labels (like r, t, n) 

112 for rep_var in [f"REP1_{self.name}", f"{self.name}_rep"]: 

113 try: 

114 rep_data = cdf_file.varget(rep_var) 

115 if isinstance(rep_data, np.ndarray): 

116 labels = _process_labels(rep_data) 

117 if labels: 

118 return labels 

119 except Exception: 

120 continue 

121 

122 return None 

123 

124 def _generate_generic_column_names(self) -> list[str]: 

125 """Generate generic column names based on variable name and size. 

126 

127 Returns: 

128 List of column names 

129 """ 

130 # Check if variable name suggests coordinate system 

131 var_lower = self.name.lower() 

132 

133 # Check for vector-like names 

134 is_vector = any(pattern in var_lower for pattern in ["vector", "vec", "mag", "field"]) 

135 

136 # Common coordinate suffixes 

137 if self.array_size == 3: 

138 if "rtn" in var_lower: 

139 return [f"{self.name}_r", f"{self.name}_t", f"{self.name}_n"] 

140 elif "xyz" in var_lower or is_vector: 

141 return [f"{self.name}_x", f"{self.name}_y", f"{self.name}_z"] 

142 

143 if self.array_size == 4 and is_vector: 

144 return [ 

145 f"{self.name}_x", 

146 f"{self.name}_y", 

147 f"{self.name}_z", 

148 f"{self.name}_w", 

149 ] 

150 

151 # Default: use numeric indices 

152 return [f"{self.name}_{i}" for i in range(self.array_size)] 

153 

154 

155def _is_epoch_variable(var_info: Any, var_name: str, data: np.ndarray | Any) -> bool: 

156 """Check if a variable is a CDF EPOCH time variable. 

157 

158 Args: 

159 var_info: Variable information from CDF 

160 var_name: Variable name 

161 data: Variable data 

162 

163 Returns: 

164 True if this is an EPOCH variable, False otherwise 

165 """ 

166 # Check if data type is CDF_TIME_TT2000 (data type 33) 

167 if hasattr(var_info, "Data_Type") and var_info.Data_Type == 33: 

168 return True 

169 

170 # Check if data type description indicates EPOCH 

171 if ( 

172 hasattr(var_info, "Data_Type_Description") 

173 and "TIME_TT2000" in var_info.Data_Type_Description 

174 ): 

175 return True 

176 

177 # Fallback: check if variable name contains "epoch" and data is int64 

178 return "epoch" in var_name.lower() and isinstance(data, np.ndarray) and data.dtype == np.int64 

179 

180 

181def _convert_epoch_to_datetime(data: np.ndarray) -> np.ndarray: 

182 """Convert CDF EPOCH values to datetime64 array. 

183 

184 Args: 

185 data: Array of EPOCH values (int64 nanoseconds since J2000) 

186 

187 Returns: 

188 Array of datetime64[ns] values 

189 """ 

190 try: 

191 from cdflib import cdfepoch # type: ignore[import-untyped] 

192 

193 # Convert EPOCH to datetime64[ns] 

194 # cdfepoch.to_datetime returns numpy.datetime64 array 

195 datetime_values = cdfepoch.to_datetime(data) 

196 

197 # Return as-is (already datetime64[ns]) 

198 return datetime_values # type: ignore[no-any-return] 

199 except Exception: 

200 # If conversion fails, return original data 

201 return data 

202 

203 

204def read_cdf_variables(file_path: Path) -> list[CDFVariable]: 

205 """Read all variables from a CDF file. 

206 

207 Args: 

208 file_path: Path to the CDF file 

209 

210 Returns: 

211 List of CDFVariable objects sorted by record count (descending) 

212 

213 Raises: 

214 ImportError: If cdflib is not installed 

215 Exception: If the file cannot be read 

216 """ 

217 try: 

218 import cdflib 

219 except ImportError as e: 

220 raise ImportError( 

221 "cdflib is required for CDF operations. Install with: pip install cdflib" 

222 ) from e 

223 

224 variables = [] 

225 

226 with cdflib.CDF(str(file_path)) as cdf: 

227 info = cdf.cdf_info() 

228 all_vars = info.rVariables + info.zVariables 

229 

230 for var_name in all_vars: 

231 try: 

232 data = cdf.varget(var_name) 

233 

234 # Get variable info to check for EPOCH type 

235 try: 

236 var_info = cdf.varinq(var_name) 

237 except Exception: 

238 var_info = None 

239 

240 # Convert EPOCH variables to datetime 

241 if ( 

242 var_info 

243 and _is_epoch_variable(var_info, var_name, data) 

244 and isinstance(data, np.ndarray) 

245 ): 

246 data = _convert_epoch_to_datetime(data) 

247 

248 # Determine number of records 

249 if isinstance(data, np.ndarray): 

250 num_records = data.shape[0] if len(data.shape) > 0 else 1 

251 shape = data.shape 

252 dtype = str(data.dtype) 

253 else: 

254 num_records = 1 

255 shape = () 

256 dtype = type(data).__name__ 

257 

258 # Get variable attributes 

259 try: 

260 attributes = cdf.varattsget(var_name) 

261 except Exception: 

262 attributes = {} 

263 

264 variables.append( 

265 CDFVariable( 

266 name=var_name, 

267 data=data, 

268 num_records=num_records, 

269 shape=shape, 

270 dtype=dtype, 

271 attributes=attributes, 

272 ) 

273 ) 

274 except Exception: 

275 # Skip variables that can't be read 

276 continue 

277 

278 # Sort by number of records (descending) 

279 variables.sort(key=lambda v: v.num_records, reverse=True) 

280 

281 return variables 

282 

283 

284def get_column_names_for_variable(variable: CDFVariable, cdf_file_path: Path) -> list[str]: 

285 """Get column names for a CDF variable. 

286 

287 Args: 

288 variable: The CDFVariable to get names for 

289 cdf_file_path: Path to the CDF file (needed to read label metadata) 

290 

291 Returns: 

292 List of column names 

293 """ 

294 try: 

295 import cdflib 

296 

297 with cdflib.CDF(str(cdf_file_path)) as cdf: 

298 return variable.get_column_names(cdf) 

299 except Exception: 

300 # Fall back to basic column names if we can't read metadata 

301 if variable.is_array: 

302 return [f"{variable.name}_{i}" for i in range(variable.array_size)] 

303 return [variable.name]