Coverage for src / crump / type_detection.py: 98%
100 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 14:40 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 14:40 +0000
1"""Type detection for CSV columns."""
3import re
4from pathlib import Path
6from crump.tabular_file import create_reader
9def detect_column_type(values: list[str]) -> str:
10 """Detect the most appropriate data type for a column based on sample values.
12 Args:
13 values: List of string values from the column (excluding empty strings)
15 Returns:
16 Detected type: 'bigint', 'integer', 'float', 'date', 'datetime', 'text', or 'varchar(N)'
17 """
18 if not values:
19 return "text"
21 # Sample up to 1000 values for performance
22 sample = values[:1000]
23 non_empty = [v for v in sample if v.strip()]
25 if not non_empty:
26 return "text"
28 # Check if ANY value is a bigint AND all values are numeric (integers)
29 # This handles mixed cases where some values are small integers and some are large
30 if any(_is_bigint(v) for v in non_empty) and all(_is_any_integer(v) for v in non_empty):
31 return "bigint"
33 # Check if all values are integers (within INTEGER range)
34 if all(_is_integer(v) for v in non_empty):
35 return "integer"
37 # Check if all values are floats
38 if all(_is_float(v) for v in non_empty):
39 return "float"
41 # Check if all values are dates
42 if all(_is_date(v) for v in non_empty):
43 return "date"
45 # Check if all values are datetimes
46 if all(_is_datetime(v) for v in non_empty):
47 return "datetime"
49 # Check if it's a short text field (could use varchar)
50 max_length = max(len(v) for v in non_empty)
51 if max_length <= 255:
52 return f"varchar({max_length})"
54 return "text"
57def _is_integer(value: str) -> bool:
58 """Check if a string represents an integer within PostgreSQL INTEGER range.
60 PostgreSQL INTEGER range: -2147483648 to 2147483647 (-2^31 to 2^31-1)
61 """
62 try:
63 int_val = int(value)
64 # Check if value fits in PostgreSQL INTEGER range
65 return -2147483648 <= int_val <= 2147483647
66 except ValueError:
67 return False
70def _is_bigint(value: str) -> bool:
71 """Check if a string represents a large integer that requires BIGINT.
73 This checks if the value is an integer but exceeds the PostgreSQL INTEGER range.
74 PostgreSQL BIGINT range: -9223372036854775808 to 9223372036854775807 (-2^63 to 2^63-1)
75 """
76 try:
77 int_val = int(value)
78 # Check if value exceeds INTEGER range but fits in BIGINT range
79 return (int_val < -2147483648 or int_val > 2147483647) and (
80 -9223372036854775808 <= int_val <= 9223372036854775807
81 )
82 except ValueError:
83 return False
86def _is_any_integer(value: str) -> bool:
87 """Check if a string represents any integer (within INTEGER or BIGINT range).
89 This returns True for both small integers and large integers.
90 """
91 try:
92 int_val = int(value)
93 # Check if value fits in BIGINT range
94 return -9223372036854775808 <= int_val <= 9223372036854775807
95 except ValueError:
96 return False
99def _is_float(value: str) -> bool:
100 """Check if a string represents a float."""
101 try:
102 float(value)
103 return True
104 except ValueError:
105 return False
108def _is_date(value: str) -> bool:
109 """Check if a string represents a date (YYYY-MM-DD format)."""
110 # Common date patterns
111 date_patterns = [
112 r"^\d{4}-\d{2}-\d{2}$", # YYYY-MM-DD
113 r"^\d{4}/\d{2}/\d{2}$", # YYYY/MM/DD
114 r"^\d{2}-\d{2}-\d{4}$", # DD-MM-YYYY
115 r"^\d{2}/\d{2}/\d{4}$", # DD/MM/YYYY or MM/DD/YYYY
116 ]
118 return any(re.match(pattern, value.strip()) for pattern in date_patterns)
121def _is_datetime(value: str) -> bool:
122 """Check if a string represents a datetime."""
123 # Common datetime patterns
124 datetime_patterns = [
125 r"^\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}", # YYYY-MM-DD HH:MM:SS
126 r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}", # ISO format
127 r"^\d{2}/\d{2}/\d{4}\s+\d{2}:\d{2}:\d{2}", # MM/DD/YYYY HH:MM:SS
128 ]
130 return any(re.match(pattern, value.strip()) for pattern in datetime_patterns)
133def detect_nullable(values: list[str], total_rows: int) -> bool:
134 """Detect if a column should be nullable based on sample values.
136 Args:
137 values: List of non-empty string values from the column
138 total_rows: Total number of rows in the CSV
140 Returns:
141 True if column has any empty/null values, False otherwise
142 """
143 # If we have fewer values than total rows, there are empty values
144 return len(values) < total_rows
147def analyze_tabular_file_types(file_path: Path) -> dict[str, str]:
148 """Analyze a tabular file (CSV or Parquet) and detect data types for each column.
150 Args:
151 file_path: Path to the tabular file
153 Returns:
154 Dictionary mapping column names to detected types
155 """
156 column_values: dict[str, list[str]] = {}
158 with create_reader(file_path) as reader:
159 if not reader.fieldnames:
160 return {}
162 # Initialize empty lists for each column
163 for col in reader.fieldnames:
164 column_values[col] = []
166 # Collect values for each column
167 for row in reader:
168 for col in reader.fieldnames:
169 if col in row and row[col]:
170 column_values[col].append(str(row[col]))
172 # Detect type for each column
173 return {col: detect_column_type(values) for col, values in column_values.items()}
176def analyze_tabular_file_types_and_nullable(file_path: Path) -> dict[str, tuple[str, bool]]:
177 """Analyze a tabular file (CSV or Parquet) and detect data types and nullable status for each column.
179 Args:
180 file_path: Path to the tabular file
182 Returns:
183 Dictionary mapping column names to (data_type, nullable) tuples
184 """
185 column_values: dict[str, list[str]] = {}
186 total_rows = 0
188 with create_reader(file_path) as reader:
189 if not reader.fieldnames:
190 return {}
192 # Initialize empty lists for each column
193 for col in reader.fieldnames:
194 column_values[col] = []
196 # Collect values for each column and count total rows
197 for row in reader:
198 total_rows += 1
199 for col in reader.fieldnames:
200 if col in row and row[col]:
201 val_str = str(row[col]).strip()
202 if val_str:
203 column_values[col].append(val_str)
205 # Detect type and nullable for each column
206 result = {}
207 for col, values in column_values.items():
208 data_type = detect_column_type(values)
209 nullable = detect_nullable(values, total_rows)
210 result[col] = (data_type, nullable)
212 return result
215def suggest_id_column(columns: list[str], matchers: list[str] | None = None) -> str:
216 """Suggest which column should be the ID column.
218 Args:
219 columns: List of column names
220 matchers: Optional list of column name patterns to match (in priority order).
221 If None, uses default patterns: ['id', 'uuid', 'key', 'code']
223 Returns:
224 Name of suggested ID column
225 """
226 # Use provided matchers or default ones
227 id_candidates = ["id", "uuid", "epoch", "key", "code"] if matchers is None else matchers
229 # Check for exact matches
230 lower_columns = {col.lower(): col for col in columns}
231 for candidate in id_candidates:
232 if candidate.lower() in lower_columns:
233 return lower_columns[candidate.lower()]
235 # Check for columns ending with _id (only if using default matchers)
236 if matchers is None:
237 for col in columns:
238 if col.lower().endswith("_id"):
239 return col
241 # Default to first column
242 return columns[0] if columns else "id"