rate_limiter.py•3.65 kB
"""
Rate Limiter Implementation
Token bucket algorithm for rate limiting API calls
"""
import asyncio
import time
from typing import Optional
from datetime import datetime
import structlog
logger = structlog.get_logger(__name__)
class RateLimiter:
"""
Token bucket rate limiter
Allows burst traffic up to burst_limit, then enforces
calls_per_minute rate limit.
"""
def __init__(
self,
calls_per_minute: int = 100,
burst_limit: Optional[int] = None
):
"""
Args:
calls_per_minute: Maximum calls allowed per minute
burst_limit: Maximum burst size (defaults to calls_per_minute)
"""
self.calls_per_minute = calls_per_minute
self.burst_limit = burst_limit or calls_per_minute
# Token bucket
self.tokens = float(self.burst_limit)
self.last_update = time.time()
# Stats
self.total_requests = 0
self.throttled_requests = 0
logger.info(
"Rate limiter initialized",
calls_per_minute=calls_per_minute,
burst_limit=self.burst_limit
)
async def acquire(self, tokens: int = 1) -> bool:
"""
Acquire tokens from bucket
Args:
tokens: Number of tokens to acquire
Returns:
True if acquired, False if rate limited
"""
self.total_requests += 1
# Refill tokens based on time elapsed
now = time.time()
elapsed = now - self.last_update
self.last_update = now
# Add tokens based on rate (calls per minute)
tokens_to_add = elapsed * (self.calls_per_minute / 60.0)
self.tokens = min(self.burst_limit, self.tokens + tokens_to_add)
# Check if enough tokens available
if self.tokens >= tokens:
self.tokens -= tokens
return True
# Rate limited
self.throttled_requests += 1
logger.warning(
"Rate limit exceeded",
tokens_available=self.tokens,
tokens_requested=tokens,
total_requests=self.total_requests,
throttled_requests=self.throttled_requests
)
return False
async def wait_for_token(self, tokens: int = 1):
"""
Wait until tokens are available (blocking)
Args:
tokens: Number of tokens needed
"""
while not await self.acquire(tokens):
# Calculate wait time
tokens_needed = tokens - self.tokens
wait_time = tokens_needed / (self.calls_per_minute / 60.0)
logger.debug(
"Waiting for rate limit",
wait_time=f"{wait_time:.2f}s",
tokens_needed=tokens_needed
)
await asyncio.sleep(wait_time)
def get_stats(self) -> dict:
"""Get rate limiter statistics"""
throttle_rate = (
self.throttled_requests / self.total_requests
if self.total_requests > 0
else 0.0
)
return {
"calls_per_minute": self.calls_per_minute,
"burst_limit": self.burst_limit,
"tokens_available": self.tokens,
"total_requests": self.total_requests,
"throttled_requests": self.throttled_requests,
"throttle_rate": throttle_rate
}
def reset(self):
"""Reset rate limiter"""
self.tokens = float(self.burst_limit)
self.last_update = time.time()
self.total_requests = 0
self.throttled_requests = 0
logger.info("Rate limiter reset")