Skip to main content
Glama
tokens.pyโ€ข13.3 kB
#!/usr/bin/env python3 """ Token estimation and metrics utilities. Provides accurate token counting using tiktoken, usage tracking, and budget management for response optimization. """ import json import time import os from typing import Dict, Any, Optional, Union, List from dataclasses import dataclass, field from pathlib import Path # Try to import tiktoken, fall back to basic estimation if not available try: import tiktoken TIKTOKEN_AVAILABLE = True except ImportError: TIKTOKEN_AVAILABLE = False import warnings warnings.warn("tiktoken not available, using basic token estimation. Install with: pip install tiktoken") @dataclass class TokenMetrics: """Token usage metrics for a single operation.""" operation: str input_tokens: int output_tokens: int total_tokens: int timestamp: float = field(default_factory=time.time) compact_mode: bool = False page_size: int = 0 response_compressed: bool = False @dataclass class TokenBudget: """Token budget configuration and tracking.""" daily_limit: int operation_limit: int warning_threshold: float = 0.8 # Warn at 80% of limit current_daily_usage: int = 0 current_operation_count: int = 0 last_reset_time: float = field(default_factory=time.time) def should_warn(self, tokens: int) -> bool: """Check if token usage should trigger warning.""" return (tokens / self.operation_limit) >= self.warning_threshold def is_over_limit(self, tokens: int) -> bool: """Check if tokens exceed operation limit.""" return tokens > self.operation_limit def needs_daily_reset(self) -> bool: """Check if daily counter needs reset (24h cycle).""" hours_since_reset = (time.time() - self.last_reset_time) / 3600 return hours_since_reset >= 24 def reset_daily(self): """Reset daily usage counter.""" self.current_daily_usage = 0 self.last_reset_time = time.time() class TokenEstimator: """Token estimation and budget management using tiktoken for accuracy.""" def __init__(self, model: str = "gpt-4", daily_limit: int = 100000, operation_limit: int = 8000): self.model = model self.budget = self._load_budget_config(daily_limit, operation_limit) self.metrics_history: list[TokenMetrics] = [] self.max_history_size = 1000 # Initialize tiktoken encoder if TIKTOKEN_AVAILABLE: try: self.encoder = tiktoken.encoding_for_model(model) except KeyError: # Fallback to cl100k_base (GPT-4) if model not found self.encoder = tiktoken.get_encoding("cl100k_base") else: self.encoder = None # Create metrics directory if needed self.metrics_dir = Path.home() / ".scribe_metrics" self.metrics_dir.mkdir(exist_ok=True) self.metrics_file = self.metrics_dir / "token_usage.json" def _load_budget_config(self, daily_limit: int = 100000, operation_limit: int = 8000) -> TokenBudget: """Load token budget configuration from environment or parameters.""" daily_limit_env = int(os.getenv("SCRIBE_TOKEN_DAILY_LIMIT", daily_limit)) operation_limit_env = int(os.getenv("SCRIBE_TOKEN_OPERATION_LIMIT", operation_limit)) warning_threshold = float(os.getenv("SCRIBE_TOKEN_WARNING_THRESHOLD", "0.8")) return TokenBudget( daily_limit=daily_limit_env, operation_limit=operation_limit_env, warning_threshold=warning_threshold ) def estimate_tokens(self, data: Union[str, Dict, List, Any]) -> int: """ Estimate token count for various data types using tiktoken. Args: data: The data to estimate tokens for Returns: Estimated token count """ if self.encoder is not None: # Use tiktoken for accurate counting if isinstance(data, str): return len(self.encoder.encode(data)) elif isinstance(data, (dict, list)): # Convert to JSON string and count tokens json_str = json.dumps(data, ensure_ascii=False) return len(self.encoder.encode(json_str)) else: # Convert to string and count tokens return len(self.encoder.encode(str(data))) else: # Fallback to basic estimation if tiktoken not available # Rough approximation: 1 token โ‰ˆ 4 characters for English text if isinstance(data, str): return len(data) // 4 elif isinstance(data, (dict, list)): return len(json.dumps(data)) // 4 else: return len(str(data)) // 4 def estimate_response_tokens(self, response: Dict[str, Any]) -> Dict[str, int]: """ Estimate tokens for different parts of a response. Returns breakdown by section (entries, pagination, metadata, etc.). """ breakdown = {} # Main response structure overhead base_overhead = self.estimate_tokens({"ok": True, "count": 0}) breakdown["base"] = base_overhead # Entries section if "entries" in response: entries_tokens = self.estimate_tokens(response["entries"]) breakdown["entries"] = entries_tokens # Pagination section if "pagination" in response: pagination_tokens = self.estimate_tokens(response["pagination"]) breakdown["pagination"] = pagination_tokens # Other metadata (reminders, warnings, etc.) other_tokens = 0 for key, value in response.items(): if key not in ["ok", "entries", "pagination", "count"]: other_tokens += self.estimate_tokens({key: value}) breakdown["metadata"] = other_tokens # Total breakdown["total"] = sum(breakdown.values()) return breakdown def record_operation(self, operation: str, input_data: Any, response: Dict[str, Any], compact_mode: bool = False, page_size: int = 0) -> TokenMetrics: """ Record token usage for an operation. Args: operation: Name of the operation (tool name) input_data: Input parameters/data response: Response data compact_mode: Whether compact mode was used page_size: Page size if paginated Returns: TokenMetrics for the operation """ input_tokens = self.estimate_tokens(input_data) response_breakdown = self.estimate_response_tokens(response) output_tokens = response_breakdown["total"] total_tokens = input_tokens + output_tokens metrics = TokenMetrics( operation=operation, input_tokens=input_tokens, output_tokens=output_tokens, total_tokens=total_tokens, compact_mode=compact_mode, page_size=page_size, response_compressed=response.get("_compressed", False) ) # Store metrics self.metrics_history.append(metrics) if len(self.metrics_history) > self.max_history_size: self.metrics_history.pop(0) # Update budget tracking if self.budget.needs_daily_reset(): self.budget.reset_daily() self.budget.current_daily_usage += total_tokens self.budget.current_operation_count += 1 # Log warning if needed if self.budget.should_warn(total_tokens): print(f"โš ๏ธ Token Warning: {operation} used {total_tokens} tokens " f"({total_tokens/self.budget.operation_limit:.1%} of limit)") # Log if over limit if self.budget.is_over_limit(total_tokens): print(f"๐Ÿšจ Token Limit Exceeded: {operation} used {total_tokens} tokens " f"(limit: {self.budget.operation_limit})") return metrics def get_optimization_suggestion(self, operation: str, tokens: int, compact_mode: bool = False) -> Optional[str]: """ Get optimization suggestion based on token usage pattern. """ if tokens < 1000: return None suggestions = [] # High token usage suggestions if tokens > 5000: if not compact_mode: savings = int(tokens * 0.7) # 70% savings with compact mode suggestions.append(f"Use compact=True to save ~{savings} tokens") # Check if pagination would help if "query" in operation or "read" in operation: suggestions.append("Use smaller page_size to reduce tokens per request") # Context-specific suggestions if "query_entries" in operation and tokens > 3000: suggestions.append("Consider adding date range filters to limit results") if "read_recent" in operation and tokens > 2000: suggestions.append("Use smaller 'n' parameter or add field selection") return " | ".join(suggestions) if suggestions else None def get_usage_stats(self) -> Dict[str, Any]: """Get current token usage statistics.""" if not self.metrics_history: return {"message": "No operations recorded yet"} # Calculate stats total_operations = len(self.metrics_history) total_tokens = sum(m.total_tokens for m in self.metrics_history) avg_tokens = total_tokens / total_operations compact_operations = [m for m in self.metrics_history if m.compact_mode] compact_avg = (sum(m.total_tokens for m in compact_operations) / len(compact_operations)) if compact_operations else 0 full_operations = [m for m in self.metrics_history if not m.compact_mode] full_avg = (sum(m.total_tokens for m in full_operations) / len(full_operations)) if full_operations else 0 # Recent trend (last 10 operations) recent = self.metrics_history[-10:] recent_avg = sum(m.total_tokens for m in recent) / len(recent) if recent else 0 stats = { "total_operations": total_operations, "total_tokens_used": total_tokens, "average_tokens_per_operation": int(avg_tokens), "compact_mode_average": int(compact_avg), "full_mode_average": int(full_avg), "recent_average": int(recent_avg), "compact_savings": f"{((full_avg - compact_avg) / full_avg * 100):.1f}%" if full_avg > 0 else "N/A", "daily_usage": self.budget.current_daily_usage, "daily_limit": self.budget.daily_limit, "operation_limit": self.budget.operation_limit, "model": self.model, "tiktoken_available": TIKTOKEN_AVAILABLE } # Add tiktoken info if available if TIKTOKEN_AVAILABLE and self.encoder: stats["tokenizer"] = { "name": self.encoder.name, "vocab_size": self.encoder.n_vocab } return stats def get_tokenizer_info(self) -> Dict[str, Any]: """Get information about the tokenizer being used.""" if not TIKTOKEN_AVAILABLE: return { "available": False, "message": "tiktoken not available. Install with: pip install tiktoken" } if not self.encoder: return { "available": True, "initialized": False, "message": "Tokenizer failed to initialize" } return { "available": True, "initialized": True, "name": self.encoder.name, "model": self.model, "vocab_size": self.encoder.n_vocab, "max_token_value": self.encoder.max_token_value } def save_metrics(self): """Save metrics history to file.""" try: metrics_data = [] for m in self.metrics_history: metrics_data.append({ "operation": m.operation, "input_tokens": m.input_tokens, "output_tokens": m.output_tokens, "total_tokens": m.total_tokens, "timestamp": m.timestamp, "compact_mode": m.compact_mode, "page_size": m.page_size, "response_compressed": m.response_compressed }) with open(self.metrics_file, 'w') as f: json.dump(metrics_data, f, indent=2) except Exception as e: print(f"Failed to save metrics: {e}") def load_metrics(self): """Load metrics history from file.""" try: if self.metrics_file.exists(): with open(self.metrics_file, 'r') as f: metrics_data = json.load(f) self.metrics_history = [] for data in metrics_data: self.metrics_history.append(TokenMetrics(**data)) except Exception as e: print(f"Failed to load metrics: {e}") # Global estimator instance token_estimator = TokenEstimator()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/paxocial/scribe_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server