Coverage for src / beautyspot / limiter.py: 92%
37 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-18 18:20 +0900
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-18 18:20 +0900
1# src/beautyspot/limiter.py
3import time
4import asyncio
5import threading
6from typing import Protocol, runtime_checkable
9@runtime_checkable
10class LimiterProtocol(Protocol):
11 def consume(self, cost: int) -> None: ...
13 async def consume_async(self, cost: int) -> None: ...
16class Gcra(LimiterProtocol):
17 """
18 A smooth rate limiter based on the GCRA (Generic Cell Rate Algorithm).
20 Features:
21 - No burst after long idle (Strict Pacing).
22 - No start-up delay for the very first request.
23 - Fails fast if a task cost exceeds the TPM limit.
24 - Thread-safe and Async-compatible.
25 - Uses monotonic clock.
26 """
28 def __init__(self, tokens_per_minute: int):
29 if tokens_per_minute <= 0:
30 raise ValueError("tokens_per_minute must be positive")
32 # Rate: tokens per second
33 self.rate = float(tokens_per_minute) / 60.0
35 # Maximum allowed cost per task.
36 # A single task consuming more than the TPM limit is physically impossible
37 # to process within the rate window, so it should be forbidden.
38 self.max_cost = int(tokens_per_minute)
40 # Theoretical Arrival Time (TAT)
41 self.tat = time.monotonic()
42 self.lock = threading.Lock()
44 def _consume_reservation(self, cost: int) -> float:
45 """
46 Calculates wait time and updates TAT.
47 Returns seconds to wait.
48 """
49 if cost <= 0: 49 ↛ 50line 49 didn't jump to line 50 because the condition on line 49 was never true
50 return 0.0
52 # Guard: Prevent requests that exceed the rate limit capacity entirely
53 if cost > self.max_cost:
54 raise ValueError(
55 f"Requested cost ({cost}) exceeds the maximum limit of {self.max_cost} tokens per minute. "
56 "This task cannot be processed within the defined rate limit."
57 )
59 increment = cost / self.rate
61 with self.lock:
62 now = time.monotonic()
63 if now > self.tat:
64 self.tat = now
66 wait_time = self.tat - now
67 if wait_time < 0: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true
68 wait_time = 0.0
70 self.tat += increment
72 return wait_time
74 def consume(self, cost: int):
75 """
76 Acquire tokens from the bucket, blocking if necessary.
78 If the bucket does not have enough tokens, this method sleeps (blocks the thread)
79 until the tokens become available based on the refill rate.
81 Args:
82 cost (int): Number of tokens to consume.
84 Raises:
85 ValueError: If `cost` exceeds the bucket's total capacity (`tpm`).
86 (i.e., the request is too expensive to ever be processed)
87 """
88 wait_time = self._consume_reservation(cost)
89 if wait_time > 0:
90 time.sleep(wait_time)
92 async def consume_async(self, cost: int):
93 """
94 Acquire tokens asynchronously.
96 If the bucket does not have enough tokens, this method awaits (non-blocking sleep)
97 until the tokens become available.
99 Args:
100 cost (int): Number of tokens to consume.
102 Raises:
103 ValueError: If `cost` exceeds the bucket's total capacity.
104 """
105 wait_time = self._consume_reservation(cost)
106 if wait_time > 0:
107 await asyncio.sleep(wait_time)