Coverage for src / crump / type_detection.py: 98%

100 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 14:40 +0000

1"""Type detection for CSV columns.""" 

2 

3import re 

4from pathlib import Path 

5 

6from crump.tabular_file import create_reader 

7 

8 

9def detect_column_type(values: list[str]) -> str: 

10 """Detect the most appropriate data type for a column based on sample values. 

11 

12 Args: 

13 values: List of string values from the column (excluding empty strings) 

14 

15 Returns: 

16 Detected type: 'bigint', 'integer', 'float', 'date', 'datetime', 'text', or 'varchar(N)' 

17 """ 

18 if not values: 

19 return "text" 

20 

21 # Sample up to 1000 values for performance 

22 sample = values[:1000] 

23 non_empty = [v for v in sample if v.strip()] 

24 

25 if not non_empty: 

26 return "text" 

27 

28 # Check if ANY value is a bigint AND all values are numeric (integers) 

29 # This handles mixed cases where some values are small integers and some are large 

30 if any(_is_bigint(v) for v in non_empty) and all(_is_any_integer(v) for v in non_empty): 

31 return "bigint" 

32 

33 # Check if all values are integers (within INTEGER range) 

34 if all(_is_integer(v) for v in non_empty): 

35 return "integer" 

36 

37 # Check if all values are floats 

38 if all(_is_float(v) for v in non_empty): 

39 return "float" 

40 

41 # Check if all values are dates 

42 if all(_is_date(v) for v in non_empty): 

43 return "date" 

44 

45 # Check if all values are datetimes 

46 if all(_is_datetime(v) for v in non_empty): 

47 return "datetime" 

48 

49 # Check if it's a short text field (could use varchar) 

50 max_length = max(len(v) for v in non_empty) 

51 if max_length <= 255: 

52 return f"varchar({max_length})" 

53 

54 return "text" 

55 

56 

57def _is_integer(value: str) -> bool: 

58 """Check if a string represents an integer within PostgreSQL INTEGER range. 

59 

60 PostgreSQL INTEGER range: -2147483648 to 2147483647 (-2^31 to 2^31-1) 

61 """ 

62 try: 

63 int_val = int(value) 

64 # Check if value fits in PostgreSQL INTEGER range 

65 return -2147483648 <= int_val <= 2147483647 

66 except ValueError: 

67 return False 

68 

69 

70def _is_bigint(value: str) -> bool: 

71 """Check if a string represents a large integer that requires BIGINT. 

72 

73 This checks if the value is an integer but exceeds the PostgreSQL INTEGER range. 

74 PostgreSQL BIGINT range: -9223372036854775808 to 9223372036854775807 (-2^63 to 2^63-1) 

75 """ 

76 try: 

77 int_val = int(value) 

78 # Check if value exceeds INTEGER range but fits in BIGINT range 

79 return (int_val < -2147483648 or int_val > 2147483647) and ( 

80 -9223372036854775808 <= int_val <= 9223372036854775807 

81 ) 

82 except ValueError: 

83 return False 

84 

85 

86def _is_any_integer(value: str) -> bool: 

87 """Check if a string represents any integer (within INTEGER or BIGINT range). 

88 

89 This returns True for both small integers and large integers. 

90 """ 

91 try: 

92 int_val = int(value) 

93 # Check if value fits in BIGINT range 

94 return -9223372036854775808 <= int_val <= 9223372036854775807 

95 except ValueError: 

96 return False 

97 

98 

99def _is_float(value: str) -> bool: 

100 """Check if a string represents a float.""" 

101 try: 

102 float(value) 

103 return True 

104 except ValueError: 

105 return False 

106 

107 

108def _is_date(value: str) -> bool: 

109 """Check if a string represents a date (YYYY-MM-DD format).""" 

110 # Common date patterns 

111 date_patterns = [ 

112 r"^\d{4}-\d{2}-\d{2}$", # YYYY-MM-DD 

113 r"^\d{4}/\d{2}/\d{2}$", # YYYY/MM/DD 

114 r"^\d{2}-\d{2}-\d{4}$", # DD-MM-YYYY 

115 r"^\d{2}/\d{2}/\d{4}$", # DD/MM/YYYY or MM/DD/YYYY 

116 ] 

117 

118 return any(re.match(pattern, value.strip()) for pattern in date_patterns) 

119 

120 

121def _is_datetime(value: str) -> bool: 

122 """Check if a string represents a datetime.""" 

123 # Common datetime patterns 

124 datetime_patterns = [ 

125 r"^\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}", # YYYY-MM-DD HH:MM:SS 

126 r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}", # ISO format 

127 r"^\d{2}/\d{2}/\d{4}\s+\d{2}:\d{2}:\d{2}", # MM/DD/YYYY HH:MM:SS 

128 ] 

129 

130 return any(re.match(pattern, value.strip()) for pattern in datetime_patterns) 

131 

132 

133def detect_nullable(values: list[str], total_rows: int) -> bool: 

134 """Detect if a column should be nullable based on sample values. 

135 

136 Args: 

137 values: List of non-empty string values from the column 

138 total_rows: Total number of rows in the CSV 

139 

140 Returns: 

141 True if column has any empty/null values, False otherwise 

142 """ 

143 # If we have fewer values than total rows, there are empty values 

144 return len(values) < total_rows 

145 

146 

147def analyze_tabular_file_types(file_path: Path) -> dict[str, str]: 

148 """Analyze a tabular file (CSV or Parquet) and detect data types for each column. 

149 

150 Args: 

151 file_path: Path to the tabular file 

152 

153 Returns: 

154 Dictionary mapping column names to detected types 

155 """ 

156 column_values: dict[str, list[str]] = {} 

157 

158 with create_reader(file_path) as reader: 

159 if not reader.fieldnames: 

160 return {} 

161 

162 # Initialize empty lists for each column 

163 for col in reader.fieldnames: 

164 column_values[col] = [] 

165 

166 # Collect values for each column 

167 for row in reader: 

168 for col in reader.fieldnames: 

169 if col in row and row[col]: 

170 column_values[col].append(str(row[col])) 

171 

172 # Detect type for each column 

173 return {col: detect_column_type(values) for col, values in column_values.items()} 

174 

175 

176def analyze_tabular_file_types_and_nullable(file_path: Path) -> dict[str, tuple[str, bool]]: 

177 """Analyze a tabular file (CSV or Parquet) and detect data types and nullable status for each column. 

178 

179 Args: 

180 file_path: Path to the tabular file 

181 

182 Returns: 

183 Dictionary mapping column names to (data_type, nullable) tuples 

184 """ 

185 column_values: dict[str, list[str]] = {} 

186 total_rows = 0 

187 

188 with create_reader(file_path) as reader: 

189 if not reader.fieldnames: 

190 return {} 

191 

192 # Initialize empty lists for each column 

193 for col in reader.fieldnames: 

194 column_values[col] = [] 

195 

196 # Collect values for each column and count total rows 

197 for row in reader: 

198 total_rows += 1 

199 for col in reader.fieldnames: 

200 if col in row and row[col]: 

201 val_str = str(row[col]).strip() 

202 if val_str: 

203 column_values[col].append(val_str) 

204 

205 # Detect type and nullable for each column 

206 result = {} 

207 for col, values in column_values.items(): 

208 data_type = detect_column_type(values) 

209 nullable = detect_nullable(values, total_rows) 

210 result[col] = (data_type, nullable) 

211 

212 return result 

213 

214 

215def suggest_id_column(columns: list[str], matchers: list[str] | None = None) -> str: 

216 """Suggest which column should be the ID column. 

217 

218 Args: 

219 columns: List of column names 

220 matchers: Optional list of column name patterns to match (in priority order). 

221 If None, uses default patterns: ['id', 'uuid', 'key', 'code'] 

222 

223 Returns: 

224 Name of suggested ID column 

225 """ 

226 # Use provided matchers or default ones 

227 id_candidates = ["id", "uuid", "epoch", "key", "code"] if matchers is None else matchers 

228 

229 # Check for exact matches 

230 lower_columns = {col.lower(): col for col in columns} 

231 for candidate in id_candidates: 

232 if candidate.lower() in lower_columns: 

233 return lower_columns[candidate.lower()] 

234 

235 # Check for columns ending with _id (only if using default matchers) 

236 if matchers is None: 

237 for col in columns: 

238 if col.lower().endswith("_id"): 

239 return col 

240 

241 # Default to first column 

242 return columns[0] if columns else "id"