Coverage for src / invariant / store / disk.py: 95.74%
47 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 10:21 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 10:21 +0100
1"""DiskStore: Filesystem-based artifact storage."""
3import threading
4from pathlib import Path
5from typing import Any
7from invariant.cacheable import is_cacheable
8from invariant.store.base import ArtifactStore
9from invariant.store.codec import deserialize, serialize
12class DiskStore(ArtifactStore):
13 """Filesystem-based artifact store.
15 Stores artifacts in the local filesystem under `.invariant/cache/`
16 using a two-level directory structure: `{digest[:2]}/{digest[2:]}`
17 for efficient filesystem performance.
18 """
20 def __init__(self, cache_dir: Path | str | None = None) -> None:
21 """Initialize DiskStore.
23 Args:
24 cache_dir: Directory to store cache. Defaults to `.invariant/cache/`
25 in the current working directory.
26 """
27 if cache_dir is None:
28 cache_dir = Path.cwd() / ".invariant" / "cache"
29 elif isinstance(cache_dir, str):
30 cache_dir = Path(cache_dir)
32 self.cache_dir = cache_dir
33 self.cache_dir.mkdir(parents=True, exist_ok=True)
34 super().__init__()
36 def _get_path(self, op_name: str, digest: str) -> Path:
37 """Get filesystem path for an operation and digest.
39 Args:
40 op_name: The name of the operation.
41 digest: The SHA-256 hash (64 character hex string).
43 Returns:
44 Path to the artifact file.
45 """
46 if len(digest) != 64:
47 raise ValueError(f"Invalid digest length: {len(digest)}, expected 64")
49 # Sanitize op_name for filesystem (replace : with _)
50 safe_op_name = op_name.replace(":", "_").replace("/", "_")
52 # Three-level directory structure: op_name / first 2 chars / remaining 62 chars
53 dir_path = self.cache_dir / safe_op_name / digest[:2]
54 file_path = dir_path / digest[2:]
55 return file_path
57 def exists(self, op_name: str, digest: str) -> bool:
58 """Check if an artifact exists."""
59 path = self._get_path(op_name, digest)
60 exists = path.exists()
61 if exists:
62 self.stats.hits += 1
63 else:
64 self.stats.misses += 1
65 return exists
67 def get(self, op_name: str, digest: str) -> Any:
68 """Retrieve an artifact by operation name and digest.
70 Raises:
71 KeyError: If artifact does not exist.
72 """
73 path = self._get_path(op_name, digest)
75 if not path.exists():
76 raise KeyError(
77 f"Artifact with op_name '{op_name}' and digest '{digest}' not found"
78 )
80 # Read file
81 with open(path, "rb") as f:
82 data = f.read()
84 # Deserialize using codec
85 return deserialize(data)
87 def put(self, op_name: str, digest: str, artifact: Any) -> None:
88 """Store an artifact with the given operation name and digest."""
89 # Validate artifact is cacheable
90 if not is_cacheable(artifact):
91 raise TypeError(
92 f"Artifact is not cacheable: {type(artifact)}. "
93 f"Use is_cacheable() to check values before storing."
94 )
96 path = self._get_path(op_name, digest)
98 # Create parent directory if needed
99 path.parent.mkdir(parents=True, exist_ok=True)
101 # Serialize using codec
102 serialized_data = serialize(artifact)
104 # Write atomically (write to temp file, then rename)
105 temp_path = path.with_name(path.name + f".{threading.get_ident()}.tmp")
106 with open(temp_path, "wb") as f:
107 f.write(serialized_data)
108 temp_path.replace(path)
109 self.stats.puts += 1