Coverage for src / invariant / store / disk.py: 95.74%

47 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 10:21 +0100

1"""DiskStore: Filesystem-based artifact storage.""" 

2 

3import threading 

4from pathlib import Path 

5from typing import Any 

6 

7from invariant.cacheable import is_cacheable 

8from invariant.store.base import ArtifactStore 

9from invariant.store.codec import deserialize, serialize 

10 

11 

12class DiskStore(ArtifactStore): 

13 """Filesystem-based artifact store. 

14 

15 Stores artifacts in the local filesystem under `.invariant/cache/` 

16 using a two-level directory structure: `{digest[:2]}/{digest[2:]}` 

17 for efficient filesystem performance. 

18 """ 

19 

20 def __init__(self, cache_dir: Path | str | None = None) -> None: 

21 """Initialize DiskStore. 

22 

23 Args: 

24 cache_dir: Directory to store cache. Defaults to `.invariant/cache/` 

25 in the current working directory. 

26 """ 

27 if cache_dir is None: 

28 cache_dir = Path.cwd() / ".invariant" / "cache" 

29 elif isinstance(cache_dir, str): 

30 cache_dir = Path(cache_dir) 

31 

32 self.cache_dir = cache_dir 

33 self.cache_dir.mkdir(parents=True, exist_ok=True) 

34 super().__init__() 

35 

36 def _get_path(self, op_name: str, digest: str) -> Path: 

37 """Get filesystem path for an operation and digest. 

38 

39 Args: 

40 op_name: The name of the operation. 

41 digest: The SHA-256 hash (64 character hex string). 

42 

43 Returns: 

44 Path to the artifact file. 

45 """ 

46 if len(digest) != 64: 

47 raise ValueError(f"Invalid digest length: {len(digest)}, expected 64") 

48 

49 # Sanitize op_name for filesystem (replace : with _) 

50 safe_op_name = op_name.replace(":", "_").replace("/", "_") 

51 

52 # Three-level directory structure: op_name / first 2 chars / remaining 62 chars 

53 dir_path = self.cache_dir / safe_op_name / digest[:2] 

54 file_path = dir_path / digest[2:] 

55 return file_path 

56 

57 def exists(self, op_name: str, digest: str) -> bool: 

58 """Check if an artifact exists.""" 

59 path = self._get_path(op_name, digest) 

60 exists = path.exists() 

61 if exists: 

62 self.stats.hits += 1 

63 else: 

64 self.stats.misses += 1 

65 return exists 

66 

67 def get(self, op_name: str, digest: str) -> Any: 

68 """Retrieve an artifact by operation name and digest. 

69 

70 Raises: 

71 KeyError: If artifact does not exist. 

72 """ 

73 path = self._get_path(op_name, digest) 

74 

75 if not path.exists(): 

76 raise KeyError( 

77 f"Artifact with op_name '{op_name}' and digest '{digest}' not found" 

78 ) 

79 

80 # Read file 

81 with open(path, "rb") as f: 

82 data = f.read() 

83 

84 # Deserialize using codec 

85 return deserialize(data) 

86 

87 def put(self, op_name: str, digest: str, artifact: Any) -> None: 

88 """Store an artifact with the given operation name and digest.""" 

89 # Validate artifact is cacheable 

90 if not is_cacheable(artifact): 

91 raise TypeError( 

92 f"Artifact is not cacheable: {type(artifact)}. " 

93 f"Use is_cacheable() to check values before storing." 

94 ) 

95 

96 path = self._get_path(op_name, digest) 

97 

98 # Create parent directory if needed 

99 path.parent.mkdir(parents=True, exist_ok=True) 

100 

101 # Serialize using codec 

102 serialized_data = serialize(artifact) 

103 

104 # Write atomically (write to temp file, then rename) 

105 temp_path = path.with_name(path.name + f".{threading.get_ident()}.tmp") 

106 with open(temp_path, "wb") as f: 

107 f.write(serialized_data) 

108 temp_path.replace(path) 

109 self.stats.puts += 1