Skip to main content
Glama

MCP cldkctl Server

by raffelprama
server.py20 kB
#!/usr/bin/env python3 """ MCP Server for Cloudeka CLI (cldkctl) functionality. """ import asyncio import base64 import json import os import sys from datetime import datetime, timedelta from typing import Any, Dict, Optional from importlib import metadata import requests from mcp.server import Server from mcp.server.models import InitializationOptions from mcp.server.stdio import stdio_server from mcp.types import ( CallToolResult, Tool, TextContent, Content, ) from . import CLDKCTL_TOKEN as PKG_CLDKCTL_TOKEN, CLDKCTL_BASE_URL as PKG_CLDKCTL_BASE_URL # Import the tool parameter map try: from .tool_param_map import TOOL_PARAM_MAP except ImportError: # Fallback if the file doesn't exist TOOL_PARAM_MAP = {} class SimpleNotificationOptions: """Simple notification options class for MCP server capabilities.""" def __init__(self, prompts_changed=False, resources_changed=False, tools_changed=False): self.prompts_changed = prompts_changed self.resources_changed = resources_changed self.tools_changed = tools_changed # Initialize the server server = Server("cldkctl") # Configuration PRODUCTION_URL = "https://ai.cloudeka.id" STAGING_URL = "https://staging.ai.cloudeka.id" CACHE_FILE = os.path.expanduser("~/.cldkctl/mcp_cache.json") CACHE_DIR = os.path.expanduser("~/.cldkctl") # Ensure cache directory exists os.makedirs(CACHE_DIR, exist_ok=True) # Global state for authentication and environment auth_cache = { "jwt_token": None, "login_payload": None, "expires_at": None, "user_info": None, "environment": "production", "base_url": PRODUCTION_URL, } # Environment configuration current_base_url = PRODUCTION_URL # Default to production environment_name = "production" def load_cache(): """Load cached authentication data.""" global auth_cache, current_base_url, environment_name try: if os.path.exists(CACHE_FILE): with open(CACHE_FILE, "r") as f: cached_data = json.load(f) # Check if token is still valid if cached_data.get("expires_at"): expires_at = datetime.fromisoformat(cached_data["expires_at"]) if datetime.now() < expires_at: auth_cache.update(cached_data) current_base_url = auth_cache["base_url"] environment_name = auth_cache["environment"] print(f"Loaded cached auth data, expires at {expires_at}", file=sys.stderr) return True else: print("Cached token expired", file=sys.stderr) return False except Exception as e: print(f"Error loading cache: {e}", file=sys.stderr) return False def save_cache(): """Save authentication data to cache.""" try: with open(CACHE_FILE, "w") as f: json.dump(auth_cache, f, default=str) except Exception as e: print(f"Error saving cache: {e}", file=sys.stderr) def authenticate_with_token(token: str, force_staging: bool = False) -> bool: """Authenticate using a cldkctl token and get JWT.""" global auth_cache, current_base_url, environment_name # Determine which URL to use if force_staging: base_url = STAGING_URL env_name = "staging" else: base_url = PRODUCTION_URL env_name = "production" print(f"Authenticating with {env_name} environment: {base_url}", file=sys.stderr) url = f"{base_url}/core/cldkctl/auth" payload = {"token": token} try: response = requests.post(url, json=payload, headers={"Content-Type": "application/json"}) print(f"Auth response status: {response.status_code}", file=sys.stderr) if response.status_code == 200: data = response.json() jwt_token = data.get("data", {}).get("token") if jwt_token: # Update global state current_base_url = base_url environment_name = env_name # Cache the authentication data auth_cache["jwt_token"] = jwt_token auth_cache["login_payload"] = base64.b64encode(json.dumps(payload).encode()).decode() auth_cache["expires_at"] = (datetime.now() + timedelta(hours=24)).isoformat() auth_cache["user_info"] = data.get("data", {}) auth_cache["environment"] = env_name auth_cache["base_url"] = base_url save_cache() print(f"Authentication successful with {env_name}", file=sys.stderr) return True else: print("No JWT token in response", file=sys.stderr) return False elif response.status_code == 400 and "pq: relation \"cldkctl_tokens\" does not exist" in response.text: print("Production backend has database issue, trying staging...", file=sys.stderr) if not force_staging: return authenticate_with_token(token, force_staging=True) else: print("Staging also failed with the same issue.", file=sys.stderr) return False else: print(f"Authentication failed: {response.status_code} - {response.text}", file=sys.stderr) return False except requests.RequestException as e: print(f"Authentication request error: {e}", file=sys.stderr) if not force_staging: print("Trying staging as fallback...", file=sys.stderr) return authenticate_with_token(token, force_staging=True) return False def get_auth_headers() -> Dict[str, str]: """Get headers with authentication token.""" if not auth_cache.get("jwt_token"): raise Exception("Not authenticated. Please authenticate first.") # Check for token expiration expires_at_str = auth_cache.get("expires_at") if expires_at_str: if datetime.now() >= datetime.fromisoformat(expires_at_str): print("Token expired, attempting re-authentication", file=sys.stderr) if auth_cache.get("login_payload"): login_data = json.loads(base64.b64decode(auth_cache["login_payload"]).decode()) if not authenticate_with_token(login_data["token"], force_staging=(environment_name == "staging")): raise Exception("Re-authentication failed") else: raise Exception("Token expired and no login payload to re-authenticate.") return { "Authorization": f"Bearer {auth_cache['jwt_token']}", "Content-Type": "application/json", } def make_authenticated_request(method: str, endpoint: str, data: Optional[Dict] = None, params: Optional[Dict] = None) -> Dict: """Make an authenticated request to the API.""" url = f"{current_base_url}{endpoint}" try: headers = get_auth_headers() # For GET requests, do not send a body (data) if method.upper() == "GET": response = requests.request(method, url, headers=headers, params=params) else: response = requests.request(method, url, headers=headers, json=data, params=params) response.raise_for_status() return response.json() except Exception as e: print(f"Authenticated request failed: {e}", file=sys.stderr) # In case of an error, return a structured error message return {"error": True, "message": str(e), "status_code": getattr(e.response, "status_code", None)} def get_tool_definitions() -> list[Tool]: """Generate tool definitions from TOOL_PARAM_MAP.""" tools = [] # Add basic authentication tools that are handled specially tools.extend([ Tool( name="auth", description="Authenticate with a cldkctl token to get JWT access", inputSchema={ "type": "object", "properties": { "token": { "type": "string", "description": "Your cldkctl token (starts with 'cldkctl_')" }, "force_staging": { "type": "boolean", "description": "Force using staging environment (default: false, will auto-fallback if production fails)" } }, "required": ["token"] } ), ]) # Generate tools from TOOL_PARAM_MAP for tool_name, tool_info in TOOL_PARAM_MAP.items(): # Remove the mcp_cldkctl_ prefix for the tool name clean_name = tool_name.replace("mcp_cldkctl_", "") # Build properties and required fields from the tool info properties = {} required = [] # Add required parameters for param in tool_info.get("required_params", []): param_name = param["name"] param_type = param["type"] param_desc = param.get("desc", "") # Map parameter types to JSON schema types if param_type == "string": schema_type = "string" elif param_type == "integer": schema_type = "integer" elif param_type == "boolean": schema_type = "boolean" elif param_type == "list": schema_type = "array" elif param_type == "dict" or param_type == "object": schema_type = "object" else: schema_type = "string" # Default to string properties[param_name] = { "type": schema_type, "description": param_desc } required.append(param_name) # Add optional parameters for param in tool_info.get("optional_params", []): param_name = param["name"] param_type = param["type"] param_desc = param.get("desc", "") # Map parameter types to JSON schema types if param_type == "string": schema_type = "string" elif param_type == "integer": schema_type = "integer" elif param_type == "boolean": schema_type = "boolean" elif param_type == "list": schema_type = "array" elif param_type == "dict" or param_type == "object": schema_type = "object" else: schema_type = "string" # Default to string properties[param_name] = { "type": schema_type, "description": param_desc } # Create the tool tool = Tool( name=clean_name, description=f"Call the {clean_name} endpoint", inputSchema={ "type": "object", "properties": properties, "required": required } ) tools.append(tool) return tools @server.list_tools() async def handle_list_tools() -> list[Tool]: """List all available tools.""" try: tools = get_tool_definitions() print(f"Successfully loaded {len(tools)} tools.", file=sys.stderr) return tools except Exception as e: print(f"!!!!!!!! ERROR GETTING TOOL DEFINITIONS !!!!!!!!", file=sys.stderr) print(f"Exception: {e}", file=sys.stderr) import traceback traceback.print_exc(file=sys.stderr) return [] def format_response(data: Any) -> str: """Formats API response data for display.""" if isinstance(data, dict) and data.get("error"): return f"❌ API Error: {data.get('message', 'Unknown error')}" return f"```json\n{json.dumps(data, indent=2)}\n```" @server.call_tool() async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> list[Content]: """Handle tool calls.""" global current_base_url, environment_name # Normalize tool name by removing mcp_cldkctl_ prefix if present tool_name = name.replace("mcp_cldkctl_", "") if name.startswith("mcp_cldkctl_") else name try: # Handle authentication tools if tool_name == "auth": token = arguments["token"] force_staging = arguments.get("force_staging", False) if authenticate_with_token(token, force_staging): env_info = f" ({environment_name})" if environment_name != "production" else "" user_info = auth_cache.get('user_info', {}) text = ( f"✅ Authentication successful{env_info}!\n\n" f"User: {user_info.get('name', 'Unknown')}\n" f"Role: {user_info.get('role', 'Unknown')}\n" f"Organization: {user_info.get('organization_id', 'None')}\n" f"Environment: {environment_name}\n" f"Base URL: {current_base_url}" ) return [TextContent(type="text", text=text)] else: return [TextContent(type="text", text="❌ Authentication failed.")] # Handle all other tools from TOOL_PARAM_MAP full_tool_name = f"mcp_cldkctl_{tool_name}" if full_tool_name in TOOL_PARAM_MAP: tool_info = TOOL_PARAM_MAP[full_tool_name] endpoint = tool_info["endpoint"] method = tool_info["method"] # Special case: registry list should always send page=1 and project-id if provided if full_tool_name == "mcp_cldkctl_cldkctl_registry_list": params = {"page": "1"} if "project-id" in arguments: params["project-id"] = arguments["project-id"] data = make_authenticated_request(method, endpoint, params=params) return [TextContent(type="text", text=format_response(data))] # Handle path parameters in the endpoint try: # Replace path parameters like :param_name with actual values for param_name, param_value in arguments.items(): if f":{param_name}" in endpoint: endpoint = endpoint.replace(f":{param_name}", str(param_value)) # Determine what to send as body vs query params if method.upper() in ["POST", "PUT", "PATCH"]: # For POST/PUT/PATCH, send arguments as JSON body data = make_authenticated_request(method, endpoint, data=arguments) else: # For GET/DELETE, send arguments as query parameters data = make_authenticated_request(method, endpoint, params=arguments) return [TextContent(type="text", text=format_response(data))] except KeyError as e: return [TextContent(type="text", text=f"❌ Missing required parameter: {e}")] except Exception as e: return [TextContent(type="text", text=f"❌ Error executing {tool_name}: {str(e)}")] else: # Try without the prefix as well if tool_name in TOOL_PARAM_MAP: tool_info = TOOL_PARAM_MAP[tool_name] endpoint = tool_info["endpoint"] method = tool_info["method"] try: # Replace path parameters like :param_name with actual values for param_name, param_value in arguments.items(): if f":{param_name}" in endpoint: endpoint = endpoint.replace(f":{param_name}", str(param_value)) # Determine what to send as body vs query params if method.upper() in ["POST", "PUT", "PATCH"]: # For POST/PUT/PATCH, send arguments as JSON body data = make_authenticated_request(method, endpoint, data=arguments) else: # For GET/DELETE, send arguments as query parameters data = make_authenticated_request(method, endpoint, params=arguments) return [TextContent(type="text", text=format_response(data))] except KeyError as e: return [TextContent(type="text", text=f"❌ Missing required parameter: {e}")] except Exception as e: return [TextContent(type="text", text=f"❌ Error executing {tool_name}: {str(e)}")] else: available_tools = [name.replace("mcp_cldkctl_", "") for name in TOOL_PARAM_MAP.keys()] return [TextContent(type="text", text=f"❌ Unknown tool: {tool_name}. Available tools: {', '.join(available_tools[:20])}...")] except Exception as e: print(f"Error calling tool {tool_name}: {e}", file=sys.stderr) import traceback traceback.print_exc(file=sys.stderr) return [TextContent(type="text", text=f"❌ Error executing tool '{tool_name}': {str(e)}")] async def main(): """Main function.""" print("Initializing server...", file=sys.stderr) try: __version__ = metadata.version("mcp-cldkctl") except metadata.PackageNotFoundError: __version__ = "0.0.0-dev" # Token loading logic moved here def load_token_from_mcp_json(): import pathlib config_path = os.path.expanduser("~/.cursor/mcp.json") if not os.path.exists(config_path): config_path = str(pathlib.Path.home() / ".cursor" / "mcp.json") try: with open(config_path, "r") as f: data = json.load(f) token = data.get("mcpServers", {}).get("cldkctl", {}).get("env", {}).get("CLDKCTL_TOKEN") base_url = data.get("mcpServers", {}).get("cldkctl", {}).get("env", {}).get("CLDKCTL_BASE_URL") return token, base_url except Exception: return None, None if not load_cache(): token = os.environ.get("CLDKCTL_TOKEN") base_url = os.environ.get("CLDKCTL_BASE_URL") # Use package-level values as fallback if env not set if not token: token = PKG_CLDKCTL_TOKEN if not base_url: base_url = PKG_CLDKCTL_BASE_URL # If still not set, try reading mcp.json directly (legacy fallback) if not token: token, base_url = load_token_from_mcp_json() if token: print("No valid cache found, authenticating with token from environment, package, or mcp.json...", file=sys.stderr) force_staging = base_url == STAGING_URL if base_url else False authenticate_with_token(token, force_staging=force_staging) else: print("No valid cache or environment/config token found. You will be prompted for a token on first use.", file=sys.stderr) async with stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="cldkctl", server_version=__version__, capabilities=server.get_capabilities( notification_options=SimpleNotificationOptions( prompts_changed=False, resources_changed=False, tools_changed=False, ), experimental_capabilities={}, ), ), ) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("Server shutting down.", file=sys.stderr)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/raffelprama/mcp-cldkctl'

If you have feedback or need assistance with the MCP directory API, please join our Discord server