Skip to main content
Glama
gmail_adapter.py14.3 kB
""" Gmail Adapter - Enhanced Gmail operations with LLM-powered features Handles email search, sending, categorization, and intelligent processing """ import base64 from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from typing import List, Optional, Dict, Any from datetime import datetime, timedelta import structlog from googleapiclient.discovery import build from googleapiclient.errors import HttpError from ..utils.google_auth import get_gmail_service logger = structlog.get_logger(__name__) class GmailAdapter: """ Gmail API adapter with intelligent features Features: - Search and filter emails - Send/reply to emails - Categorize emails with LLM - Extract action items - Detect calendar invites """ def __init__(self): """Initialize Gmail service""" try: self.service = get_gmail_service() logger.info("Gmail adapter initialized successfully") except Exception as e: logger.error(f"Failed to initialize Gmail adapter: {e}") raise def search_emails( self, query: Optional[str] = None, max_results: int = 50, after_date: Optional[str] = None, before_date: Optional[str] = None, from_email: Optional[str] = None, to_email: Optional[str] = None, subject: Optional[str] = None, has_attachment: Optional[bool] = None, is_unread: Optional[bool] = None ) -> Dict[str, Any]: """ Search emails with advanced filters Args: query: Gmail search query max_results: Maximum emails to return after_date: ISO date string (e.g., '2024-01-01') before_date: ISO date string from_email: Filter by sender to_email: Filter by recipient subject: Filter by subject has_attachment: Filter emails with attachments is_unread: Filter unread emails Returns: Dictionary with success status and emails """ try: # Build query query_parts = [] if query: query_parts.append(query) if after_date: query_parts.append(f"after:{after_date}") if before_date: query_parts.append(f"before:{before_date}") if from_email: query_parts.append(f"from:{from_email}") if to_email: query_parts.append(f"to:{to_email}") if subject: query_parts.append(f"subject:{subject}") if has_attachment: query_parts.append("has:attachment") if is_unread: query_parts.append("is:unread") full_query = " ".join(query_parts) if query_parts else None logger.info(f"Searching emails with query: {full_query}") # Execute search results = self.service.users().messages().list( userId='me', q=full_query, maxResults=max_results ).execute() messages = results.get('messages', []) if not messages: return { "success": True, "count": 0, "emails": [], "message": "No emails found" } # Fetch email details emails = [] for msg in messages: try: email_data = self.get_email(msg['id']) if email_data.get('success'): emails.append(email_data['email']) except Exception as e: logger.warning(f"Failed to fetch email {msg['id']}: {e}") continue logger.info(f"Found {len(emails)} emails") return { "success": True, "count": len(emails), "emails": emails, "query": full_query } except HttpError as e: logger.error(f"Gmail API error: {e}") return { "success": False, "error": f"Gmail API error: {str(e)}" } except Exception as e: logger.error(f"Failed to search emails: {e}") return { "success": False, "error": str(e) } def get_email(self, email_id: str) -> Dict[str, Any]: """ Get detailed email information Args: email_id: Email message ID Returns: Dictionary with email details """ try: message = self.service.users().messages().get( userId='me', id=email_id, format='full' ).execute() # Extract headers headers = message.get('payload', {}).get('headers', []) subject = self._get_header(headers, 'Subject') from_email = self._get_header(headers, 'From') to_email = self._get_header(headers, 'To') date = self._get_header(headers, 'Date') # Extract body body = self._get_email_body(message.get('payload', {})) # Extract snippet snippet = message.get('snippet', '') # Check if unread labels = message.get('labelIds', []) is_unread = 'UNREAD' in labels email_data = { "id": email_id, "thread_id": message.get('threadId'), "subject": subject, "from": from_email, "to": to_email, "date": date, "snippet": snippet, "body": body, "is_unread": is_unread, "labels": labels, "has_attachments": self._has_attachments(message.get('payload', {})) } return { "success": True, "email": email_data } except HttpError as e: logger.error(f"Failed to get email {email_id}: {e}") return { "success": False, "error": f"Gmail API error: {str(e)}" } except Exception as e: logger.error(f"Failed to get email {email_id}: {e}") return { "success": False, "error": str(e) } def send_email( self, to: str, subject: str, body: str, cc: Optional[str] = None, bcc: Optional[str] = None, reply_to: Optional[str] = None, html: bool = False ) -> Dict[str, Any]: """ Send an email Args: to: Recipient email address subject: Email subject body: Email body cc: CC recipients (comma-separated) bcc: BCC recipients (comma-separated) reply_to: Reply-to address html: Whether body is HTML Returns: Dictionary with success status and message ID """ try: # Create message message = MIMEMultipart() if html else MIMEText(body) message['To'] = to message['Subject'] = subject if cc: message['Cc'] = cc if bcc: message['Bcc'] = bcc if reply_to: message['Reply-To'] = reply_to if html and isinstance(message, MIMEMultipart): message.attach(MIMEText(body, 'html')) # Encode message raw_message = base64.urlsafe_b64encode( message.as_bytes() ).decode('utf-8') # Send message sent_message = self.service.users().messages().send( userId='me', body={'raw': raw_message} ).execute() logger.info(f"Email sent successfully to {to}") return { "success": True, "message_id": sent_message['id'], "thread_id": sent_message.get('threadId'), "message": f"Email sent to {to}" } except HttpError as e: logger.error(f"Failed to send email: {e}") return { "success": False, "error": f"Gmail API error: {str(e)}" } except Exception as e: logger.error(f"Failed to send email: {e}") return { "success": False, "error": str(e) } def reply_to_email( self, email_id: str, body: str, html: bool = False ) -> Dict[str, Any]: """ Reply to an email Args: email_id: Original email ID body: Reply body html: Whether body is HTML Returns: Dictionary with success status """ try: # Get original email original = self.get_email(email_id) if not original.get('success'): return { "success": False, "error": "Failed to fetch original email" } email_data = original['email'] # Extract reply-to or from address to_address = email_data['from'] subject = email_data['subject'] # Add "Re:" prefix if not present if not subject.lower().startswith('re:'): subject = f"Re: {subject}" # Send reply return self.send_email( to=to_address, subject=subject, body=body, html=html, reply_to=email_data['to'] ) except Exception as e: logger.error(f"Failed to reply to email: {e}") return { "success": False, "error": str(e) } def mark_as_read(self, email_id: str) -> Dict[str, Any]: """Mark email as read""" try: self.service.users().messages().modify( userId='me', id=email_id, body={'removeLabelIds': ['UNREAD']} ).execute() return { "success": True, "message": "Email marked as read" } except Exception as e: return { "success": False, "error": str(e) } def mark_as_unread(self, email_id: str) -> Dict[str, Any]: """Mark email as unread""" try: self.service.users().messages().modify( userId='me', id=email_id, body={'addLabelIds': ['UNREAD']} ).execute() return { "success": True, "message": "Email marked as unread" } except Exception as e: return { "success": False, "error": str(e) } def archive_email(self, email_id: str) -> Dict[str, Any]: """Archive email (remove from inbox)""" try: self.service.users().messages().modify( userId='me', id=email_id, body={'removeLabelIds': ['INBOX']} ).execute() return { "success": True, "message": "Email archived" } except Exception as e: return { "success": False, "error": str(e) } def delete_email(self, email_id: str) -> Dict[str, Any]: """Move email to trash""" try: self.service.users().messages().trash( userId='me', id=email_id ).execute() return { "success": True, "message": "Email moved to trash" } except Exception as e: return { "success": False, "error": str(e) } # Helper methods def _get_header(self, headers: List[Dict], name: str) -> str: """Extract header value by name""" for header in headers: if header['name'].lower() == name.lower(): return header['value'] return "" def _get_email_body(self, payload: Dict) -> str: """Extract email body from payload""" try: # Check for parts if 'parts' in payload: for part in payload['parts']: if part['mimeType'] == 'text/plain': data = part['body'].get('data', '') if data: return base64.urlsafe_b64decode(data).decode('utf-8') elif part['mimeType'] == 'text/html': data = part['body'].get('data', '') if data: return base64.urlsafe_b64decode(data).decode('utf-8') # Check nested parts elif 'parts' in part: body = self._get_email_body(part) if body: return body # Check body directly if 'body' in payload and 'data' in payload['body']: data = payload['body']['data'] return base64.urlsafe_b64decode(data).decode('utf-8') return "" except Exception as e: logger.warning(f"Failed to extract email body: {e}") return "" def _has_attachments(self, payload: Dict) -> bool: """Check if email has attachments""" try: if 'parts' in payload: for part in payload['parts']: if part.get('filename'): return True if 'parts' in part: if self._has_attachments(part): return True return False except: return False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pbulbule13/mcpwithgoogle'

If you have feedback or need assistance with the MCP directory API, please join our Discord server