Coverage for src / invariant / store / disk.py: 95.65%

46 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-20 16:05 +0000

1"""DiskStore: Filesystem-based artifact storage.""" 

2 

3from pathlib import Path 

4from typing import Any 

5 

6from invariant.cacheable import is_cacheable 

7from invariant.store.base import ArtifactStore 

8from invariant.store.codec import deserialize, serialize 

9 

10 

11class DiskStore(ArtifactStore): 

12 """Filesystem-based artifact store. 

13 

14 Stores artifacts in the local filesystem under `.invariant/cache/` 

15 using a two-level directory structure: `{digest[:2]}/{digest[2:]}` 

16 for efficient filesystem performance. 

17 """ 

18 

19 def __init__(self, cache_dir: Path | str | None = None) -> None: 

20 """Initialize DiskStore. 

21 

22 Args: 

23 cache_dir: Directory to store cache. Defaults to `.invariant/cache/` 

24 in the current working directory. 

25 """ 

26 if cache_dir is None: 

27 cache_dir = Path.cwd() / ".invariant" / "cache" 

28 elif isinstance(cache_dir, str): 

29 cache_dir = Path(cache_dir) 

30 

31 self.cache_dir = cache_dir 

32 self.cache_dir.mkdir(parents=True, exist_ok=True) 

33 super().__init__() 

34 

35 def _get_path(self, op_name: str, digest: str) -> Path: 

36 """Get filesystem path for an operation and digest. 

37 

38 Args: 

39 op_name: The name of the operation. 

40 digest: The SHA-256 hash (64 character hex string). 

41 

42 Returns: 

43 Path to the artifact file. 

44 """ 

45 if len(digest) != 64: 

46 raise ValueError(f"Invalid digest length: {len(digest)}, expected 64") 

47 

48 # Sanitize op_name for filesystem (replace : with _) 

49 safe_op_name = op_name.replace(":", "_").replace("/", "_") 

50 

51 # Three-level directory structure: op_name / first 2 chars / remaining 62 chars 

52 dir_path = self.cache_dir / safe_op_name / digest[:2] 

53 file_path = dir_path / digest[2:] 

54 return file_path 

55 

56 def exists(self, op_name: str, digest: str) -> bool: 

57 """Check if an artifact exists.""" 

58 path = self._get_path(op_name, digest) 

59 exists = path.exists() 

60 if exists: 

61 self.stats.hits += 1 

62 else: 

63 self.stats.misses += 1 

64 return exists 

65 

66 def get(self, op_name: str, digest: str) -> Any: 

67 """Retrieve an artifact by operation name and digest. 

68 

69 Raises: 

70 KeyError: If artifact does not exist. 

71 """ 

72 path = self._get_path(op_name, digest) 

73 

74 if not path.exists(): 

75 raise KeyError( 

76 f"Artifact with op_name '{op_name}' and digest '{digest}' not found" 

77 ) 

78 

79 # Read file 

80 with open(path, "rb") as f: 

81 data = f.read() 

82 

83 # Deserialize using codec 

84 return deserialize(data) 

85 

86 def put(self, op_name: str, digest: str, artifact: Any) -> None: 

87 """Store an artifact with the given operation name and digest.""" 

88 # Validate artifact is cacheable 

89 if not is_cacheable(artifact): 

90 raise TypeError( 

91 f"Artifact is not cacheable: {type(artifact)}. " 

92 f"Use is_cacheable() to check values before storing." 

93 ) 

94 

95 path = self._get_path(op_name, digest) 

96 

97 # Create parent directory if needed 

98 path.parent.mkdir(parents=True, exist_ok=True) 

99 

100 # Serialize using codec 

101 serialized_data = serialize(artifact) 

102 

103 # Write atomically (write to temp file, then rename) 

104 temp_path = path.with_suffix(path.suffix + ".tmp") 

105 with open(temp_path, "wb") as f: 

106 f.write(serialized_data) 

107 temp_path.replace(path) 

108 self.stats.puts += 1