Skip to main content
Glama
mcp_analyzer.py18.2 kB
""" MCP Server Analyzer Specialized analyzer for Model Context Protocol (MCP) servers. Extracts tools, resources, prompts, and server configurations. """ import ast import json import logging import re from typing import Dict, List, Any, Optional, Set, Tuple from dataclasses import dataclass, field from pathlib import Path logger = logging.getLogger(__name__) @dataclass class MCPTool: """Represents an MCP tool definition.""" name: str description: str input_schema: Dict[str, Any] required_params: List[str] = field(default_factory=list) optional_params: List[str] = field(default_factory=list) examples: List[Dict[str, Any]] = field(default_factory=list) handler_function: Optional[str] = None def to_dict(self) -> Dict[str, Any]: return { 'name': self.name, 'description': self.description, 'input_schema': self.input_schema, 'required_params': self.required_params, 'optional_params': self.optional_params, 'examples': self.examples, 'handler_function': self.handler_function } @dataclass class MCPResource: """Represents an MCP resource.""" uri: str name: str description: str mime_type: Optional[str] = None def to_dict(self) -> Dict[str, Any]: return { 'uri': self.uri, 'name': self.name, 'description': self.description, 'mime_type': self.mime_type } @dataclass class MCPServerInfo: """Complete MCP server analysis.""" server_name: str server_class: str tools: List[MCPTool] = field(default_factory=list) resources: List[MCPResource] = field(default_factory=list) prompts: List[Dict[str, Any]] = field(default_factory=list) transport: str = "stdio" # stdio, sse, http env_variables: List[Dict[str, str]] = field(default_factory=list) client_configs: Dict[str, str] = field(default_factory=dict) integrations: List[str] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: return { 'server_name': self.server_name, 'server_class': self.server_class, 'tools': [t.to_dict() for t in self.tools], 'resources': [r.to_dict() for r in self.resources], 'prompts': self.prompts, 'transport': self.transport, 'env_variables': self.env_variables, 'client_configs': self.client_configs, 'integrations': self.integrations } class MCPServerAnalyzer: """Analyzer for MCP server codebases.""" def __init__(self): self.mcp_patterns = { 'server_init': [ r'Server\s*\(', r'from\s+mcp\.server\s+import\s+Server', r'@app\.list_tools', r'@app\.call_tool', ], 'tool_decorator': [ r'@.*\.list_tools\(\)', r'@.*\.call_tool\(\)', ], 'transport': [ r'stdio_server\(\)', r'sse_server\(\)', r'create_sse_server\(\)', ] } def analyze(self, codebase_path: str, file_contents: Dict[str, str]) -> Optional[MCPServerInfo]: """ Analyze a codebase for MCP server patterns. Args: codebase_path: Root path of the codebase file_contents: Dictionary of file_path: content Returns: MCPServerInfo if MCP server detected, None otherwise """ logger.info("Analyzing codebase for MCP server patterns...") # Find the main server file server_file = self._find_server_file(file_contents) if not server_file: logger.info("No MCP server file detected") return None logger.info(f"Found MCP server file: {server_file}") # Parse the server file content = file_contents[server_file] try: tree = ast.parse(content) # Count nodes for debugging function_nodes = [n for n in ast.walk(tree) if isinstance(n, ast.FunctionDef)] logger.info(f"Parsed {server_file}: {len(function_nodes)} functions found in AST") except SyntaxError as e: logger.error(f"Failed to parse {server_file}: {e}") return None # Extract server information server_info = self._extract_server_info(tree, content, server_file) # Extract environment variables server_info.env_variables = self._extract_env_variables(file_contents) # Detect integrations server_info.integrations = self._detect_integrations(file_contents) # Generate client configs server_info.client_configs = self._generate_client_configs( codebase_path, server_info.server_name ) return server_info def _find_server_file(self, file_contents: Dict[str, str]) -> Optional[str]: """Find the main MCP server file.""" candidates = [] for file_path, content in file_contents.items(): if not file_path.endswith('.py'): continue score = 0 # Check for MCP imports if 'from mcp.server import Server' in content or 'from mcp import Server' in content: score += 10 # Check for server decorators if '@app.list_tools' in content or '@server.list_tools' in content: score += 8 # Check for transport setup if 'stdio_server' in content or 'sse_server' in content: score += 5 # Prefer files named 'server.py' or 'main.py' filename = Path(file_path).name.lower() if filename == 'server.py': score += 3 elif filename == 'main.py': score += 2 if score > 0: candidates.append((file_path, score)) if not candidates: return None # Return the file with highest score candidates.sort(key=lambda x: x[1], reverse=True) return candidates[0][0] def _extract_server_info( self, tree: ast.AST, content: str, file_path: str ) -> MCPServerInfo: """Extract server information from AST.""" server_name = "unknown" server_class = "unknown" tools = [] resources = [] prompts = [] transport = "stdio" # Find Server initialization for node in ast.walk(tree): # Server("name") if isinstance(node, ast.Call): if isinstance(node.func, ast.Name) and node.func.id == 'Server': if node.args and isinstance(node.args[0], ast.Constant): server_name = node.args[0].value # Extract tools from @app.list_tools() decorator tools = self._extract_tools_from_decorator(tree, content) # Extract resources resources = self._extract_resources(tree) # Extract prompts prompts = self._extract_prompts(tree) # Detect transport if 'stdio_server' in content: transport = 'stdio' elif 'sse_server' in content or 'create_sse_server' in content: transport = 'sse' elif 'create_http_server' in content: transport = 'http' return MCPServerInfo( server_name=server_name, server_class=server_class, tools=tools, resources=resources, prompts=prompts, transport=transport ) def _extract_tools_from_decorator(self, tree: ast.AST, content: str) -> List[MCPTool]: """Extract tool definitions from @app.list_tools() decorated functions.""" tools = [] function_count = 0 decorated_count = 0 for node in ast.walk(tree): # Check for both sync and async functions if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): continue function_count += 1 if node.decorator_list: decorated_count += 1 logger.info(f"Function '{node.name}' has {len(node.decorator_list)} decorators") for dec in node.decorator_list: logger.info(f" Decorator type: {type(dec)}, is_list_tools: {self._is_list_tools_decorator(dec)}") # Check if this is a list_tools handler has_list_tools_decorator = any( self._is_list_tools_decorator(dec) for dec in node.decorator_list ) if has_list_tools_decorator: logger.info(f"✓ Found list_tools function: {node.name}") # Extract the tool definitions from the function body tools = self._parse_tool_definitions(node, content) logger.info(f"Extracted {len(tools)} tools from {node.name}") break logger.info(f"Scanned {function_count} functions, {decorated_count} with decorators") if not tools: logger.warning("No list_tools function found") return tools def _is_list_tools_decorator(self, decorator: ast.AST) -> bool: """Check if a decorator is @app.list_tools().""" if isinstance(decorator, ast.Call): if isinstance(decorator.func, ast.Attribute): return decorator.func.attr == 'list_tools' elif isinstance(decorator, ast.Attribute): return decorator.attr == 'list_tools' return False def _parse_tool_definitions(self, node: ast.FunctionDef, content: str) -> List[MCPTool]: """Parse tool definitions from list_tools function body.""" tools = [] # Look for return statement with list of Tool objects for stmt in ast.walk(node): if isinstance(stmt, ast.Return) and stmt.value: logger.info(f"Found return statement, type: {type(stmt.value)}") if isinstance(stmt.value, ast.List): logger.info(f"Return statement is a list with {len(stmt.value.elts)} elements") # Extract each Tool() or types.Tool() call for i, elt in enumerate(stmt.value.elts): if isinstance(elt, ast.Call): # Check if it's a Tool() or types.Tool() or Type.Tool() call is_tool_call = False func_name = "unknown" if isinstance(elt.func, ast.Name): func_name = elt.func.id if func_name == 'Tool': is_tool_call = True elif isinstance(elt.func, ast.Attribute): func_name = f"{getattr(elt.func.value, 'id', '?')}.{elt.func.attr}" if elt.func.attr == 'Tool': is_tool_call = True logger.info(f" Element {i}: {func_name}, is_tool_call={is_tool_call}") if is_tool_call: tool = self._parse_tool_call(elt) if tool: logger.info(f" ✓ Parsed tool: {tool.name}") tools.append(tool) else: logger.warning(f" ✗ Failed to parse tool") return tools def _parse_tool_call(self, call: ast.Call) -> Optional[MCPTool]: """Parse a single Tool() call.""" tool_data = { 'name': None, 'description': None, 'inputSchema': {} } # Extract keyword arguments for keyword in call.keywords: if keyword.arg == 'name' and isinstance(keyword.value, ast.Constant): tool_data['name'] = keyword.value.value elif keyword.arg == 'description' and isinstance(keyword.value, ast.Constant): tool_data['description'] = keyword.value.value elif keyword.arg == 'inputSchema': # Parse the schema dict tool_data['inputSchema'] = self._parse_dict_node(keyword.value) if not tool_data['name']: return None # Parse schema schema = tool_data['inputSchema'] required_params = schema.get('required', []) properties = schema.get('properties', {}) optional_params = [p for p in properties.keys() if p not in required_params] return MCPTool( name=tool_data['name'], description=tool_data.get('description', ''), input_schema=schema, required_params=required_params, optional_params=optional_params ) def _parse_dict_node(self, node: ast.AST) -> Dict[str, Any]: """Recursively parse an ast.Dict node into a Python dict.""" if isinstance(node, ast.Dict): result = {} for key, value in zip(node.keys, node.values): if isinstance(key, ast.Constant): result[key.value] = self._parse_dict_node(value) return result elif isinstance(node, ast.List): return [self._parse_dict_node(elt) for elt in node.elts] elif isinstance(node, ast.Constant): return node.value else: return {} def _extract_resources(self, tree: ast.AST) -> List[MCPResource]: """Extract resource definitions.""" resources = [] # Similar logic to tools, look for @app.list_resources() # Implementation depends on your MCP resource patterns return resources def _extract_prompts(self, tree: ast.AST) -> List[Dict[str, Any]]: """Extract prompt definitions.""" prompts = [] # Similar logic to tools, look for @app.list_prompts() return prompts def _extract_env_variables(self, file_contents: Dict[str, str]) -> List[Dict[str, str]]: """Extract environment variables from .env.example or code.""" env_vars = [] # Check .env.example for file_path, content in file_contents.items(): if '.env.example' in file_path or '.env.template' in file_path: for line in content.split('\n'): line = line.strip() if line and not line.startswith('#') and '=' in line: key = line.split('=')[0].strip() # Extract comments as description description = "" env_vars.append({ 'name': key, 'description': description, 'required': True }) # Also scan for os.getenv() calls for file_path, content in file_contents.items(): if not file_path.endswith('.py'): continue # Find os.getenv("VAR_NAME") patterns matches = re.findall(r'os\.getenv\([\'"]([A-Z_]+)[\'"]', content) for var_name in matches: if not any(v['name'] == var_name for v in env_vars): env_vars.append({ 'name': var_name, 'description': '', 'required': False }) return env_vars def _detect_integrations(self, file_contents: Dict[str, str]) -> List[str]: """Detect external service integrations.""" integrations = [] integration_patterns = { 'RapidAPI': r'rapidapi\.com|X-RapidAPI-Key', 'OpenAI': r'openai|gpt-|OPENAI_API_KEY', 'Anthropic': r'anthropic|claude|ANTHROPIC_API_KEY', 'LiveKit': r'livekit|LIVEKIT_', 'Deepgram': r'deepgram|DEEPGRAM_API_KEY', 'Stripe': r'stripe|STRIPE_', 'AWS': r'boto3|aws|AWS_', 'Firebase': r'firebase|FIREBASE_', 'Supabase': r'supabase|SUPABASE_', } all_content = ' '.join(file_contents.values()) for service, pattern in integration_patterns.items(): if re.search(pattern, all_content, re.IGNORECASE): integrations.append(service) return integrations def _generate_client_configs(self, codebase_path: str, server_name: str) -> Dict[str, str]: """Generate example client configurations.""" configs = {} # Claude Desktop config configs['claude_desktop'] = f'''{{ "mcpServers": {{ "{server_name}": {{ "command": "python", "args": ["path/to/{Path(codebase_path).name}/server.py"], "env": {{ "API_KEY": "your_api_key_here" }} }} }} }}''' # Cursor config configs['cursor'] = f'''{{ "mcpServers": {{ "{server_name}": {{ "command": "python", "args": ["path/to/server.py"], "env": {{}} }} }} }}''' return configs def analyze_mcp_server(codebase_path: str, file_contents: Dict[str, str]) -> Optional[MCPServerInfo]: """ Convenience function to analyze an MCP server. Args: codebase_path: Root path of the codebase file_contents: Dictionary mapping file paths to their contents Returns: MCPServerInfo if MCP server detected, None otherwise """ analyzer = MCPServerAnalyzer() return analyzer.analyze(codebase_path, file_contents)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vedantparmar12/Document-Automation'

If you have feedback or need assistance with the MCP directory API, please join our Discord server