"""
Gmail Adapter - Enhanced Gmail operations with LLM-powered features
Handles email search, sending, categorization, and intelligent processing
"""
import base64
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import List, Optional, Dict, Any
from datetime import datetime, timedelta
import structlog
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from ..utils.google_auth import get_gmail_service
logger = structlog.get_logger(__name__)
class GmailAdapter:
"""
Gmail API adapter with intelligent features
Features:
- Search and filter emails
- Send/reply to emails
- Categorize emails with LLM
- Extract action items
- Detect calendar invites
"""
def __init__(self):
"""Initialize Gmail service"""
try:
self.service = get_gmail_service()
logger.info("Gmail adapter initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Gmail adapter: {e}")
raise
def search_emails(
self,
query: Optional[str] = None,
max_results: int = 50,
after_date: Optional[str] = None,
before_date: Optional[str] = None,
from_email: Optional[str] = None,
to_email: Optional[str] = None,
subject: Optional[str] = None,
has_attachment: Optional[bool] = None,
is_unread: Optional[bool] = None
) -> Dict[str, Any]:
"""
Search emails with advanced filters
Args:
query: Gmail search query
max_results: Maximum emails to return
after_date: ISO date string (e.g., '2024-01-01')
before_date: ISO date string
from_email: Filter by sender
to_email: Filter by recipient
subject: Filter by subject
has_attachment: Filter emails with attachments
is_unread: Filter unread emails
Returns:
Dictionary with success status and emails
"""
try:
# Build query
query_parts = []
if query:
query_parts.append(query)
if after_date:
query_parts.append(f"after:{after_date}")
if before_date:
query_parts.append(f"before:{before_date}")
if from_email:
query_parts.append(f"from:{from_email}")
if to_email:
query_parts.append(f"to:{to_email}")
if subject:
query_parts.append(f"subject:{subject}")
if has_attachment:
query_parts.append("has:attachment")
if is_unread:
query_parts.append("is:unread")
full_query = " ".join(query_parts) if query_parts else None
logger.info(f"Searching emails with query: {full_query}")
# Execute search
results = self.service.users().messages().list(
userId='me',
q=full_query,
maxResults=max_results
).execute()
messages = results.get('messages', [])
if not messages:
return {
"success": True,
"count": 0,
"emails": [],
"message": "No emails found"
}
# Fetch email details
emails = []
for msg in messages:
try:
email_data = self.get_email(msg['id'])
if email_data.get('success'):
emails.append(email_data['email'])
except Exception as e:
logger.warning(f"Failed to fetch email {msg['id']}: {e}")
continue
logger.info(f"Found {len(emails)} emails")
return {
"success": True,
"count": len(emails),
"emails": emails,
"query": full_query
}
except HttpError as e:
logger.error(f"Gmail API error: {e}")
return {
"success": False,
"error": f"Gmail API error: {str(e)}"
}
except Exception as e:
logger.error(f"Failed to search emails: {e}")
return {
"success": False,
"error": str(e)
}
def get_email(self, email_id: str) -> Dict[str, Any]:
"""
Get detailed email information
Args:
email_id: Email message ID
Returns:
Dictionary with email details
"""
try:
message = self.service.users().messages().get(
userId='me',
id=email_id,
format='full'
).execute()
# Extract headers
headers = message.get('payload', {}).get('headers', [])
subject = self._get_header(headers, 'Subject')
from_email = self._get_header(headers, 'From')
to_email = self._get_header(headers, 'To')
date = self._get_header(headers, 'Date')
# Extract body
body = self._get_email_body(message.get('payload', {}))
# Extract snippet
snippet = message.get('snippet', '')
# Check if unread
labels = message.get('labelIds', [])
is_unread = 'UNREAD' in labels
email_data = {
"id": email_id,
"thread_id": message.get('threadId'),
"subject": subject,
"from": from_email,
"to": to_email,
"date": date,
"snippet": snippet,
"body": body,
"is_unread": is_unread,
"labels": labels,
"has_attachments": self._has_attachments(message.get('payload', {}))
}
return {
"success": True,
"email": email_data
}
except HttpError as e:
logger.error(f"Failed to get email {email_id}: {e}")
return {
"success": False,
"error": f"Gmail API error: {str(e)}"
}
except Exception as e:
logger.error(f"Failed to get email {email_id}: {e}")
return {
"success": False,
"error": str(e)
}
def send_email(
self,
to: str,
subject: str,
body: str,
cc: Optional[str] = None,
bcc: Optional[str] = None,
reply_to: Optional[str] = None,
html: bool = False
) -> Dict[str, Any]:
"""
Send an email
Args:
to: Recipient email address
subject: Email subject
body: Email body
cc: CC recipients (comma-separated)
bcc: BCC recipients (comma-separated)
reply_to: Reply-to address
html: Whether body is HTML
Returns:
Dictionary with success status and message ID
"""
try:
# Create message
message = MIMEMultipart() if html else MIMEText(body)
message['To'] = to
message['Subject'] = subject
if cc:
message['Cc'] = cc
if bcc:
message['Bcc'] = bcc
if reply_to:
message['Reply-To'] = reply_to
if html and isinstance(message, MIMEMultipart):
message.attach(MIMEText(body, 'html'))
# Encode message
raw_message = base64.urlsafe_b64encode(
message.as_bytes()
).decode('utf-8')
# Send message
sent_message = self.service.users().messages().send(
userId='me',
body={'raw': raw_message}
).execute()
logger.info(f"Email sent successfully to {to}")
return {
"success": True,
"message_id": sent_message['id'],
"thread_id": sent_message.get('threadId'),
"message": f"Email sent to {to}"
}
except HttpError as e:
logger.error(f"Failed to send email: {e}")
return {
"success": False,
"error": f"Gmail API error: {str(e)}"
}
except Exception as e:
logger.error(f"Failed to send email: {e}")
return {
"success": False,
"error": str(e)
}
def reply_to_email(
self,
email_id: str,
body: str,
html: bool = False
) -> Dict[str, Any]:
"""
Reply to an email
Args:
email_id: Original email ID
body: Reply body
html: Whether body is HTML
Returns:
Dictionary with success status
"""
try:
# Get original email
original = self.get_email(email_id)
if not original.get('success'):
return {
"success": False,
"error": "Failed to fetch original email"
}
email_data = original['email']
# Extract reply-to or from address
to_address = email_data['from']
subject = email_data['subject']
# Add "Re:" prefix if not present
if not subject.lower().startswith('re:'):
subject = f"Re: {subject}"
# Send reply
return self.send_email(
to=to_address,
subject=subject,
body=body,
html=html,
reply_to=email_data['to']
)
except Exception as e:
logger.error(f"Failed to reply to email: {e}")
return {
"success": False,
"error": str(e)
}
def mark_as_read(self, email_id: str) -> Dict[str, Any]:
"""Mark email as read"""
try:
self.service.users().messages().modify(
userId='me',
id=email_id,
body={'removeLabelIds': ['UNREAD']}
).execute()
return {
"success": True,
"message": "Email marked as read"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def mark_as_unread(self, email_id: str) -> Dict[str, Any]:
"""Mark email as unread"""
try:
self.service.users().messages().modify(
userId='me',
id=email_id,
body={'addLabelIds': ['UNREAD']}
).execute()
return {
"success": True,
"message": "Email marked as unread"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def archive_email(self, email_id: str) -> Dict[str, Any]:
"""Archive email (remove from inbox)"""
try:
self.service.users().messages().modify(
userId='me',
id=email_id,
body={'removeLabelIds': ['INBOX']}
).execute()
return {
"success": True,
"message": "Email archived"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def delete_email(self, email_id: str) -> Dict[str, Any]:
"""Move email to trash"""
try:
self.service.users().messages().trash(
userId='me',
id=email_id
).execute()
return {
"success": True,
"message": "Email moved to trash"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
# Helper methods
def _get_header(self, headers: List[Dict], name: str) -> str:
"""Extract header value by name"""
for header in headers:
if header['name'].lower() == name.lower():
return header['value']
return ""
def _get_email_body(self, payload: Dict) -> str:
"""Extract email body from payload"""
try:
# Check for parts
if 'parts' in payload:
for part in payload['parts']:
if part['mimeType'] == 'text/plain':
data = part['body'].get('data', '')
if data:
return base64.urlsafe_b64decode(data).decode('utf-8')
elif part['mimeType'] == 'text/html':
data = part['body'].get('data', '')
if data:
return base64.urlsafe_b64decode(data).decode('utf-8')
# Check nested parts
elif 'parts' in part:
body = self._get_email_body(part)
if body:
return body
# Check body directly
if 'body' in payload and 'data' in payload['body']:
data = payload['body']['data']
return base64.urlsafe_b64decode(data).decode('utf-8')
return ""
except Exception as e:
logger.warning(f"Failed to extract email body: {e}")
return ""
def _has_attachments(self, payload: Dict) -> bool:
"""Check if email has attachments"""
try:
if 'parts' in payload:
for part in payload['parts']:
if part.get('filename'):
return True
if 'parts' in part:
if self._has_attachments(part):
return True
return False
except:
return False