Coverage for src / crump / history.py: 100%

37 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 14:40 +0000

1"""History tracking for crump sync operations.""" 

2 

3from __future__ import annotations 

4 

5import hashlib 

6from datetime import UTC, datetime 

7from pathlib import Path 

8from typing import TYPE_CHECKING 

9 

10if TYPE_CHECKING: 

11 from crump.database import DatabaseBackend 

12 

13 

14class SyncHistoryEntry: 

15 """Represents a single sync history entry.""" 

16 

17 def __init__( 

18 self, 

19 timestamp: datetime, 

20 filename: str, 

21 table_name: str, 

22 rows_upserted: int, 

23 rows_deleted: int, 

24 data_hash: str, 

25 schema_changed: bool, 

26 duration_seconds: float, 

27 success: bool, 

28 error: str | None = None, 

29 ) -> None: 

30 """Initialize a sync history entry. 

31 

32 Args: 

33 timestamp: UTC timestamp when sync started for this file 

34 filename: Name of the file being synced 

35 table_name: Target table name for the sync 

36 rows_upserted: Number of rows inserted or updated 

37 rows_deleted: Number of rows deleted 

38 data_hash: Hash of the data file 

39 schema_changed: Whether schema changes were made 

40 duration_seconds: Duration of the sync in seconds 

41 success: Whether the sync succeeded 

42 error: Error message if sync failed, None otherwise 

43 """ 

44 self.timestamp = timestamp 

45 self.filename = filename 

46 self.table_name = table_name 

47 self.rows_upserted = rows_upserted 

48 self.rows_deleted = rows_deleted 

49 self.data_hash = data_hash 

50 self.schema_changed = schema_changed 

51 self.duration_seconds = duration_seconds 

52 self.success = success 

53 self.error = error 

54 

55 

56def _calculate_file_hash(file_path: Path) -> str: 

57 """Calculate SHA256 hash of a file. 

58 

59 Args: 

60 file_path: Path to the file to hash 

61 

62 Returns: 

63 Hexadecimal string representation of the SHA256 hash 

64 """ 

65 sha256_hash = hashlib.sha256() 

66 with open(file_path, "rb") as f: 

67 # Read in 64kb chunks to handle large files 

68 for byte_block in iter(lambda: f.read(65536), b""): 

69 sha256_hash.update(byte_block) 

70 return sha256_hash.hexdigest() 

71 

72 

73def _ensure_history_table_exists(backend: DatabaseBackend) -> None: 

74 """Create the _crump_history table if it doesn't exist. 

75 

76 Args: 

77 backend: Database backend to use 

78 """ 

79 columns = { 

80 "timestamp": backend.map_data_type("timestamp") + " NOT NULL", 

81 "filename": backend.map_data_type("text") + " NOT NULL", 

82 "table_name": backend.map_data_type("text") + " NOT NULL", 

83 "rows_upserted": backend.map_data_type("integer") + " NOT NULL", 

84 "rows_deleted": backend.map_data_type("integer") + " NOT NULL", 

85 "data_hash": backend.map_data_type("text") + " NOT NULL", 

86 "schema_changed": "BOOLEAN NOT NULL", 

87 "duration_seconds": backend.map_data_type("float") + " NOT NULL", 

88 "success": "BOOLEAN NOT NULL", 

89 "error": backend.map_data_type("text"), 

90 } 

91 

92 backend.create_table_if_not_exists("_crump_history", columns, primary_keys=["timestamp"]) 

93 backend.commit() 

94 

95 

96def record_sync_history( 

97 backend: DatabaseBackend, 

98 file_path: Path, 

99 table_name: str, 

100 rows_upserted: int, 

101 rows_deleted: int, 

102 schema_changed: bool, 

103 start_time: datetime, 

104 end_time: datetime, 

105 success: bool, 

106 error: str | None = None, 

107) -> None: 

108 """Record a sync operation to the history table. 

109 

110 Args: 

111 backend: Database backend to use 

112 file_path: Path to the file that was synced 

113 table_name: Target table name for the sync 

114 rows_upserted: Number of rows inserted or updated 

115 rows_deleted: Number of rows deleted 

116 schema_changed: Whether schema changes were made 

117 start_time: When the sync started (UTC) 

118 end_time: When the sync ended (UTC) 

119 success: Whether the sync succeeded 

120 error: Error message if sync failed, None otherwise 

121 """ 

122 # Ensure history table exists 

123 _ensure_history_table_exists(backend) 

124 

125 # Calculate file hash 

126 data_hash = _calculate_file_hash(file_path) 

127 

128 # Calculate duration 

129 duration_seconds = (end_time - start_time).total_seconds() 

130 

131 # Create history entry 

132 entry = SyncHistoryEntry( 

133 timestamp=start_time, 

134 filename=file_path.name, 

135 table_name=table_name, 

136 rows_upserted=rows_upserted, 

137 rows_deleted=rows_deleted, 

138 data_hash=data_hash, 

139 schema_changed=schema_changed, 

140 duration_seconds=duration_seconds, 

141 success=success, 

142 error=error, 

143 ) 

144 

145 # Insert into database 

146 row_data = { 

147 "timestamp": entry.timestamp, 

148 "filename": entry.filename, 

149 "table_name": entry.table_name, 

150 "rows_upserted": entry.rows_upserted, 

151 "rows_deleted": entry.rows_deleted, 

152 "data_hash": entry.data_hash, 

153 "schema_changed": entry.schema_changed, 

154 "duration_seconds": entry.duration_seconds, 

155 "success": entry.success, 

156 "error": entry.error, 

157 } 

158 

159 # Use upsert to handle potential timestamp conflicts (though unlikely) 

160 backend.upsert_row("_crump_history", ["timestamp"], row_data) 

161 backend.commit() 

162 

163 

164def get_utc_now() -> datetime: 

165 """Get current UTC datetime. 

166 

167 Returns: 

168 Current UTC datetime with timezone info 

169 """ 

170 return datetime.now(UTC)