Skip to main content
Glama

WHOIS MCP Server

by dadepo
validate_route_object.py8.07 kB
import ipaddress import logging from typing import Annotated, Any import httpx from mcp.server.fastmcp import Context, FastMCP from mcp.server.session import ServerSession from pydantic import Field from whois_mcp.cache import TTLCache from whois_mcp.config import HTTP_TIMEOUT_SECONDS, RIPE_REST_BASE, USER_AGENT __all__ = ["register"] # Configure logging logger = logging.getLogger(__name__) # Initialize cache with 5-minute TTL for route validation results _route_cache: TTLCache[str, Any] = TTLCache(max_items=1000, ttl_seconds=300.0) # Tool metadata constants TOOL_NAME = "ripe_validate_route_object" TOOL_DESCRIPTION = ( "PREFERRED TOOL for validating route object registration in the RIPE NCC database. " "This tool is specifically for the RIPE RIR (Europe/Middle East/Central Asia region). " "Use this when you need to CHECK, VERIFY, or VALIDATE if a route object exists for a prefix-ASN pair in RIPE. " "Keywords: 'route validation', 'check route', 'verify route', 'route exists', " "'BGP security', 'route filtering', 'IRR coverage', 'RPKI validation'. " "Automatically handles IPv4/IPv6 detection and returns simple exists/not-found status. " "Much faster and more accurate than parsing raw WHOIS data for route validation in RIPE database." ) PREFIX_DESCRIPTION = ( "IP prefix to CHECK/VALIDATE for route object registration in RIPE database. Use CIDR notation like " "'192.0.2.0/24' for IPv4 or '2001:db8::/32' for IPv6. Use this when you need to " "VERIFY if a prefix has a registered route object in the RIPE IRR database." ) ORIGIN_ASN_DESCRIPTION = ( "Origin ASN number to VALIDATE/CHECK for route coverage (without 'AS' prefix). " "For example, use 64496 to VERIFY if AS64496 has a route object registered for the " "specified prefix. This VALIDATES proper BGP route registration and IRR coverage." ) async def _search_route(prefix: str) -> tuple[dict[str, Any], str]: """Search for route objects matching the given prefix.""" route_type = "route6" if ":" in prefix else "route" url = f"{RIPE_REST_BASE}/search.json?query-string={prefix}&type-filter={route_type}" async with httpx.AsyncClient( timeout=HTTP_TIMEOUT_SECONDS, headers={"User-Agent": USER_AGENT}, follow_redirects=True, ) as client: response = await client.get(url) response.raise_for_status() return response.json(), route_type async def _validate_route_object_request( prefix: Annotated[str, Field(description=PREFIX_DESCRIPTION)], origin_asn: Annotated[int, Field(description=ORIGIN_ASN_DESCRIPTION)], ctx: Context[ServerSession, None], ) -> dict[str, Any]: """ Validate IRR route/route6 coverage for a specific prefix-ASN combination. This tool is optimized for validation workflows where you need to verify if a specific route object exists for a known prefix-ASN pair. Much more efficient than whois_query when you just need to check coverage status rather than examine full route object details. Use Cases: - BGP route filtering validation ("Is 192.0.2.0/24 AS64496 properly registered?") - RPKI validation workflows - Network security audits - Automated route policy verification Returns: {"ok": true, "data": {"state": "exists|not-found", "matches": [...], "prefix": "...", "origin_asn": N}} """ # Log the incoming request await ctx.info( f"Starting route validation for prefix '{prefix}' origin AS{origin_asn}" ) # Create cache key from prefix and ASN cache_key = f"route:{prefix}|{origin_asn}" # Check cache first cached_result = _route_cache.get(cache_key) if cached_result is not None: logger.info(f"Route validation for '{prefix}' AS{origin_asn} served from cache") await ctx.info( f"Route validation for '{prefix}' AS{origin_asn} served from cache" ) return cached_result try: # Validate prefix format ipaddress.ip_network(prefix, strict=False) except Exception as e: error_msg = f"Invalid prefix format '{prefix}': {str(e)}" logger.error(error_msg) await ctx.error(f"Route validation failed: {error_msg}") return {"ok": False, "error": "invalid_prefix", "detail": str(e)} try: # Search for route objects search_data, route_type = await _search_route(prefix) matches: list[dict[str, str]] = [] objects = search_data.get("objects", {}).get("object", []) for obj in objects: attrs = obj.get("attributes", {}).get("attribute", []) # Extract route and origin from attributes route = next( (a["value"] for a in attrs if a.get("name") in ("route", "route6")), None, ) origin = next((a["value"] for a in attrs if a.get("name") == "origin"), "") if not route: continue # Parse origin ASN try: origin_num = int(origin.upper().replace("AS", "")) except Exception: continue # Check if this route matches our target ASN if origin_num == origin_asn: matches.append( { "route": route, "origin": origin, "source": route_type, } ) result = { "ok": True, "data": { "state": "exists" if matches else "not-found", "matches": matches, "prefix": prefix, "origin_asn": origin_asn, }, } # Cache the result _route_cache.set(cache_key, result) logger.info( f"Route validation for '{prefix}' AS{origin_asn} completed: " f"{len(matches)} matches found" ) # Log successful completion via MCP context state = result["data"]["state"] match_count = len(matches) await ctx.info( f"Route validation completed: {state} ({match_count} matches found for '{prefix}' AS{origin_asn})" ) return result except httpx.HTTPStatusError as e: # Handle 404 as "not found" rather than an error if e.response.status_code == 404: logger.info(f"No route objects found for '{prefix}' (404 response)") await ctx.info( f"Route validation completed: not-found (no route objects exist for '{prefix}')" ) result: dict[str, Any] = { "ok": True, "data": { "state": "not-found", "matches": [], "prefix": prefix, "origin_asn": origin_asn, }, } # Cache the not-found result _route_cache.set(cache_key, result) return result else: error_msg = f"HTTP error during route search for '{prefix}': {str(e)}" logger.error(error_msg) await ctx.error(f"Route validation failed: {error_msg}") return {"ok": False, "error": "http_error", "detail": str(e)} except httpx.HTTPError as e: error_msg = f"HTTP error during route search for '{prefix}': {str(e)}" logger.error(error_msg) await ctx.error(f"Route validation failed: {error_msg}") return {"ok": False, "error": "http_error", "detail": str(e)} except Exception as e: error_msg = f"Route validation for '{prefix}' AS{origin_asn} failed: {str(e)}" logger.error(error_msg) await ctx.error(f"Route validation failed: {error_msg}") return {"ok": False, "error": "validation_error", "detail": str(e)} def register(mcp: FastMCP) -> None: """Register the validate_route_object tool with the MCP server.""" mcp.tool( name=TOOL_NAME, description=TOOL_DESCRIPTION, )(_validate_route_object_request)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dadepo/whois-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server