Skip to main content
Glama
step11.py12.7 kB
import requests import json from urllib.parse import urljoin import os # Configuration MCP_SERVER_URL = "http://localhost:9000" MCP_ENDPOINT = f"{MCP_SERVER_URL}/mcp" CREDENTIALS_FILE = "client_credentials.json" REDIRECT_URI = "http://localhost:9090/callback" # 1. Attempt to list tools (unauthenticated) def post_tools_list(): payload = { "jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": None } print(f"\n[1] Sending unauthenticated tools/list request to {MCP_ENDPOINT} ...") resp = requests.post(MCP_ENDPOINT, json=payload) print(f"Status: {resp.status_code}") return resp # 2. Parse WWW-Authenticate header for resource_metadata def parse_www_authenticate(resp): www_auth = resp.headers.get("WWW-Authenticate") print(f"\n[2] WWW-Authenticate header: {www_auth}") if not www_auth: raise RuntimeError("No WWW-Authenticate header found!") # Parse resource_metadata (format: Bearer realm=..., resource_metadata="...") import re match = re.search(r'resource_metadata="([^"]+)"', www_auth) if not match: raise RuntimeError("resource_metadata not found in WWW-Authenticate header!") resource_metadata_url = match.group(1) print(f"Extracted resource_metadata URL: {resource_metadata_url}") return resource_metadata_url # 3. Fetch OAuth protected resource metadata def fetch_protected_resource_metadata(url): print(f"\n[3] Fetching protected resource metadata from {url} ...") resp = requests.get(url) print(f"Status: {resp.status_code}") if resp.status_code != 200: raise RuntimeError(f"Failed to fetch protected resource metadata: {resp.text}") data = resp.json() print("Protected resource metadata:") print(json.dumps(data, indent=2)) return data # 4. Extract authorization_servers def extract_authorization_server(resource_metadata): auth_servers = resource_metadata.get("authorization_servers") if not auth_servers or not isinstance(auth_servers, list): raise RuntimeError("authorization_servers field missing or invalid!") auth_server = auth_servers[0] print(f"\n[4] Using authorization server: {auth_server}") return auth_server # 5. Fetch authorization server metadata (RFC8414) def fetch_authorization_server_metadata(auth_server_url): # Always append /.well-known/oauth-authorization-server to the discovered base URL if auth_server_url.endswith("/"): auth_server_url = auth_server_url[:-1] metadata_url = urljoin(auth_server_url + '/', ".well-known/oauth-authorization-server") print(f"\n[5] Fetching authorization server metadata from {metadata_url} ...") resp = requests.get(metadata_url) print(f"Status: {resp.status_code}") if resp.status_code != 200: raise RuntimeError(f"Failed to fetch authorization server metadata: {resp.text}") data = resp.json() print("Authorization server metadata:") print(json.dumps(data, indent=2)) # Print relevant endpoints print("\nRelevant endpoints:") for key in ["token_endpoint", "authorization_endpoint", "registration_endpoint"]: if key in data: print(f" {key}: {data[key]}") return data def print_dcr_request(authz_metadata): reg_endpoint = authz_metadata.get("registration_endpoint") if not reg_endpoint: print("\n[6] No registration_endpoint found in authorization server metadata.") return print(f"\n[6] Dynamic Client Registration (RFC 7591)") print(f"Registration endpoint: {reg_endpoint}") dcr_payload = { "client_name": "My Anonymous Client", "redirect_uris": [REDIRECT_URI], "grant_types": ["authorization_code"], "scope": "mcp:read mcp:tools mcp:prompts echo-mcp-server-audience", "token_endpoint_auth_method": "client_secret_basic" } print("Registration request payload:") print(json.dumps(dcr_payload, indent=2)) def load_client_credentials(): if os.path.exists(CREDENTIALS_FILE): with open(CREDENTIALS_FILE, "r") as f: creds = json.load(f) print(f"\n[INFO] Loaded client credentials from {CREDENTIALS_FILE}:") print(json.dumps(creds, indent=2)) return creds return None def save_client_credentials(client_id, client_secret): creds = {"client_id": client_id, "client_secret": client_secret} with open(CREDENTIALS_FILE, "w") as f: json.dump(creds, f, indent=2) print(f"\n[INFO] Saved client credentials to {CREDENTIALS_FILE}") def perform_dcr_request(authz_metadata): reg_endpoint = authz_metadata.get("registration_endpoint") if not reg_endpoint: print("\n[6] No registration_endpoint found in authorization server metadata.") return None print(f"\n[6] Dynamic Client Registration (RFC 7591)") print(f"Registration endpoint: {reg_endpoint}") dcr_payload = { "client_name": "My Anonymous Client", "redirect_uris": [REDIRECT_URI], "grant_types": ["authorization_code"], "scope": "mcp:read mcp:tools mcp:prompts echo-mcp-server-audience", "token_endpoint_auth_method": "client_secret_basic" } print("Registration request payload:") print(json.dumps(dcr_payload, indent=2)) try: resp = requests.post(reg_endpoint, json=dcr_payload, headers={"Content-Type": "application/json"}) print(f"\nRegistration response status: {resp.status_code}") print("Registration response body:") print(resp.text) if resp.status_code == 201 or resp.status_code == 200: data = resp.json() client_id = data.get("client_id") client_secret = data.get("client_secret") print(f"\n[7] Registered client_id: {client_id}") print(f"[7] Registered client_secret: {client_secret}") save_client_credentials(client_id, client_secret) return data else: print("\n[7] Registration failed.") return None except Exception as e: print(f"\n[ERROR] Registration request failed: {e}") return None def build_authorization_url(authz_metadata, client_id): authz_endpoint = authz_metadata.get("authorization_endpoint") if not authz_endpoint: print("\n[8] No authorization_endpoint found in authorization server metadata.") return None # Required parameters import secrets import urllib.parse state = secrets.token_urlsafe(16) # The resource parameter is required by MCP spec resource = MCP_SERVER_URL params = { "response_type": "code", "client_id": client_id, "redirect_uri": REDIRECT_URI, "scope": "mcp:read mcp:tools mcp:prompts echo-mcp-server-audience", "state": state, "resource": resource } url = authz_endpoint + "?" + urllib.parse.urlencode(params) print(f"\n[8] Open this URL in your browser to authorize:") print(url) return state def prompt_for_redirect_url(): print("\nAfter authorizing, you will be redirected to your callback URL.") print("Copy the full URL from your browser's address bar and paste it here.") redirect_url = input("Paste the full redirect URL: ").strip() return redirect_url def extract_code_from_redirect_url(redirect_url, expected_state=None): import urllib.parse parsed = urllib.parse.urlparse(redirect_url) query = urllib.parse.parse_qs(parsed.query) code = query.get("code", [None])[0] state = query.get("state", [None])[0] if not code: print("\n[ERROR] No code found in redirect URL.") return None print(f"\n[9] Authorization code: {code}") if expected_state: if state != expected_state: print(f"[WARNING] State mismatch! Expected: {expected_state}, Got: {state}") else: print(f"[INFO] State matches: {state}") return code def exchange_code_for_token(authz_metadata, client_id, client_secret, code): token_endpoint = authz_metadata.get("token_endpoint") if not token_endpoint: print("\n[10] No token_endpoint found in authorization server metadata.") return None print(f"\n[10] Exchanging code for token at: {token_endpoint}") data = { "grant_type": "authorization_code", "code": code, "redirect_uri": REDIRECT_URI, "client_id": client_id, "client_secret": client_secret, "resource": MCP_SERVER_URL } headers = {"Content-Type": "application/x-www-form-urlencoded"} import requests resp = requests.post(token_endpoint, data=data, headers=headers) print(f"Token endpoint response status: {resp.status_code}") print("Token endpoint response body:") print(resp.text) if resp.status_code == 200: token_data = resp.json() access_token = token_data.get("access_token") if access_token: print(f"\n[11] Access token: {access_token[:40]}... (truncated)") return access_token else: print("\n[ERROR] No access_token in response.") return None else: print("\n[ERROR] Failed to obtain access token.") return None def mcp_tools_list_with_token(access_token): print(f"\n[12] Calling MCP tools/list with access token...") payload = { "jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": None } headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } resp = requests.post(MCP_ENDPOINT, json=payload, headers=headers) print(f"MCP response status: {resp.status_code}") print("MCP response body:") print(resp.text) if resp.status_code == 200: try: data = resp.json() print("\n[13] MCP tools:") print(json.dumps(data, indent=2)) except Exception as e: print(f"\n[ERROR] Failed to parse MCP response: {e}") else: print("\n[ERROR] MCP server returned error.") def main(): try: creds = load_client_credentials() if creds: print("\n[INFO] Skipping registration since credentials already exist.") client_id = creds["client_id"] client_secret = creds["client_secret"] else: resp = post_tools_list() if resp.status_code != 401: print("Expected 401 Unauthorized, got:", resp.status_code) print("Response:", resp.text) return resource_metadata_url = parse_www_authenticate(resp) resource_metadata = fetch_protected_resource_metadata(resource_metadata_url) auth_server_url = extract_authorization_server(resource_metadata) authz_metadata = fetch_authorization_server_metadata(auth_server_url) reg_data = perform_dcr_request(authz_metadata) if not reg_data: print("\n[ERROR] Registration failed, cannot continue.") return client_id = reg_data.get("client_id") client_secret = reg_data.get("client_secret") # --- OAuth flow --- # Discover endpoints if not creds: # If we just registered, we already have authz_metadata pass else: # If we loaded creds, need to rediscover endpoints resp = post_tools_list() if resp.status_code != 401: print("Expected 401 Unauthorized, got:", resp.status_code) print("Response:", resp.text) return resource_metadata_url = parse_www_authenticate(resp) resource_metadata = fetch_protected_resource_metadata(resource_metadata_url) auth_server_url = extract_authorization_server(resource_metadata) authz_metadata = fetch_authorization_server_metadata(auth_server_url) state = build_authorization_url(authz_metadata, client_id) if not state: print("\n[ERROR] Could not build authorization URL.") return redirect_url = prompt_for_redirect_url() code = extract_code_from_redirect_url(redirect_url, expected_state=state) if not code: print("\n[ERROR] No code to exchange for token.") return access_token = exchange_code_for_token(authz_metadata, client_id, client_secret, code) if not access_token: print("\n[ERROR] Could not obtain access token.") return mcp_tools_list_with_token(access_token) except Exception as e: print(f"\n[ERROR] {e}") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/christian-posta/mcp-auth-step-by-step'

If you have feedback or need assistance with the MCP directory API, please join our Discord server