import asyncio
import inspect
import json
import logging
import re
import time
from typing import Dict, List, Optional, Tuple
from mcp.server.fastmcp import FastMCP
from meraki_mcp.services.meraki_client import MerakiClient
from meraki_mcp.settings import ApiSettings
logger = logging.getLogger(__name__)
class MerakiApiTools:
"""Dynamic tool class that can discover and execute any Meraki API endpoint"""
def __init__(self, mcp: FastMCP, meraki_client: MerakiClient, enabled: bool, settings: ApiSettings | None = None):
self.mcp = mcp
self.meraki_client = meraki_client
self.settings = settings or ApiSettings()
self._api_cache: Dict[str, List[str]] = {}
self._device_cache: Dict[str, Dict] = {}
self._response_cache: Dict[str, Dict] = {}
self._cache_ttl = int(self.settings.CACHE_TTL_SECONDS)
self._search_patterns: List[Dict] = []
self._patterns_initialized = False
self.enabled = enabled
if self.enabled:
self._register_tools()
else:
logger.info("MerakiApiTools not registered (MERAKI_API_KEY not set)")
# ----- Safety and policy helpers -----
def _is_mutating(self, method: str) -> bool:
m = method.lower()
return m.startswith("create") or m.startswith("update") or m.startswith("delete")
def _is_allowed(self, section: str, method: str) -> tuple[bool, str | None]:
# deny lists take precedence
if section in set(self.settings.DENY_SECTIONS):
return False, f"Section '{section}' is denied by policy"
if method in set(self.settings.DENY_METHODS) or f"{section}.{method}" in set(self.settings.DENY_METHODS):
return False, f"Method '{section}.{method}' is denied by policy"
# allow lists (if present) restrict surface
has_allow_sections = bool(self.settings.ALLOW_SECTIONS)
has_allow_methods = bool(self.settings.ALLOW_METHODS)
if has_allow_sections and section not in set(self.settings.ALLOW_SECTIONS):
return False, f"Section '{section}' not in ALLOW_SECTIONS"
if has_allow_methods and not (method in set(self.settings.ALLOW_METHODS) or f"{section}.{method}" in set(self.settings.ALLOW_METHODS)):
return False, f"Method '{section}.{method}' not in ALLOW_METHODS"
return True, None
def _apply_rate_guards(self, params: Dict) -> Dict:
capped = dict(params)
for key in ("perPage", "per_page"):
if key in capped:
try:
capped[key] = min(int(capped[key]), int(self.settings.MAX_PER_PAGE))
except Exception:
pass
if "timespan" in capped:
try:
capped["timespan"] = min(int(capped["timespan"]), int(self.settings.MAX_TIMESPAN))
except Exception:
pass
return capped
def _redact(self, data):
keys = set(k.lower() for k in self.settings.REDACT_KEYS)
def _walk(obj):
if isinstance(obj, dict):
out = {}
for k, v in obj.items():
if k.lower() in keys and isinstance(v, (str, int, float)):
out[k] = "***REDACTED***"
else:
out[k] = _walk(v)
return out
elif isinstance(obj, list):
return [_walk(x) for x in obj]
else:
return obj
return _walk(data)
def _generate_keywords_from_method(self, section: str, method: str) -> List[str]:
"""Generate semantic keywords from section and method names"""
keywords = []
keywords.append(section.lower())
if section == "organizations":
keywords.extend(["org", "orgs", "organization"])
elif section == "appliance":
keywords.extend(["mx", "security", "firewall"])
elif section == "switch":
keywords.extend(["ms", "switching", "port", "ports"])
elif section == "wireless":
keywords.extend([
"mr",
"wifi",
"wireless",
"access",
# Newer wireless domains
"zigbee",
"iot",
"door",
"lock",
])
elif section == "camera":
keywords.extend(["mv", "cameras", "video"])
elif section == "sensor":
keywords.extend([
"mt",
"sensors",
"environmental",
"gateway",
"gateways",
])
elif section == "cellularGateway":
keywords.extend(["mg", "cellular", "gateway", "cellular-gateway"])
elif section == "sm":
keywords.extend(["sm", "systems", "manager", "mdm", "endpoint"])
elif section == "insight":
keywords.extend(["insight", "wan", "health", "application", "app"])
elif section == "webhooks":
keywords.extend(["webhook", "webhooks", "endpoints", "templates", "logs"])
elif section == "licensing":
keywords.extend(["licensing", "subscriptions", "entitlements", "license"])
elif section == "administered":
keywords.extend(["administered", "identity", "me", "api", "keys", "user"])
elif section == "spaces":
keywords.extend(["spaces", "cisco", "integration"])
elif section == "networks":
keywords.extend(["network", "net"])
elif section == "devices":
keywords.extend(["device", "hardware"])
method_parts = re.findall(r"[A-Z]?[a-z]+|[A-Z]+(?=[A-Z][a-z]|\b)", method)
method_words = [part.lower() for part in method_parts]
keywords.extend(method_words)
if "get" in method_words:
keywords.extend(["show", "list", "fetch", "retrieve"])
elif "update" in method_words:
keywords.extend(["modify", "change", "edit", "set"])
elif "create" in method_words:
keywords.extend(["add", "new", "make"])
elif "delete" in method_words:
keywords.extend(["remove", "destroy"])
if any(word in method_words for word in ["firewall", "rules"]):
keywords.extend(["security", "l3", "layer3", "policy"])
if any(word in method_words for word in ["client", "clients"]):
keywords.extend(["connected", "devices", "users"])
if any(word in method_words for word in ["port", "ports"]):
keywords.extend(["interface", "config", "configuration", "settings"])
if any(word in method_words for word in ["vpn"]):
keywords.extend(["tunnel", "connection", "site"])
if any(word in method_words for word in ["ssid"]):
keywords.extend(["network", "wifi", "wireless"])
return list(set(keywords))
def _calculate_method_weight(self, method: str) -> float:
"""Calculate priority weight for a method based on common usage patterns"""
method_lower = method.lower()
if method in ["getOrganizations", "getDevice", "getNetworkClients"]:
return 1.0
elif "organization" in method_lower and method.startswith("get"):
return 0.9
elif "network" in method_lower and method.startswith("get"):
return 0.8
elif method.startswith("get") and "device" in method_lower:
return 0.8
elif method.startswith("get"):
return 0.7
elif method.startswith("update"):
return 0.6
elif method.startswith("create"):
return 0.5
elif method.startswith("delete"):
return 0.4
else:
return 0.3
def _get_method_parameters(self, section: str, method: str) -> List[str]:
"""Get required parameters for a method using inspection"""
try:
dashboard = self.meraki_client.get_dashboard()
section_obj = getattr(dashboard, section)
method_obj = getattr(section_obj, method)
sig = inspect.signature(method_obj)
required_params = []
for param_name, param in sig.parameters.items():
if param.default == inspect.Parameter.empty and param_name != "kwargs":
required_params.append(param_name)
return required_params
except Exception:
return []
def _generate_dynamic_patterns(self) -> List[Dict]:
"""Generate semantic patterns for ALL available API endpoints"""
api_structure = self._discover_api_structure()
patterns = []
for section, methods in api_structure.items():
for method in methods:
keywords = self._generate_keywords_from_method(section, method)
weight = self._calculate_method_weight(method)
required_params = self._get_method_parameters(section, method)
method_parts = re.findall(
r"[A-Z]?[a-z]+|[A-Z]+(?=[A-Z][a-z]|\b)", method
)
description = " ".join(method_parts).lower()
pattern = {
"keywords": keywords,
"section": section,
"method": method,
"description": description,
"required_params": required_params,
"weight": weight,
}
patterns.append(pattern)
return patterns
def _initialize_search_patterns(self) -> List[Dict]:
"""Initialize semantic search patterns by generating them dynamically from API structure"""
return self._generate_dynamic_patterns()
def _calculate_semantic_score(self, query: str, pattern: Dict) -> float:
"""Calculate semantic similarity score between query and pattern"""
query_words = set(re.findall(r"\b\w+\b", query.lower()))
pattern_keywords = set(pattern["keywords"])
intersection = len(query_words.intersection(pattern_keywords))
union = len(query_words.union(pattern_keywords))
if union == 0:
return 0.0
jaccard_score = intersection / union
weighted_score = jaccard_score * pattern["weight"]
exact_matches = sum(1 for word in query_words if word in pattern_keywords)
exact_bonus = min(exact_matches * 0.1, 0.3)
return min(weighted_score + exact_bonus, 1.0)
def _ensure_patterns_initialized(self):
"""Ensure search patterns are initialized (lazy loading)"""
if not self._patterns_initialized:
logger.info(
"Initializing semantic search patterns for all API endpoints..."
)
self._search_patterns = self._generate_dynamic_patterns()
self._patterns_initialized = True
logger.info(f"Initialized {len(self._search_patterns)} semantic patterns")
def _find_best_pattern_match(self, query: str) -> Optional[Dict]:
"""Find the best matching pattern for the given query"""
self._ensure_patterns_initialized()
best_score = 0.0
best_pattern = None
for pattern in self._search_patterns:
score = self._calculate_semantic_score(query, pattern)
if score > best_score and score > 0.3:
best_score = score
best_pattern = pattern
return best_pattern
def _register_tools(self):
"""Register the dynamic tools with the MCP server"""
self.mcp.tool()(self.search_meraki_api_endpoints)
self.mcp.tool()(self.execute_meraki_api_endpoint)
self.mcp.tool()(self.get_meraki_endpoint_parameters)
def _discover_api_structure(self) -> Dict[str, List[str]]:
"""Discover all available API sections and their methods"""
if self._api_cache:
return self._api_cache
try:
dashboard = self.meraki_client.get_dashboard()
api_structure = {}
# Get all API sections
sections = [
attr
for attr in dir(dashboard)
if not attr.startswith("_")
and hasattr(getattr(dashboard, attr), "__class__")
and "api" in str(type(getattr(dashboard, attr))).lower()
]
for section in sections:
section_obj = getattr(dashboard, section)
methods = [
method
for method in dir(section_obj)
if not method.startswith("_")
and callable(getattr(section_obj, method))
]
api_structure[section] = methods
self._api_cache = api_structure
return api_structure
except Exception as e:
logger.error(f"Failed to discover API structure: {e}")
return {}
def _get_cache_key(self, section: str, method: str, **params) -> str:
"""Generate a cache key for API responses"""
sorted_params = sorted(params.items())
return f"{section}.{method}:{hash(str(sorted_params))}"
def _is_cache_valid(self, cache_entry: Dict) -> bool:
"""Check if cache entry is still valid"""
return time.time() - cache_entry.get("timestamp", 0) < self._cache_ttl
async def search_meraki_api_endpoints(self, query: str) -> str:
"""
Search and discover Meraki API endpoints using semantic similarity and natural language.
This tool uses intelligent pattern matching and semantic scoring to find the most relevant
API endpoints. It analyzes query intent and matches against known patterns for instant results.
SEMANTIC MATCHING:
- Organizations: "get organizations", "list orgs", "show my organizations"
- Device Status: "device info", "device details", "check device status"
- Port Config: "device port config", "switch port", "port configuration"
- Firewall Rules: "firewall rules", "security rules", "l3 firewall"
- Network Clients: "network clients", "connected devices", "client list"
Args:
query (str): Natural language search term. Examples:
- "get my organizations" → organizations.getOrganizations
- "device Q123 port 4 config" → switch.getDeviceSwitchPort
- "network clients" → networks.getNetworkClients
- "firewall rules" → appliance.getNetworkApplianceFirewallL3FirewallRules
Returns:
JSON string containing:
- query: The search term used
- direct_match: Best semantic match with confidence score
- matches: Fallback section matches if no direct match
- usage: Instructions for next steps
"""
# Try semantic pattern matching first
best_pattern = self._find_best_pattern_match(query)
direct_match = None
if best_pattern:
direct_match = {
"section": best_pattern["section"],
"method": best_pattern["method"],
"description": best_pattern["description"],
"required_params": best_pattern["required_params"],
"confidence": self._calculate_semantic_score(query, best_pattern),
}
matches = {}
# Fallback to traditional search if no semantic match found
if not direct_match:
api_structure = self._discover_api_structure()
query_lower = query.lower()
for section, methods in api_structure.items():
section_matches = []
# Section name matching
if any(word in section.lower() for word in query_lower.split()):
section_matches.extend(methods[:8])
# Method name matching
for method in methods:
if any(word in method.lower() for word in query_lower.split()):
if method not in section_matches:
section_matches.append(method)
if section_matches:
matches[section] = section_matches[:8]
result = {
"query": query,
"direct_match": direct_match,
"matches": matches,
"usage": "Use execute_api_endpoint with section='<section>' and method='<method>' to call an endpoint",
}
return json.dumps(result, indent=2)
async def get_meraki_endpoint_parameters(self, section: str, method: str) -> str:
"""
Discover required and optional parameters for any Meraki API endpoint.
This tool provides complete parameter documentation for API methods, including
data types, required vs optional parameters, and default values. Use this after
finding endpoints with search_api_endpoints and before calling execute_api_endpoint.
WHEN TO USE:
- Before making API calls to understand required parameters
- To validate you have all necessary data before execution
- To understand optional parameters for enhanced functionality
- To check parameter data types for proper formatting
Args:
section (str): API section name from search results. Must be exact match. Examples:
- "organizations" - for organization management endpoints
- "devices" - for device-specific operations
- "networks" - for network management
- "appliance" - for MX security appliance features
- "wireless" - for wireless access point management
- "switch" - for MS switch management
- "camera" - for MV camera management
- "sensor" - for MT sensor management
method (str): Exact method name from search results. Examples:
- "getOrganizations" - list all organizations
- "getDevice" - get device details (requires serial parameter)
- "getNetworkClients" - list network clients (requires networkId)
- "updateNetworkSettings" - modify network settings
- "getOrganizationDevices" - list organization devices
Returns:
JSON string containing complete parameter documentation:
{
"section": "devices",
"method": "getDevice",
"parameters": {
"serial": {
"required": true,
"type": "<class 'str'>",
"description": "Device serial number"
},
"optional_param": {
"required": false,
"type": "<class 'str'>",
"default": "default_value"
}
},
"usage_example": "execute_api_endpoint(section='devices', method='getDevice', serial='Q2XX-XXXX-XXXX')"
}
PARAMETER TYPES:
- str: Text strings (device serials, network IDs, names)
- int: Numbers (timespan, per_page limits)
- bool: True/False values (enabled/disabled settings)
- list: Arrays of values (multiple device serials, IP ranges)
- dict: JSON objects (configuration settings, rule definitions)
ERROR HANDLING:
If endpoint not found, returns suggestions to use search_api_endpoints first.
WORKFLOW TIP:
1. search_api_endpoints("your query") → find available methods
2. get_endpoint_parameters(section, method) → understand requirements
3. execute_api_endpoint(section, method, param1=value1, ...) → make the call
"""
try:
dashboard = self.meraki_client.get_dashboard()
section_obj = getattr(dashboard, section)
method_obj = getattr(section_obj, method)
sig = inspect.signature(method_obj)
parameters = {}
for param_name, param in sig.parameters.items():
param_info = {
"required": param.default == inspect.Parameter.empty,
"type": str(param.annotation)
if param.annotation != inspect.Parameter.empty
else "unknown",
}
if param.default != inspect.Parameter.empty:
param_info["default"] = param.default
parameters[param_name] = param_info
result = {
"section": section,
"method": method,
"parameters": parameters,
"usage_example": f"execute_api_endpoint(section='{section}', method='{method}', ...)",
}
return json.dumps(result, indent=2)
except AttributeError:
error_result = {
"error": f"Endpoint not found: {section}.{method}",
"suggestion": "Use search_api_endpoints to find available endpoints",
}
return json.dumps(error_result, indent=2)
except Exception as e:
logger.error(f"Failed to get endpoint parameters: {e}")
error_result = {"error": f"Failed to get parameters: {str(e)}"}
return json.dumps(error_result, indent=2)
async def execute_meraki_api_endpoint(
self,
section: str,
method: str,
serial: Optional[str] = None,
portId: Optional[str] = None,
networkId: Optional[str] = None,
organizationId: Optional[str] = None,
kwargs: str = "{}",
) -> str:
"""
Execute any Meraki Dashboard API endpoint with dynamic parameter handling.
This is the primary execution tool that calls the actual Meraki API. It handles
authentication, rate limiting, error handling, and response formatting automatically.
COMMON DIRECT USAGE:
1. Get device port configuration:
execute_api_endpoint(section="switch", method="getDeviceSwitchPort",
serial="Q2XX-XXXX-XXXX", portId="4")
2. Get device status:
execute_api_endpoint(section="devices", method="getDevice",
serial="Q2XX-XXXX-XXXX")
3. Get network clients with additional parameters:
execute_api_endpoint(section="networks", method="getNetworkClients",
networkId="N_12345", kwargs='{"timespan": 3600, "perPage": 50}')
Args:
section (str): API section name (e.g., "switch", "devices", "networks")
method (str): API method name (e.g., "getDeviceSwitchPort", "getDevice")
serial (str, optional): Device serial number
portId (str, optional): Port identifier (e.g., "4", "1", "2")
networkId (str, optional): Network identifier
organizationId (str, optional): Organization identifier
kwargs (str): JSON string containing any additional parameters
Examples: '{"timespan": 3600}', '{"perPage": 50, "startingAfter": "2023-01-01"}'
Returns:
JSON string containing API response or error details
"""
try:
cache_key = self._get_cache_key(
section,
method,
serial=serial,
portId=portId,
networkId=networkId,
organizationId=organizationId,
kwargs=kwargs,
)
if (not self.settings.DISABLE_RESPONSE_CACHE) and method.startswith("get") and cache_key in self._response_cache:
cache_entry = self._response_cache[cache_key]
if self._is_cache_valid(cache_entry):
logger.info(f"Cache hit for {section}.{method}")
return cache_entry["response"]
# Policy checks (allow/deny + mutation guards)
ok, reason = self._is_allowed(section, method)
if not ok:
return json.dumps({
"error": "execution blocked",
"reason": reason,
"section": section,
"method": method,
}, indent=2)
def _call_api():
dashboard = self.meraki_client.get_dashboard()
section_obj = getattr(dashboard, section)
method_obj = getattr(section_obj, method)
all_params = {
"serial": serial,
"portId": portId,
"networkId": networkId,
"organizationId": organizationId,
}
try:
if kwargs and kwargs.strip():
extra_params = json.loads(kwargs)
if isinstance(extra_params, dict):
all_params.update(extra_params)
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"Invalid additional_params JSON: {e}")
filtered_params = {
k: v for k, v in all_params.items() if v is not None and v != ""
}
# Mutation guard
if self._is_mutating(method):
if not self.settings.ALLOW_MUTATIONS:
raise PermissionError("Mutations are disabled by policy (ALLOW_MUTATIONS=false)")
if self.settings.REQUIRE_CONFIRM_FOR_MUTATIONS and not bool(filtered_params.get("confirm")):
raise PermissionError("Confirm flag required for mutations (provide 'confirm': true)")
# Guard parameter sizes
filtered_params = self._apply_rate_guards(filtered_params)
sig = inspect.signature(method_obj)
missing_params = []
for param_name, param in sig.parameters.items():
if (
param.default == inspect.Parameter.empty
and param_name != "kwargs"
):
if param_name not in filtered_params:
missing_params.append(param_name)
if missing_params:
raise ValueError(
f"Missing required parameters: {', '.join(missing_params)}"
)
return method_obj(**filtered_params)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, _call_api)
safe_result = self._redact(result)
response_json = json.dumps(safe_result, indent=2, default=str)
if (not self.settings.DISABLE_RESPONSE_CACHE) and method.startswith("get"):
self._response_cache[cache_key] = {
"response": response_json,
"timestamp": time.time(),
}
return response_json
except ValueError as ve:
error_result = {
"error": str(ve),
"section": section,
"method": method,
"provided_params": [
k
for k, v in {
"serial": serial,
"portId": portId,
"networkId": networkId,
"organizationId": organizationId,
}.items()
if v is not None and v != ""
],
"additional_params_provided": kwargs,
"suggestion": "Use get_meraki_endpoint_parameters to see all required parameters",
}
return json.dumps(error_result, indent=2)
except AttributeError:
api_structure = self._discover_api_structure()
available_sections = list(api_structure.keys())
if section not in available_sections:
error_result = {
"error": f"Section '{section}' not found",
"available_sections": available_sections[:10],
"suggestion": "Use search_meraki_api_endpoints to find the correct section",
}
else:
available_methods = api_structure.get(section, [])
error_result = {
"error": f"Method '{method}' not found in section '{section}'",
"available_methods": available_methods[:20],
"suggestion": "Use search_meraki_api_endpoints to find the correct method",
}
return json.dumps(error_result, indent=2)
except Exception as e:
logger.error(f"API call failed: {e}")
error_result = {
"error": f"API call failed: {str(e)}",
"section": section,
"method": method,
"provided_params": [
k
for k, v in {
"serial": serial,
"portId": portId,
"networkId": networkId,
"organizationId": organizationId,
}.items()
if v is not None and v != ""
],
}
if kwargs:
error_result["additional_params"] = kwargs
return json.dumps(error_result, indent=2)