Skip to main content
Glama

WHOIS MCP Server

by dadepo
expand_as_set.py9.2 kB
import logging from typing import Annotated, Any import aiohttp from mcp.server.fastmcp import Context, FastMCP from mcp.server.session import ServerSession from pydantic import Field from whois_mcp.cache import TTLCache from whois_mcp.config import HTTP_TIMEOUT_SECONDS, RIPE_REST_BASE, USER_AGENT __all__ = ["register"] # Configure logging logger = logging.getLogger(__name__) # Initialize cache with 5-minute TTL for AS-SET results _as_set_cache: TTLCache[str, Any] = TTLCache(max_items=1000, ttl_seconds=300.0) # Tool metadata constants TOOL_NAME = "ripe_expand_as_set" TOOL_DESCRIPTION = ( "Efficiently expand AS-SET objects from the RIPE NCC database into concrete ASNs with configurable depth. " "This tool is specifically for the RIPE RIR (Europe/Middle East/Central Asia region). " "Use this instead of whois_query when you need ASNs from a RIPE AS-SET. " "CRITICAL: For 'top-level', 'direct', or 'immediate' members, use max_depth=1. " "For complete expansion, use max_depth=10+. Large AS-SETs like 'AS-RETN' have hundreds " "of nested AS-SETs - choose depth carefully to balance completeness vs speed. " "Automatically handles recursive expansion, deduplication, and cycle detection. " "Perfect for network analysis, route filtering, and policy generation for RIPE-managed AS-SETs." ) SETNAME_DESCRIPTION = ( "AS-SET name to recursively expand into concrete ASN numbers from RIPE database. " "Examples: 'AS-CLOUDFLARE', 'AS-GOOGLE', 'AS-RETN'. " "The tool will automatically resolve all nested AS-SETs and return a complete " "list of individual ASNs contained within the hierarchy from RIPE NCC records." ) MAX_DEPTH_DESCRIPTION = ( "Maximum recursion depth for AS-SET expansion (1-20 levels, default: 10). " "IMPORTANT: Use depth=1 for 'top-level' or 'direct' members only. " "Use depth=2-3 for shallow analysis, depth=10 for complete expansion. " "Higher values provide more complete results but take much longer to process. " "For questions about 'immediate' or 'direct' members, always use depth=1." ) async def _get_json(url: str) -> dict[str, Any]: """Fetch JSON data from the specified URL.""" try: async with aiohttp.ClientSession( headers={"User-Agent": USER_AGENT}, timeout=aiohttp.ClientTimeout(total=HTTP_TIMEOUT_SECONDS), ) as session: async with session.get(url) as response: response.raise_for_status() logger.debug(f"Successfully fetched JSON from {url}") return await response.json() except aiohttp.ClientResponseError as e: if e.status == 404: # Return empty structure for 404s (AS-SET not found) logger.debug(f"AS-SET not found (404): {url}") return {"objects": {"object": []}} else: logger.error(f"HTTP error fetching JSON from {url}: {str(e)}") raise except aiohttp.ClientError as e: logger.error(f"Failed to fetch JSON from {url}: {str(e)}") raise def _attrs(obj: dict[str, Any], name: str) -> list[str]: """Extract attribute values by name from an object.""" return [ attr.get("value", "").strip() for attr in obj.get("attributes", {}).get("attribute", []) if attr.get("name") == name and attr.get("value", "").strip() ] async def _expand_as_set_recursive( setname: str, seen: set[str], out_asns: set[int], depth: int = 0, max_depth: int = 10, ) -> None: """Recursively expand an AS-SET into concrete ASNs (cycle-safe, depth-limited).""" if depth >= max_depth: logger.warning( f"Maximum recursion depth ({max_depth}) reached for AS-SET: {setname}" ) return if setname in seen: logger.debug(f"Cycle detected for AS-SET: {setname}") return seen.add(setname) cache_key = f"as_set:{setname}" cached_result = _as_set_cache.get(cache_key) if cached_result: logger.debug(f"Cache hit for AS-SET: {setname}") out_asns.update(cached_result) return try: url = f"{RIPE_REST_BASE}/ripe/as-set/{setname}.json" data = await _get_json(url) objects = data.get("objects", {}).get("object", []) if not objects: logger.warning(f"No objects found for AS-SET: {setname}") return obj = objects[0] asns: set[int] = set() for member in _attrs(obj, "members"): if member.upper().startswith("AS") and member[2:].isdigit(): asns.add(int(member[2:])) elif member.upper().startswith("AS-"): try: await _expand_as_set_recursive( member, seen, asns, depth + 1, max_depth ) except Exception as e: logger.warning(f"Failed to expand nested AS-SET {member}: {str(e)}") # Continue with other members even if one fails _as_set_cache.set(cache_key, asns) out_asns.update(asns) logger.info(f"Successfully expanded AS-SET {setname} with {len(asns)} ASNs") except Exception as e: logger.error(f"Failed to expand AS-SET {setname}: {str(e)}") raise async def _expand_as_set_request( setname: Annotated[str, Field(description=SETNAME_DESCRIPTION)], max_depth: Annotated[ int, Field(description=MAX_DEPTH_DESCRIPTION, ge=1, le=20), ] = 10, *, ctx: Context[ServerSession, None], ) -> dict[str, Any]: """ Expand an AS-SET into all concrete ASNs and return the result in a structured format. This tool recursively resolves AS-SET hierarchies with configurable depth limits, automatically handling cycles and deduplication. Much more efficient than multiple whois_query calls when you need complete AS-SET membership information. Args: setname: The AS-SET name to expand (e.g., 'AS-RETN') max_depth: Maximum recursion depth (1-20, default: 10). Use lower values (3-5) for quick analysis, higher values (15-20) for comprehensive expansion. Most AS-SETs are fully resolved within 10 levels. Returns: {"ok": true, "data": {"as_set": "...", "asns": [1234, 5678, ...], "count": N}} Performance Notes: - Depth 1: FAST - Only direct/immediate members (use for "top-level" questions) - Depth 2-3: Quick overview with one level of nesting - Depth 6-10: Balanced, handles most real-world AS-SETs completely - Depth 11-20: Comprehensive, may be very slow for large AS-SETs like AS-RETN """ # Log the incoming request await ctx.info( f"Starting AS-SET expansion for '{setname}' with max_depth={max_depth}" ) # Check cache first cache_key = f"as_set:{setname}" cached_result = _as_set_cache.get(cache_key) if cached_result is not None: logger.info(f"AS-SET expansion for '{setname}' served from cache") await ctx.info( f"AS-SET expansion for '{setname}' served from cache ({len(cached_result)} ASNs)" ) return { "ok": True, "data": { "as_set": setname, "asns": sorted(cached_result), "count": len(cached_result), }, } try: seen: set[str] = set() asns: set[int] = set() await _expand_as_set_recursive(setname, seen, asns, 0, max_depth) # Check if the AS-SET was found (if seen contains only the original setname, it wasn't found) if len(asns) == 0 and len(seen) <= 1: logger.info(f"AS-SET '{setname}' not found or contains no ASNs") await ctx.info( f"AS-SET expansion completed: not-found ('{setname}' does not exist or contains no ASNs)" ) result: dict[str, Any] = { "ok": True, "data": { "as_set": setname, "asns": [], "count": 0, "status": "not-found", }, } else: await ctx.info( f"AS-SET expansion completed: expanded (found {len(asns)} ASNs from '{setname}' at depth {max_depth})" ) result = { "ok": True, "data": { "as_set": setname, "asns": sorted(asns), "count": len(asns), "status": "expanded", }, } logger.info(f"AS-SET expansion for '{setname}' completed with {len(asns)} ASNs") return result except Exception as e: error_msg = f"AS-SET expansion for '{setname}' failed: {str(e)}" logger.error(error_msg) await ctx.error(f"AS-SET expansion failed: {error_msg}") return {"ok": False, "error": "expansion_error", "detail": str(e)} def register(mcp: FastMCP) -> None: """Register the expand_as_set tool with the MCP server.""" mcp.tool( name=TOOL_NAME, description=TOOL_DESCRIPTION, )(_expand_as_set_request)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dadepo/whois-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server