Skip to main content
Glama
mcp_server.py40.1 kB
import asyncio import sys import json from pathlib import Path from typing import Any from datetime import datetime, timedelta # Add project root to path PROJECT_ROOT = Path(__file__).parent.parent sys.path.insert(0, str(PROJECT_ROOT)) from mcp.server import Server from mcp.types import Tool, TextContent import structlog # Import utilities from src.utils.config_loader import get_config from src.utils.llm_manager import LLMManager from src.adapters.calendar_adapter import CalendarAdapter from src.adapters.gmail_adapter import GmailAdapter from src.agents.calendar_agent import CalendarBookingAgent from src.agents.email_agent import EmailAgent # Setup logging logger = structlog.get_logger(__name__) # Initialize server server = Server("enhanced-google-mcp-server") # Global state config = None llm_manager = None calendar_adapter = None calendar_agent = None gmail_adapter = None email_agent = None def initialize(): """Initialize configuration and LLM manager""" global config, llm_manager, calendar_adapter, calendar_agent, gmail_adapter, email_agent try: # Load configuration config = get_config() logger.info("Configuration loaded successfully") # Initialize LLM Manager llm_config = config.get_llm_config() llm_manager = LLMManager(config.yaml_config) logger.info("LLM Manager initialized successfully") # Initialize Calendar Adapter calendar_adapter = CalendarAdapter() logger.info("Calendar adapter initialized successfully") # Initialize Calendar Booking Agent calendar_agent = CalendarBookingAgent(calendar_adapter, llm_manager) logger.info("Calendar booking agent initialized successfully") # Initialize Gmail Adapter gmail_adapter = GmailAdapter() logger.info("Gmail adapter initialized successfully") # Initialize Email Agent email_agent = EmailAgent(gmail_adapter, llm_manager) logger.info("Email agent initialized successfully") return True except Exception as e: logger.error(f"Initialization failed: {e}") import traceback traceback.print_exc() return False @server.list_tools() async def list_tools() -> list[Tool]: """ List all available MCP tools This is a minimal set - see COMPLETE_IMPLEMENTATION_GUIDE.md for full tool set """ tools = [ # LLM Testing Tool Tool( name="test_llm", description=""" Test the LLM fallback system. Sends a test prompt through the LLM fallback chain (Euron → Deepseek → Gemini → Claude). Returns which provider was used and the response. Useful for verifying LLM configuration and testing provider availability. """, inputSchema={ "type": "object", "properties": { "prompt": { "type": "string", "description": "Test prompt to send to LLM" }, "force_provider": { "type": "string", "enum": ["euron", "deepseek", "gemini", "claude"], "description": "Force specific provider (optional)" } }, "required": ["prompt"] } ), # LLM Health Check Tool Tool( name="llm_health", description=""" Check health status of all LLM providers. Returns: - Provider status (healthy/degraded/failed) - Success rates - Total calls and costs - Circuit breaker states - Fallback statistics Useful for monitoring LLM system health. """, inputSchema={ "type": "object", "properties": {} } ), # LLM Smart Assistant Tool Tool( name="ai_assistant", description=""" Intelligent AI assistant powered by multi-provider LLM fallback. Use this for any task requiring AI intelligence: - Answer questions - Analyze text - Generate content - Summarize information - Make recommendations - Problem solving Automatically uses the best available LLM provider. """, inputSchema={ "type": "object", "properties": { "task": { "type": "string", "description": "What you want the AI to do" }, "context": { "type": "string", "description": "Additional context (optional)" } } } ), # Configuration Tool Tool( name="show_config", description=""" Show current MCP server configuration. Returns: - Enabled LLM providers - Google API configuration - Feature flags - System settings """, inputSchema={ "type": "object", "properties": {} } ), # Calendar: Create Event / Block Time Tool( name="create_calendar_event", description=""" Create a new Google Calendar event or block time. Use this to: - Schedule meetings - Block focus time - Create reminders - Set up interview slots Supports: - Custom start/end times - Descriptions and locations - Multiple reminders (15min, 1hr, 1day before, etc.) - Attendee invitations """, inputSchema={ "type": "object", "properties": { "title": { "type": "string", "description": "Event title (e.g., 'Interview Prep', 'Meeting with John')" }, "start_time": { "type": "string", "description": "Start time in ISO format (e.g., '2025-11-03T20:00:00') or natural language" }, "end_time": { "type": "string", "description": "End time in ISO format or natural language" }, "description": { "type": "string", "description": "Event description (optional)" }, "location": { "type": "string", "description": "Event location (optional)" }, "reminders": { "type": "array", "items": {"type": "integer"}, "description": "Reminder minutes before event (e.g., [15, 60] for 15min and 1hr reminders)" }, "timezone": { "type": "string", "description": "Timezone (default: America/Los_Angeles)", "default": "America/Los_Angeles" } }, "required": ["title", "start_time", "end_time"] } ), # Calendar: Get Events Tool( name="get_calendar_events", description=""" Get calendar events for a specific time range. Use this to: - Check today's schedule - See upcoming interviews - Find free time slots - Review past events """, inputSchema={ "type": "object", "properties": { "time_min": { "type": "string", "description": "Start time (ISO format or 'now', 'today', 'tomorrow')" }, "time_max": { "type": "string", "description": "End time (ISO format or 'end_of_day', 'end_of_week')" }, "max_results": { "type": "integer", "description": "Maximum events to return", "default": 10 } } } ), # Calendar: Update Event Tool( name="update_calendar_event", description=""" Update an existing calendar event. Use this to: - Add/update reminders - Change event time - Update description - Modify title """, inputSchema={ "type": "object", "properties": { "event_id": { "type": "string", "description": "Event ID to update" }, "title": { "type": "string", "description": "New title (optional)" }, "start_time": { "type": "string", "description": "New start time (optional)" }, "end_time": { "type": "string", "description": "New end time (optional)" }, "description": { "type": "string", "description": "New description (optional)" }, "reminders": { "type": "array", "items": {"type": "integer"}, "description": "New reminder minutes (optional)" } }, "required": ["event_id"] } ), # INTELLIGENT AGENT: Book Calendar Time Tool( name="book_calendar_time", description=""" 🤖 INTELLIGENT AGENT: Automatically book calendar time from natural language. This is an AI-powered agent that understands requests like: - "Block time tomorrow 1-2 PM for pickup" - "Schedule interview prep tonight 8-10 PM with 15min reminder" - "Add meeting next Monday at 2 PM for 1 hour" - "Book time this Friday afternoon for 90 minutes" The agent will: ✅ Understand natural language ✅ Parse relative times (tomorrow, tonight, next Monday, etc.) ✅ Detect conflicts with existing events ✅ Create the calendar event automatically ✅ Add reminders if mentioned ✅ Return event link and details This is the EASIEST way to book calendar time - just describe what you want! """, inputSchema={ "type": "object", "properties": { "request": { "type": "string", "description": "Natural language booking request (e.g., 'Block tomorrow 1-2 PM for pickup')" }, "timezone": { "type": "string", "description": "Timezone (default: America/Los_Angeles)", "default": "America/Los_Angeles" } }, "required": ["request"] } ), # INTELLIGENT AGENT: Find Free Time Tool( name="find_free_time", description=""" 🤖 INTELLIGENT AGENT: Find free time slots in your calendar. Use this to: - Find available meeting times - Suggest when to schedule something - Identify gaps in your schedule Automatically checks your calendar and suggests free slots. """, inputSchema={ "type": "object", "properties": { "date": { "type": "string", "description": "Date to search (e.g., 'tomorrow', '2025-11-05', 'next Monday')" }, "duration_minutes": { "type": "integer", "description": "Required duration in minutes (default: 60)", "default": 60 }, "between_hours": { "type": "array", "items": {"type": "integer"}, "description": "Time range as [start_hour, end_hour] (default: [9, 17] for 9AM-5PM)" } } } ), # ============ EMAIL TOOLS ============ # Email: Search Emails Tool( name="search_emails", description=""" Search emails in Gmail with advanced filtering. Supports: - Gmail query syntax - Date filtering - Sender/recipient filtering - Subject filtering - Attachment filtering - Unread filtering Returns emails with full details. """, inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Gmail search query (e.g., 'from:example@gmail.com subject:interview')" }, "max_results": { "type": "integer", "description": "Maximum emails to return (default: 50)", "default": 50 }, "after_date": { "type": "string", "description": "Filter emails after this date (ISO format: 2024-01-01)" }, "before_date": { "type": "string", "description": "Filter emails before this date (ISO format: 2024-12-31)" }, "from_email": { "type": "string", "description": "Filter by sender email" }, "to_email": { "type": "string", "description": "Filter by recipient email" }, "subject": { "type": "string", "description": "Filter by subject keywords" }, "has_attachment": { "type": "boolean", "description": "Filter emails with attachments" }, "is_unread": { "type": "boolean", "description": "Filter unread emails only" } } } ), # Email: Get Email Details Tool( name="get_email", description=""" Get detailed information about a specific email. Returns: - Full email body - Headers (from, to, subject, date) - Attachments info - Labels and read status """, inputSchema={ "type": "object", "properties": { "email_id": { "type": "string", "description": "Email message ID" } }, "required": ["email_id"] } ), # Email: Send Email (Manual) Tool( name="send_email", description=""" Send an email manually (you provide all details). Use this when you want full control over email content. For AI-composed emails, use 'compose_and_send_email' instead. """, inputSchema={ "type": "object", "properties": { "to": { "type": "string", "description": "Recipient email address" }, "subject": { "type": "string", "description": "Email subject" }, "body": { "type": "string", "description": "Email body" }, "cc": { "type": "string", "description": "CC recipients (comma-separated)" }, "bcc": { "type": "string", "description": "BCC recipients (comma-separated)" }, "html": { "type": "boolean", "description": "Whether body is HTML", "default": False } }, "required": ["to", "subject", "body"] } ), # INTELLIGENT AGENT: Compose and Send Email Tool( name="compose_and_send_email", description=""" 🤖 INTELLIGENT AGENT: Compose and send email from natural language. This AI agent understands requests like: - "Send email to john@example.com thanking him for the interview" - "Write follow-up to recruiter@company.com about my application" - "Compose professional email to manager requesting time off" The agent will: ✅ Understand your intent ✅ Compose professional email body ✅ Generate appropriate subject line ✅ Send the email automatically This is the EASIEST way to send emails - just describe what you want! """, inputSchema={ "type": "object", "properties": { "request": { "type": "string", "description": "Natural language email request (e.g., 'Send thank you email to interviewer@company.com')" }, "to": { "type": "string", "description": "Recipient email (optional if mentioned in request)" }, "subject": { "type": "string", "description": "Email subject (optional, will be generated if not provided)" }, "additional_context": { "type": "string", "description": "Additional context to include in email" } }, "required": ["request"] } ), # INTELLIGENT AGENT: Draft Email (Preview Only) Tool( name="draft_email", description=""" 🤖 INTELLIGENT AGENT: Draft an email without sending (preview only). Use this to: - Preview email before sending - Get AI-composed email for review - See what the agent would write Same as 'compose_and_send_email' but doesn't actually send. """, inputSchema={ "type": "object", "properties": { "request": { "type": "string", "description": "Natural language email request" }, "to": { "type": "string", "description": "Recipient email (optional)" }, "subject": { "type": "string", "description": "Email subject (optional)" }, "additional_context": { "type": "string", "description": "Additional context" } }, "required": ["request"] } ), # Email: Reply to Email Tool( name="reply_to_email", description=""" Reply to an existing email. Automatically: - Uses correct recipient (from original sender) - Adds "Re:" to subject - Maintains thread """, inputSchema={ "type": "object", "properties": { "email_id": { "type": "string", "description": "ID of email to reply to" }, "body": { "type": "string", "description": "Reply body" }, "html": { "type": "boolean", "description": "Whether body is HTML", "default": False } }, "required": ["email_id", "body"] } ) ] logger.info(f"Listed {len(tools)} tools") return tools @server.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: """ Handle tool calls from Claude Desktop """ logger.info(f"Tool called: {name}", tool=name, arguments=arguments) try: # Test LLM if name == "test_llm": prompt = arguments.get("prompt", "Hello!") force_provider = arguments.get("force_provider") response = await llm_manager.generate( prompt=prompt, force_provider=force_provider ) result = { "success": True, "provider_used": response.provider, "response": response.content, "usage": response.usage, "cost": response.cost, "latency": response.latency } return [TextContent( type="text", text=json.dumps(result, indent=2) )] # LLM Health Check elif name == "llm_health": health = await llm_manager.health_check() result = { "success": True, "health": health } return [TextContent( type="text", text=json.dumps(result, indent=2, default=str) )] # AI Assistant elif name == "ai_assistant": task = arguments.get("task", "") context = arguments.get("context", "") system_prompt = "You are a helpful AI assistant. Provide clear, accurate, and useful responses." if context: prompt = f"Context: {context}\n\nTask: {task}" else: prompt = task response = await llm_manager.generate( prompt=prompt, system_prompt=system_prompt ) result = { "success": True, "response": response.content, "provider": response.provider, "cost": response.cost } return [TextContent( type="text", text=json.dumps(result, indent=2) )] # Show Configuration elif name == "show_config": llm_config = config.get_llm_config() enabled_providers = [ p['name'] for p in llm_config.get('providers', []) if p.get('enabled', True) ] result = { "success": True, "configuration": { "llm_providers": enabled_providers, "circuit_breaker": llm_config.get('circuit_breaker', {}), "rate_limit": llm_config.get('rate_limit', {}), "environment": config.get('environment', 'production') } } return [TextContent( type="text", text=json.dumps(result, indent=2) )] # Create Calendar Event elif name == "create_calendar_event": from dateutil import parser import pytz title = arguments.get("title") start_time_str = arguments.get("start_time") end_time_str = arguments.get("end_time") description = arguments.get("description") location = arguments.get("location") reminders = arguments.get("reminders") timezone = arguments.get("timezone", "America/Los_Angeles") # Parse times try: start_time = parser.parse(start_time_str) end_time = parser.parse(end_time_str) # Make timezone aware if not already tz = pytz.timezone(timezone) if start_time.tzinfo is None: start_time = tz.localize(start_time) if end_time.tzinfo is None: end_time = tz.localize(end_time) result = calendar_adapter.create_event( summary=title, start_time=start_time, end_time=end_time, description=description, location=location, reminders=reminders, timezone=timezone ) return [TextContent( type="text", text=json.dumps(result, indent=2) )] except Exception as e: result = { "success": False, "error": f"Failed to parse times or create event: {str(e)}" } return [TextContent( type="text", text=json.dumps(result, indent=2) )] # Get Calendar Events elif name == "get_calendar_events": from dateutil import parser import pytz time_min_str = arguments.get("time_min") time_max_str = arguments.get("time_max") max_results = arguments.get("max_results", 10) try: time_min = None time_max = None if time_min_str: if time_min_str.lower() in ['now', 'today']: time_min = datetime.now(pytz.UTC) else: time_min = parser.parse(time_min_str) if time_min.tzinfo is None: time_min = pytz.UTC.localize(time_min) if time_max_str: if time_max_str.lower() == 'end_of_day': time_max = datetime.now(pytz.UTC).replace(hour=23, minute=59, second=59) elif time_max_str.lower() == 'end_of_week': from datetime import timedelta time_max = datetime.now(pytz.UTC) + timedelta(days=7) else: time_max = parser.parse(time_max_str) if time_max.tzinfo is None: time_max = pytz.UTC.localize(time_max) result = calendar_adapter.get_events( time_min=time_min, time_max=time_max, max_results=max_results ) return [TextContent( type="text", text=json.dumps(result, indent=2, default=str) )] except Exception as e: result = { "success": False, "error": f"Failed to get events: {str(e)}" } return [TextContent( type="text", text=json.dumps(result, indent=2) )] # Update Calendar Event elif name == "update_calendar_event": from dateutil import parser import pytz event_id = arguments.get("event_id") title = arguments.get("title") start_time_str = arguments.get("start_time") end_time_str = arguments.get("end_time") description = arguments.get("description") reminders = arguments.get("reminders") try: start_time = None end_time = None if start_time_str: start_time = parser.parse(start_time_str) if start_time.tzinfo is None: start_time = pytz.UTC.localize(start_time) if end_time_str: end_time = parser.parse(end_time_str) if end_time.tzinfo is None: end_time = pytz.UTC.localize(end_time) result = calendar_adapter.update_event( event_id=event_id, summary=title, start_time=start_time, end_time=end_time, description=description, reminders=reminders ) return [TextContent( type="text", text=json.dumps(result, indent=2) )] except Exception as e: result = { "success": False, "error": f"Failed to update event: {str(e)}" } return [TextContent( type="text", text=json.dumps(result, indent=2) )] # AGENT: Book Calendar Time elif name == "book_calendar_time": request = arguments.get("request") timezone = arguments.get("timezone", "America/Los_Angeles") try: result = await calendar_agent.book_time( request=request, timezone=timezone ) return [TextContent( type="text", text=json.dumps(result, indent=2) )] except Exception as e: result = { "success": False, "error": f"Agent failed: {str(e)}" } return [TextContent( type="text", text=json.dumps(result, indent=2) )] # AGENT: Find Free Time elif name == "find_free_time": from dateutil import parser as date_parser import pytz date_str = arguments.get("date", "today") duration_minutes = arguments.get("duration_minutes", 60) between_hours = arguments.get("between_hours", [9, 17]) try: # Parse date now = datetime.now(pytz.timezone("America/Los_Angeles")) if date_str.lower() == "today": search_date = now elif date_str.lower() == "tomorrow": search_date = now + timedelta(days=1) else: search_date = date_parser.parse(date_str) if search_date.tzinfo is None: search_date = pytz.timezone("America/Los_Angeles").localize(search_date) result = await calendar_agent.find_free_slots( date=search_date, duration_minutes=duration_minutes, between_hours=tuple(between_hours) ) return [TextContent( type="text", text=json.dumps(result, indent=2, default=str) )] except Exception as e: result = { "success": False, "error": f"Agent failed: {str(e)}" } return [TextContent( type="text", text=json.dumps(result, indent=2) )] # EMAIL TOOLS # Search Emails elif name == "search_emails": try: result = gmail_adapter.search_emails( query=arguments.get("query"), max_results=arguments.get("max_results", 50), after_date=arguments.get("after_date"), before_date=arguments.get("before_date"), from_email=arguments.get("from_email"), to_email=arguments.get("to_email"), subject=arguments.get("subject"), has_attachment=arguments.get("has_attachment"), is_unread=arguments.get("is_unread") ) return [TextContent( type="text", text=json.dumps(result, indent=2, default=str) )] except Exception as e: result = { "success": False, "error": f"Failed to search emails: {str(e)}" } return [TextContent( type="text", text=json.dumps(result, indent=2) )] # Get Email elif name == "get_email": try: email_id = arguments.get("email_id") result = gmail_adapter.get_email(email_id) return [TextContent( type="text", text=json.dumps(result, indent=2, default=str) )] except Exception as e: result = { "success": False, "error": f"Failed to get email: {str(e)}" } return [TextContent( type="text", text=json.dumps(result, indent=2) )] # Send Email (Manual) elif name == "send_email": try: result = gmail_adapter.send_email( to=arguments.get("to"), subject=arguments.get("subject"), body=arguments.get("body"), cc=arguments.get("cc"), bcc=arguments.get("bcc"), html=arguments.get("html", False) ) return [TextContent( type="text", text=json.dumps(result, indent=2) )] except Exception as e: result = { "success": False, "error": f"Failed to send email: {str(e)}" } return [TextContent( type="text", text=json.dumps(result, indent=2) )] # AGENT: Compose and Send Email elif name == "compose_and_send_email": try: request = arguments.get("request") to = arguments.get("to") subject = arguments.get("subject") additional_context = arguments.get("additional_context") result = await email_agent.compose_and_send( request=request, to=to, subject=subject, additional_context=additional_context ) return [TextContent( type="text", text=json.dumps(result, indent=2) )] except Exception as e: result = { "success": False, "error": f"Email agent failed: {str(e)}" } return [TextContent( type="text", text=json.dumps(result, indent=2) )] # AGENT: Draft Email (Preview Only) elif name == "draft_email": try: request = arguments.get("request") to = arguments.get("to") subject = arguments.get("subject") additional_context = arguments.get("additional_context") result = await email_agent.draft_email( request=request, to=to, subject=subject, additional_context=additional_context ) return [TextContent( type="text", text=json.dumps(result, indent=2) )] except Exception as e: result = { "success": False, "error": f"Email agent failed: {str(e)}" } return [TextContent( type="text", text=json.dumps(result, indent=2) )] # Reply to Email elif name == "reply_to_email": try: email_id = arguments.get("email_id") body = arguments.get("body") html = arguments.get("html", False) result = gmail_adapter.reply_to_email( email_id=email_id, body=body, html=html ) return [TextContent( type="text", text=json.dumps(result, indent=2) )] except Exception as e: result = { "success": False, "error": f"Failed to reply: {str(e)}" } return [TextContent( type="text", text=json.dumps(result, indent=2) )] # Unknown tool else: result = { "success": False, "error": f"Unknown tool: {name}" } return [TextContent( type="text", text=json.dumps(result, indent=2) )] except Exception as e: logger.error(f"Tool execution failed: {e}", tool=name, error=str(e)) result = { "success": False, "error": str(e), "tool": name } return [TextContent( type="text", text=json.dumps(result, indent=2) )] async def main(): """Main entry point for MCP server""" from mcp.server.stdio import stdio_server logger.info("=" * 70) logger.info("Enhanced MCP Server Starting") logger.info("=" * 70) # Initialize if not initialize(): logger.error("Initialization failed - exiting") sys.exit(1) logger.info("Server initialized successfully") logger.info("Waiting for Claude Desktop connection...") # Run server async with stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, server.create_initialization_options() ) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Server stopped by user") except Exception as e: logger.error(f"Server error: {e}") import traceback traceback.print_exc() sys.exit(1)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pbulbule13/mcpwithgoogle'

If you have feedback or need assistance with the MCP directory API, please join our Discord server