Skip to main content
Glama
rate_limiter.py3.65 kB
""" Rate Limiter Implementation Token bucket algorithm for rate limiting API calls """ import asyncio import time from typing import Optional from datetime import datetime import structlog logger = structlog.get_logger(__name__) class RateLimiter: """ Token bucket rate limiter Allows burst traffic up to burst_limit, then enforces calls_per_minute rate limit. """ def __init__( self, calls_per_minute: int = 100, burst_limit: Optional[int] = None ): """ Args: calls_per_minute: Maximum calls allowed per minute burst_limit: Maximum burst size (defaults to calls_per_minute) """ self.calls_per_minute = calls_per_minute self.burst_limit = burst_limit or calls_per_minute # Token bucket self.tokens = float(self.burst_limit) self.last_update = time.time() # Stats self.total_requests = 0 self.throttled_requests = 0 logger.info( "Rate limiter initialized", calls_per_minute=calls_per_minute, burst_limit=self.burst_limit ) async def acquire(self, tokens: int = 1) -> bool: """ Acquire tokens from bucket Args: tokens: Number of tokens to acquire Returns: True if acquired, False if rate limited """ self.total_requests += 1 # Refill tokens based on time elapsed now = time.time() elapsed = now - self.last_update self.last_update = now # Add tokens based on rate (calls per minute) tokens_to_add = elapsed * (self.calls_per_minute / 60.0) self.tokens = min(self.burst_limit, self.tokens + tokens_to_add) # Check if enough tokens available if self.tokens >= tokens: self.tokens -= tokens return True # Rate limited self.throttled_requests += 1 logger.warning( "Rate limit exceeded", tokens_available=self.tokens, tokens_requested=tokens, total_requests=self.total_requests, throttled_requests=self.throttled_requests ) return False async def wait_for_token(self, tokens: int = 1): """ Wait until tokens are available (blocking) Args: tokens: Number of tokens needed """ while not await self.acquire(tokens): # Calculate wait time tokens_needed = tokens - self.tokens wait_time = tokens_needed / (self.calls_per_minute / 60.0) logger.debug( "Waiting for rate limit", wait_time=f"{wait_time:.2f}s", tokens_needed=tokens_needed ) await asyncio.sleep(wait_time) def get_stats(self) -> dict: """Get rate limiter statistics""" throttle_rate = ( self.throttled_requests / self.total_requests if self.total_requests > 0 else 0.0 ) return { "calls_per_minute": self.calls_per_minute, "burst_limit": self.burst_limit, "tokens_available": self.tokens, "total_requests": self.total_requests, "throttled_requests": self.throttled_requests, "throttle_rate": throttle_rate } def reset(self): """Reset rate limiter""" self.tokens = float(self.burst_limit) self.last_update = time.time() self.total_requests = 0 self.throttled_requests = 0 logger.info("Rate limiter reset")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pbulbule13/mcpwithgoogle'

If you have feedback or need assistance with the MCP directory API, please join our Discord server