Skip to main content
Glama
token_exchange.py21.2 kB
"""RFC 8693 Token Exchange implementation for ADR-004 Progressive Consent. This module implements the token exchange pattern to convert Flow 1 MCP tokens (aud: "mcp-server") into ephemeral delegated Nextcloud tokens (aud: "nextcloud") for session operations. Key Properties: - On-demand generation during tool execution - Ephemeral tokens (NOT stored, discarded after use) - Limited scopes (only what tool needs) - Short-lived (5 minutes default) """ import logging import time from typing import Any, Dict, Optional, Tuple from urllib.parse import urljoin import httpx import jwt from ..config import get_settings from .refresh_token_storage import RefreshTokenStorage logger = logging.getLogger(__name__) class TokenExchangeService: """Implements RFC 8693 OAuth 2.0 Token Exchange.""" # RFC 8693 Grant Type TOKEN_EXCHANGE_GRANT = "urn:ietf:params:oauth:grant-type:token-exchange" # RFC 8693 Token Type Identifiers TOKEN_TYPE_ACCESS_TOKEN = "urn:ietf:params:oauth:token-type:access_token" TOKEN_TYPE_JWT = "urn:ietf:params:oauth:token-type:jwt" TOKEN_TYPE_ID_TOKEN = "urn:ietf:params:oauth:token-type:id_token" def __init__( self, oidc_discovery_url: Optional[str] = None, client_id: Optional[str] = None, client_secret: Optional[str] = None, nextcloud_host: Optional[str] = None, ): """Initialize token exchange service. Args: oidc_discovery_url: OIDC discovery endpoint URL client_id: OAuth client ID for token exchange client_secret: OAuth client secret nextcloud_host: Nextcloud instance URL """ settings = get_settings() self.oidc_discovery_url = oidc_discovery_url or settings.oidc_discovery_url self.client_id = client_id or settings.oidc_client_id self.client_secret = client_secret or settings.oidc_client_secret self.nextcloud_host = nextcloud_host or settings.nextcloud_host self._token_endpoint: Optional[str] = None self._jwks_uri: Optional[str] = None self._discovery_cache: Optional[Dict[str, Any]] = None self._discovery_cache_time: float = 0 self._discovery_cache_ttl: float = 3600 # 1 hour # Storage for Progressive Consent (refresh tokens) - only needed for delegation # NOT needed for pure RFC 8693 exchange (MCP tools) self.storage: Optional[RefreshTokenStorage] = None # Create HTTP client self.http_client = httpx.AsyncClient( timeout=30.0, follow_redirects=True, ) async def __aenter__(self): """Async context manager entry.""" if self.storage: await self.storage.initialize() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" await self.close() async def close(self): """Close HTTP client and storage.""" await self.http_client.aclose() # RefreshTokenStorage doesn't have a close method async def _ensure_storage(self): """Lazily initialize storage for Progressive Consent operations. Only needed for delegation operations that use refresh tokens. NOT needed for pure RFC 8693 exchange (MCP tools). """ if self.storage is None: self.storage = RefreshTokenStorage.from_env() await self.storage.initialize() async def _discover_endpoints(self) -> Dict[str, Any]: """Discover OIDC endpoints from discovery URL. Returns: Discovery document containing endpoint URLs """ # Check cache if ( self._discovery_cache and (time.time() - self._discovery_cache_time) < self._discovery_cache_ttl ): return self._discovery_cache if not self.oidc_discovery_url: # Fallback to Nextcloud OIDC if no discovery URL self.oidc_discovery_url = urljoin( self.nextcloud_host, # type: ignore[arg-type] "/.well-known/openid-configuration", ) try: response = await self.http_client.get(self.oidc_discovery_url) response.raise_for_status() self._discovery_cache = response.json() self._discovery_cache_time = time.time() # Cache frequently used endpoints self._token_endpoint = self._discovery_cache.get("token_endpoint") self._jwks_uri = self._discovery_cache.get("jwks_uri") return self._discovery_cache except Exception as e: logger.error(f"Failed to discover OIDC endpoints: {e}") raise async def exchange_token_for_delegation( self, flow1_token: str, requested_scopes: list[str], requested_audience: str = "nextcloud", ) -> Tuple[str, int]: """Exchange Flow 1 MCP token for delegated Nextcloud token. This implements RFC 8693 Token Exchange for on-behalf-of delegation. Args: flow1_token: The MCP session token (aud: "mcp-server") requested_scopes: Scopes needed for this operation requested_audience: Target audience (usually "nextcloud") Returns: Tuple of (delegated_token, expires_in) Raises: ValueError: If token validation fails RuntimeError: If provisioning not completed or exchange fails """ # 1. Validate Flow 1 token audience await self._validate_flow1_token(flow1_token) # 2. Extract user ID from token user_id = self._extract_user_id(flow1_token) # 3. Check user has provisioned Nextcloud access (Flow 2) if not await self._check_provisioning(user_id): raise RuntimeError( "Nextcloud access not provisioned. " "User must complete Flow 2 provisioning first." ) # 4. Get stored refresh token for user (from Flow 2) refresh_token = await self._get_user_refresh_token(user_id) if not refresh_token: raise RuntimeError( "No refresh token found. User must complete provisioning." ) # 5. Perform token exchange with IdP delegated_token, expires_in = await self._perform_token_exchange( subject_token=flow1_token, refresh_token=refresh_token, requested_scopes=requested_scopes, requested_audience=requested_audience, ) # 6. Log the exchange for audit trail logger.info( f"Token exchange completed for user {user_id}: " f"scopes={requested_scopes}, audience={requested_audience}, " f"expires_in={expires_in}s" ) return delegated_token, expires_in async def exchange_token_for_audience( self, subject_token: str, requested_audience: str = "nextcloud", requested_scopes: list[str] | None = None, ) -> Tuple[str, int]: """ Pure RFC 8693 token exchange (no refresh tokens required). This implements stateless per-request token exchange where: 1. Client token has aud: <client-id> (e.g., "nextcloud-mcp-server") 2. Exchange for token with aud: "nextcloud" (for API access) 3. NO refresh tokens or provisioning required Use case: All MCP tool calls (request-time operations). NOT for background jobs (which use refresh tokens separately). Args: subject_token: Token being exchanged (from MCP client) requested_audience: Target audience (usually "nextcloud") requested_scopes: Optional scopes (may not be supported by all IdPs) Returns: Tuple of (access_token, expires_in) Raises: ValueError: If token validation fails RuntimeError: If exchange fails """ # 1. Validate subject token (accepts both "mcp-server" and client_id) await self._validate_flow1_token(subject_token) # 2. Extract user ID for logging user_id = self._extract_user_id(subject_token) # 3. Discover token endpoint discovery = await self._discover_endpoints() token_endpoint = discovery.get("token_endpoint") if not token_endpoint: raise RuntimeError("No token endpoint found in discovery") # 4. Build pure RFC 8693 exchange request (subject_token ONLY) data = { "grant_type": self.TOKEN_EXCHANGE_GRANT, "subject_token": subject_token, "subject_token_type": self.TOKEN_TYPE_ACCESS_TOKEN, "requested_token_type": self.TOKEN_TYPE_ACCESS_TOKEN, "audience": requested_audience, } # Add scopes if provided (may not be supported by all providers) if requested_scopes: data["scope"] = " ".join(requested_scopes) # Add client credentials if self.client_id and self.client_secret: data["client_id"] = self.client_id data["client_secret"] = self.client_secret try: # Perform exchange logger.debug(f"Exchanging token for audience={requested_audience}") response = await self.http_client.post( token_endpoint, data=data, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) response.raise_for_status() result = response.json() access_token = result.get("access_token") expires_in = result.get("expires_in", 300) if not access_token: raise RuntimeError("No access token in exchange response") logger.info( f"Pure RFC 8693 token exchange successful for user {user_id}: " f"audience={requested_audience}, expires_in={expires_in}s" ) return access_token, expires_in except httpx.HTTPStatusError as e: logger.error(f"Token exchange failed: {e.response.text}") raise RuntimeError(f"Token exchange failed: {e}") except Exception as e: logger.error(f"Token exchange error: {e}") raise async def _validate_flow1_token(self, token: str): """Validate that token has correct audience for MCP server. Accepts either: - "mcp-server" (Progressive Consent legacy) - self.client_id (external IdP, e.g., "nextcloud-mcp-server") Args: token: JWT token to validate Raises: ValueError: If token is invalid or has wrong audience """ try: # Decode without verification first to check audience # In production, should verify signature against JWKS payload = jwt.decode(token, options={"verify_signature": False}) # Check audience audience = payload.get("aud", []) if isinstance(audience, str): audience = [audience] # Accept either "mcp-server" (Progressive Consent) or client_id (external IdP) valid_audiences = ["mcp-server"] if self.client_id: valid_audiences.append(self.client_id) if not any(aud in audience for aud in valid_audiences): raise ValueError( f"Invalid token audience. Expected one of {valid_audiences}, got {audience}" ) # Check expiration exp = payload.get("exp", 0) if exp < time.time(): raise ValueError("Token has expired") except jwt.DecodeError as e: raise ValueError(f"Invalid JWT token: {e}") def _extract_user_id(self, token: str) -> str: """Extract user ID from JWT token. Args: token: JWT token Returns: User ID from token """ try: payload = jwt.decode(token, options={"verify_signature": False}) # Try standard claims in order of preference user_id = ( payload.get("sub") or payload.get("preferred_username") or payload.get("email") or payload.get("name") ) if not user_id: raise ValueError("No user identifier in token") return user_id except jwt.DecodeError as e: raise ValueError(f"Failed to extract user ID: {e}") async def _check_provisioning(self, user_id: str) -> bool: """Check if user has completed Flow 2 provisioning. Args: user_id: User identifier Returns: True if provisioned, False otherwise """ await self._ensure_storage() assert self.storage is not None # _ensure_storage() ensures this token_data = await self.storage.get_refresh_token(user_id) return token_data is not None async def _get_user_refresh_token(self, user_id: str) -> Optional[str]: """Get stored refresh token for user from Flow 2 provisioning. Args: user_id: User identifier Returns: Refresh token if found, None otherwise """ await self._ensure_storage() assert self.storage is not None # _ensure_storage() ensures this token_data = await self.storage.get_refresh_token(user_id) if token_data: return token_data.get("refresh_token") return None async def _perform_token_exchange( self, subject_token: str, refresh_token: str, requested_scopes: list[str], requested_audience: str, ) -> Tuple[str, int]: """Perform RFC 8693 token exchange with IdP. Args: subject_token: The token being exchanged (Flow 1 token) refresh_token: User's stored refresh token for delegation requested_scopes: Minimal scopes for this operation requested_audience: Target audience Returns: Tuple of (access_token, expires_in) """ # Discover token endpoint discovery = await self._discover_endpoints() token_endpoint = discovery.get("token_endpoint") if not token_endpoint: raise RuntimeError("No token endpoint found in discovery") # Build token exchange request per RFC 8693 data = { # Token exchange grant type "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", # The token we're exchanging (Flow 1 MCP token) "subject_token": subject_token, "subject_token_type": self.TOKEN_TYPE_ACCESS_TOKEN, # Use refresh token as actor token (proves we have delegation rights) "actor_token": refresh_token, "actor_token_type": self.TOKEN_TYPE_ACCESS_TOKEN, # Requested token properties "requested_token_type": self.TOKEN_TYPE_ACCESS_TOKEN, "audience": requested_audience, "scope": " ".join(requested_scopes), } # Add client credentials if configured if self.client_id and self.client_secret: data["client_id"] = self.client_id data["client_secret"] = self.client_secret try: # Attempt RFC 8693 token exchange response = await self.http_client.post( token_endpoint, data=data, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) if response.status_code == 400: # Token exchange might not be supported, fall back to refresh grant logger.info( "Token exchange not supported, falling back to refresh grant" ) return await self._fallback_refresh_grant( refresh_token=refresh_token, requested_scopes=requested_scopes, token_endpoint=token_endpoint, ) response.raise_for_status() result = response.json() access_token = result.get("access_token") expires_in = result.get("expires_in", 300) # Default 5 minutes if not access_token: raise RuntimeError("No access token in exchange response") return access_token, expires_in except httpx.HTTPStatusError as e: logger.error(f"Token exchange failed: {e.response.text}") raise RuntimeError(f"Token exchange failed: {e}") except Exception as e: logger.error(f"Token exchange error: {e}") raise async def _fallback_refresh_grant( self, refresh_token: str, requested_scopes: list[str], token_endpoint: str ) -> Tuple[str, int]: """Fallback to standard refresh token grant if token exchange not supported. This is less secure than token exchange but provides compatibility. Args: refresh_token: User's stored refresh token requested_scopes: Minimal scopes for this operation token_endpoint: Token endpoint URL Returns: Tuple of (access_token, expires_in) """ data = { "grant_type": "refresh_token", "refresh_token": refresh_token, "scope": " ".join(requested_scopes), # Request minimal scopes } # Add client credentials if configured if self.client_id and self.client_secret: data["client_id"] = self.client_id data["client_secret"] = self.client_secret try: response = await self.http_client.post( token_endpoint, data=data, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) response.raise_for_status() result = response.json() access_token = result.get("access_token") expires_in = result.get("expires_in", 300) # Default 5 minutes if not access_token: raise RuntimeError("No access token in refresh response") # Log that we're using fallback logger.warning( f"Using refresh grant fallback for token exchange. " f"Scopes: {requested_scopes}" ) return access_token, expires_in except httpx.HTTPStatusError as e: logger.error(f"Refresh grant failed: {e.response.text}") raise RuntimeError(f"Refresh grant failed: {e}") except Exception as e: logger.error(f"Refresh grant error: {e}") raise # Singleton instance _token_exchange_service: Optional[TokenExchangeService] = None async def get_token_exchange_service() -> TokenExchangeService: """Get or create the singleton token exchange service. Note: Storage is initialized lazily only when needed for delegation operations. Pure RFC 8693 exchange (MCP tools) doesn't require storage. Returns: TokenExchangeService instance """ global _token_exchange_service if _token_exchange_service is None: _token_exchange_service = TokenExchangeService() # Storage is initialized lazily via _ensure_storage() when needed return _token_exchange_service async def exchange_token_for_delegation( flow1_token: str, requested_scopes: list[str], requested_audience: str = "nextcloud" ) -> Tuple[str, int]: """Convenience function to exchange tokens (Progressive Consent with refresh tokens). NOTE: This is for background jobs only. For MCP tool calls, use exchange_token_for_audience(). Args: flow1_token: The MCP session token (aud: "mcp-server") requested_scopes: Scopes needed for this operation requested_audience: Target audience (usually "nextcloud") Returns: Tuple of (delegated_token, expires_in) """ service = await get_token_exchange_service() return await service.exchange_token_for_delegation( flow1_token=flow1_token, requested_scopes=requested_scopes, requested_audience=requested_audience, ) async def exchange_token_for_audience( subject_token: str, requested_audience: str = "nextcloud", requested_scopes: list[str] | None = None, ) -> Tuple[str, int]: """Convenience function for pure RFC 8693 token exchange (no refresh tokens). Use this for ALL MCP tool calls (request-time operations). Args: subject_token: Token being exchanged (from MCP client) requested_audience: Target audience (usually "nextcloud") requested_scopes: Optional scopes (may not be supported by all IdPs) Returns: Tuple of (access_token, expires_in) """ service = await get_token_exchange_service() return await service.exchange_token_for_audience( subject_token=subject_token, requested_audience=requested_audience, requested_scopes=requested_scopes, )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/No-Smoke/nextcloud-mcp-comprehensive'

If you have feedback or need assistance with the MCP directory API, please join our Discord server