Coverage for jinja2_async_environment/caching/strategies.py: 20%
220 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 14:09 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 14:09 -0700
1"""Advanced cache strategies and eviction policies."""
3import time
4import typing as t
5from dataclasses import dataclass
6from threading import RLock
8from .typed import CacheEntry, TypedCache
10T = t.TypeVar("T")
13@dataclass
14class CacheStatistics:
15 """Extended cache statistics for monitoring and optimization."""
17 hits: int = 0
18 misses: int = 0
19 evictions: int = 0
20 memory_pressure_evictions: int = 0
21 ttl_evictions: int = 0
22 avg_access_time: float = 0.0
23 peak_size: int = 0
24 cache_efficiency: float = 0.0
27class LFUCache(TypedCache[T]):
28 """Least Frequently Used cache with advanced statistics."""
30 def __init__(self, max_size: int = 1000, default_ttl: int = 300):
31 super().__init__(max_size, default_ttl)
32 self._access_counts: dict[str, int] = {}
33 self._statistics = CacheStatistics()
35 def get(self, key: str) -> T | None:
36 with self._lock:
37 start_time = time.time()
39 if key not in self._cache:
40 self._statistics.misses += 1
41 return None
43 entry = self._cache[key]
44 current_time = time.time()
46 # Check TTL
47 if current_time - entry.timestamp > entry.ttl:
48 del self._cache[key]
49 self._access_counts.pop(key, None)
50 if key in self._access_order:
51 self._access_order.remove(key)
52 self._statistics.misses += 1
53 self._statistics.ttl_evictions += 1
54 return None
56 # Update LFU tracking
57 self._access_counts[key] = self._access_counts.get(key, 0) + 1
58 entry.access_count += 1
59 entry.last_access = current_time
61 # Update LRU order
62 if key in self._access_order:
63 self._access_order.remove(key)
64 self._access_order.append(key)
66 self._statistics.hits += 1
67 access_time = time.time() - start_time
68 self._update_avg_access_time(access_time)
70 return entry.value
72 def set(self, key: str, value: T, ttl: int | None = None) -> None:
73 with self._lock:
74 current_time = time.time()
76 # Evict if needed
77 if len(self._cache) >= self._max_size and key not in self._cache:
78 self._evict_lfu()
80 # Create entry
81 entry = CacheEntry(
82 value=value,
83 timestamp=current_time,
84 ttl=ttl or self._default_ttl,
85 access_count=1,
86 last_access=current_time,
87 )
89 self._cache[key] = entry
90 self._access_counts[key] = 1
92 # Update LRU order
93 if key in self._access_order:
94 self._access_order.remove(key)
95 self._access_order.append(key)
97 # Update statistics
98 if len(self._cache) > self._statistics.peak_size:
99 self._statistics.peak_size = len(self._cache)
101 def _evict_lfu(self) -> None:
102 """Evict least frequently used entries."""
103 if not self._cache:
104 return
106 # Find LFU key
107 lfu_key = min(self._access_counts.keys(), key=lambda k: self._access_counts[k])
109 # Remove from all structures
110 self._cache.pop(lfu_key, None)
111 self._access_counts.pop(lfu_key, None)
112 if lfu_key in self._access_order:
113 self._access_order.remove(lfu_key)
115 self._statistics.evictions += 1
116 self._statistics.memory_pressure_evictions += 1
118 def _update_avg_access_time(self, access_time: float) -> None:
119 """Update running average of access times."""
120 total_accesses = self._statistics.hits + self._statistics.misses
121 if total_accesses == 1:
122 self._statistics.avg_access_time = access_time
123 else:
124 # Exponential moving average
125 alpha = 0.1
126 self._statistics.avg_access_time = (
127 alpha * access_time + (1 - alpha) * self._statistics.avg_access_time
128 )
130 def get_extended_statistics(self) -> dict[str, t.Any]:
131 """Get extended statistics for monitoring."""
132 base_stats = self.get_statistics()
134 total_requests = self._statistics.hits + self._statistics.misses
135 self._statistics.cache_efficiency = (
136 self._statistics.hits / total_requests if total_requests > 0 else 0.0
137 )
139 return {
140 **base_stats,
141 "access_counts": dict(self._access_counts),
142 "avg_access_time_ms": self._statistics.avg_access_time * 1000,
143 "peak_size": self._statistics.peak_size,
144 "cache_efficiency": self._statistics.cache_efficiency,
145 "ttl_evictions": self._statistics.ttl_evictions,
146 "memory_pressure_evictions": self._statistics.memory_pressure_evictions,
147 }
150class AdaptiveCache(TypedCache[T]):
151 """Adaptive cache that switches between LRU and LFU based on access patterns."""
153 def __init__(self, max_size: int = 1000, default_ttl: int = 300):
154 super().__init__(max_size, default_ttl)
155 self._access_patterns: dict[str, list[float]] = {}
156 self._strategy: str = "lru" # "lru" or "lfu"
157 self._strategy_switches = 0
158 self._last_evaluation = time.time()
159 self._evaluation_interval = 300 # 5 minutes
161 def get(self, key: str) -> T | None:
162 with self._lock:
163 # Periodically evaluate and potentially switch strategy
164 if time.time() - self._last_evaluation > self._evaluation_interval:
165 self._evaluate_strategy()
167 # Track access pattern
168 current_time = time.time()
169 if key not in self._access_patterns:
170 self._access_patterns[key] = []
171 self._access_patterns[key].append(current_time)
173 # Keep only recent accesses (last hour)
174 cutoff_time = current_time - 3600
175 self._access_patterns[key] = [
176 t for t in self._access_patterns[key] if t > cutoff_time
177 ]
179 return super().get(key)
181 def _evaluate_strategy(self) -> None:
182 """Evaluate access patterns and potentially switch strategy."""
183 self._last_evaluation = time.time()
185 if not self._access_patterns:
186 return
188 # Calculate access frequency distribution
189 frequencies = []
190 for accesses in self._access_patterns.values():
191 frequencies.append(len(accesses))
193 if not frequencies:
194 return
196 # Calculate coefficient of variation
197 mean_freq = sum(frequencies) / len(frequencies)
198 if mean_freq == 0:
199 return
201 variance = sum((f - mean_freq) ** 2 for f in frequencies) / len(frequencies)
202 std_dev = variance**0.5
203 cv = std_dev / mean_freq
205 # Switch strategy based on access pattern uniformity
206 # High CV (> 0.5) suggests some items are accessed much more frequently → LFU
207 # Low CV (< 0.3) suggests relatively uniform access → LRU
208 old_strategy = self._strategy
209 if cv > 0.5 and self._strategy == "lru":
210 self._strategy = "lfu"
211 self._strategy_switches += 1
212 elif cv < 0.3 and self._strategy == "lfu":
213 self._strategy = "lru"
214 self._strategy_switches += 1
216 if old_strategy != self._strategy:
217 # Clear old tracking data when switching
218 self._access_patterns.clear()
220 def _evict_lru(self) -> None:
221 """Evict using current strategy."""
222 if self._strategy == "lfu":
223 self._evict_lfu_adaptive()
224 else:
225 super()._evict_lru()
227 def _evict_lfu_adaptive(self) -> None:
228 """Evict least frequently used entry."""
229 if not self._access_patterns:
230 super()._evict_lru()
231 return
233 # Find key with lowest access frequency
234 lfu_key = min(
235 self._access_patterns.keys(),
236 key=lambda k: len(self._access_patterns.get(k, [])),
237 )
239 if lfu_key in self._cache:
240 del self._cache[lfu_key]
241 if lfu_key in self._access_order:
242 self._access_order.remove(lfu_key)
243 self._evictions += 1
245 self._access_patterns.pop(lfu_key, None)
247 def get_strategy_info(self) -> dict[str, t.Any]:
248 """Get information about current strategy and switches."""
249 return {
250 "current_strategy": self._strategy,
251 "strategy_switches": self._strategy_switches,
252 "last_evaluation": self._last_evaluation,
253 "access_pattern_keys": len(self._access_patterns),
254 }
257class HierarchicalCache[T]:
258 """Multi-level cache with different strategies per level."""
260 def __init__(
261 self,
262 l1_size: int = 100,
263 l2_size: int = 1000,
264 l1_ttl: int = 60,
265 l2_ttl: int = 300,
266 ):
267 """Initialize hierarchical cache.
269 Args:
270 l1_size: Size of L1 (fastest) cache
271 l2_size: Size of L2 (larger) cache
272 l1_ttl: TTL for L1 cache entries
273 l2_ttl: TTL for L2 cache entries
274 """
275 self.l1_cache = TypedCache[T](max_size=l1_size, default_ttl=l1_ttl)
276 self.l2_cache = LFUCache[T](max_size=l2_size, default_ttl=l2_ttl)
277 self._lock = RLock()
279 # Statistics
280 self.l1_hits = 0
281 self.l2_hits = 0
282 self.total_misses = 0
283 self.promotions = 0 # L2 → L1 promotions
285 def get(self, key: str) -> T | None:
286 """Get value from hierarchical cache."""
287 with self._lock:
288 # Try L1 first
289 value = self.l1_cache.get(key)
290 if value is not None:
291 self.l1_hits += 1
292 return value
294 # Try L2
295 value = self.l2_cache.get(key)
296 if value is not None:
297 self.l2_hits += 1
298 # Promote frequently accessed items to L1
299 self._consider_promotion(key, value)
300 return value
302 # Miss in both levels
303 self.total_misses += 1
304 return None
306 def set(self, key: str, value: T, ttl: int | None = None) -> None:
307 """Set value in hierarchical cache."""
308 with self._lock:
309 # Always start in L2, promote to L1 based on access patterns
310 self.l2_cache.set(key, value, ttl)
312 def _consider_promotion(self, key: str, value: T) -> None:
313 """Consider promoting item from L2 to L1."""
314 # Get access count from L2
315 l2_entry = self.l2_cache._cache.get(key)
316 if l2_entry and l2_entry.access_count >= 3: # Promote after 3 accesses
317 # Promote to L1
318 self.l1_cache.set(key, value, l2_entry.ttl)
319 self.promotions += 1
321 def clear(self) -> None:
322 """Clear all cache levels."""
323 with self._lock:
324 self.l1_cache.clear()
325 self.l2_cache.clear()
326 self.l1_hits = 0
327 self.l2_hits = 0
328 self.total_misses = 0
329 self.promotions = 0
331 def get_statistics(self) -> dict[str, t.Any]:
332 """Get comprehensive statistics for all cache levels."""
333 total_requests = self.l1_hits + self.l2_hits + self.total_misses
335 return {
336 "l1_cache": self.l1_cache.get_statistics(),
337 "l2_cache": self.l2_cache.get_extended_statistics(),
338 "l1_hits": self.l1_hits,
339 "l2_hits": self.l2_hits,
340 "total_misses": self.total_misses,
341 "promotions": self.promotions,
342 "overall_hit_rate": (
343 (self.l1_hits + self.l2_hits) / total_requests
344 if total_requests > 0
345 else 0.0
346 ),
347 "l1_hit_rate": (
348 self.l1_hits / total_requests if total_requests > 0 else 0.0
349 ),
350 "size": len(self.l1_cache) + len(self.l2_cache),
351 "max_size": self.l1_cache._max_size + self.l2_cache._max_size,
352 "hits": self.l1_hits + self.l2_hits,
353 "misses": self.total_misses,
354 "evictions": self.l1_cache._evictions + self.l2_cache._evictions,
355 "hit_rate": (
356 (self.l1_hits + self.l2_hits) / total_requests
357 if total_requests > 0
358 else 0.0
359 ),
360 "fill_ratio": (
361 (len(self.l1_cache) + len(self.l2_cache))
362 / (self.l1_cache._max_size + self.l2_cache._max_size)
363 ),
364 }
366 def __len__(self) -> int:
367 """Get total number of entries across all cache levels."""
368 return len(self.l1_cache) + len(self.l2_cache)
371class CacheWarmer:
372 """Utility for warming caches with commonly used templates."""
374 def __init__(self, cache_manager: t.Any):
375 """Initialize cache warmer.
377 Args:
378 cache_manager: Cache manager to warm
379 """
380 self.cache_manager = cache_manager
381 self._warmed_keys: set[str] = set()
383 async def warm_template_cache(
384 self,
385 environment: t.Any,
386 template_names: list[str],
387 context_data: dict[str, t.Any] | None = None,
388 ) -> dict[str, bool]:
389 """Warm template cache by pre-loading common templates.
391 Args:
392 environment: AsyncEnvironment instance
393 template_names: List of template names to warm
394 context_data: Optional context data for compilation
396 Returns:
397 Dictionary mapping template names to success status
398 """
399 results = {}
400 context_data = context_data or {}
402 for template_name in template_names:
403 try:
404 # Pre-load template to warm the cache
405 template = await environment.get_template_async(template_name)
407 # Also warm compilation cache if context provided
408 if context_data:
409 await template.render_async(**context_data)
411 self._warmed_keys.add(template_name)
412 results[template_name] = True
414 except Exception:
415 results[template_name] = False
417 return results
419 def warm_package_cache(self, package_names: list[str]) -> dict[str, bool]:
420 """Warm package cache by pre-loading package specs.
422 Args:
423 package_names: List of package names to warm
425 Returns:
426 Dictionary mapping package names to success status
427 """
428 results = {}
430 for package_name in package_names:
431 try:
432 import importlib.util
434 spec = importlib.util.find_spec(package_name)
435 if spec and spec.loader:
436 # Cache the package spec
437 cache_key = f"spec:{package_name}"
438 self.cache_manager.set("package", cache_key, (spec.loader, spec))
439 self._warmed_keys.add(cache_key)
440 results[package_name] = True
441 else:
442 results[package_name] = False
444 except Exception:
445 results[package_name] = False
447 return results
449 def get_warmed_keys(self) -> set[str]:
450 """Get set of keys that have been warmed."""
451 return self._warmed_keys.copy()
453 def clear_warmed_tracking(self) -> None:
454 """Clear the tracking of warmed keys."""
455 self._warmed_keys.clear()