Coverage for jinja2_async_environment/caching/typed.py: 44%
124 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 14:09 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 14:09 -0700
1"""Type-safe cache implementation with generic support."""
3import time
4import typing as t
5from dataclasses import dataclass
6from threading import RLock
8T = t.TypeVar("T")
11@dataclass
12class CacheEntry[T]:
13 """Cache entry with value, timestamp, and TTL information."""
15 value: T
16 timestamp: float
17 ttl: int
18 access_count: int = 0
19 last_access: float = 0.0
22class TypedCache[T]:
23 """Type-safe cache with TTL, LRU eviction, and memory management.
25 This cache provides:
26 - Type safety through generics
27 - TTL-based expiration
28 - LRU eviction when cache is full
29 - Thread-safe operations
30 - Memory usage tracking
31 """
33 def __init__(self, max_size: int = 1000, default_ttl: int = 300):
34 """Initialize the typed cache.
36 Args:
37 max_size: Maximum number of entries to store
38 default_ttl: Default time-to-live in seconds
39 """
40 self._cache: dict[str, CacheEntry[T]] = {}
41 self._max_size = max_size
42 self._default_ttl = default_ttl
43 self._access_order: list[str] = []
44 self._lock = RLock()
46 # Statistics
47 self._hits = 0
48 self._misses = 0
49 self._evictions = 0
51 def get(self, key: str) -> T | None:
52 """Get value from cache with TTL and LRU tracking.
54 Args:
55 key: Cache key
57 Returns:
58 Cached value or None if not found/expired
59 """
60 with self._lock:
61 if key not in self._cache:
62 self._misses += 1
63 return None
65 entry = self._cache[key]
66 current_time = time.time()
68 # Check TTL
69 if current_time - entry.timestamp > entry.ttl:
70 del self._cache[key]
71 if key in self._access_order:
72 self._access_order.remove(key)
73 self._misses += 1
74 return None
76 # Update access tracking
77 entry.access_count += 1
78 entry.last_access = current_time
80 # Update LRU order
81 if key in self._access_order:
82 self._access_order.remove(key)
83 self._access_order.append(key)
85 self._hits += 1
86 return entry.value
88 def set(self, key: str, value: T, ttl: int | None = None) -> None:
89 """Store value in cache with automatic eviction.
91 Args:
92 key: Cache key
93 value: Value to store
94 ttl: Time-to-live in seconds (uses default if None)
95 """
96 with self._lock:
97 current_time = time.time()
99 # Check if we need to evict entries
100 if len(self._cache) >= self._max_size and key not in self._cache:
101 self._evict_lru()
103 # Create cache entry
104 entry = CacheEntry(
105 value=value,
106 timestamp=current_time,
107 ttl=ttl or self._default_ttl,
108 access_count=1,
109 last_access=current_time,
110 )
112 self._cache[key] = entry
114 # Update access order
115 if key in self._access_order:
116 self._access_order.remove(key)
117 self._access_order.append(key)
119 def delete(self, key: str) -> bool:
120 """Delete a key from the cache.
122 Args:
123 key: Cache key to delete
125 Returns:
126 True if key was deleted, False if not found
127 """
128 with self._lock:
129 if key in self._cache:
130 del self._cache[key]
131 if key in self._access_order:
132 self._access_order.remove(key)
133 return True
134 return False
136 def clear(self) -> None:
137 """Clear all entries from the cache."""
138 with self._lock:
139 self._cache.clear()
140 self._access_order.clear()
141 self._hits = 0
142 self._misses = 0
143 self._evictions = 0
145 def _evict_lru(self) -> None:
146 """Evict least recently used entries to make space."""
147 # Evict 25% of entries to avoid frequent evictions
148 evict_count = max(1, self._max_size // 4)
150 for _ in range(min(evict_count, len(self._access_order))):
151 if self._access_order:
152 lru_key = self._access_order.pop(0)
153 if lru_key in self._cache:
154 del self._cache[lru_key]
155 self._evictions += 1
157 def cleanup_expired(self) -> int:
158 """Remove all expired entries.
160 Returns:
161 Number of entries removed
162 """
163 with self._lock:
164 current_time = time.time()
165 expired_keys = []
167 for key, entry in self._cache.items():
168 if current_time - entry.timestamp > entry.ttl:
169 expired_keys.append(key)
171 for key in expired_keys:
172 del self._cache[key]
173 if key in self._access_order:
174 self._access_order.remove(key)
176 return len(expired_keys)
178 def get_statistics(self) -> dict[str, t.Any]:
179 """Get cache statistics.
181 Returns:
182 Dictionary with cache statistics
183 """
184 with self._lock:
185 total_requests = self._hits + self._misses
186 hit_rate = self._hits / total_requests if total_requests > 0 else 0.0
188 return {
189 "size": len(self._cache),
190 "max_size": self._max_size,
191 "hits": self._hits,
192 "misses": self._misses,
193 "evictions": self._evictions,
194 "hit_rate": hit_rate,
195 "fill_ratio": len(self._cache) / self._max_size,
196 }
198 def resize(self, new_max_size: int) -> None:
199 """Resize the cache maximum size.
201 Args:
202 new_max_size: New maximum size for the cache
203 """
204 with self._lock:
205 self._max_size = new_max_size
207 # If new size is smaller, evict excess entries
208 if new_max_size < len(self._cache):
209 excess = len(self._cache) - new_max_size
210 for _ in range(excess):
211 if self._access_order:
212 lru_key = self._access_order.pop(0)
213 if lru_key in self._cache:
214 del self._cache[lru_key]
215 self._evictions += 1
217 def contains(self, key: str) -> bool:
218 """Check if key exists in cache (without updating access).
220 Args:
221 key: Cache key to check
223 Returns:
224 True if key exists and not expired, False otherwise
225 """
226 with self._lock:
227 if key not in self._cache:
228 return False
230 entry = self._cache[key]
231 current_time = time.time()
233 # Check TTL without updating access
234 if current_time - entry.timestamp > entry.ttl:
235 return False
237 return True
239 def keys(self) -> list[str]:
240 """Get all valid (non-expired) keys.
242 Returns:
243 List of valid cache keys
244 """
245 with self._lock:
246 current_time = time.time()
247 valid_keys = []
249 for key, entry in self._cache.items():
250 if current_time - entry.timestamp <= entry.ttl:
251 valid_keys.append(key)
253 return valid_keys
255 def __len__(self) -> int:
256 """Get number of entries in cache."""
257 return len(self._cache)
259 def __contains__(self, key: str) -> bool:
260 """Check if key is in cache (supports 'in' operator)."""
261 return self.contains(key)