google_auth.py•7.68 kB
"""
Enhanced Google OAuth 2.0 Authentication
Supports Gmail, Calendar, Drive, Keep, and Sheets
"""
import os
import json
from pathlib import Path
from typing import Optional, List
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import structlog
logger = structlog.get_logger(__name__)
# Expanded scopes for all Google services
SCOPES = [
# Gmail
'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/gmail.send',
'https://www.googleapis.com/auth/gmail.modify',
'https://www.googleapis.com/auth/gmail.labels',
# Calendar
'https://www.googleapis.com/auth/calendar',
'https://www.googleapis.com/auth/calendar.events',
# Drive
'https://www.googleapis.com/auth/drive.readonly',
'https://www.googleapis.com/auth/drive.file',
'https://www.googleapis.com/auth/drive.metadata.readonly',
# Sheets
'https://www.googleapis.com/auth/spreadsheets',
# Keep (Note: Keep API is not officially available, using Drive for notes)
# We'll use Drive API to access Google Keep notes exported to Drive
]
# File paths
PROJECT_ROOT = Path(__file__).parent.parent.parent
TOKEN_PATH = PROJECT_ROOT / 'tokens.json'
CREDENTIALS_PATH = PROJECT_ROOT / 'credentials.json'
class GoogleAuthManager:
"""
Production-grade Google authentication manager
Features:
- Automatic token refresh
- Token encryption (optional)
- Multi-scope support
- Error handling and logging
"""
def __init__(
self,
credentials_path: Optional[Path] = None,
token_path: Optional[Path] = None,
scopes: Optional[List[str]] = None
):
self.credentials_path = credentials_path or CREDENTIALS_PATH
self.token_path = token_path or TOKEN_PATH
self.scopes = scopes or SCOPES
self._credentials: Optional[Credentials] = None
logger.info(
"Google Auth Manager initialized",
scopes_count=len(self.scopes),
credentials_path=str(self.credentials_path),
token_path=str(self.token_path)
)
def get_credentials(self) -> Credentials:
"""
Get valid Google credentials
Returns:
Valid Credentials object
Raises:
FileNotFoundError: If credentials.json not found
Exception: If authentication fails
"""
# Return cached credentials if valid
if self._credentials and self._credentials.valid:
return self._credentials
# Try to load from token file
if self.token_path.exists():
try:
self._credentials = Credentials.from_authorized_user_file(
str(self.token_path),
self.scopes
)
logger.info("Loaded credentials from token file")
# Refresh if expired
if self._credentials.expired and self._credentials.refresh_token:
logger.info("Refreshing expired credentials")
self._credentials.refresh(Request())
self._save_token(self._credentials)
logger.info("Credentials refreshed successfully")
return self._credentials
except Exception as e:
logger.error(
"Failed to load/refresh credentials from token file",
error=str(e)
)
# Continue to OAuth flow
# Perform OAuth flow
if not self.credentials_path.exists():
logger.error(
"credentials.json not found",
path=str(self.credentials_path)
)
raise FileNotFoundError(
f"credentials.json not found at {self.credentials_path}. "
"Please download it from Google Cloud Console."
)
logger.info("Starting OAuth flow")
flow = InstalledAppFlow.from_client_secrets_file(
str(self.credentials_path),
self.scopes
)
# Run local server for OAuth callback
self._credentials = flow.run_local_server(port=8080)
# Save credentials
self._save_token(self._credentials)
logger.info("OAuth flow completed successfully")
return self._credentials
def _save_token(self, credentials: Credentials):
"""Save credentials to token file"""
try:
token_data = {
'token': credentials.token,
'refresh_token': credentials.refresh_token,
'token_uri': credentials.token_uri,
'client_id': credentials.client_id,
'client_secret': credentials.client_secret,
'scopes': credentials.scopes
}
with open(self.token_path, 'w') as f:
json.dump(token_data, f, indent=2)
logger.info("Credentials saved to token file")
except Exception as e:
logger.error(
"Failed to save credentials",
error=str(e)
)
raise
def is_authorized(self) -> bool:
"""Check if user is authorized"""
if not self.token_path.exists():
return False
try:
creds = Credentials.from_authorized_user_file(
str(self.token_path),
self.scopes
)
return creds.valid or (creds.expired and creds.refresh_token)
except:
return False
def revoke_credentials(self):
"""Revoke and delete stored credentials"""
if self.token_path.exists():
self.token_path.unlink()
logger.info("Credentials revoked and deleted")
self._credentials = None
def force_refresh(self):
"""Force refresh credentials"""
if self._credentials:
logger.info("Forcing credential refresh")
self._credentials.refresh(Request())
self._save_token(self._credentials)
logger.info("Credentials force refreshed")
# Singleton instance
_auth_manager: Optional[GoogleAuthManager] = None
def get_auth_manager() -> GoogleAuthManager:
"""Get singleton auth manager instance"""
global _auth_manager
if _auth_manager is None:
_auth_manager = GoogleAuthManager()
return _auth_manager
def get_credentials() -> Credentials:
"""Get valid Google credentials (convenience function)"""
return get_auth_manager().get_credentials()
def is_authorized() -> bool:
"""Check if user is authorized (convenience function)"""
return get_auth_manager().is_authorized()
# Service Builder Functions
def get_gmail_service():
"""Build and return Gmail API service"""
from googleapiclient.discovery import build
credentials = get_credentials()
return build('gmail', 'v1', credentials=credentials)
def get_calendar_service():
"""Build and return Calendar API service"""
from googleapiclient.discovery import build
credentials = get_credentials()
return build('calendar', 'v3', credentials=credentials)
def get_drive_service():
"""Build and return Drive API service"""
from googleapiclient.discovery import build
credentials = get_credentials()
return build('drive', 'v3', credentials=credentials)
def get_sheets_service():
"""Build and return Sheets API service"""
from googleapiclient.discovery import build
credentials = get_credentials()
return build('sheets', 'v4', credentials=credentials)