Coverage for src / crump / cdf_reader.py: 76%
134 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 14:40 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 14:40 +0000
1"""CDF file reading utilities for data_sync."""
3from __future__ import annotations
5from dataclasses import dataclass
6from pathlib import Path
7from typing import Any
9import numpy as np
12@dataclass
13class CDFVariable:
14 """Represents a variable from a CDF file."""
16 name: str
17 data: np.ndarray | Any
18 num_records: int
19 shape: tuple[int, ...]
20 dtype: str
21 attributes: dict[str, Any]
23 @property
24 def is_array(self) -> bool:
25 """Check if this variable contains array data (2D)."""
26 return isinstance(self.data, np.ndarray) and len(self.shape) == 2
28 @property
29 def array_size(self) -> int:
30 """Get the size of array elements (for 2D arrays)."""
31 return self.shape[1] if self.is_array else 1
33 def get_column_names(self, cdf_file: Any) -> list[str]:
34 """Generate column names for this variable.
36 Args:
37 cdf_file: The CDF file object to read label metadata
39 Returns:
40 List of column names for this variable
41 """
42 if not self.is_array:
43 # Simple 1D variable - use variable name
44 return [self.name]
46 # Try to get labels from CDF metadata
47 labels = self._get_labels_from_metadata(cdf_file)
48 if labels and len(labels) == self.array_size:
49 return [f"{self.name}_{label}" for label in labels]
51 # Fall back to generic names based on common patterns
52 return self._generate_generic_column_names()
54 def _get_labels_from_metadata(self, cdf_file: Any) -> list[str] | None:
55 """Try to extract labels from CDF metadata.
57 Args:
58 cdf_file: The CDF file object
60 Returns:
61 List of labels if found, None otherwise
62 """
64 def _is_useful_label(label: str) -> bool:
65 """Check if a label is useful (not just a number or index)."""
66 # Filter out labels that are just numbers or single digits
67 if label.isdigit():
68 return False
69 # Filter out very short labels that look like indices
70 return not len(label) <= 1
72 def _process_labels(label_data: np.ndarray) -> list[str] | None:
73 """Process and validate label data."""
74 labels = [str(label).strip() for label in label_data]
75 # Only return if at least some labels are useful
76 useful_labels = [lbl for lbl in labels if _is_useful_label(lbl)]
77 if len(useful_labels) >= len(labels) // 2: # At least half should be useful
78 return labels
79 return None
81 # Try LABL_PTR_1 attribute (points to a label variable)
82 if "LABL_PTR_1" in self.attributes:
83 label_var_name = self.attributes["LABL_PTR_1"]
84 try:
85 label_data = cdf_file.varget(label_var_name)
86 if isinstance(label_data, np.ndarray):
87 labels = _process_labels(label_data)
88 if labels:
89 return labels
90 except Exception:
91 pass
93 # Try LBL1_{varname} or similar patterns
94 potential_label_vars = [
95 f"LBL1_{self.name}",
96 f"LABL_{self.name}",
97 f"{self.name}_LABEL",
98 f"{self.name}_label",
99 ]
101 for label_var in potential_label_vars:
102 try:
103 label_data = cdf_file.varget(label_var)
104 if isinstance(label_data, np.ndarray):
105 labels = _process_labels(label_data)
106 if labels:
107 return labels
108 except Exception:
109 continue
111 # Try REP1_{varname} for representation labels (like r, t, n)
112 for rep_var in [f"REP1_{self.name}", f"{self.name}_rep"]:
113 try:
114 rep_data = cdf_file.varget(rep_var)
115 if isinstance(rep_data, np.ndarray):
116 labels = _process_labels(rep_data)
117 if labels:
118 return labels
119 except Exception:
120 continue
122 return None
124 def _generate_generic_column_names(self) -> list[str]:
125 """Generate generic column names based on variable name and size.
127 Returns:
128 List of column names
129 """
130 # Check if variable name suggests coordinate system
131 var_lower = self.name.lower()
133 # Check for vector-like names
134 is_vector = any(pattern in var_lower for pattern in ["vector", "vec", "mag", "field"])
136 # Common coordinate suffixes
137 if self.array_size == 3:
138 if "rtn" in var_lower:
139 return [f"{self.name}_r", f"{self.name}_t", f"{self.name}_n"]
140 elif "xyz" in var_lower or is_vector:
141 return [f"{self.name}_x", f"{self.name}_y", f"{self.name}_z"]
143 if self.array_size == 4 and is_vector:
144 return [
145 f"{self.name}_x",
146 f"{self.name}_y",
147 f"{self.name}_z",
148 f"{self.name}_w",
149 ]
151 # Default: use numeric indices
152 return [f"{self.name}_{i}" for i in range(self.array_size)]
155def _is_epoch_variable(var_info: Any, var_name: str, data: np.ndarray | Any) -> bool:
156 """Check if a variable is a CDF EPOCH time variable.
158 Args:
159 var_info: Variable information from CDF
160 var_name: Variable name
161 data: Variable data
163 Returns:
164 True if this is an EPOCH variable, False otherwise
165 """
166 # Check if data type is CDF_TIME_TT2000 (data type 33)
167 if hasattr(var_info, "Data_Type") and var_info.Data_Type == 33:
168 return True
170 # Check if data type description indicates EPOCH
171 if (
172 hasattr(var_info, "Data_Type_Description")
173 and "TIME_TT2000" in var_info.Data_Type_Description
174 ):
175 return True
177 # Fallback: check if variable name contains "epoch" and data is int64
178 return "epoch" in var_name.lower() and isinstance(data, np.ndarray) and data.dtype == np.int64
181def _convert_epoch_to_datetime(data: np.ndarray) -> np.ndarray:
182 """Convert CDF EPOCH values to datetime64 array.
184 Args:
185 data: Array of EPOCH values (int64 nanoseconds since J2000)
187 Returns:
188 Array of datetime64[ns] values
189 """
190 try:
191 from cdflib import cdfepoch # type: ignore[import-untyped]
193 # Convert EPOCH to datetime64[ns]
194 # cdfepoch.to_datetime returns numpy.datetime64 array
195 datetime_values = cdfepoch.to_datetime(data)
197 # Return as-is (already datetime64[ns])
198 return datetime_values # type: ignore[no-any-return]
199 except Exception:
200 # If conversion fails, return original data
201 return data
204def read_cdf_variables(file_path: Path) -> list[CDFVariable]:
205 """Read all variables from a CDF file.
207 Args:
208 file_path: Path to the CDF file
210 Returns:
211 List of CDFVariable objects sorted by record count (descending)
213 Raises:
214 ImportError: If cdflib is not installed
215 Exception: If the file cannot be read
216 """
217 try:
218 import cdflib
219 except ImportError as e:
220 raise ImportError(
221 "cdflib is required for CDF operations. Install with: pip install cdflib"
222 ) from e
224 variables = []
226 with cdflib.CDF(str(file_path)) as cdf:
227 info = cdf.cdf_info()
228 all_vars = info.rVariables + info.zVariables
230 for var_name in all_vars:
231 try:
232 data = cdf.varget(var_name)
234 # Get variable info to check for EPOCH type
235 try:
236 var_info = cdf.varinq(var_name)
237 except Exception:
238 var_info = None
240 # Convert EPOCH variables to datetime
241 if (
242 var_info
243 and _is_epoch_variable(var_info, var_name, data)
244 and isinstance(data, np.ndarray)
245 ):
246 data = _convert_epoch_to_datetime(data)
248 # Determine number of records
249 if isinstance(data, np.ndarray):
250 num_records = data.shape[0] if len(data.shape) > 0 else 1
251 shape = data.shape
252 dtype = str(data.dtype)
253 else:
254 num_records = 1
255 shape = ()
256 dtype = type(data).__name__
258 # Get variable attributes
259 try:
260 attributes = cdf.varattsget(var_name)
261 except Exception:
262 attributes = {}
264 variables.append(
265 CDFVariable(
266 name=var_name,
267 data=data,
268 num_records=num_records,
269 shape=shape,
270 dtype=dtype,
271 attributes=attributes,
272 )
273 )
274 except Exception:
275 # Skip variables that can't be read
276 continue
278 # Sort by number of records (descending)
279 variables.sort(key=lambda v: v.num_records, reverse=True)
281 return variables
284def get_column_names_for_variable(variable: CDFVariable, cdf_file_path: Path) -> list[str]:
285 """Get column names for a CDF variable.
287 Args:
288 variable: The CDFVariable to get names for
289 cdf_file_path: Path to the CDF file (needed to read label metadata)
291 Returns:
292 List of column names
293 """
294 try:
295 import cdflib
297 with cdflib.CDF(str(cdf_file_path)) as cdf:
298 return variable.get_column_names(cdf)
299 except Exception:
300 # Fall back to basic column names if we can't read metadata
301 if variable.is_array:
302 return [f"{variable.name}_{i}" for i in range(variable.array_size)]
303 return [variable.name]