mcp_server.py•40.1 kB
import asyncio
import sys
import json
from pathlib import Path
from typing import Any
from datetime import datetime, timedelta
# Add project root to path
PROJECT_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
from mcp.server import Server
from mcp.types import Tool, TextContent
import structlog
# Import utilities
from src.utils.config_loader import get_config
from src.utils.llm_manager import LLMManager
from src.adapters.calendar_adapter import CalendarAdapter
from src.adapters.gmail_adapter import GmailAdapter
from src.agents.calendar_agent import CalendarBookingAgent
from src.agents.email_agent import EmailAgent
# Setup logging
logger = structlog.get_logger(__name__)
# Initialize server
server = Server("enhanced-google-mcp-server")
# Global state
config = None
llm_manager = None
calendar_adapter = None
calendar_agent = None
gmail_adapter = None
email_agent = None
def initialize():
"""Initialize configuration and LLM manager"""
global config, llm_manager, calendar_adapter, calendar_agent, gmail_adapter, email_agent
try:
# Load configuration
config = get_config()
logger.info("Configuration loaded successfully")
# Initialize LLM Manager
llm_config = config.get_llm_config()
llm_manager = LLMManager(config.yaml_config)
logger.info("LLM Manager initialized successfully")
# Initialize Calendar Adapter
calendar_adapter = CalendarAdapter()
logger.info("Calendar adapter initialized successfully")
# Initialize Calendar Booking Agent
calendar_agent = CalendarBookingAgent(calendar_adapter, llm_manager)
logger.info("Calendar booking agent initialized successfully")
# Initialize Gmail Adapter
gmail_adapter = GmailAdapter()
logger.info("Gmail adapter initialized successfully")
# Initialize Email Agent
email_agent = EmailAgent(gmail_adapter, llm_manager)
logger.info("Email agent initialized successfully")
return True
except Exception as e:
logger.error(f"Initialization failed: {e}")
import traceback
traceback.print_exc()
return False
@server.list_tools()
async def list_tools() -> list[Tool]:
"""
List all available MCP tools
This is a minimal set - see COMPLETE_IMPLEMENTATION_GUIDE.md for full tool set
"""
tools = [
# LLM Testing Tool
Tool(
name="test_llm",
description="""
Test the LLM fallback system.
Sends a test prompt through the LLM fallback chain (Euron → Deepseek → Gemini → Claude).
Returns which provider was used and the response.
Useful for verifying LLM configuration and testing provider availability.
""",
inputSchema={
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "Test prompt to send to LLM"
},
"force_provider": {
"type": "string",
"enum": ["euron", "deepseek", "gemini", "claude"],
"description": "Force specific provider (optional)"
}
},
"required": ["prompt"]
}
),
# LLM Health Check Tool
Tool(
name="llm_health",
description="""
Check health status of all LLM providers.
Returns:
- Provider status (healthy/degraded/failed)
- Success rates
- Total calls and costs
- Circuit breaker states
- Fallback statistics
Useful for monitoring LLM system health.
""",
inputSchema={
"type": "object",
"properties": {}
}
),
# LLM Smart Assistant Tool
Tool(
name="ai_assistant",
description="""
Intelligent AI assistant powered by multi-provider LLM fallback.
Use this for any task requiring AI intelligence:
- Answer questions
- Analyze text
- Generate content
- Summarize information
- Make recommendations
- Problem solving
Automatically uses the best available LLM provider.
""",
inputSchema={
"type": "object",
"properties": {
"task": {
"type": "string",
"description": "What you want the AI to do"
},
"context": {
"type": "string",
"description": "Additional context (optional)"
}
}
}
),
# Configuration Tool
Tool(
name="show_config",
description="""
Show current MCP server configuration.
Returns:
- Enabled LLM providers
- Google API configuration
- Feature flags
- System settings
""",
inputSchema={
"type": "object",
"properties": {}
}
),
# Calendar: Create Event / Block Time
Tool(
name="create_calendar_event",
description="""
Create a new Google Calendar event or block time.
Use this to:
- Schedule meetings
- Block focus time
- Create reminders
- Set up interview slots
Supports:
- Custom start/end times
- Descriptions and locations
- Multiple reminders (15min, 1hr, 1day before, etc.)
- Attendee invitations
""",
inputSchema={
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Event title (e.g., 'Interview Prep', 'Meeting with John')"
},
"start_time": {
"type": "string",
"description": "Start time in ISO format (e.g., '2025-11-03T20:00:00') or natural language"
},
"end_time": {
"type": "string",
"description": "End time in ISO format or natural language"
},
"description": {
"type": "string",
"description": "Event description (optional)"
},
"location": {
"type": "string",
"description": "Event location (optional)"
},
"reminders": {
"type": "array",
"items": {"type": "integer"},
"description": "Reminder minutes before event (e.g., [15, 60] for 15min and 1hr reminders)"
},
"timezone": {
"type": "string",
"description": "Timezone (default: America/Los_Angeles)",
"default": "America/Los_Angeles"
}
},
"required": ["title", "start_time", "end_time"]
}
),
# Calendar: Get Events
Tool(
name="get_calendar_events",
description="""
Get calendar events for a specific time range.
Use this to:
- Check today's schedule
- See upcoming interviews
- Find free time slots
- Review past events
""",
inputSchema={
"type": "object",
"properties": {
"time_min": {
"type": "string",
"description": "Start time (ISO format or 'now', 'today', 'tomorrow')"
},
"time_max": {
"type": "string",
"description": "End time (ISO format or 'end_of_day', 'end_of_week')"
},
"max_results": {
"type": "integer",
"description": "Maximum events to return",
"default": 10
}
}
}
),
# Calendar: Update Event
Tool(
name="update_calendar_event",
description="""
Update an existing calendar event.
Use this to:
- Add/update reminders
- Change event time
- Update description
- Modify title
""",
inputSchema={
"type": "object",
"properties": {
"event_id": {
"type": "string",
"description": "Event ID to update"
},
"title": {
"type": "string",
"description": "New title (optional)"
},
"start_time": {
"type": "string",
"description": "New start time (optional)"
},
"end_time": {
"type": "string",
"description": "New end time (optional)"
},
"description": {
"type": "string",
"description": "New description (optional)"
},
"reminders": {
"type": "array",
"items": {"type": "integer"},
"description": "New reminder minutes (optional)"
}
},
"required": ["event_id"]
}
),
# INTELLIGENT AGENT: Book Calendar Time
Tool(
name="book_calendar_time",
description="""
🤖 INTELLIGENT AGENT: Automatically book calendar time from natural language.
This is an AI-powered agent that understands requests like:
- "Block time tomorrow 1-2 PM for pickup"
- "Schedule interview prep tonight 8-10 PM with 15min reminder"
- "Add meeting next Monday at 2 PM for 1 hour"
- "Book time this Friday afternoon for 90 minutes"
The agent will:
✅ Understand natural language
✅ Parse relative times (tomorrow, tonight, next Monday, etc.)
✅ Detect conflicts with existing events
✅ Create the calendar event automatically
✅ Add reminders if mentioned
✅ Return event link and details
This is the EASIEST way to book calendar time - just describe what you want!
""",
inputSchema={
"type": "object",
"properties": {
"request": {
"type": "string",
"description": "Natural language booking request (e.g., 'Block tomorrow 1-2 PM for pickup')"
},
"timezone": {
"type": "string",
"description": "Timezone (default: America/Los_Angeles)",
"default": "America/Los_Angeles"
}
},
"required": ["request"]
}
),
# INTELLIGENT AGENT: Find Free Time
Tool(
name="find_free_time",
description="""
🤖 INTELLIGENT AGENT: Find free time slots in your calendar.
Use this to:
- Find available meeting times
- Suggest when to schedule something
- Identify gaps in your schedule
Automatically checks your calendar and suggests free slots.
""",
inputSchema={
"type": "object",
"properties": {
"date": {
"type": "string",
"description": "Date to search (e.g., 'tomorrow', '2025-11-05', 'next Monday')"
},
"duration_minutes": {
"type": "integer",
"description": "Required duration in minutes (default: 60)",
"default": 60
},
"between_hours": {
"type": "array",
"items": {"type": "integer"},
"description": "Time range as [start_hour, end_hour] (default: [9, 17] for 9AM-5PM)"
}
}
}
),
# ============ EMAIL TOOLS ============
# Email: Search Emails
Tool(
name="search_emails",
description="""
Search emails in Gmail with advanced filtering.
Supports:
- Gmail query syntax
- Date filtering
- Sender/recipient filtering
- Subject filtering
- Attachment filtering
- Unread filtering
Returns emails with full details.
""",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Gmail search query (e.g., 'from:example@gmail.com subject:interview')"
},
"max_results": {
"type": "integer",
"description": "Maximum emails to return (default: 50)",
"default": 50
},
"after_date": {
"type": "string",
"description": "Filter emails after this date (ISO format: 2024-01-01)"
},
"before_date": {
"type": "string",
"description": "Filter emails before this date (ISO format: 2024-12-31)"
},
"from_email": {
"type": "string",
"description": "Filter by sender email"
},
"to_email": {
"type": "string",
"description": "Filter by recipient email"
},
"subject": {
"type": "string",
"description": "Filter by subject keywords"
},
"has_attachment": {
"type": "boolean",
"description": "Filter emails with attachments"
},
"is_unread": {
"type": "boolean",
"description": "Filter unread emails only"
}
}
}
),
# Email: Get Email Details
Tool(
name="get_email",
description="""
Get detailed information about a specific email.
Returns:
- Full email body
- Headers (from, to, subject, date)
- Attachments info
- Labels and read status
""",
inputSchema={
"type": "object",
"properties": {
"email_id": {
"type": "string",
"description": "Email message ID"
}
},
"required": ["email_id"]
}
),
# Email: Send Email (Manual)
Tool(
name="send_email",
description="""
Send an email manually (you provide all details).
Use this when you want full control over email content.
For AI-composed emails, use 'compose_and_send_email' instead.
""",
inputSchema={
"type": "object",
"properties": {
"to": {
"type": "string",
"description": "Recipient email address"
},
"subject": {
"type": "string",
"description": "Email subject"
},
"body": {
"type": "string",
"description": "Email body"
},
"cc": {
"type": "string",
"description": "CC recipients (comma-separated)"
},
"bcc": {
"type": "string",
"description": "BCC recipients (comma-separated)"
},
"html": {
"type": "boolean",
"description": "Whether body is HTML",
"default": False
}
},
"required": ["to", "subject", "body"]
}
),
# INTELLIGENT AGENT: Compose and Send Email
Tool(
name="compose_and_send_email",
description="""
🤖 INTELLIGENT AGENT: Compose and send email from natural language.
This AI agent understands requests like:
- "Send email to john@example.com thanking him for the interview"
- "Write follow-up to recruiter@company.com about my application"
- "Compose professional email to manager requesting time off"
The agent will:
✅ Understand your intent
✅ Compose professional email body
✅ Generate appropriate subject line
✅ Send the email automatically
This is the EASIEST way to send emails - just describe what you want!
""",
inputSchema={
"type": "object",
"properties": {
"request": {
"type": "string",
"description": "Natural language email request (e.g., 'Send thank you email to interviewer@company.com')"
},
"to": {
"type": "string",
"description": "Recipient email (optional if mentioned in request)"
},
"subject": {
"type": "string",
"description": "Email subject (optional, will be generated if not provided)"
},
"additional_context": {
"type": "string",
"description": "Additional context to include in email"
}
},
"required": ["request"]
}
),
# INTELLIGENT AGENT: Draft Email (Preview Only)
Tool(
name="draft_email",
description="""
🤖 INTELLIGENT AGENT: Draft an email without sending (preview only).
Use this to:
- Preview email before sending
- Get AI-composed email for review
- See what the agent would write
Same as 'compose_and_send_email' but doesn't actually send.
""",
inputSchema={
"type": "object",
"properties": {
"request": {
"type": "string",
"description": "Natural language email request"
},
"to": {
"type": "string",
"description": "Recipient email (optional)"
},
"subject": {
"type": "string",
"description": "Email subject (optional)"
},
"additional_context": {
"type": "string",
"description": "Additional context"
}
},
"required": ["request"]
}
),
# Email: Reply to Email
Tool(
name="reply_to_email",
description="""
Reply to an existing email.
Automatically:
- Uses correct recipient (from original sender)
- Adds "Re:" to subject
- Maintains thread
""",
inputSchema={
"type": "object",
"properties": {
"email_id": {
"type": "string",
"description": "ID of email to reply to"
},
"body": {
"type": "string",
"description": "Reply body"
},
"html": {
"type": "boolean",
"description": "Whether body is HTML",
"default": False
}
},
"required": ["email_id", "body"]
}
)
]
logger.info(f"Listed {len(tools)} tools")
return tools
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""
Handle tool calls from Claude Desktop
"""
logger.info(f"Tool called: {name}", tool=name, arguments=arguments)
try:
# Test LLM
if name == "test_llm":
prompt = arguments.get("prompt", "Hello!")
force_provider = arguments.get("force_provider")
response = await llm_manager.generate(
prompt=prompt,
force_provider=force_provider
)
result = {
"success": True,
"provider_used": response.provider,
"response": response.content,
"usage": response.usage,
"cost": response.cost,
"latency": response.latency
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
# LLM Health Check
elif name == "llm_health":
health = await llm_manager.health_check()
result = {
"success": True,
"health": health
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2, default=str)
)]
# AI Assistant
elif name == "ai_assistant":
task = arguments.get("task", "")
context = arguments.get("context", "")
system_prompt = "You are a helpful AI assistant. Provide clear, accurate, and useful responses."
if context:
prompt = f"Context: {context}\n\nTask: {task}"
else:
prompt = task
response = await llm_manager.generate(
prompt=prompt,
system_prompt=system_prompt
)
result = {
"success": True,
"response": response.content,
"provider": response.provider,
"cost": response.cost
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
# Show Configuration
elif name == "show_config":
llm_config = config.get_llm_config()
enabled_providers = [
p['name'] for p in llm_config.get('providers', [])
if p.get('enabled', True)
]
result = {
"success": True,
"configuration": {
"llm_providers": enabled_providers,
"circuit_breaker": llm_config.get('circuit_breaker', {}),
"rate_limit": llm_config.get('rate_limit', {}),
"environment": config.get('environment', 'production')
}
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
# Create Calendar Event
elif name == "create_calendar_event":
from dateutil import parser
import pytz
title = arguments.get("title")
start_time_str = arguments.get("start_time")
end_time_str = arguments.get("end_time")
description = arguments.get("description")
location = arguments.get("location")
reminders = arguments.get("reminders")
timezone = arguments.get("timezone", "America/Los_Angeles")
# Parse times
try:
start_time = parser.parse(start_time_str)
end_time = parser.parse(end_time_str)
# Make timezone aware if not already
tz = pytz.timezone(timezone)
if start_time.tzinfo is None:
start_time = tz.localize(start_time)
if end_time.tzinfo is None:
end_time = tz.localize(end_time)
result = calendar_adapter.create_event(
summary=title,
start_time=start_time,
end_time=end_time,
description=description,
location=location,
reminders=reminders,
timezone=timezone
)
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
except Exception as e:
result = {
"success": False,
"error": f"Failed to parse times or create event: {str(e)}"
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
# Get Calendar Events
elif name == "get_calendar_events":
from dateutil import parser
import pytz
time_min_str = arguments.get("time_min")
time_max_str = arguments.get("time_max")
max_results = arguments.get("max_results", 10)
try:
time_min = None
time_max = None
if time_min_str:
if time_min_str.lower() in ['now', 'today']:
time_min = datetime.now(pytz.UTC)
else:
time_min = parser.parse(time_min_str)
if time_min.tzinfo is None:
time_min = pytz.UTC.localize(time_min)
if time_max_str:
if time_max_str.lower() == 'end_of_day':
time_max = datetime.now(pytz.UTC).replace(hour=23, minute=59, second=59)
elif time_max_str.lower() == 'end_of_week':
from datetime import timedelta
time_max = datetime.now(pytz.UTC) + timedelta(days=7)
else:
time_max = parser.parse(time_max_str)
if time_max.tzinfo is None:
time_max = pytz.UTC.localize(time_max)
result = calendar_adapter.get_events(
time_min=time_min,
time_max=time_max,
max_results=max_results
)
return [TextContent(
type="text",
text=json.dumps(result, indent=2, default=str)
)]
except Exception as e:
result = {
"success": False,
"error": f"Failed to get events: {str(e)}"
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
# Update Calendar Event
elif name == "update_calendar_event":
from dateutil import parser
import pytz
event_id = arguments.get("event_id")
title = arguments.get("title")
start_time_str = arguments.get("start_time")
end_time_str = arguments.get("end_time")
description = arguments.get("description")
reminders = arguments.get("reminders")
try:
start_time = None
end_time = None
if start_time_str:
start_time = parser.parse(start_time_str)
if start_time.tzinfo is None:
start_time = pytz.UTC.localize(start_time)
if end_time_str:
end_time = parser.parse(end_time_str)
if end_time.tzinfo is None:
end_time = pytz.UTC.localize(end_time)
result = calendar_adapter.update_event(
event_id=event_id,
summary=title,
start_time=start_time,
end_time=end_time,
description=description,
reminders=reminders
)
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
except Exception as e:
result = {
"success": False,
"error": f"Failed to update event: {str(e)}"
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
# AGENT: Book Calendar Time
elif name == "book_calendar_time":
request = arguments.get("request")
timezone = arguments.get("timezone", "America/Los_Angeles")
try:
result = await calendar_agent.book_time(
request=request,
timezone=timezone
)
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
except Exception as e:
result = {
"success": False,
"error": f"Agent failed: {str(e)}"
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
# AGENT: Find Free Time
elif name == "find_free_time":
from dateutil import parser as date_parser
import pytz
date_str = arguments.get("date", "today")
duration_minutes = arguments.get("duration_minutes", 60)
between_hours = arguments.get("between_hours", [9, 17])
try:
# Parse date
now = datetime.now(pytz.timezone("America/Los_Angeles"))
if date_str.lower() == "today":
search_date = now
elif date_str.lower() == "tomorrow":
search_date = now + timedelta(days=1)
else:
search_date = date_parser.parse(date_str)
if search_date.tzinfo is None:
search_date = pytz.timezone("America/Los_Angeles").localize(search_date)
result = await calendar_agent.find_free_slots(
date=search_date,
duration_minutes=duration_minutes,
between_hours=tuple(between_hours)
)
return [TextContent(
type="text",
text=json.dumps(result, indent=2, default=str)
)]
except Exception as e:
result = {
"success": False,
"error": f"Agent failed: {str(e)}"
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
# EMAIL TOOLS
# Search Emails
elif name == "search_emails":
try:
result = gmail_adapter.search_emails(
query=arguments.get("query"),
max_results=arguments.get("max_results", 50),
after_date=arguments.get("after_date"),
before_date=arguments.get("before_date"),
from_email=arguments.get("from_email"),
to_email=arguments.get("to_email"),
subject=arguments.get("subject"),
has_attachment=arguments.get("has_attachment"),
is_unread=arguments.get("is_unread")
)
return [TextContent(
type="text",
text=json.dumps(result, indent=2, default=str)
)]
except Exception as e:
result = {
"success": False,
"error": f"Failed to search emails: {str(e)}"
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
# Get Email
elif name == "get_email":
try:
email_id = arguments.get("email_id")
result = gmail_adapter.get_email(email_id)
return [TextContent(
type="text",
text=json.dumps(result, indent=2, default=str)
)]
except Exception as e:
result = {
"success": False,
"error": f"Failed to get email: {str(e)}"
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
# Send Email (Manual)
elif name == "send_email":
try:
result = gmail_adapter.send_email(
to=arguments.get("to"),
subject=arguments.get("subject"),
body=arguments.get("body"),
cc=arguments.get("cc"),
bcc=arguments.get("bcc"),
html=arguments.get("html", False)
)
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
except Exception as e:
result = {
"success": False,
"error": f"Failed to send email: {str(e)}"
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
# AGENT: Compose and Send Email
elif name == "compose_and_send_email":
try:
request = arguments.get("request")
to = arguments.get("to")
subject = arguments.get("subject")
additional_context = arguments.get("additional_context")
result = await email_agent.compose_and_send(
request=request,
to=to,
subject=subject,
additional_context=additional_context
)
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
except Exception as e:
result = {
"success": False,
"error": f"Email agent failed: {str(e)}"
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
# AGENT: Draft Email (Preview Only)
elif name == "draft_email":
try:
request = arguments.get("request")
to = arguments.get("to")
subject = arguments.get("subject")
additional_context = arguments.get("additional_context")
result = await email_agent.draft_email(
request=request,
to=to,
subject=subject,
additional_context=additional_context
)
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
except Exception as e:
result = {
"success": False,
"error": f"Email agent failed: {str(e)}"
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
# Reply to Email
elif name == "reply_to_email":
try:
email_id = arguments.get("email_id")
body = arguments.get("body")
html = arguments.get("html", False)
result = gmail_adapter.reply_to_email(
email_id=email_id,
body=body,
html=html
)
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
except Exception as e:
result = {
"success": False,
"error": f"Failed to reply: {str(e)}"
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
# Unknown tool
else:
result = {
"success": False,
"error": f"Unknown tool: {name}"
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
except Exception as e:
logger.error(f"Tool execution failed: {e}", tool=name, error=str(e))
result = {
"success": False,
"error": str(e),
"tool": name
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
async def main():
"""Main entry point for MCP server"""
from mcp.server.stdio import stdio_server
logger.info("=" * 70)
logger.info("Enhanced MCP Server Starting")
logger.info("=" * 70)
# Initialize
if not initialize():
logger.error("Initialization failed - exiting")
sys.exit(1)
logger.info("Server initialized successfully")
logger.info("Waiting for Claude Desktop connection...")
# Run server
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Server stopped by user")
except Exception as e:
logger.error(f"Server error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)