Coverage for src / crump / history.py: 100%
37 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 14:40 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 14:40 +0000
1"""History tracking for crump sync operations."""
3from __future__ import annotations
5import hashlib
6from datetime import UTC, datetime
7from pathlib import Path
8from typing import TYPE_CHECKING
10if TYPE_CHECKING:
11 from crump.database import DatabaseBackend
14class SyncHistoryEntry:
15 """Represents a single sync history entry."""
17 def __init__(
18 self,
19 timestamp: datetime,
20 filename: str,
21 table_name: str,
22 rows_upserted: int,
23 rows_deleted: int,
24 data_hash: str,
25 schema_changed: bool,
26 duration_seconds: float,
27 success: bool,
28 error: str | None = None,
29 ) -> None:
30 """Initialize a sync history entry.
32 Args:
33 timestamp: UTC timestamp when sync started for this file
34 filename: Name of the file being synced
35 table_name: Target table name for the sync
36 rows_upserted: Number of rows inserted or updated
37 rows_deleted: Number of rows deleted
38 data_hash: Hash of the data file
39 schema_changed: Whether schema changes were made
40 duration_seconds: Duration of the sync in seconds
41 success: Whether the sync succeeded
42 error: Error message if sync failed, None otherwise
43 """
44 self.timestamp = timestamp
45 self.filename = filename
46 self.table_name = table_name
47 self.rows_upserted = rows_upserted
48 self.rows_deleted = rows_deleted
49 self.data_hash = data_hash
50 self.schema_changed = schema_changed
51 self.duration_seconds = duration_seconds
52 self.success = success
53 self.error = error
56def _calculate_file_hash(file_path: Path) -> str:
57 """Calculate SHA256 hash of a file.
59 Args:
60 file_path: Path to the file to hash
62 Returns:
63 Hexadecimal string representation of the SHA256 hash
64 """
65 sha256_hash = hashlib.sha256()
66 with open(file_path, "rb") as f:
67 # Read in 64kb chunks to handle large files
68 for byte_block in iter(lambda: f.read(65536), b""):
69 sha256_hash.update(byte_block)
70 return sha256_hash.hexdigest()
73def _ensure_history_table_exists(backend: DatabaseBackend) -> None:
74 """Create the _crump_history table if it doesn't exist.
76 Args:
77 backend: Database backend to use
78 """
79 columns = {
80 "timestamp": backend.map_data_type("timestamp") + " NOT NULL",
81 "filename": backend.map_data_type("text") + " NOT NULL",
82 "table_name": backend.map_data_type("text") + " NOT NULL",
83 "rows_upserted": backend.map_data_type("integer") + " NOT NULL",
84 "rows_deleted": backend.map_data_type("integer") + " NOT NULL",
85 "data_hash": backend.map_data_type("text") + " NOT NULL",
86 "schema_changed": "BOOLEAN NOT NULL",
87 "duration_seconds": backend.map_data_type("float") + " NOT NULL",
88 "success": "BOOLEAN NOT NULL",
89 "error": backend.map_data_type("text"),
90 }
92 backend.create_table_if_not_exists("_crump_history", columns, primary_keys=["timestamp"])
93 backend.commit()
96def record_sync_history(
97 backend: DatabaseBackend,
98 file_path: Path,
99 table_name: str,
100 rows_upserted: int,
101 rows_deleted: int,
102 schema_changed: bool,
103 start_time: datetime,
104 end_time: datetime,
105 success: bool,
106 error: str | None = None,
107) -> None:
108 """Record a sync operation to the history table.
110 Args:
111 backend: Database backend to use
112 file_path: Path to the file that was synced
113 table_name: Target table name for the sync
114 rows_upserted: Number of rows inserted or updated
115 rows_deleted: Number of rows deleted
116 schema_changed: Whether schema changes were made
117 start_time: When the sync started (UTC)
118 end_time: When the sync ended (UTC)
119 success: Whether the sync succeeded
120 error: Error message if sync failed, None otherwise
121 """
122 # Ensure history table exists
123 _ensure_history_table_exists(backend)
125 # Calculate file hash
126 data_hash = _calculate_file_hash(file_path)
128 # Calculate duration
129 duration_seconds = (end_time - start_time).total_seconds()
131 # Create history entry
132 entry = SyncHistoryEntry(
133 timestamp=start_time,
134 filename=file_path.name,
135 table_name=table_name,
136 rows_upserted=rows_upserted,
137 rows_deleted=rows_deleted,
138 data_hash=data_hash,
139 schema_changed=schema_changed,
140 duration_seconds=duration_seconds,
141 success=success,
142 error=error,
143 )
145 # Insert into database
146 row_data = {
147 "timestamp": entry.timestamp,
148 "filename": entry.filename,
149 "table_name": entry.table_name,
150 "rows_upserted": entry.rows_upserted,
151 "rows_deleted": entry.rows_deleted,
152 "data_hash": entry.data_hash,
153 "schema_changed": entry.schema_changed,
154 "duration_seconds": entry.duration_seconds,
155 "success": entry.success,
156 "error": entry.error,
157 }
159 # Use upsert to handle potential timestamp conflicts (though unlikely)
160 backend.upsert_row("_crump_history", ["timestamp"], row_data)
161 backend.commit()
164def get_utc_now() -> datetime:
165 """Get current UTC datetime.
167 Returns:
168 Current UTC datetime with timezone info
169 """
170 return datetime.now(UTC)