Coverage for jinja2_async_environment / caching / strategies.py: 66%
241 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 21:26 -0800
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 21:26 -0800
1"""Advanced cache strategies and eviction policies."""
3import time
4import typing as t
5from dataclasses import dataclass
6from threading import RLock
8from .typed import CacheEntry, TypedCache
10T = t.TypeVar("T")
13@dataclass
14class CacheStatistics:
15 """Extended cache statistics for monitoring and optimization."""
17 hits: int = 0
18 misses: int = 0
19 evictions: int = 0
20 memory_pressure_evictions: int = 0
21 ttl_evictions: int = 0
22 avg_access_time: float = 0.0
23 peak_size: int = 0
24 cache_efficiency: float = 0.0
27class LFUCache(TypedCache[T]):
28 """Least Frequently Used cache with advanced statistics."""
30 def __init__(self, max_size: int = 1000, default_ttl: int = 300):
31 super().__init__(max_size, default_ttl)
32 self._access_counts: dict[str, int] = {}
33 self._statistics = CacheStatistics()
35 def get(self, key: str) -> T | None:
36 with self._lock:
37 start_time = time.time()
39 if key not in self._cache:
40 self._statistics.misses += 1
41 return None
43 entry = self._cache[key]
44 current_time = time.time()
46 # Check TTL
47 if current_time - entry.timestamp > entry.ttl:
48 del self._cache[key]
49 self._access_counts.pop(key, None)
50 if key in self._access_order:
51 self._access_order.remove(key)
52 self._statistics.misses += 1
53 self._statistics.ttl_evictions += 1
54 return None
56 # Update LFU tracking
57 self._access_counts[key] = self._access_counts.get(key, 0) + 1
58 entry.access_count += 1
59 entry.last_access = current_time
61 # Update LRU order
62 if key in self._access_order:
63 self._access_order.remove(key)
64 self._access_order.append(key)
66 self._statistics.hits += 1
67 access_time = time.time() - start_time
68 self._update_avg_access_time(access_time)
70 return entry.value
72 def set(self, key: str, value: T, ttl: int | None = None) -> None:
73 with self._lock:
74 current_time = time.time()
76 # Evict if needed
77 if len(self._cache) >= self._max_size and key not in self._cache:
78 self._evict_lfu()
80 # Create entry
81 entry = CacheEntry(
82 value=value,
83 timestamp=current_time,
84 ttl=ttl or self._default_ttl,
85 access_count=1,
86 last_access=current_time,
87 )
89 self._cache[key] = entry
90 self._access_counts[key] = 1
92 # Update LRU order
93 if key in self._access_order:
94 self._access_order.remove(key)
95 self._access_order.append(key)
97 # Update statistics
98 if len(self._cache) > self._statistics.peak_size:
99 self._statistics.peak_size = len(self._cache)
101 def _evict_lfu(self) -> None:
102 """Evict least frequently used entries."""
103 if not self._cache:
104 return
106 # Find LFU key
107 lfu_key = min(self._access_counts.keys(), key=lambda k: self._access_counts[k])
109 # Remove from all structures
110 self._cache.pop(lfu_key, None)
111 self._access_counts.pop(lfu_key, None)
112 if lfu_key in self._access_order:
113 self._access_order.remove(lfu_key)
115 self._statistics.evictions += 1
116 self._statistics.memory_pressure_evictions += 1
118 def _update_avg_access_time(self, access_time: float) -> None:
119 """Update running average of access times."""
120 total_accesses = self._statistics.hits + self._statistics.misses
121 if total_accesses == 1:
122 self._statistics.avg_access_time = access_time
123 else:
124 # Exponential moving average
125 alpha = 0.1
126 self._statistics.avg_access_time = (
127 alpha * access_time + (1 - alpha) * self._statistics.avg_access_time
128 )
130 def get_extended_statistics(self) -> dict[str, t.Any]:
131 """Get extended statistics for monitoring."""
132 base_stats = self.get_statistics()
134 total_requests = self._statistics.hits + self._statistics.misses
135 self._statistics.cache_efficiency = (
136 self._statistics.hits / total_requests if total_requests > 0 else 0.0
137 )
139 return base_stats | {
140 "access_counts": self._access_counts.copy(),
141 "avg_access_time_ms": self._statistics.avg_access_time * 1000,
142 "peak_size": self._statistics.peak_size,
143 "cache_efficiency": self._statistics.cache_efficiency,
144 "ttl_evictions": self._statistics.ttl_evictions,
145 "memory_pressure_evictions": self._statistics.memory_pressure_evictions,
146 }
149class AdaptiveCache(TypedCache[T]):
150 """Adaptive cache that switches between LRU and LFU based on access patterns."""
152 def __init__(self, max_size: int = 1000, default_ttl: int = 300):
153 super().__init__(max_size, default_ttl)
154 self._access_patterns: dict[str, list[float]] = {}
155 self._strategy: str = "lru" # "lru" or "lfu"
156 self._strategy_switches = 0
157 self._last_evaluation = time.time()
158 self._evaluation_interval = 300 # 5 minutes
160 def get(self, key: str) -> T | None:
161 with self._lock:
162 # Periodically evaluate and potentially switch strategy
163 if time.time() - self._last_evaluation > self._evaluation_interval:
164 self._evaluate_strategy()
166 # Track access pattern
167 current_time = time.time()
168 if key not in self._access_patterns:
169 self._access_patterns[key] = []
170 self._access_patterns[key].append(current_time)
172 # Keep only recent accesses (last hour)
173 cutoff_time = current_time - 3600
174 self._access_patterns[key] = [
175 t for t in self._access_patterns[key] if t > cutoff_time
176 ]
178 return super().get(key)
180 def _evaluate_strategy(self) -> None:
181 """Evaluate access patterns and potentially switch strategy."""
182 self._last_evaluation = time.time()
184 if not self._access_patterns:
185 return
187 # Calculate access frequency distribution
188 frequencies = [len(accesses) for accesses in self._access_patterns.values()]
190 if not frequencies:
191 return
193 # Calculate coefficient of variation
194 mean_freq = sum(frequencies) / len(frequencies)
195 if mean_freq == 0:
196 return
198 variance = sum((f - mean_freq) ** 2 for f in frequencies) / len(frequencies)
199 std_dev = variance**0.5
200 cv = std_dev / mean_freq
202 # Switch strategy based on access pattern uniformity
203 # High CV (> 0.5) suggests some items are accessed much more frequently → LFU
204 # Low CV (< 0.3) suggests relatively uniform access → LRU
205 old_strategy = self._strategy
206 if cv > 0.5 and self._strategy == "lru":
207 self._strategy = "lfu"
208 self._strategy_switches += 1
209 elif cv < 0.3 and self._strategy == "lfu":
210 self._strategy = "lru"
211 self._strategy_switches += 1
213 if old_strategy != self._strategy:
214 # Clear old tracking data when switching
215 self._access_patterns.clear()
217 def _evict_lru(self) -> None:
218 """Evict using current strategy."""
219 if self._strategy == "lfu":
220 self._evict_lfu_adaptive()
221 else:
222 super()._evict_lru()
224 def _evict_lfu_adaptive(self) -> None:
225 """Evict least frequently used entry."""
226 if not self._access_patterns:
227 super()._evict_lru()
228 return
230 # Find key with lowest access frequency
231 lfu_key = min(
232 self._access_patterns.keys(),
233 key=lambda k: len(self._access_patterns.get(k, [])),
234 )
236 if lfu_key in self._cache:
237 del self._cache[lfu_key]
238 if lfu_key in self._access_order:
239 self._access_order.remove(lfu_key)
240 self._evictions += 1
242 self._access_patterns.pop(lfu_key, None)
244 def get_strategy_info(self) -> dict[str, t.Any]:
245 """Get information about current strategy and switches."""
246 return {
247 "current_strategy": self._strategy,
248 "strategy_switches": self._strategy_switches,
249 "last_evaluation": self._last_evaluation,
250 "access_pattern_keys": len(self._access_patterns),
251 }
254class HierarchicalCache[T]:
255 """Multi-level cache with different strategies per level."""
257 def __init__(
258 self,
259 l1_size: int = 100,
260 l2_size: int = 1000,
261 l1_ttl: int = 60,
262 l2_ttl: int = 300,
263 ):
264 """Initialize hierarchical cache.
266 Args:
267 l1_size: Size of L1 (fastest) cache
268 l2_size: Size of L2 (larger) cache
269 l1_ttl: TTL for L1 cache entries
270 l2_ttl: TTL for L2 cache entries
271 """
272 self.l1_cache = TypedCache[T](max_size=l1_size, default_ttl=l1_ttl)
273 self.l2_cache = LFUCache[T](max_size=l2_size, default_ttl=l2_ttl)
274 self._lock = RLock()
276 # Statistics
277 self.l1_hits = 0
278 self.l2_hits = 0
279 self.total_misses = 0
280 self.promotions = 0 # L2 → L1 promotions
282 @property
283 def _default_ttl(self) -> int:
284 """Get the default TTL for this cache."""
285 return self.l2_cache._default_ttl
287 @_default_ttl.setter
288 def _default_ttl(self, value: int) -> None:
289 """Set the default TTL for this cache."""
290 self.l1_cache._default_ttl = value // 5 # L1 TTL is 1/5 of L2
291 self.l2_cache._default_ttl = value
293 def get(self, key: str) -> T | None:
294 """Get value from hierarchical cache."""
295 with self._lock:
296 # Try L1 first
297 value = self.l1_cache.get(key)
298 if value is not None:
299 self.l1_hits += 1
300 return value
302 # Try L2
303 value = self.l2_cache.get(key)
304 if value is not None:
305 self.l2_hits += 1
306 # Promote frequently accessed items to L1
307 self._consider_promotion(key, value)
308 return value
310 # Miss in both levels
311 self.total_misses += 1
312 return None
314 def set(self, key: str, value: T, ttl: int | None = None) -> None:
315 """Set value in hierarchical cache."""
316 with self._lock:
317 # Always start in L2, promote to L1 based on access patterns
318 self.l2_cache.set(key, value, ttl)
320 def _consider_promotion(self, key: str, value: T) -> None:
321 """Consider promoting item from L2 to L1."""
322 # Get access count from L2
323 l2_entry = self.l2_cache._cache.get(key)
324 if l2_entry and l2_entry.access_count >= 3: # Promote after 3 accesses
325 # Promote to L1
326 self.l1_cache.set(key, value, l2_entry.ttl)
327 self.promotions += 1
329 def clear(self) -> None:
330 """Clear all cache levels."""
331 with self._lock:
332 self.l1_cache.clear()
333 self.l2_cache.clear()
334 self.l1_hits = 0
335 self.l2_hits = 0
336 self.total_misses = 0
337 self.promotions = 0
339 def cleanup_expired(self) -> int:
340 """Clean up expired entries from both cache levels.
342 Returns:
343 Number of expired entries removed
344 """
345 with self._lock:
346 l1_expired = self.l1_cache.cleanup_expired()
347 l2_expired = self.l2_cache.cleanup_expired()
348 return l1_expired + l2_expired
350 def resize(self, new_max_size: int) -> None:
351 """Resize the cache maximum size.
353 For hierarchical cache, we split the size between L1 and L2.
355 Args:
356 new_max_size: New maximum size for the cache
357 """
358 with self._lock:
359 # Split size between L1 and L2 (80% to L2, 20% to L1)
360 l1_size = max(1, new_max_size // 5)
361 l2_size = new_max_size - l1_size
362 self.l1_cache.resize(l1_size)
363 self.l2_cache.resize(l2_size)
365 def delete(self, key: str) -> bool:
366 """Delete a key from the cache.
368 Args:
369 key: Cache key to delete
371 Returns:
372 True if key was deleted, False if not found
373 """
374 with self._lock:
375 # Try to delete from both caches
376 l1_deleted = self.l1_cache.delete(key)
377 l2_deleted = self.l2_cache.delete(key)
378 return l1_deleted or l2_deleted
380 def get_statistics(self) -> dict[str, t.Any]:
381 """Get comprehensive statistics for all cache levels."""
382 total_requests = self.l1_hits + self.l2_hits + self.total_misses
384 return {
385 "l1_cache": self.l1_cache.get_statistics(),
386 "l2_cache": self.l2_cache.get_extended_statistics(),
387 "l1_hits": self.l1_hits,
388 "l2_hits": self.l2_hits,
389 "total_misses": self.total_misses,
390 "promotions": self.promotions,
391 "overall_hit_rate": (
392 (self.l1_hits + self.l2_hits) / total_requests
393 if total_requests > 0
394 else 0.0
395 ),
396 "l1_hit_rate": (
397 self.l1_hits / total_requests if total_requests > 0 else 0.0
398 ),
399 "size": len(self.l1_cache) + len(self.l2_cache),
400 "max_size": self.l1_cache._max_size + self.l2_cache._max_size,
401 "hits": self.l1_hits + self.l2_hits,
402 "misses": self.total_misses,
403 "evictions": self.l1_cache._evictions + self.l2_cache._evictions,
404 "hit_rate": (
405 (self.l1_hits + self.l2_hits) / total_requests
406 if total_requests > 0
407 else 0.0
408 ),
409 "fill_ratio": (
410 (len(self.l1_cache) + len(self.l2_cache))
411 / (self.l1_cache._max_size + self.l2_cache._max_size)
412 ),
413 }
415 def __len__(self) -> int:
416 """Get total number of entries across all cache levels."""
417 return len(self.l1_cache) + len(self.l2_cache)
420class CacheWarmer:
421 """Utility for warming caches with commonly used templates."""
423 def __init__(self, cache_manager: t.Any):
424 """Initialize cache warmer.
426 Args:
427 cache_manager: Cache manager to warm
428 """
429 self.cache_manager = cache_manager
430 self._warmed_keys: set[str] = set()
432 async def warm_template_cache(
433 self,
434 environment: t.Any,
435 template_names: list[str],
436 context_data: dict[str, t.Any] | None = None,
437 ) -> dict[str, bool]:
438 """Warm template cache by pre-loading common templates.
440 Args:
441 environment: AsyncEnvironment instance
442 template_names: List of template names to warm
443 context_data: Optional context data for compilation
445 Returns:
446 Dictionary mapping template names to success status
447 """
448 results = {}
449 context_data = context_data or {}
451 for template_name in template_names:
452 try:
453 # Pre-load template to warm the cache
454 template = await environment.get_template_async(template_name)
456 # Also warm compilation cache if context provided
457 if context_data:
458 await template.render_async(**context_data)
460 self._warmed_keys.add(template_name)
461 results[template_name] = True
463 except Exception:
464 results[template_name] = False
466 return results
468 def warm_package_cache(self, package_names: list[str]) -> dict[str, bool]:
469 """Warm package cache by pre-loading package specs.
471 Args:
472 package_names: List of package names to warm
474 Returns:
475 Dictionary mapping package names to success status
476 """
477 results = {}
479 for package_name in package_names:
480 try:
481 import importlib.util
483 spec = importlib.util.find_spec(package_name)
484 if spec and spec.loader:
485 # Cache the package spec
486 cache_key = f"spec:{package_name}"
487 self.cache_manager.set("package", cache_key, (spec.loader, spec))
488 self._warmed_keys.add(cache_key)
489 results[package_name] = True
490 else:
491 results[package_name] = False
493 except Exception:
494 results[package_name] = False
496 return results
498 def get_warmed_keys(self) -> set[str]:
499 """Get set of keys that have been warmed."""
500 return self._warmed_keys.copy()
502 def clear_warmed_tracking(self) -> None:
503 """Clear the tracking of warmed keys."""
504 self._warmed_keys.clear()