Coverage for src / invariant / store / disk.py: 95.65%
46 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-20 16:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-20 16:05 +0000
1"""DiskStore: Filesystem-based artifact storage."""
3from pathlib import Path
4from typing import Any
6from invariant.cacheable import is_cacheable
7from invariant.store.base import ArtifactStore
8from invariant.store.codec import deserialize, serialize
11class DiskStore(ArtifactStore):
12 """Filesystem-based artifact store.
14 Stores artifacts in the local filesystem under `.invariant/cache/`
15 using a two-level directory structure: `{digest[:2]}/{digest[2:]}`
16 for efficient filesystem performance.
17 """
19 def __init__(self, cache_dir: Path | str | None = None) -> None:
20 """Initialize DiskStore.
22 Args:
23 cache_dir: Directory to store cache. Defaults to `.invariant/cache/`
24 in the current working directory.
25 """
26 if cache_dir is None:
27 cache_dir = Path.cwd() / ".invariant" / "cache"
28 elif isinstance(cache_dir, str):
29 cache_dir = Path(cache_dir)
31 self.cache_dir = cache_dir
32 self.cache_dir.mkdir(parents=True, exist_ok=True)
33 super().__init__()
35 def _get_path(self, op_name: str, digest: str) -> Path:
36 """Get filesystem path for an operation and digest.
38 Args:
39 op_name: The name of the operation.
40 digest: The SHA-256 hash (64 character hex string).
42 Returns:
43 Path to the artifact file.
44 """
45 if len(digest) != 64:
46 raise ValueError(f"Invalid digest length: {len(digest)}, expected 64")
48 # Sanitize op_name for filesystem (replace : with _)
49 safe_op_name = op_name.replace(":", "_").replace("/", "_")
51 # Three-level directory structure: op_name / first 2 chars / remaining 62 chars
52 dir_path = self.cache_dir / safe_op_name / digest[:2]
53 file_path = dir_path / digest[2:]
54 return file_path
56 def exists(self, op_name: str, digest: str) -> bool:
57 """Check if an artifact exists."""
58 path = self._get_path(op_name, digest)
59 exists = path.exists()
60 if exists:
61 self.stats.hits += 1
62 else:
63 self.stats.misses += 1
64 return exists
66 def get(self, op_name: str, digest: str) -> Any:
67 """Retrieve an artifact by operation name and digest.
69 Raises:
70 KeyError: If artifact does not exist.
71 """
72 path = self._get_path(op_name, digest)
74 if not path.exists():
75 raise KeyError(
76 f"Artifact with op_name '{op_name}' and digest '{digest}' not found"
77 )
79 # Read file
80 with open(path, "rb") as f:
81 data = f.read()
83 # Deserialize using codec
84 return deserialize(data)
86 def put(self, op_name: str, digest: str, artifact: Any) -> None:
87 """Store an artifact with the given operation name and digest."""
88 # Validate artifact is cacheable
89 if not is_cacheable(artifact):
90 raise TypeError(
91 f"Artifact is not cacheable: {type(artifact)}. "
92 f"Use is_cacheable() to check values before storing."
93 )
95 path = self._get_path(op_name, digest)
97 # Create parent directory if needed
98 path.parent.mkdir(parents=True, exist_ok=True)
100 # Serialize using codec
101 serialized_data = serialize(artifact)
103 # Write atomically (write to temp file, then rename)
104 temp_path = path.with_suffix(path.suffix + ".tmp")
105 with open(temp_path, "wb") as f:
106 f.write(serialized_data)
107 temp_path.replace(path)
108 self.stats.puts += 1