Coverage for llm_dataset_engine/utils/rate_limiter.py: 100%
39 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
1"""
2Token bucket rate limiter for API calls.
4Implements token bucket algorithm for rate limiting with clean,
5testable design.
6"""
8import threading
9import time
10from typing import Optional
13class RateLimiter:
14 """
15 Token bucket rate limiter for controlling API request rates.
17 Thread-safe implementation following KISS principle.
18 """
20 def __init__(
21 self, requests_per_minute: int, burst_size: Optional[int] = None
22 ):
23 """
24 Initialize rate limiter.
26 Args:
27 requests_per_minute: Maximum requests per minute
28 burst_size: Maximum burst size (default: requests_per_minute)
29 """
30 self.rpm = requests_per_minute
31 self.capacity = burst_size or requests_per_minute
32 self.tokens = float(self.capacity)
33 self.last_update = time.time()
34 self.lock = threading.Lock()
36 # Calculate refill rate (tokens per second)
37 self.refill_rate = requests_per_minute / 60.0
39 def acquire(self, tokens: int = 1, timeout: Optional[float] = None) -> bool:
40 """
41 Acquire tokens for making requests.
43 Args:
44 tokens: Number of tokens to acquire
45 timeout: Maximum wait time in seconds (None = wait forever)
47 Returns:
48 True if tokens acquired, False if timeout
50 Raises:
51 ValueError: If tokens > capacity
52 """
53 if tokens > self.capacity:
54 raise ValueError(
55 f"Requested {tokens} tokens exceeds capacity {self.capacity}"
56 )
58 deadline = None if timeout is None else time.time() + timeout
60 while True:
61 with self.lock:
62 self._refill()
64 if self.tokens >= tokens:
65 self.tokens -= tokens
66 return True
68 # Check timeout
69 if deadline is not None and time.time() >= deadline:
70 return False
72 # Sleep before retry
73 time.sleep(0.1)
75 def _refill(self) -> None:
76 """Refill tokens based on elapsed time."""
77 now = time.time()
78 elapsed = now - self.last_update
80 # Add tokens based on refill rate
81 tokens_to_add = elapsed * self.refill_rate
82 self.tokens = min(self.capacity, self.tokens + tokens_to_add)
84 self.last_update = now
86 @property
87 def available_tokens(self) -> float:
88 """Get current available tokens."""
89 with self.lock:
90 self._refill()
91 return self.tokens
93 def reset(self) -> None:
94 """Reset rate limiter to full capacity."""
95 with self.lock:
96 self.tokens = float(self.capacity)
97 self.last_update = time.time()