Skip to main content
Glama
ollama-integration.pyโ€ข13.6 kB
#!/usr/bin/env python3 """ Ollama + PostgreSQL MCP Server Integration Example This script demonstrates how to use Ollama LLM with the PostgreSQL MCP Server to perform natural language database queries. Prerequisites: 1. Install Ollama: https://ollama.ai/ 2. Pull a model: ollama pull llama3.1 3. Install required Python packages: pip install ollama mcp 4. Have the PostgreSQL MCP Server configured and ready Usage: python examples/ollama-integration.py """ import asyncio import json import logging import sys import subprocess from typing import Dict, List, Any, Optional from dataclasses import dataclass from datetime import datetime try: import ollama from mcp.client.session import ClientSession from mcp.client.stdio import stdio_client except ImportError as e: print(f"โŒ Missing dependencies: {e}") print("Install with: pip install ollama mcp") sys.exit(1) # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) @dataclass class QueryResult: """Result of a database query operation""" success: bool data: Any = None error: str = None tool_used: str = None query_analysis: str = None class OllamaMCPBridge: """Bridge between Ollama LLM and PostgreSQL MCP Server""" def __init__(self, ollama_model: str = "llama3.1", mcp_server_command: List[str] = None): """ Initialize the bridge Args: ollama_model: Name of the Ollama model to use mcp_server_command: Command to start the MCP server """ self.ollama_model = ollama_model self.mcp_server_command = mcp_server_command or ["python", "main.py"] self.mcp_session: Optional[ClientSession] = None self.available_tools: List[Dict] = [] async def start_mcp_connection(self) -> bool: """Start connection to the PostgreSQL MCP Server""" try: logger.info("๐Ÿ”— Connecting to PostgreSQL MCP Server...") # Start the MCP server process and connect via stdio server_process = await asyncio.create_subprocess_exec( *self.mcp_server_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # Create MCP client session read_stream, write_stream = stdio_client(server_process) self.mcp_session = ClientSession(read_stream, write_stream) # Initialize the session await self.mcp_session.initialize() # Get available tools tools_response = await self.mcp_session.list_tools() self.available_tools = [tool.dict() for tool in tools_response.tools] logger.info(f"โœ… Connected! Available tools: {[tool['name'] for tool in self.available_tools]}") return True except Exception as e: logger.error(f"โŒ Failed to connect to MCP server: {e}") return False async def close_mcp_connection(self): """Close the MCP connection""" if self.mcp_session: await self.mcp_session.close() logger.info("๐Ÿ”Œ MCP connection closed") def check_ollama_availability(self) -> bool: """Check if Ollama is available and the model is installed""" try: # Check if Ollama is running models = ollama.list() model_names = [model['name'] for model in models['models']] if self.ollama_model not in model_names: logger.warning(f"โš ๏ธ Model '{self.ollama_model}' not found. Available models: {model_names}") logger.info(f"๐Ÿ’ก To install: ollama pull {self.ollama_model}") return False logger.info(f"โœ… Ollama model '{self.ollama_model}' is available") return True except Exception as e: logger.error(f"โŒ Ollama not available: {e}") logger.info("๐Ÿ’ก Make sure Ollama is installed and running: https://ollama.ai/") return False def create_system_prompt(self) -> str: """Create system prompt for Ollama with MCP tool information""" tools_description = "\n".join([ f"- {tool['name']}: {tool['description']}" for tool in self.available_tools ]) return f"""You are a helpful database analyst assistant. You have access to a PostgreSQL database through the following tools: {tools_description} Guidelines: 1. When users ask questions about data, choose the appropriate tool to query the database 2. Always validate queries before executing them when possible 3. Provide clear, helpful explanations of the results 4. If a query might be slow or return many rows, warn the user 5. Format results in a user-friendly way 6. Suggest optimizations when you notice performance issues Available tools and their parameters: {json.dumps(self.available_tools, indent=2)} When you need to use a tool, respond with a JSON object like this: {{ "tool_call": {{ "name": "tool_name", "arguments": {{ "parameter": "value" }} }}, "explanation": "Why you're using this tool and what you expect to find" }} If you don't need to use a tool, just respond normally. """ async def call_mcp_tool(self, tool_name: str, arguments: Dict[str, Any]) -> QueryResult: """Call a tool on the MCP server""" try: logger.info(f"๐Ÿ”ง Calling tool: {tool_name} with args: {arguments}") result = await self.mcp_session.call_tool(tool_name, arguments) # Extract text content from the result content = "" for item in result.content: if hasattr(item, 'text'): content += item.text + "\n" return QueryResult( success=True, data=content.strip(), tool_used=tool_name ) except Exception as e: logger.error(f"โŒ Tool call failed: {e}") return QueryResult( success=False, error=str(e), tool_used=tool_name ) async def chat_with_ollama(self, user_message: str, conversation_history: List[Dict] = None) -> str: """Send message to Ollama and get response""" try: messages = [{"role": "system", "content": self.create_system_prompt()}] # Add conversation history if conversation_history: messages.extend(conversation_history) # Add current user message messages.append({"role": "user", "content": user_message}) response = ollama.chat( model=self.ollama_model, messages=messages, stream=False ) return response['message']['content'] except Exception as e: logger.error(f"โŒ Ollama chat failed: {e}") return f"Error communicating with Ollama: {e}" def parse_tool_call(self, ollama_response: str) -> Optional[Dict]: """Parse tool call from Ollama response""" try: # Look for JSON in the response start_idx = ollama_response.find('{') end_idx = ollama_response.rfind('}') + 1 if start_idx != -1 and end_idx != 0: json_str = ollama_response[start_idx:end_idx] parsed = json.loads(json_str) if 'tool_call' in parsed: return parsed return None except (json.JSONDecodeError, KeyError): return None async def process_user_query(self, user_message: str, conversation_history: List[Dict] = None) -> str: """Process a user query end-to-end""" try: # Get initial response from Ollama ollama_response = await self.chat_with_ollama(user_message, conversation_history) # Check if Ollama wants to use a tool tool_call = self.parse_tool_call(ollama_response) if tool_call: tool_name = tool_call['tool_call']['name'] tool_args = tool_call['tool_call']['arguments'] explanation = tool_call.get('explanation', '') print(f"๐Ÿค– {explanation}") print(f"๐Ÿ”ง Using tool: {tool_name}") # Execute the tool call result = await self.call_mcp_tool(tool_name, tool_args) if result.success: # Send the tool result back to Ollama for interpretation tool_result_message = f""" Tool: {tool_name} Arguments: {json.dumps(tool_args, indent=2)} Result: {result.data} Please interpret these results and provide a helpful summary for the user. """ # Update conversation with tool result extended_history = (conversation_history or []).copy() extended_history.extend([ {"role": "user", "content": user_message}, {"role": "assistant", "content": ollama_response}, {"role": "user", "content": tool_result_message} ]) final_response = await self.chat_with_ollama("", extended_history) return final_response else: return f"โŒ Tool execution failed: {result.error}" else: # No tool call needed, return Ollama's response directly return ollama_response except Exception as e: logger.error(f"โŒ Query processing failed: {e}") return f"Error processing your query: {e}" async def interactive_session(): """Run an interactive session with the user""" print("๐Ÿš€ PostgreSQL MCP + Ollama Integration") print("=" * 50) # Initialize the bridge bridge = OllamaMCPBridge() # Check prerequisites if not bridge.check_ollama_availability(): return if not await bridge.start_mcp_connection(): return print("\nโœ… Ready! You can now ask questions about your database.") print("๐Ÿ’ก Try questions like:") print(" - 'Show me the recent orders'") print(" - 'What are the top selling products?'") print(" - 'Validate this query: SELECT * FROM users'") print(" - 'What tables are available?'") print("Type 'quit' to exit.\n") conversation_history = [] try: while True: # Get user input user_input = input("๐Ÿ‘ค You: ").strip() if user_input.lower() in ['quit', 'exit', 'bye']: print("๐Ÿ‘‹ Goodbye!") break if not user_input: continue print("\n๐Ÿค– Assistant: ", end="") # Process the query response = await bridge.process_user_query(user_input, conversation_history) print(response) # Update conversation history conversation_history.extend([ {"role": "user", "content": user_input}, {"role": "assistant", "content": response} ]) # Keep conversation history manageable if len(conversation_history) > 10: # Keep last 5 exchanges conversation_history = conversation_history[-10:] print() except KeyboardInterrupt: print("\n๐Ÿ‘‹ Session interrupted by user") finally: await bridge.close_mcp_connection() async def example_queries(): """Run some example queries to demonstrate capabilities""" print("๐Ÿงช Running Example Queries") print("=" * 30) bridge = OllamaMCPBridge() if not bridge.check_ollama_availability(): return if not await bridge.start_mcp_connection(): return # Example queries to demonstrate different capabilities examples = [ "What tables are available in the database?", "Show me the structure of the orders table", "Can you validate this query: SELECT * FROM users WHERE email LIKE '%@gmail.com'?", "What are some recent orders with their total amounts?", ] try: for i, query in enumerate(examples, 1): print(f"\n๐Ÿ“ Example {i}: {query}") print("-" * 40) response = await bridge.process_user_query(query) print(f"๐Ÿค– Response: {response}") # Small delay between queries await asyncio.sleep(1) finally: await bridge.close_mcp_connection() def main(): """Main entry point""" import argparse parser = argparse.ArgumentParser(description="PostgreSQL MCP + Ollama Integration") parser.add_argument( "--mode", choices=["interactive", "examples"], default="interactive", help="Mode to run: interactive session or example queries" ) parser.add_argument( "--model", default="llama3.1", help="Ollama model to use (default: llama3.1)" ) args = parser.parse_args() # Set the model OllamaMCPBridge.ollama_model = args.model try: if args.mode == "interactive": asyncio.run(interactive_session()) else: asyncio.run(example_queries()) except Exception as e: logger.error(f"โŒ Application failed: {e}") sys.exit(1) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/abdou-ghonim/mcp-postgres'

If you have feedback or need assistance with the MCP directory API, please join our Discord server