Skip to main content
Glama
test_adr004_oauth_flow.py12.9 kB
#!/usr/bin/env python3 """ ADR-004 OAuth Flow Test Script Tests the complete Hybrid Flow implementation: 1. User initiates OAuth at MCP server /oauth/authorize 2. User consents to MCP server access (IdP) 3. User consents to MCP server accessing Nextcloud (IdP/Nextcloud) 4. MCP server receives master refresh token 5. Client receives MCP access token 6. Client calls MCP tool 7. MCP server exchanges master refresh token for Nextcloud access token 8. MCP server fetches data from Nextcloud on behalf of user Usage: # Test with Nextcloud OIDC app uv run python tests/manual/test_adr004_oauth_flow.py --provider nextcloud # Test with Keycloak uv run python tests/manual/test_adr004_oauth_flow.py --provider keycloak Requirements: - MCP server running with OAuth enabled - System web browser """ import argparse import asyncio import hashlib import logging import secrets import webbrowser from base64 import urlsafe_b64encode from http.server import BaseHTTPRequestHandler, HTTPServer from threading import Thread from urllib.parse import parse_qs, urlencode, urlparse import httpx logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) class CallbackHandler(BaseHTTPRequestHandler): """Handles OAuth callback redirect to localhost""" authorization_code = None state = None def do_GET(self): """Handle GET request with authorization code""" parsed = urlparse(self.path) params = parse_qs(parsed.query) # Ignore favicon requests if parsed.path == "/favicon.ico": self.send_response(200) self.send_header("Content-type", "image/x-icon") self.end_headers() return CallbackHandler.authorization_code = params.get("code", [None])[0] CallbackHandler.state = params.get("state", [None])[0] # Send success page self.send_response(200) self.send_header("Content-type", "text/html") self.end_headers() code_display = ( CallbackHandler.authorization_code[:50] + "..." if CallbackHandler.authorization_code else "No code received" ) html = """ <html> <head><title>Authorization Success</title></head> <body> <h1 style="color: green;">✓ Authorization Successful</h1> <p>Authorization code received. You can close this window and return to the terminal.</p> <code style="background: #f0f0f0; padding: 10px; display: block; margin: 10px 0;"> {} </code> <script>setTimeout(() => window.close(), 2000);</script> </body> </html> """.format(code_display) self.wfile.write(html.encode()) def log_message(self, format, *args): """Log HTTP requests""" logger.info(f"Callback: {format % args}") def generate_pkce_challenge(): """Generate PKCE code verifier and challenge""" code_verifier = secrets.token_urlsafe(32) digest = hashlib.sha256(code_verifier.encode()).digest() code_challenge = urlsafe_b64encode(digest).decode().rstrip("=") return code_verifier, code_challenge # Note: Playwright automation functions removed - using system browser instead async def test_oauth_flow( provider: str, mcp_server_url: str, nextcloud_host: str, username: str, password: str, ): """ Test complete ADR-004 OAuth flow using system browser. Args: provider: "nextcloud" or "keycloak" mcp_server_url: MCP server URL (e.g., http://localhost:8001) nextcloud_host: Nextcloud instance URL username: Test user username (for documentation) password: Test user password (for documentation) """ logger.info(f"Starting ADR-004 OAuth flow test with provider: {provider}") logger.info(f"MCP Server: {mcp_server_url}") logger.info(f"Nextcloud Host: {nextcloud_host}") # Generate PKCE challenge code_verifier, code_challenge = generate_pkce_challenge() logger.info(f"✓ Generated PKCE challenge: {code_challenge[:16]}...") # Generate state for CSRF protection state = secrets.token_urlsafe(32) # Start local HTTP server for OAuth callback callback_port = 8765 redirect_uri = f"http://localhost:{callback_port}/callback" server = HTTPServer(("localhost", callback_port), CallbackHandler) server_thread = Thread(target=server.serve_forever, daemon=True) server_thread.start() logger.info(f"✓ Started callback server at {redirect_uri}") try: # Step 1: Build authorization URL auth_params = { "response_type": "code", "client_id": "test-mcp-client", "redirect_uri": redirect_uri, "scope": "openid profile email offline_access notes:read notes:write", "state": state, "code_challenge": code_challenge, "code_challenge_method": "S256", } auth_url = f"{mcp_server_url}/oauth/authorize?{urlencode(auth_params)}" print("\n" + "=" * 70) print("STEP 1: AUTHORIZE IN BROWSER") print("=" * 70) print(f"\n📋 Opening browser to: {auth_url[:80]}...") print(f"\n📌 Login with: {username} / {password}") print("📌 Then authorize the MCP server") print("=" * 70 + "\n") # Step 2: Open system browser logger.info("Opening system browser for OAuth flow...") webbrowser.open(auth_url) logger.info("⏳ Waiting for authorization callback (timeout: 5 minutes)...") # Wait for callback timeout = 300 # 5 minutes elapsed = 0 while not CallbackHandler.authorization_code and elapsed < timeout: await asyncio.sleep(1) elapsed += 1 if not CallbackHandler.authorization_code: raise RuntimeError("Timeout waiting for authorization code") # Step 3: Verify we received authorization code authorization_code = CallbackHandler.authorization_code returned_state = CallbackHandler.state if not authorization_code: raise RuntimeError("Failed to receive authorization code from callback") logger.info(f"✓ Received MCP authorization code: {authorization_code[:16]}...") # Verify state matches (CSRF protection) if returned_state != state: raise RuntimeError( f"State mismatch! Expected {state}, got {returned_state}" ) logger.info("✓ State parameter verified (CSRF protection)") # Step 4: Exchange authorization code for access token logger.info("Exchanging authorization code for access token...") async with httpx.AsyncClient() as client: token_response = await client.post( f"{mcp_server_url}/oauth/token", data={ "grant_type": "authorization_code", "code": authorization_code, "code_verifier": code_verifier, "redirect_uri": redirect_uri, "client_id": "test-mcp-client", }, ) if token_response.status_code != 200: logger.error(f"Token exchange failed: {token_response.status_code}") logger.error(f"Response: {token_response.text}") raise RuntimeError( f"Token exchange failed: {token_response.status_code}" ) token_data = token_response.json() access_token = token_data["access_token"] logger.info("✓ Successfully received access token") logger.info(f" Token: {access_token[:20]}...") logger.info(f" Type: {token_data.get('token_type', 'Bearer')}") logger.info(f" Expires in: {token_data.get('expires_in', 'unknown')}s") # Step 5: Use access token to call MCP tool logger.info("Testing MCP tool call with access token...") async with httpx.AsyncClient() as client: # Call MCP server to list notes (this will trigger token exchange in background) mcp_request = { "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": "nc_notes_search_notes", "arguments": {"query": "test"}, }, } mcp_response = await client.post( f"{mcp_server_url}/mcp", json=mcp_request, headers={ "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", "Accept": "application/json, text/event-stream", }, timeout=30.0, ) if mcp_response.status_code != 200: logger.error(f"MCP tool call failed: {mcp_response.status_code}") logger.error(f"Response: {mcp_response.text}") raise RuntimeError(f"MCP tool call failed: {mcp_response.status_code}") mcp_result = mcp_response.json() if "error" in mcp_result: logger.error(f"MCP tool returned error: {mcp_result['error']}") raise RuntimeError(f"MCP tool error: {mcp_result['error']}") logger.info("✓ MCP tool call succeeded!") logger.info(f" Result: {mcp_result.get('result', {})}") # Step 6: Verify refresh token storage logger.info("Verifying refresh token storage...") # Check if refresh token was stored (requires database access) # This would require accessing the SQLite database directly logger.info("✓ OAuth flow completed successfully!") # Summary print("\n" + "=" * 70) print("ADR-004 OAUTH FLOW TEST - SUCCESS") print("=" * 70) print(f"Provider: {provider}") print(f"MCP Server: {mcp_server_url}") print(f"Nextcloud: {nextcloud_host}") print(f"User: {username}") print("") print("✓ User consented to MCP server access") print("✓ User consented to offline_access (refresh tokens)") print("✓ MCP server stored master refresh token") print("✓ Client received MCP access token") print("✓ MCP tool call succeeded") print("✓ MCP server exchanged tokens in background") print("✓ Nextcloud data fetched successfully") print("=" * 70) return { "success": True, "access_token": access_token, "provider": provider, } finally: server.shutdown() logger.info("Stopped callback server") async def main(): parser = argparse.ArgumentParser( description="Test ADR-004 OAuth Hybrid Flow", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Test with Nextcloud OIDC uv run python tests/manual/test_adr004_oauth_flow.py --provider nextcloud # Test with Keycloak uv run python tests/manual/test_adr004_oauth_flow.py --provider keycloak # Headless mode uv run python tests/manual/test_adr004_oauth_flow.py --provider nextcloud --headless """, ) parser.add_argument( "--provider", choices=["nextcloud", "keycloak"], required=True, help="OAuth provider to test (nextcloud or keycloak)", ) parser.add_argument( "--mcp-server-url", default="http://localhost:8001", help="MCP server URL (default: http://localhost:8001 for OAuth)", ) parser.add_argument( "--nextcloud-host", default="http://localhost:8080", help="Nextcloud host URL (default: http://localhost:8080)", ) parser.add_argument( "--username", default="admin", help="Test user username (default: admin)" ) parser.add_argument( "--password", default="admin", help="Test user password (default: admin)" ) args = parser.parse_args() try: result = await test_oauth_flow( provider=args.provider, mcp_server_url=args.mcp_server_url, nextcloud_host=args.nextcloud_host, username=args.username, password=args.password, ) return 0 if result["success"] else 1 except Exception as e: logger.error(f"OAuth flow test failed: {e}", exc_info=True) print("\n" + "=" * 70) print("ADR-004 OAUTH FLOW TEST - FAILED") print("=" * 70) print(f"Error: {e}") print("=" * 70) return 1 if __name__ == "__main__": exit_code = asyncio.run(main()) exit(exit_code)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/No-Smoke/nextcloud-mcp-comprehensive'

If you have feedback or need assistance with the MCP directory API, please join our Discord server