Skip to main content
Glama
unifi_mcp_optimized.py26.1 kB
#!/usr/bin/env python3 """ Unifi Network MCP Server - Optimized Version Provides access to Unifi network devices and clients with better performance Separates infrastructure (devices) from clients for faster queries """ import asyncio import glob import json import logging import os import subprocess import sys import tempfile from datetime import datetime, timedelta from pathlib import Path logging.basicConfig(level=logging.INFO, stream=sys.stderr) logger = logging.getLogger(__name__) import mcp.server.stdio import mcp.types as types from mcp.server import NotificationOptions, Server from mcp.server.models import InitializationOptions from mcp_config_loader import load_env_file, COMMON_ALLOWED_ENV_VARS from mcp_error_handler import MCPErrorClassifier, log_error_with_context server = Server("unifi-network") # Configuration SCRIPT_DIR = Path(__file__).parent ENV_FILE = SCRIPT_DIR / ".env" # Load .env with security hardening UNIFI_ALLOWED_VARS = COMMON_ALLOWED_ENV_VARS | { "UNIFI_HOST", "UNIFI_API_KEY", } # Only load env file at module level if not in unified mode if not os.getenv("MCP_UNIFIED_MODE"): load_env_file(ENV_FILE, allowed_vars=UNIFI_ALLOWED_VARS, strict=True) UNIFI_EXPORTER_PATH = SCRIPT_DIR / "unifi_exporter.py" UNIFI_HOST = os.getenv("UNIFI_HOST", "192.168.1.1") UNIFI_API_KEY = os.getenv("UNIFI_API_KEY", "") # Cache configuration CACHE_DIR = Path(tempfile.gettempdir()) / "unifi_mcp_cache" CACHE_DIR.mkdir(exist_ok=True) CACHE_DURATION = timedelta(minutes=5) # Cache data for 5 minutes if __name__ == "__main__": logger.info(f"Unifi host: {UNIFI_HOST}") logger.info(f"API key configured: {'Yes' if UNIFI_API_KEY else 'No'}") logger.info(f"Cache directory: {CACHE_DIR}") class UnifiMCPServer: """Unifi Network MCP Server - Class-based implementation""" def __init__(self): """Initialize configuration using existing config loading logic""" # Load environment configuration (skip if in unified mode) if not os.getenv("MCP_UNIFIED_MODE"): load_env_file(ENV_FILE, allowed_vars=UNIFI_ALLOWED_VARS, strict=True) self.unifi_exporter_path = SCRIPT_DIR / "unifi_exporter.py" self.unifi_host = os.getenv("UNIFI_HOST", "192.168.1.1") self.unifi_api_key = os.getenv("UNIFI_API_KEY", "") # Cache configuration - each instance gets its own cache self.cache_dir = Path(tempfile.gettempdir()) / f"unifi_mcp_cache_{id(self)}" self.cache_dir.mkdir(exist_ok=True) self.cache_duration = timedelta(minutes=5) logger.info(f"[UnifiMCPServer] Unifi host: {self.unifi_host}") logger.info(f"[UnifiMCPServer] API key configured: {'Yes' if self.unifi_api_key else 'No'}") logger.info(f"[UnifiMCPServer] Cache directory: {self.cache_dir}") async def list_tools(self) -> list[types.Tool]: """Return list of Tool objects this server provides (with unifi_ prefix)""" return [ types.Tool( name="unifi_get_network_devices", description="Get all Unifi network devices (switches, APs, gateways) with status and basic info. This is cached for better performance.", inputSchema={"type": "object", "properties": {}, "required": []}, title="Get Unifi Network Devices", annotations=types.ToolAnnotations( readOnlyHint=True, destructiveHint=False, idempotentHint=True, openWorldHint=True, ) ), types.Tool( name="unifi_get_network_clients", description="Get all active network clients and their connections. This is cached for better performance.", inputSchema={"type": "object", "properties": {}, "required": []}, title="Get Unifi Network Clients", annotations=types.ToolAnnotations( readOnlyHint=True, destructiveHint=False, idempotentHint=True, openWorldHint=True, ) ), types.Tool( name="unifi_get_network_summary", description="Get network overview: VLANs, device count, client count. Fast summary view.", inputSchema={"type": "object", "properties": {}, "required": []}, title="Get Unifi Network Summary", annotations=types.ToolAnnotations( readOnlyHint=True, destructiveHint=False, idempotentHint=True, openWorldHint=True, ) ), types.Tool( name="unifi_refresh_network_data", description="Force refresh network data from Unifi controller (bypasses cache).", inputSchema={"type": "object", "properties": {}, "required": []}, title="Refresh Unifi Network Data", annotations=types.ToolAnnotations( readOnlyHint=True, destructiveHint=False, idempotentHint=False, openWorldHint=True, ) ), ] async def handle_tool(self, tool_name: str, arguments: dict | None) -> list[types.TextContent]: """Route tool calls to appropriate handler methods""" # Strip the unifi_ prefix for routing name = tool_name.replace("unifi_", "", 1) if tool_name.startswith("unifi_") else tool_name logger.info(f"[UnifiMCPServer] Tool called: {tool_name} -> {name} with args: {arguments}") # Call the shared implementation with this instance's config return await handle_call_tool_impl( name, arguments, self.unifi_exporter_path, self.unifi_host, self.unifi_api_key, self.cache_dir, self.cache_duration ) def get_cached_data(cache_dir: Path, cache_duration: timedelta): """Get cached Unifi data if available and not expired""" cache_file = cache_dir / "unifi_data.json" if not cache_file.exists(): return None # Check if cache is still valid cache_time = datetime.fromtimestamp(cache_file.stat().st_mtime) if datetime.now() - cache_time > cache_duration: logger.info("Cache expired") return None try: with open(cache_file, "r", encoding="utf-8") as f: data = json.load(f) logger.info(f"Using cached data from {cache_time}") return data except Exception as e: logger.error(f"Error reading cache: {e}") return None def save_cached_data(data, cache_dir: Path): """Save Unifi data to cache""" cache_file = cache_dir / "unifi_data.json" try: with open(cache_file, "w", encoding="utf-8") as f: json.dump(data, f) logger.info(f"Saved data to cache: {cache_file}") except Exception as e: logger.error(f"Error saving cache: {e}") async def fetch_unifi_data(exporter_path: Path, unifi_host: str, unifi_api_key: str, cache_dir: Path): """Fetch fresh data from Unifi exporter Raises: FileNotFoundError: If exporter script not found ValueError: If API key not configured RuntimeError: If exporter fails with detailed error message """ if not exporter_path.exists(): error_msg = MCPErrorClassifier.format_error_message( service_name="Unifi", error_type="Configuration Error", message=f"Unifi exporter script not found", remediation=f"Ensure unifi_exporter.py exists in the project directory at {exporter_path.parent}", details=f"Expected path: {exporter_path}" ) raise FileNotFoundError(error_msg) if not unifi_api_key: error_msg = MCPErrorClassifier.format_error_message( service_name="Unifi", error_type="Configuration Error", message="UNIFI_API_KEY environment variable not set", remediation="Set UNIFI_API_KEY in your .env file. Generate an API key in Unifi Settings > Admins > API.", details="API key is required to authenticate with Unifi controller" ) raise ValueError(error_msg) with tempfile.TemporaryDirectory() as tmpdir: logger.info(f"Running Unifi exporter for {unifi_host}...") # Fix Windows console encoding issues env = os.environ.copy() env["PYTHONIOENCODING"] = "utf-8" env["PYTHONUNBUFFERED"] = "1" cmd = [ "python", str(exporter_path), "--host", unifi_host, "--api-key", unifi_api_key, "--format", "json", "--output-dir", tmpdir, ] # Use Popen with communicate() for proper subprocess handling import sys process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.DEVNULL, # Prevent stdin blocking text=True, env=env, creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0, ) try: stdout, stderr = process.communicate(timeout=30) if process.returncode != 0: # Parse stderr to identify specific error types stderr_lower = stderr.lower() # Classify error based on stderr content error_classification = MCPErrorClassifier.classify_text_error(stderr) if error_classification: # Known error pattern detected error_msg = MCPErrorClassifier.format_error_message( service_name="Unifi", error_type=error_classification["type"], message=f"Unifi exporter failed (exit code {process.returncode})", remediation=error_classification["remediation"], details=stderr.strip()[:500], # Limit stderr output hostname=unifi_host ) elif "401" in stderr or "unauthorized" in stderr_lower: # API authentication error error_msg = MCPErrorClassifier.format_error_message( service_name="Unifi", error_type="Authentication Failed", message=f"Invalid Unifi API key for {unifi_host}", remediation="Verify UNIFI_API_KEY in .env matches the API key from Unifi Settings > Admins > API. Ensure the key has not expired.", details=stderr.strip()[:500], hostname=unifi_host ) elif "connection refused" in stderr_lower or "failed to connect" in stderr_lower: # Connection error error_msg = MCPErrorClassifier.format_connection_error( service_name="Unifi", hostname=unifi_host, port=443, # Default Unifi port additional_guidance="Ensure Unifi controller is running and accessible. Check UNIFI_HOST setting." ) error_msg += f"\n\nExporter output: {stderr.strip()[:500]}" elif "timeout" in stderr_lower or "timed out" in stderr_lower: # Timeout error error_msg = MCPErrorClassifier.format_timeout_error( service_name="Unifi", hostname=unifi_host, port=443, timeout_seconds=30 ) error_msg += f"\n\nExporter output: {stderr.strip()[:500]}" else: # Generic error with return code context error_msg = MCPErrorClassifier.format_error_message( service_name="Unifi", error_type=f"Exporter Failed (Code {process.returncode})", message=f"Unifi exporter process failed", remediation="Check the error details below. Verify UNIFI_HOST and UNIFI_API_KEY are correct.", details=stderr.strip()[:500], hostname=unifi_host ) log_error_with_context( logger, f"Unifi exporter failed with code {process.returncode}", context={"host": unifi_host, "returncode": process.returncode, "stderr": stderr[:200]} ) raise RuntimeError(error_msg) except subprocess.TimeoutExpired: process.kill() stdout, stderr = process.communicate() error_msg = MCPErrorClassifier.format_timeout_error( service_name="Unifi", hostname=unifi_host, timeout_seconds=30 ) error_msg += "\n\nThe exporter process was killed after timeout. Check if Unifi controller is responding slowly." log_error_with_context(logger, "Unifi exporter timeout", context={"host": unifi_host, "timeout": 30}) logger.warning(f"Process timeout but checking for output files... STDERR: {stderr[:200]}") # Find the generated JSON file json_files = glob.glob(os.path.join(tmpdir, "unifi_network_*.json")) if not json_files: error_msg = MCPErrorClassifier.format_error_message( service_name="Unifi", error_type="Export Failed", message="No output file generated by Unifi exporter", remediation="The exporter ran but produced no output. Check if Unifi controller is accessible and responding.", details=f"STDOUT: {stdout[:200]}, STDERR: {stderr[:200]}", hostname=unifi_host ) log_error_with_context( logger, "Unifi exporter generated no output", context={"host": unifi_host, "stdout": stdout[:100], "stderr": stderr[:100]} ) raise FileNotFoundError(error_msg) # Read the most recent file latest_file = sorted(json_files)[-1] logger.info(f"Reading data from {latest_file}") with open(latest_file, "r", encoding="utf-8") as f: data = json.load(f) # Save to cache save_cached_data(data, cache_dir) return data async def get_unifi_data(exporter_path: Path, unifi_host: str, unifi_api_key: str, cache_dir: Path, cache_duration: timedelta): """Get Unifi data from cache or fetch fresh""" # Try cache first data = get_cached_data(cache_dir, cache_duration) if data: return data # Fetch fresh data logger.info("Fetching fresh Unifi data...") return await fetch_unifi_data(exporter_path, unifi_host, unifi_api_key, cache_dir) @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """List available Unifi network tools""" return [ types.Tool( name="get_network_devices", description="Get all Unifi network devices (switches, APs, gateways) with status and basic info. This is cached for better performance.", inputSchema={"type": "object", "properties": {}, "required": []}, title="Get Unifi Network Devices", annotations=types.ToolAnnotations( readOnlyHint=True, destructiveHint=False, idempotentHint=True, openWorldHint=True, ) ), types.Tool( name="get_network_clients", description="Get all active network clients and their connections. This is cached for better performance.", inputSchema={"type": "object", "properties": {}, "required": []}, title="Get Unifi Network Clients", annotations=types.ToolAnnotations( readOnlyHint=True, destructiveHint=False, idempotentHint=True, openWorldHint=True, ) ), types.Tool( name="get_network_summary", description="Get network overview: VLANs, device count, client count. Fast summary view.", inputSchema={"type": "object", "properties": {}, "required": []}, title="Get Unifi Network Summary", annotations=types.ToolAnnotations( readOnlyHint=True, destructiveHint=False, idempotentHint=True, openWorldHint=True, ) ), types.Tool( name="refresh_network_data", description="Force refresh network data from Unifi controller (bypasses cache).", inputSchema={"type": "object", "properties": {}, "required": []}, title="Refresh Unifi Network Data", annotations=types.ToolAnnotations( readOnlyHint=True, destructiveHint=False, idempotentHint=False, openWorldHint=True, ) ), ] async def handle_call_tool_impl( name: str, arguments: dict | None, exporter_path: Path, unifi_host: str, unifi_api_key: str, cache_dir: Path, cache_duration: timedelta ) -> list[types.TextContent]: """Core tool execution logic that can be called by both class and module-level handlers""" try: if name == "get_network_devices": data = await get_unifi_data(exporter_path, unifi_host, unifi_api_key, cache_dir, cache_duration) return format_network_devices(data) elif name == "get_network_clients": data = await get_unifi_data(exporter_path, unifi_host, unifi_api_key, cache_dir, cache_duration) return format_network_clients(data) elif name == "get_network_summary": data = await get_unifi_data(exporter_path, unifi_host, unifi_api_key, cache_dir, cache_duration) return format_network_summary(data) elif name == "refresh_network_data": logger.info("Force refreshing network data...") data = await fetch_unifi_data(exporter_path, unifi_host, unifi_api_key, cache_dir) return [ types.TextContent( type="text", text=f"✓ Network data refreshed successfully\n\nDevices: {len(data.get('devices', []))}\nClients: {len(data.get('clients', []))}\nNetworks: {len(data.get('networks', []))}", ) ] else: return [types.TextContent(type="text", text=f"Unknown tool: {name}")] except Exception as e: # Check if error is already formatted by our error handler error_text = str(e) if "✗ Unifi" in error_text or "→" in error_text: # Already formatted, return as-is log_error_with_context(logger, f"Unifi tool error in {name}", error=e, context={"tool": name}) return [types.TextContent(type="text", text=error_text)] else: # Format generic error error_msg = MCPErrorClassifier.format_error_message( service_name="Unifi", error_type="Tool Execution Error", message=f"Failed to execute tool '{name}'", remediation="Check the logs for detailed error information. Ensure Unifi controller is configured correctly.", details=str(e) ) log_error_with_context(logger, f"Error in tool {name}", error=e, context={"tool": name, "arguments": arguments}) return [types.TextContent(type="text", text=error_msg)] @server.call_tool() async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent]: """Handle tool calls (module-level wrapper for standalone mode)""" # For standalone mode, use the global variables return await handle_call_tool_impl( name, arguments, UNIFI_EXPORTER_PATH, UNIFI_HOST, UNIFI_API_KEY, CACHE_DIR, CACHE_DURATION ) def format_network_devices(data: dict) -> list[types.TextContent]: """Format network devices output""" devices = data.get("devices", []) output = "=== NETWORK DEVICES ===\n\n" output += f"Total: {len(devices)} devices\n\n" # Group by type by_type = {} for device in devices: device_type = device.get("type", "unknown") if device_type not in by_type: by_type[device_type] = [] by_type[device_type].append(device) type_names = { "ugw": "Gateways", "usw": "Switches", "uap": "Access Points", "unknown": "Other", } for device_type, type_devices in sorted(by_type.items()): output += f"\n{type_names.get(device_type, device_type.upper())} ({len(type_devices)}):\n" for device in type_devices: name = device.get("name", "Unknown") model = device.get("model", "N/A") ip = device.get("ip", "N/A") state = device.get("state", 0) status = "✓ Online" if state == 1 else "✗ Offline" version = device.get("version", "N/A") output += f" • {name} ({model})\n" output += f" IP: {ip} | Status: {status} | Version: {version}\n" # Add client count for APs if device_type == "uap": num_sta = device.get("num_sta", 0) output += f" Connected clients: {num_sta}\n" # Add port info for switches if device_type == "usw": port_table = device.get("port_table", []) ports_up = sum(1 for p in port_table if p.get("up", False)) output += f" Ports: {ports_up}/{len(port_table)} up\n" return [types.TextContent(type="text", text=output)] def format_network_clients(data: dict) -> list[types.TextContent]: """Format network clients output""" clients = data.get("clients", []) networks = {n["_id"]: n for n in data.get("networks", [])} output = "=== NETWORK CLIENTS ===\n\n" output += f"Total: {len(clients)} active clients\n\n" # Group by VLAN/network by_network = {} for client in clients: network_id = client.get("network_id", "unknown") if network_id not in by_network: by_network[network_id] = [] by_network[network_id].append(client) for network_id, network_clients in sorted( by_network.items(), key=lambda x: len(x[1]), reverse=True ): network_name = networks.get(network_id, {}).get("name", "Unknown") vlan = networks.get(network_id, {}).get("vlan", "N/A") output += f"\n{network_name} (VLAN {vlan}) - {len(network_clients)} clients:\n" # Show first 10 clients per network for client in network_clients[:10]: hostname = client.get("hostname", client.get("name", "Unknown")) ip = client.get("ip", "N/A") mac = client.get("mac", "N/A") is_wired = client.get("is_wired", False) conn_type = "Wired" if is_wired else "Wireless" output += f" • {hostname} ({ip})\n" output += f" MAC: {mac} | {conn_type}\n" if len(network_clients) > 10: output += f" ... and {len(network_clients) - 10} more\n" return [types.TextContent(type="text", text=output)] def format_network_summary(data: dict) -> list[types.TextContent]: """Format network summary output""" networks = data.get("networks", []) devices = data.get("devices", []) clients = data.get("clients", []) output = "=== NETWORK SUMMARY ===\n\n" # Overall stats output += f"Networks/VLANs: {len(networks)}\n" output += f"Network Devices: {len(devices)}\n" output += f"Active Clients: {len(clients)}\n\n" # Device breakdown online_devices = sum(1 for d in devices if d.get("state") == 1) output += f"DEVICES:\n" output += f" Online: {online_devices}/{len(devices)}\n" # Count by type device_types = {} for d in devices: dtype = d.get("type", "unknown") device_types[dtype] = device_types.get(dtype, 0) + 1 type_names = {"ugw": "Gateways", "usw": "Switches", "uap": "Access Points"} for dtype, count in device_types.items(): output += f" {type_names.get(dtype, dtype)}: {count}\n" # Client breakdown wired = sum(1 for c in clients if c.get("is_wired", False)) output += f"\nCLIENTS:\n" output += f" Wired: {wired}\n" output += f" Wireless: {len(clients) - wired}\n" # Top networks by client count by_network = {} for client in clients: network_id = client.get("network_id", "unknown") by_network[network_id] = by_network.get(network_id, 0) + 1 output += f"\nTOP NETWORKS:\n" networks_dict = {n["_id"]: n for n in networks} for network_id, count in sorted( by_network.items(), key=lambda x: x[1], reverse=True )[:5]: name = networks_dict.get(network_id, {}).get("name", "Unknown") vlan = networks_dict.get(network_id, {}).get("vlan", "N/A") output += f" • {name} (VLAN {vlan}): {count} clients\n" return [types.TextContent(type="text", text=output)] async def main(): async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="unifi-network", server_version="2.0.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bjeans/homelab-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server