Skip to main content
Glama
imap_client.py24.1 kB
"""IMAP client implementation.""" import email import logging import re from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Union import imapclient from imap_mcp.config import ImapConfig from imap_mcp.models import Email from imap_mcp.oauth2 import get_access_token logger = logging.getLogger(__name__) class ImapClient: """IMAP client for interacting with email servers.""" def __init__(self, config: ImapConfig, allowed_folders: Optional[List[str]] = None): """Initialize IMAP client. Args: config: IMAP configuration allowed_folders: List of allowed folders (None means all folders) """ self.config = config self.allowed_folders = set(allowed_folders) if allowed_folders else None self.client = None self.folder_cache: Dict[str, List[str]] = {} self.connected = False self.count_cache: Dict[str, Dict[str, Tuple[int, datetime]]] = {} # Cache for message counts self.current_folder = None # Store the currently selected folder self.folder_message_counts = {} # Cache for folder message counts def connect(self) -> None: """Connect to IMAP server. Raises: ConnectionError: If connection fails """ try: self.client = imapclient.IMAPClient( self.config.host, port=self.config.port, ssl=self.config.use_ssl, ) # Use OAuth2 for Gmail if configured if self.config.requires_oauth2: logger.info(f"Using OAuth2 authentication for {self.config.host}") # Get fresh access token if not self.config.oauth2: raise ValueError("OAuth2 configuration is required for Gmail") access_token, _ = get_access_token(self.config.oauth2) # Authenticate with XOAUTH2 # Use the oauth_login method which properly formats the XOAUTH2 string self.client.oauth2_login(self.config.username, access_token) else: # Standard password authentication if not self.config.password: raise ValueError("Password is required for authentication") self.client.login(self.config.username, self.config.password) self.connected = True logger.info(f"Connected to IMAP server {self.config.host}") except Exception as e: self.connected = False logger.error(f"Failed to connect to IMAP server: {e}") raise ConnectionError(f"Failed to connect to IMAP server: {e}") def disconnect(self) -> None: """Disconnect from IMAP server.""" if self.client: try: self.client.logout() except Exception as e: logger.warning(f"Error during IMAP logout: {e}") finally: self.client = None self.connected = False logger.info("Disconnected from IMAP server") def ensure_connected(self) -> None: """Ensure that we are connected to the IMAP server. Raises: ConnectionError: If connection fails """ if not self.connected: self.connect() def get_capabilities(self) -> List[str]: """Get IMAP server capabilities. Returns: List of server capabilities Raises: ConnectionError: If not connected and connection fails """ self.ensure_connected() raw_capabilities = self.client.capabilities() # Convert byte strings to regular strings and normalize case capabilities = [] for cap in raw_capabilities: if isinstance(cap, bytes): cap = cap.decode('utf-8') capabilities.append(cap.upper()) return capabilities def list_folders(self, refresh: bool = False) -> List[str]: """List available folders. Args: refresh: Force refresh folder list cache Returns: List of folder names Raises: ConnectionError: If not connected and connection fails """ self.ensure_connected() # Check cache first if not refresh and self.folder_cache: return list(self.folder_cache.keys()) # Get folders from server folders = [] for flags, delimiter, name in self.client.list_folders(): if isinstance(name, bytes): # Convert bytes to string if necessary name = name.decode("utf-8") # Filter folders if allowed_folders is set if self.allowed_folders is not None and name not in self.allowed_folders: continue folders.append(name) self.folder_cache[name] = flags logger.debug(f"Listed {len(folders)} folders") return folders def _is_folder_allowed(self, folder: str) -> bool: """Check if a folder is allowed. Args: folder: Folder to check Returns: True if folder is allowed, False otherwise """ # If no allowed_folders specified, all folders are allowed if self.allowed_folders is None: return True # If allowed_folders is specified, check if folder is in it return folder in self.allowed_folders def select_folder(self, folder: str, readonly: bool = False) -> Dict: """Select folder on IMAP server. Args: folder: Folder to select readonly: If True, select folder in read-only mode Returns: Dictionary with folder information Raises: ValueError: If folder is not allowed ConnectionError: If connection error occurs """ # Make sure the folder is allowed if not self._is_folder_allowed(folder): raise ValueError(f"Folder '{folder}' is not allowed") self.ensure_connected() try: result = self.client.select_folder(folder, readonly=readonly) self.current_folder = folder logger.debug(f"Selected folder '{folder}'") return result except imapclient.IMAPClient.Error as e: logger.error(f"Error selecting folder {folder}: {e}") raise ConnectionError(f"Failed to select folder {folder}: {e}") def search( self, criteria: Union[str, List, Tuple], folder: str = "INBOX", charset: Optional[str] = None, ) -> List[int]: """Search for messages. Args: criteria: Search criteria folder: Folder to search in charset: Character set for search criteria Returns: List of message UIDs Raises: ConnectionError: If not connected and connection fails """ self.ensure_connected() self.select_folder(folder, readonly=True) if isinstance(criteria, str): # Predefined criteria strings criteria_map = { "all": "ALL", "unseen": "UNSEEN", "seen": "SEEN", "answered": "ANSWERED", "unanswered": "UNANSWERED", "deleted": "DELETED", "undeleted": "UNDELETED", "flagged": "FLAGGED", "unflagged": "UNFLAGGED", "recent": "RECENT", "today": ["SINCE", datetime.now().date()], "yesterday": [ "SINCE", (datetime.now() - timedelta(days=1)).date(), "BEFORE", datetime.now().date(), ], "week": ["SINCE", (datetime.now() - timedelta(days=7)).date()], "month": ["SINCE", (datetime.now() - timedelta(days=30)).date()], } if criteria.lower() in criteria_map: criteria = criteria_map[criteria.lower()] results = self.client.search(criteria, charset=charset) logger.debug(f"Search returned {len(results)} results") return list(results) def fetch_email(self, uid: int, folder: str = "INBOX") -> Optional[Email]: """Fetch a single email by UID. Args: uid: Email UID folder: Folder to fetch from Returns: Email object or None if not found Raises: ConnectionError: If not connected and connection fails """ self.ensure_connected() self.select_folder(folder, readonly=True) # Fetch message data with BODY.PEEK[] to get all parts including headers # Using BODY.PEEK[] instead of RFC822 to avoid setting the \Seen flag result = self.client.fetch([uid], ["BODY.PEEK[]", "FLAGS"]) if not result or uid not in result: logger.warning(f"Message with UID {uid} not found in folder {folder}") return None # Parse message message_data = result[uid] raw_message = message_data[b"BODY[]"] flags = message_data[b"FLAGS"] # Convert flags to strings str_flags = [ f.decode("utf-8") if isinstance(f, bytes) else f for f in flags ] # Parse email message = email.message_from_bytes(raw_message) email_obj = Email.from_message(message, uid=uid, folder=folder) email_obj.flags = str_flags return email_obj def fetch_emails( self, uids: List[int], folder: str = "INBOX", limit: Optional[int] = None, ) -> Dict[int, Email]: """Fetch multiple emails by UIDs. Args: uids: List of email UIDs folder: Folder to fetch from limit: Maximum number of emails to fetch Returns: Dictionary mapping UIDs to Email objects Raises: ConnectionError: If not connected and connection fails """ self.ensure_connected() self.select_folder(folder, readonly=True) # Apply limit if specified if limit is not None and limit > 0: uids = uids[:limit] # Fetch message data if not uids: return {} # Use BODY.PEEK[] to get full message including all parts and headers result = self.client.fetch(uids, ["BODY.PEEK[]", "FLAGS"]) # Parse emails emails = {} for uid, message_data in result.items(): raw_message = message_data[b"BODY[]"] flags = message_data[b"FLAGS"] # Convert flags to strings str_flags = [ f.decode("utf-8") if isinstance(f, bytes) else f for f in flags ] # Parse email message = email.message_from_bytes(raw_message) email_obj = Email.from_message(message, uid=uid, folder=folder) email_obj.flags = str_flags emails[uid] = email_obj return emails def fetch_thread(self, uid: int, folder: str = "INBOX") -> List[Email]: """Fetch all emails in a thread. This method retrieves the initial email identified by the UID, and then searches for all related emails that belong to the same thread using Message-ID, In-Reply-To, References headers, and Subject matching as a fallback. Args: uid: UID of any email in the thread folder: Folder to fetch from Returns: List of Email objects in the thread, sorted chronologically Raises: ConnectionError: If not connected and connection fails ValueError: If the initial email cannot be found """ self.ensure_connected() self.select_folder(folder, readonly=True) # Fetch the initial email initial_email = self.fetch_email(uid, folder) if not initial_email: raise ValueError(f"Initial email with UID {uid} not found in folder {folder}") # Get thread identifiers from the initial email message_id = initial_email.headers.get("Message-ID", "") subject = initial_email.subject # Strip "Re:", "Fwd:", etc. from the subject for better matching clean_subject = re.sub(r"^(?:Re|Fwd|Fw|FWD|RE|FW):\s*", "", subject, flags=re.IGNORECASE) # Set to store all UIDs that belong to the thread thread_uids = {uid} # Search for emails with this Message-ID in the References or In-Reply-To headers if message_id: # Look for emails that reference this message ID references_query = f'HEADER References "{message_id}"' try: references_results = self.search(references_query, folder) thread_uids.update(references_results) except Exception as e: logger.warning(f"Error searching for References: {e}") # Look for direct replies to this message inreplyto_query = f'HEADER In-Reply-To "{message_id}"' try: inreplyto_results = self.search(inreplyto_query, folder) thread_uids.update(inreplyto_results) except Exception as e: logger.warning(f"Error searching for In-Reply-To: {e}") # If the initial email has References or In-Reply-To, fetch those messages too initial_references = initial_email.headers.get("References", "") initial_inreplyto = initial_email.headers.get("In-Reply-To", "") # Extract all message IDs from the References header if initial_references: for ref_id in re.findall(r'<[^>]+>', initial_references): query = f'HEADER Message-ID "{ref_id}"' try: results = self.search(query, folder) thread_uids.update(results) except Exception as e: logger.warning(f"Error searching for Referenced message {ref_id}: {e}") # Look for the message that this is a reply to if initial_inreplyto: query = f'HEADER Message-ID "{initial_inreplyto}"' try: results = self.search(query, folder) thread_uids.update(results) except Exception as e: logger.warning(f"Error searching for In-Reply-To message: {e}") # If we still have only the initial email or a small thread, try subject-based matching if len(thread_uids) <= 2 and clean_subject: # Look for emails with the same or related subject (Re: Subject) # This is a fallback for email clients that don't properly use References/In-Reply-To subject_query = f'SUBJECT "{clean_subject}"' try: subject_results = self.search(subject_query, folder) # Filter out emails that are unlikely to be part of the thread # For example, avoid including all emails with a common subject like "Hello" if len(subject_results) < 20: # Set a reasonable limit thread_uids.update(subject_results) else: # If there are too many results, try a more strict approach # Look for exact subject match or common Re: pattern strict_matches = [] strict_subjects = [ clean_subject, f"Re: {clean_subject}", f"RE: {clean_subject}", f"Fwd: {clean_subject}", f"FWD: {clean_subject}", f"Fw: {clean_subject}", f"FW: {clean_subject}" ] # Fetch subjects for all candidate emails candidate_emails = self.fetch_emails(subject_results, folder) for candidate_uid, candidate_email in candidate_emails.items(): if candidate_email.subject in strict_subjects: strict_matches.append(candidate_uid) thread_uids.update(strict_matches) except Exception as e: logger.warning(f"Error searching by subject: {e}") # Fetch all discovered thread emails thread_emails = self.fetch_emails(list(thread_uids), folder) # Sort emails by date (chronologically) sorted_emails = sorted( thread_emails.values(), key=lambda e: e.date if e.date else datetime.min ) return sorted_emails def mark_email( self, uid: int, folder: str, flag: str, value: bool = True, ) -> bool: """Mark email with flag. Args: uid: Email UID folder: Folder containing the email flag: Flag to set or remove value: True to set, False to remove Returns: True if successful Raises: ConnectionError: If not connected and connection fails """ self.ensure_connected() self.select_folder(folder) try: if value: self.client.add_flags([uid], flag) logger.debug(f"Added flag {flag} to message {uid}") else: self.client.remove_flags([uid], flag) logger.debug(f"Removed flag {flag} from message {uid}") return True except Exception as e: logger.error(f"Failed to mark email: {e}") return False def move_email(self, uid: int, source_folder: str, target_folder: str) -> bool: """Move email to another folder. Args: uid: Email UID source_folder: Source folder target_folder: Target folder Returns: True if successful Raises: ConnectionError: If not connected and connection fails ValueError: If folder is not allowed """ self.ensure_connected() # Check if folders are allowed if self.allowed_folders is not None: if source_folder not in self.allowed_folders: raise ValueError(f"Source folder '{source_folder}' is not allowed") if target_folder not in self.allowed_folders: raise ValueError(f"Target folder '{target_folder}' is not allowed") # Select source folder self.select_folder(source_folder) try: # Move email (copy + delete) self.client.copy([uid], target_folder) self.client.add_flags([uid], r"\Deleted") self.client.expunge() logger.debug(f"Moved message {uid} from {source_folder} to {target_folder}") return True except Exception as e: logger.error(f"Failed to move email: {e}") return False def delete_email(self, uid: int, folder: str) -> bool: """Delete email. Args: uid: Email UID folder: Folder containing the email Returns: True if successful Raises: ConnectionError: If not connected and connection fails """ self.ensure_connected() self.select_folder(folder) try: self.client.add_flags([uid], r"\Deleted") self.client.expunge() logger.debug(f"Deleted message {uid} from {folder}") return True except Exception as e: logger.error(f"Failed to delete email: {e}") return False def _get_drafts_folder(self) -> str: """Get the drafts folder name for the current server. Returns: The name of the drafts folder, or "INBOX" as fallback """ self.ensure_connected() folders = self.list_folders(refresh=True) # Check for Gmail's special folders structure if self.config.host and "gmail" in self.config.host.lower(): gmail_drafts = [f for f in folders if f.lower().endswith("/drafts")] if gmail_drafts: logger.debug(f"Using Gmail drafts folder: {gmail_drafts[0]}") return gmail_drafts[0] # Look for standard drafts folder names (case-insensitive) drafts_folder_names = ["Drafts", "Draft", "Brouillons", "Borradores", "Entwürfe"] for folder in folders: if folder.lower() in [name.lower() for name in drafts_folder_names]: logger.debug(f"Using drafts folder: {folder}") return folder # Fallback to INBOX if no drafts folder found logger.warning("No drafts folder found, using INBOX as fallback") return "INBOX" def save_draft_mime(self, message) -> Optional[int]: """Save a MIME message as a draft. Args: message: email.message.Message object to save as draft Returns: UID of the saved draft if available, None otherwise Raises: ConnectionError: If not connected and connection fails """ self.ensure_connected() # Get the drafts folder drafts_folder = self._get_drafts_folder() try: # Convert message to bytes if it's not already if hasattr(message, "as_bytes"): message_bytes = message.as_bytes() else: message_bytes = message.as_string().encode("utf-8") # Save the draft with Draft flag response = self.client.append( drafts_folder, message_bytes, flags=(r"\Draft",) ) # Try to extract the UID from the response uid = None if isinstance(response, bytes) and b"APPENDUID" in response: # Parse the APPENDUID response (format: [APPENDUID <uidvalidity> <uid>]) try: # Use a more robust parsing approach match = re.search(rb'APPENDUID\s+\d+\s+(\d+)', response) if match: uid = int(match.group(1)) logger.debug(f"Draft saved with UID: {uid}") except (IndexError, ValueError) as e: logger.warning(f"Could not parse UID from response: {e}") if uid is None: logger.warning(f"Could not extract UID from append response: {response}") return uid except Exception as e: logger.error(f"Failed to save draft: {e}") return None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/non-dirty/imap-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server