Coverage for llm_dataset_engine/utils/rate_limiter.py: 100%

39 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-15 18:04 +0200

1""" 

2Token bucket rate limiter for API calls. 

3 

4Implements token bucket algorithm for rate limiting with clean, 

5testable design. 

6""" 

7 

8import threading 

9import time 

10from typing import Optional 

11 

12 

13class RateLimiter: 

14 """ 

15 Token bucket rate limiter for controlling API request rates. 

16  

17 Thread-safe implementation following KISS principle. 

18 """ 

19 

20 def __init__( 

21 self, requests_per_minute: int, burst_size: Optional[int] = None 

22 ): 

23 """ 

24 Initialize rate limiter. 

25 

26 Args: 

27 requests_per_minute: Maximum requests per minute 

28 burst_size: Maximum burst size (default: requests_per_minute) 

29 """ 

30 self.rpm = requests_per_minute 

31 self.capacity = burst_size or requests_per_minute 

32 self.tokens = float(self.capacity) 

33 self.last_update = time.time() 

34 self.lock = threading.Lock() 

35 

36 # Calculate refill rate (tokens per second) 

37 self.refill_rate = requests_per_minute / 60.0 

38 

39 def acquire(self, tokens: int = 1, timeout: Optional[float] = None) -> bool: 

40 """ 

41 Acquire tokens for making requests. 

42 

43 Args: 

44 tokens: Number of tokens to acquire 

45 timeout: Maximum wait time in seconds (None = wait forever) 

46 

47 Returns: 

48 True if tokens acquired, False if timeout 

49 

50 Raises: 

51 ValueError: If tokens > capacity 

52 """ 

53 if tokens > self.capacity: 

54 raise ValueError( 

55 f"Requested {tokens} tokens exceeds capacity {self.capacity}" 

56 ) 

57 

58 deadline = None if timeout is None else time.time() + timeout 

59 

60 while True: 

61 with self.lock: 

62 self._refill() 

63 

64 if self.tokens >= tokens: 

65 self.tokens -= tokens 

66 return True 

67 

68 # Check timeout 

69 if deadline is not None and time.time() >= deadline: 

70 return False 

71 

72 # Sleep before retry 

73 time.sleep(0.1) 

74 

75 def _refill(self) -> None: 

76 """Refill tokens based on elapsed time.""" 

77 now = time.time() 

78 elapsed = now - self.last_update 

79 

80 # Add tokens based on refill rate 

81 tokens_to_add = elapsed * self.refill_rate 

82 self.tokens = min(self.capacity, self.tokens + tokens_to_add) 

83 

84 self.last_update = now 

85 

86 @property 

87 def available_tokens(self) -> float: 

88 """Get current available tokens.""" 

89 with self.lock: 

90 self._refill() 

91 return self.tokens 

92 

93 def reset(self) -> None: 

94 """Reset rate limiter to full capacity.""" 

95 with self.lock: 

96 self.tokens = float(self.capacity) 

97 self.last_update = time.time() 

98