Coverage for src / beautyspot / limiter.py: 92%

37 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-03-11 19:10 +0900

1# src/beautyspot/limiter.py 

2 

3import time 

4import asyncio 

5import threading 

6from typing import Protocol, runtime_checkable 

7 

8 

9@runtime_checkable 

10class LimiterProtocol(Protocol): 

11 def consume(self, cost: int) -> None: ... 

12 

13 async def consume_async(self, cost: int) -> None: ... 

14 

15 

16class TokenBucket(LimiterProtocol): 

17 """ 

18 A smooth rate limiter based on the GCRA (Generic Cell Rate Algorithm). 

19 

20 Features: 

21 - No burst after long idle (Strict Pacing). 

22 - No start-up delay for the very first request. 

23 - Fails fast if a task cost exceeds the TPM limit. 

24 - Thread-safe and Async-compatible. 

25 - Uses monotonic clock. 

26 """ 

27 

28 def __init__(self, tokens_per_minute: int): 

29 if tokens_per_minute <= 0: 

30 raise ValueError("tokens_per_minute must be positive") 

31 

32 # Rate: tokens per second 

33 self.rate = float(tokens_per_minute) / 60.0 

34 

35 # Maximum allowed cost per task. 

36 # A single task consuming more than the TPM limit is physically impossible 

37 # to process within the rate window, so it should be forbidden. 

38 self.max_cost = int(tokens_per_minute) 

39 

40 # Theoretical Arrival Time (TAT) 

41 self.tat = time.monotonic() 

42 self.lock = threading.Lock() 

43 

44 def _consume_reservation(self, cost: int) -> float: 

45 """ 

46 Calculates wait time and updates TAT. 

47 Returns seconds to wait. 

48 """ 

49 if cost <= 0: 49 ↛ 50line 49 didn't jump to line 50 because the condition on line 49 was never true

50 return 0.0 

51 

52 # Guard: Prevent requests that exceed the rate limit capacity entirely 

53 if cost > self.max_cost: 

54 raise ValueError( 

55 f"Requested cost ({cost}) exceeds the maximum limit of {self.max_cost} tokens per minute. " 

56 "This task cannot be processed within the defined rate limit." 

57 ) 

58 

59 increment = cost / self.rate 

60 

61 with self.lock: 

62 now = time.monotonic() 

63 if now > self.tat: 

64 self.tat = now 

65 

66 wait_time = self.tat - now 

67 if wait_time < 0: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true

68 wait_time = 0.0 

69 

70 self.tat += increment 

71 

72 return wait_time 

73 

74 def consume(self, cost: int): 

75 """ 

76 Acquire tokens from the bucket, blocking if necessary. 

77 

78 If the bucket does not have enough tokens, this method sleeps (blocks the thread) 

79 until the tokens become available based on the refill rate. 

80 

81 Args: 

82 cost (int): Number of tokens to consume. 

83 

84 Raises: 

85 ValueError: If `cost` exceeds the bucket's total capacity (`tpm`). 

86 (i.e., the request is too expensive to ever be processed) 

87 """ 

88 wait_time = self._consume_reservation(cost) 

89 if wait_time > 0: 

90 time.sleep(wait_time) 

91 

92 async def consume_async(self, cost: int): 

93 """ 

94 Acquire tokens asynchronously. 

95 

96 If the bucket does not have enough tokens, this method awaits (non-blocking sleep) 

97 until the tokens become available. 

98 

99 Args: 

100 cost (int): Number of tokens to consume. 

101 

102 Raises: 

103 ValueError: If `cost` exceeds the bucket's total capacity. 

104 """ 

105 wait_time = self._consume_reservation(cost) 

106 if wait_time > 0: 

107 await asyncio.sleep(wait_time)