"""Core MCP server implementation for PostgreSQL"""
import asyncio
import logging
from typing import Any, Sequence
import json
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import (
CallToolRequest,
CallToolResult,
ListToolsRequest,
ListToolsResult,
TextContent,
Tool
)
from .database import PostgreSQLManager, DatabaseError
from .tools import DatabaseTools
from .config import DatabaseConfig, ServerConfig, load_config
logger = logging.getLogger(__name__)
class PostgreSQLMCPServer:
"""PostgreSQL MCP Server"""
def __init__(self):
self.db_config: DatabaseConfig = None
self.server_config: ServerConfig = None
self.db_manager: PostgreSQLManager = None
self.tools: DatabaseTools = None
self.server: Server = None
async def initialize(self) -> None:
"""Initialize the MCP server"""
try:
# Load configuration
self.db_config, self.server_config = load_config()
logger.info(f"Loaded configuration for {self.server_config.name}")
# Set up logging
logging.basicConfig(
level=getattr(logging, self.server_config.log_level),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Initialize database manager
self.db_manager = PostgreSQLManager(self.db_config)
await self.db_manager.initialize()
# Initialize tools
self.tools = DatabaseTools(self.db_manager, self.server_config)
# Create MCP server
self.server = Server(self.server_config.name)
# Register handlers
self._register_handlers()
logger.info("PostgreSQL MCP Server initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize MCP server: {e}")
raise
def _register_handlers(self) -> None:
"""Register MCP server handlers"""
@self.server.list_tools()
async def handle_list_tools() -> ListToolsResult:
"""Handle list tools request"""
tools = self.tools.get_tools()
logger.debug(f"Returning {len(tools)} tools")
return ListToolsResult(tools=tools)
@self.server.call_tool()
async def handle_call_tool(request: CallToolRequest) -> CallToolResult:
"""Handle tool call request"""
logger.info(f"Tool call: {request.params.name}")
try:
# Handle the tool call
content = await self.tools.handle_tool_call(
request.params.name,
request.params.arguments or {}
)
return CallToolResult(content=content)
except Exception as e:
logger.error(f"Error handling tool call {request.params.name}: {e}")
return CallToolResult(
content=[TextContent(type="text", text=f"Error: {str(e)}")]
)
async def run_stdio(self) -> None:
"""Run the server using stdio transport"""
try:
logger.info("Starting PostgreSQL MCP Server with stdio transport")
# Initialize if not already done
if not self.server:
await self.initialize()
# Run the server
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name=self.server_config.name,
server_version=self.server_config.version,
capabilities=self.server.get_capabilities()
)
)
except Exception as e:
logger.error(f"Server error: {e}")
raise
finally:
await self.cleanup()
async def cleanup(self) -> None:
"""Clean up resources"""
if self.db_manager:
await self.db_manager.close()
logger.info("Database connections closed")
async def main() -> None:
"""Main entry point for the MCP server"""
server = PostgreSQLMCPServer()
try:
await server.run_stdio()
except KeyboardInterrupt:
logger.info("Server interrupted by user")
except Exception as e:
logger.error(f"Server failed: {e}")
raise
finally:
await server.cleanup()
if __name__ == "__main__":
asyncio.run(main())