#!/usr/bin/env python3
"""
Ollama + PostgreSQL MCP Server Integration Example
This script demonstrates how to use Ollama LLM with the PostgreSQL MCP Server
to perform natural language database queries.
Prerequisites:
1. Install Ollama: https://ollama.ai/
2. Pull a model: ollama pull llama3.1
3. Install required Python packages: pip install ollama mcp
4. Have the PostgreSQL MCP Server configured and ready
Usage:
python examples/ollama-integration.py
"""
import asyncio
import json
import logging
import sys
import subprocess
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime
try:
import ollama
from mcp.client.session import ClientSession
from mcp.client.stdio import stdio_client
except ImportError as e:
print(f"โ Missing dependencies: {e}")
print("Install with: pip install ollama mcp")
sys.exit(1)
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class QueryResult:
"""Result of a database query operation"""
success: bool
data: Any = None
error: str = None
tool_used: str = None
query_analysis: str = None
class OllamaMCPBridge:
"""Bridge between Ollama LLM and PostgreSQL MCP Server"""
def __init__(self, ollama_model: str = "llama3.1", mcp_server_command: List[str] = None):
"""
Initialize the bridge
Args:
ollama_model: Name of the Ollama model to use
mcp_server_command: Command to start the MCP server
"""
self.ollama_model = ollama_model
self.mcp_server_command = mcp_server_command or ["python", "main.py"]
self.mcp_session: Optional[ClientSession] = None
self.available_tools: List[Dict] = []
async def start_mcp_connection(self) -> bool:
"""Start connection to the PostgreSQL MCP Server"""
try:
logger.info("๐ Connecting to PostgreSQL MCP Server...")
# Start the MCP server process and connect via stdio
server_process = await asyncio.create_subprocess_exec(
*self.mcp_server_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Create MCP client session
read_stream, write_stream = stdio_client(server_process)
self.mcp_session = ClientSession(read_stream, write_stream)
# Initialize the session
await self.mcp_session.initialize()
# Get available tools
tools_response = await self.mcp_session.list_tools()
self.available_tools = [tool.dict() for tool in tools_response.tools]
logger.info(f"โ
Connected! Available tools: {[tool['name'] for tool in self.available_tools]}")
return True
except Exception as e:
logger.error(f"โ Failed to connect to MCP server: {e}")
return False
async def close_mcp_connection(self):
"""Close the MCP connection"""
if self.mcp_session:
await self.mcp_session.close()
logger.info("๐ MCP connection closed")
def check_ollama_availability(self) -> bool:
"""Check if Ollama is available and the model is installed"""
try:
# Check if Ollama is running
models = ollama.list()
model_names = [model['name'] for model in models['models']]
if self.ollama_model not in model_names:
logger.warning(f"โ ๏ธ Model '{self.ollama_model}' not found. Available models: {model_names}")
logger.info(f"๐ก To install: ollama pull {self.ollama_model}")
return False
logger.info(f"โ
Ollama model '{self.ollama_model}' is available")
return True
except Exception as e:
logger.error(f"โ Ollama not available: {e}")
logger.info("๐ก Make sure Ollama is installed and running: https://ollama.ai/")
return False
def create_system_prompt(self) -> str:
"""Create system prompt for Ollama with MCP tool information"""
tools_description = "\n".join([
f"- {tool['name']}: {tool['description']}"
for tool in self.available_tools
])
return f"""You are a helpful database analyst assistant. You have access to a PostgreSQL database through the following tools:
{tools_description}
Guidelines:
1. When users ask questions about data, choose the appropriate tool to query the database
2. Always validate queries before executing them when possible
3. Provide clear, helpful explanations of the results
4. If a query might be slow or return many rows, warn the user
5. Format results in a user-friendly way
6. Suggest optimizations when you notice performance issues
Available tools and their parameters:
{json.dumps(self.available_tools, indent=2)}
When you need to use a tool, respond with a JSON object like this:
{{
"tool_call": {{
"name": "tool_name",
"arguments": {{
"parameter": "value"
}}
}},
"explanation": "Why you're using this tool and what you expect to find"
}}
If you don't need to use a tool, just respond normally.
"""
async def call_mcp_tool(self, tool_name: str, arguments: Dict[str, Any]) -> QueryResult:
"""Call a tool on the MCP server"""
try:
logger.info(f"๐ง Calling tool: {tool_name} with args: {arguments}")
result = await self.mcp_session.call_tool(tool_name, arguments)
# Extract text content from the result
content = ""
for item in result.content:
if hasattr(item, 'text'):
content += item.text + "\n"
return QueryResult(
success=True,
data=content.strip(),
tool_used=tool_name
)
except Exception as e:
logger.error(f"โ Tool call failed: {e}")
return QueryResult(
success=False,
error=str(e),
tool_used=tool_name
)
async def chat_with_ollama(self, user_message: str, conversation_history: List[Dict] = None) -> str:
"""Send message to Ollama and get response"""
try:
messages = [{"role": "system", "content": self.create_system_prompt()}]
# Add conversation history
if conversation_history:
messages.extend(conversation_history)
# Add current user message
messages.append({"role": "user", "content": user_message})
response = ollama.chat(
model=self.ollama_model,
messages=messages,
stream=False
)
return response['message']['content']
except Exception as e:
logger.error(f"โ Ollama chat failed: {e}")
return f"Error communicating with Ollama: {e}"
def parse_tool_call(self, ollama_response: str) -> Optional[Dict]:
"""Parse tool call from Ollama response"""
try:
# Look for JSON in the response
start_idx = ollama_response.find('{')
end_idx = ollama_response.rfind('}') + 1
if start_idx != -1 and end_idx != 0:
json_str = ollama_response[start_idx:end_idx]
parsed = json.loads(json_str)
if 'tool_call' in parsed:
return parsed
return None
except (json.JSONDecodeError, KeyError):
return None
async def process_user_query(self, user_message: str, conversation_history: List[Dict] = None) -> str:
"""Process a user query end-to-end"""
try:
# Get initial response from Ollama
ollama_response = await self.chat_with_ollama(user_message, conversation_history)
# Check if Ollama wants to use a tool
tool_call = self.parse_tool_call(ollama_response)
if tool_call:
tool_name = tool_call['tool_call']['name']
tool_args = tool_call['tool_call']['arguments']
explanation = tool_call.get('explanation', '')
print(f"๐ค {explanation}")
print(f"๐ง Using tool: {tool_name}")
# Execute the tool call
result = await self.call_mcp_tool(tool_name, tool_args)
if result.success:
# Send the tool result back to Ollama for interpretation
tool_result_message = f"""
Tool: {tool_name}
Arguments: {json.dumps(tool_args, indent=2)}
Result:
{result.data}
Please interpret these results and provide a helpful summary for the user.
"""
# Update conversation with tool result
extended_history = (conversation_history or []).copy()
extended_history.extend([
{"role": "user", "content": user_message},
{"role": "assistant", "content": ollama_response},
{"role": "user", "content": tool_result_message}
])
final_response = await self.chat_with_ollama("", extended_history)
return final_response
else:
return f"โ Tool execution failed: {result.error}"
else:
# No tool call needed, return Ollama's response directly
return ollama_response
except Exception as e:
logger.error(f"โ Query processing failed: {e}")
return f"Error processing your query: {e}"
async def interactive_session():
"""Run an interactive session with the user"""
print("๐ PostgreSQL MCP + Ollama Integration")
print("=" * 50)
# Initialize the bridge
bridge = OllamaMCPBridge()
# Check prerequisites
if not bridge.check_ollama_availability():
return
if not await bridge.start_mcp_connection():
return
print("\nโ
Ready! You can now ask questions about your database.")
print("๐ก Try questions like:")
print(" - 'Show me the recent orders'")
print(" - 'What are the top selling products?'")
print(" - 'Validate this query: SELECT * FROM users'")
print(" - 'What tables are available?'")
print("Type 'quit' to exit.\n")
conversation_history = []
try:
while True:
# Get user input
user_input = input("๐ค You: ").strip()
if user_input.lower() in ['quit', 'exit', 'bye']:
print("๐ Goodbye!")
break
if not user_input:
continue
print("\n๐ค Assistant: ", end="")
# Process the query
response = await bridge.process_user_query(user_input, conversation_history)
print(response)
# Update conversation history
conversation_history.extend([
{"role": "user", "content": user_input},
{"role": "assistant", "content": response}
])
# Keep conversation history manageable
if len(conversation_history) > 10: # Keep last 5 exchanges
conversation_history = conversation_history[-10:]
print()
except KeyboardInterrupt:
print("\n๐ Session interrupted by user")
finally:
await bridge.close_mcp_connection()
async def example_queries():
"""Run some example queries to demonstrate capabilities"""
print("๐งช Running Example Queries")
print("=" * 30)
bridge = OllamaMCPBridge()
if not bridge.check_ollama_availability():
return
if not await bridge.start_mcp_connection():
return
# Example queries to demonstrate different capabilities
examples = [
"What tables are available in the database?",
"Show me the structure of the orders table",
"Can you validate this query: SELECT * FROM users WHERE email LIKE '%@gmail.com'?",
"What are some recent orders with their total amounts?",
]
try:
for i, query in enumerate(examples, 1):
print(f"\n๐ Example {i}: {query}")
print("-" * 40)
response = await bridge.process_user_query(query)
print(f"๐ค Response: {response}")
# Small delay between queries
await asyncio.sleep(1)
finally:
await bridge.close_mcp_connection()
def main():
"""Main entry point"""
import argparse
parser = argparse.ArgumentParser(description="PostgreSQL MCP + Ollama Integration")
parser.add_argument(
"--mode",
choices=["interactive", "examples"],
default="interactive",
help="Mode to run: interactive session or example queries"
)
parser.add_argument(
"--model",
default="llama3.1",
help="Ollama model to use (default: llama3.1)"
)
args = parser.parse_args()
# Set the model
OllamaMCPBridge.ollama_model = args.model
try:
if args.mode == "interactive":
asyncio.run(interactive_session())
else:
asyncio.run(example_queries())
except Exception as e:
logger.error(f"โ Application failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()