Skip to main content
Glama
tools.py19.7 kB
"""MCP tools implementation for email operations.""" import json import logging import os from datetime import datetime from typing import Dict, List, Optional, Union, Any from mcp.server.fastmcp import FastMCP from mcp.server.fastmcp import Context from imap_mcp.imap_client import ImapClient from imap_mcp.resources import get_client_from_context, get_smtp_client_from_context from typing import Dict from datetime import datetime logger = logging.getLogger(__name__) # Define the path for storing tasks TASKS_FILE = os.path.join(os.path.dirname(os.path.dirname(__file__)), "tasks.json") def register_tools(mcp: FastMCP, imap_client: ImapClient) -> None: """Register MCP tools. Args: mcp: MCP server imap_client: IMAP client """ # Using decorator pattern to register tools @mcp.tool() async def draft_meeting_reply_tool(invite_details: Dict[str, Any], availability_status: bool, ctx: Context) -> Dict[str, str]: """Drafts a meeting reply (accept/decline) based on calendar invite details and availability. Args: invite_details: Dictionary containing invite details (subject, start_time, end_time, organizer, location) availability_status: Whether the user is available for the meeting (True=available/accept, False=unavailable/decline) ctx: MCP context Returns: Dictionary with reply text and additional metadata """ return await draft_meeting_reply(invite_details, availability_status, ctx) @mcp.tool() async def identify_meeting_invite_tool(folder: str, uid: int, ctx: Context) -> Dict[str, Any]: """Identifies if an email is a meeting invite and extracts relevant details. Args: folder: Email folder name uid: Email UID ctx: MCP context Returns: Dictionary with invite details if it's a meeting invite, or status information if not """ return await identify_meeting_invite(folder, uid, ctx) @mcp.tool() async def check_calendar_availability_tool(start_time: str, end_time: str, ctx: Context) -> Dict[str, Any]: """Checks calendar availability for a given time slot. Args: start_time: Meeting start time (ISO format) end_time: Meeting end time (ISO format) ctx: MCP context Returns: Dictionary with availability status and additional information """ return await check_calendar_availability(start_time, end_time, ctx) @mcp.tool() async def process_invite_email_tool(folder: str, uid: int, ctx: Context) -> Dict[str, Any]: """Processes a meeting invitation email: identifies invite, checks availability, drafts reply, saves draft. Args: folder: Email folder name uid: Email UID ctx: MCP context Returns: Dictionary with processing results and status information """ return await process_invite_email(folder, uid, ctx) @mcp.tool() async def create_task(description: str, ctx: Context, due_date: Optional[str] = None, priority: Optional[int] = None) -> str: """Creates a new task and saves it to a local file. Args: description: Task description ctx: MCP context due_date: Optional due date in ISO format priority: Optional priority (1=high, 2=medium, 3=low) Returns: Success message or error information """ # Call the internal implementation return await _create_task_impl(description, ctx, due_date, priority) @mcp.tool() async def draft_reply_tool(folder: str, uid: int, reply_body: str, ctx: Context, reply_all: bool = False, cc: Optional[List[str]] = None, body_html: Optional[str] = None) -> Dict[str, Any]: """Creates a draft reply to an email and saves it to the drafts folder. Args: folder: Email folder name uid: Email UID reply_body: Reply text content ctx: MCP context reply_all: Whether to reply to all recipients cc: Optional CC recipients body_html: Optional HTML version of the reply Returns: Dictionary with status and the UID of the created draft """ # Avoid recursion by calling the internal implementation return await _draft_reply_impl(folder, uid, reply_body, ctx, reply_all, cc, body_html) # Move email to a different folder @mcp.tool() async def move_email( folder: str, uid: int, target_folder: str, ctx: Context, ) -> str: """Move email to another folder. Args: folder: Source folder uid: Email UID target_folder: Target folder ctx: MCP context Returns: Success message or error message """ client = get_client_from_context(ctx) try: success = client.move_email(uid, folder, target_folder) if success: return f"Email moved from {folder} to {target_folder}" else: return "Failed to move email" except Exception as e: logger.error(f"Error moving email: {e}") return f"Error: {e}" # Mark email as read @mcp.tool() async def mark_as_read( folder: str, uid: int, ctx: Context, ) -> str: """Mark email as read. Args: folder: Folder name uid: Email UID ctx: MCP context Returns: Success message or error message """ client = get_client_from_context(ctx) try: success = client.mark_email(uid, folder, r"\Seen", True) if success: return "Email marked as read" else: return "Failed to mark email as read" except Exception as e: logger.error(f"Error marking email as read: {e}") return f"Error: {e}" # Mark email as unread @mcp.tool() async def mark_as_unread( folder: str, uid: int, ctx: Context, ) -> str: """Mark email as unread. Args: folder: Folder name uid: Email UID ctx: MCP context Returns: Success message or error message """ client = get_client_from_context(ctx) try: success = client.mark_email(uid, folder, r"\Seen", False) if success: return "Email marked as unread" else: return "Failed to mark email as unread" except Exception as e: logger.error(f"Error marking email as unread: {e}") return f"Error: {e}" # Flag email (important/starred) @mcp.tool() async def flag_email( folder: str, uid: int, ctx: Context, flag: bool = True, ) -> str: """Flag or unflag email. Args: folder: Folder name uid: Email UID flag: True to flag, False to unflag ctx: MCP context Returns: Success message or error message """ client = get_client_from_context(ctx) try: success = client.mark_email(uid, folder, r"\Flagged", flag) if success: return f"Email {'flagged' if flag else 'unflagged'}" else: return f"Failed to {'flag' if flag else 'unflag'} email" except Exception as e: logger.error(f"Error flagging email: {e}") return f"Error: {e}" # Delete email @mcp.tool() async def delete_email( folder: str, uid: int, ctx: Context, ) -> str: """Delete email. Args: folder: Folder name uid: Email UID ctx: MCP context Returns: Success message or error message """ client = get_client_from_context(ctx) try: success = client.delete_email(uid, folder) if success: return "Email deleted" else: return "Failed to delete email" except Exception as e: logger.error(f"Error deleting email: {e}") return f"Error: {e}" # Search for emails @mcp.tool() async def search_emails( query: str, ctx: Context, folder: Optional[str] = None, criteria: str = "text", limit: int = 10, ) -> str: """Search for emails. Args: query: Search query folder: Folder to search in (None for all folders) criteria: Search criteria (text, from, to, subject, all, unseen, seen) limit: Maximum number of results ctx: MCP context Returns: JSON-formatted list of search results """ client = get_client_from_context(ctx) # Define search criteria search_criteria_map = { "text": ["TEXT", query], "from": ["FROM", query], "to": ["TO", query], "subject": ["SUBJECT", query], "all": "ALL", "unseen": "UNSEEN", "seen": "SEEN", "today": "today", "week": "week", "month": "month", } if criteria.lower() not in search_criteria_map: return f"Invalid search criteria: {criteria}" search_criteria = search_criteria_map[criteria.lower()] folders_to_search = [folder] if folder else client.list_folders() results = [] for current_folder in folders_to_search: try: # Search for emails uids = client.search(search_criteria, folder=current_folder) # Limit results and sort by newest first uids = sorted(uids, reverse=True)[:limit] if uids: # Fetch emails emails = client.fetch_emails(uids, folder=current_folder) # Create summaries for uid, email_obj in emails.items(): results.append({ "uid": uid, "folder": current_folder, "from": str(email_obj.from_), "to": [str(to) for to in email_obj.to], "subject": email_obj.subject, "date": email_obj.date.isoformat() if email_obj.date else None, "flags": email_obj.flags, "has_attachments": len(email_obj.attachments) > 0, }) except Exception as e: logger.warning(f"Error searching folder {current_folder}: {e}") # Sort results by date (newest first) results.sort( key=lambda x: x.get("date") or "0", reverse=True ) # Apply global limit results = results[:limit] return json.dumps(results, indent=2) # Process email interactive session @mcp.tool() async def process_email( folder: str, uid: int, action: str, ctx: Context, notes: Optional[str] = None, target_folder: Optional[str] = None, ) -> str: """Process an email with specified action. This is a higher-level tool that combines multiple actions and records the decision for learning purposes. Args: folder: Folder name uid: Email UID action: Action to take (move, read, unread, flag, unflag, delete) notes: Optional notes about the decision target_folder: Target folder for move action ctx: MCP context Returns: Success message or error message """ client = get_client_from_context(ctx) # Fetch the email first to have context for learning email_obj = client.fetch_email(uid, folder) if not email_obj: return f"Email with UID {uid} not found in folder {folder}" # Process the action result = "" try: if action.lower() == "move": if not target_folder: return "Target folder must be specified for move action" client.move_email(uid, folder, target_folder) result = f"Email moved from {folder} to {target_folder}" elif action.lower() == "read": client.mark_email(uid, folder, r"\Seen", True) result = "Email marked as read" elif action.lower() == "unread": client.mark_email(uid, folder, r"\Seen", False) result = "Email marked as unread" elif action.lower() == "flag": client.mark_email(uid, folder, r"\Flagged", True) result = "Email flagged" elif action.lower() == "unflag": client.mark_email(uid, folder, r"\Flagged", False) result = "Email unflagged" elif action.lower() == "delete": client.delete_email(uid, folder) result = "Email deleted" else: return f"Invalid action: {action}" # TODO: Record the action for learning in a separate module return result except Exception as e: logger.error(f"Error processing email: {e}") return f"Error: {e}" # Process meeting invite and generate a draft reply @mcp.tool() async def process_meeting_invite( folder: str, uid: int, ctx: Context, availability_mode: str = "random", ) -> dict: """Process a meeting invite email and create a draft reply. This tool orchestrates the full workflow: 1. Identifies if the email is a meeting invite 2. Checks calendar availability for the meeting time 3. Generates an appropriate reply (accept/decline) 4. Creates a MIME message for the reply 5. Saves the reply as a draft Args: folder: Folder containing the invite email uid: UID of the invite email ctx: MCP context availability_mode: Mode for availability check (random, always_available, always_busy, business_hours, weekdays) Returns: Dictionary with the processing result: - status: "success", "not_invite", or "error" - message: Description of the result - draft_uid: UID of the saved draft (if successful) - draft_folder: Folder where the draft was saved (if successful) - availability: Whether the time slot was available """ from imap_mcp.workflows.invite_parser import identify_meeting_invite_details from imap_mcp.workflows.calendar_mock import check_mock_availability from imap_mcp.workflows.meeting_reply import generate_meeting_reply_content from imap_mcp.smtp_client import create_reply_mime client = get_client_from_context(ctx) result = { "status": "error", "message": "An error occurred during processing", "draft_uid": None, "draft_folder": None, "availability": None } try: # Step 1: Fetch the original email logger.info(f"Fetching email UID {uid} from folder {folder}") email_obj = client.fetch_email(uid, folder) if not email_obj: result["message"] = f"Email with UID {uid} not found in folder {folder}" return result # Step 2: Identify if it's a meeting invite logger.info(f"Analyzing email for meeting invite details: {email_obj.subject}") invite_result = identify_meeting_invite_details(email_obj) if not invite_result["is_invite"]: result["status"] = "not_invite" result["message"] = "The email is not a meeting invite" return result invite_details = invite_result["details"] # Step 3: Check calendar availability logger.info(f"Checking calendar availability for meeting: {invite_details['subject']}") availability_result = check_mock_availability( invite_details.get("start_time"), invite_details.get("end_time"), availability_mode ) result["availability"] = availability_result["available"] # Step 4: Generate reply content logger.info(f"Generating {'accept' if availability_result['available'] else 'decline'} reply") reply_content = generate_meeting_reply_content(invite_details, availability_result) # Step 5: Create MIME message for reply logger.info("Creating MIME message for reply") # Create EmailAddress object for the reply sender (use the original recipient) if email_obj.to and len(email_obj.to) > 0: reply_from = email_obj.to[0] else: # Fallback to a default if no recipient in original email reply_from = EmailAddress( name="Me", address=client.config.username ) # Create the reply MIME message - using the standalone function mime_message = create_reply_mime( original_email=email_obj, reply_to=reply_from, body=reply_content["reply_body"], subject=reply_content["reply_subject"], # Don't use reply_all for meeting responses reply_all=False ) # Step 6: Save as draft logger.info("Saving reply as draft") draft_uid = client.save_draft_mime(mime_message) if draft_uid: drafts_folder = client._get_drafts_folder() result["status"] = "success" result["message"] = f"Draft reply created: {reply_content['reply_type']}" result["draft_uid"] = draft_uid result["draft_folder"] = drafts_folder logger.info(f"Draft saved successfully with UID {draft_uid} in folder {drafts_folder}") else: result["message"] = "Failed to save draft" except Exception as e: logger.error(f"Error processing meeting invite: {e}") result["message"] = f"Error: {e}" return result

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/non-dirty/imap-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server