Skip to main content
Glama
client.py•9.22 kB
""" MCP Registry API Client. Connects to the official MCP Registry to discover, search, and fetch metadata for community MCP servers. Official Registry: https://registry.modelcontextprotocol.io API Docs: https://registry.modelcontextprotocol.io/docs """ import requests from typing import List, Dict, Optional from pathlib import Path import json from datetime import datetime, timedelta from loguru import logger class MCPRegistryClient: """ Client for interacting with the MCP Registry API. Provides methods to: - List available servers - Search by keywords or categories - Get detailed server metadata - Cache responses locally Example: >>> client = MCPRegistryClient() >>> servers = client.list_servers(search="database") >>> postgres = client.get_server("io.modelcontextprotocol/server-postgres") """ BASE_URL = "https://registry.modelcontextprotocol.io/v0" CACHE_DURATION = timedelta(hours=24) # Cache for 24 hours def __init__(self, cache_dir: Optional[Path] = None): """ Initialize registry client. Args: cache_dir: Directory for caching registry data. Defaults to ~/.aistack-mcp/cache """ self.cache_dir = cache_dir or Path.home() / ".aistack-mcp" / "cache" self.cache_dir.mkdir(parents=True, exist_ok=True) self.cache_file = self.cache_dir / "registry_cache.json" self._cache = self._load_cache() def list_servers( self, search: Optional[str] = None, category: Optional[str] = None, runtime: Optional[str] = None, limit: int = 100, use_cache: bool = True ) -> List[Dict]: """ List servers from registry. Args: search: Search query (searches name, description, tags) category: Filter by category (database, productivity, devops, etc.) runtime: Filter by runtime (node, python, docker) limit: Maximum number of results use_cache: Use cached results if available Returns: List of server metadata dictionaries Example: >>> client.list_servers(search="database", limit=10) [ { "id": "io.modelcontextprotocol/server-postgres", "name": "PostgreSQL MCP Server", "description": "Connect to PostgreSQL databases", "runtime": "node", ... } ] """ cache_key = f"list_{search}_{category}_{runtime}_{limit}" if use_cache and self._is_cache_valid(cache_key): logger.info(f"Using cached results for: {cache_key}") return self._cache[cache_key]["data"] # Build query parameters params = {"limit": limit} if search: params["search"] = search if category: params["category"] = category if runtime: params["runtime"] = runtime try: logger.info(f"Fetching servers from registry: {params}") response = requests.get( f"{self.BASE_URL}/servers", params=params, timeout=10 ) response.raise_for_status() data = response.json() servers = data.get("servers", []) # Cache the results self._cache[cache_key] = { "data": servers, "timestamp": datetime.now().isoformat() } self._save_cache() return servers except requests.exceptions.RequestException as e: logger.error(f"Failed to fetch servers: {e}") # Try to return cached data even if expired if cache_key in self._cache: logger.warning("Using expired cache due to network error") return self._cache[cache_key]["data"] return [] def get_server(self, server_id: str, use_cache: bool = True) -> Optional[Dict]: """ Get detailed metadata for a specific server. Args: server_id: Unique server identifier (e.g., "io.modelcontextprotocol/server-postgres") use_cache: Use cached result if available Returns: Server metadata dictionary or None if not found Example: >>> server = client.get_server("io.modelcontextprotocol/server-postgres") >>> print(server["packages"]["npm"]) "@modelcontextprotocol/server-postgres" """ cache_key = f"server_{server_id}" if use_cache and self._is_cache_valid(cache_key): logger.info(f"Using cached server: {server_id}") return self._cache[cache_key]["data"] try: logger.info(f"Fetching server details: {server_id}") response = requests.get( f"{self.BASE_URL}/servers/{server_id}", timeout=10 ) response.raise_for_status() server = response.json() # Cache the result self._cache[cache_key] = { "data": server, "timestamp": datetime.now().isoformat() } self._save_cache() return server except requests.exceptions.HTTPError as e: if e.response.status_code == 404: logger.warning(f"Server not found: {server_id}") else: logger.error(f"Failed to fetch server: {e}") return None except requests.exceptions.RequestException as e: logger.error(f"Network error: {e}") # Try cached data if cache_key in self._cache: logger.warning("Using expired cache due to network error") return self._cache[cache_key]["data"] return None def search_by_category(self, category: str, limit: int = 50) -> List[Dict]: """ Search servers by category. Categories: - database: Database connections (PostgreSQL, MySQL, MongoDB, etc.) - productivity: Productivity tools (Notion, Slack, Linear, etc.) - devops: DevOps tools (GitHub, GitLab, Jenkins, etc.) - communication: Communication tools (Slack, Discord, Teams, etc.) - storage: Cloud storage (AWS S3, Google Cloud, etc.) - api: API integrations (REST, GraphQL, etc.) Args: category: Category name limit: Maximum number of results Returns: List of servers in category """ return self.list_servers(category=category, limit=limit) def get_popular(self, limit: int = 20) -> List[Dict]: """ Get most popular servers (sorted by downloads). Args: limit: Number of servers to return Returns: List of popular servers """ return self.list_servers(limit=limit) def search_by_runtime(self, runtime: str, limit: int = 50) -> List[Dict]: """ Search servers by runtime. Runtimes: - node: Node.js servers (npm packages) - python: Python servers (PyPI packages) - docker: Docker containers - binary: Compiled binaries Args: runtime: Runtime type limit: Maximum number of results Returns: List of servers for runtime """ return self.list_servers(runtime=runtime, limit=limit) def clear_cache(self): """Clear all cached registry data.""" self._cache = {} if self.cache_file.exists(): self.cache_file.unlink() logger.info("Cache cleared") def _load_cache(self) -> Dict: """Load cache from disk.""" if self.cache_file.exists(): try: return json.loads(self.cache_file.read_text()) except json.JSONDecodeError: logger.warning("Cache file corrupted, starting fresh") return {} return {} def _save_cache(self): """Save cache to disk.""" try: self.cache_file.write_text(json.dumps(self._cache, indent=2)) except Exception as e: logger.error(f"Failed to save cache: {e}") def _is_cache_valid(self, key: str) -> bool: """Check if cached entry is still valid.""" if key not in self._cache: return False timestamp_str = self._cache[key].get("timestamp") if not timestamp_str: return False try: timestamp = datetime.fromisoformat(timestamp_str) age = datetime.now() - timestamp return age < self.CACHE_DURATION except ValueError: return False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mjdevaccount/AIStack-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server