Skip to main content
Glama

WHOIS MCP Server

by dadepo
expand_as_set.py10.7 kB
import logging from typing import Annotated, Any import httpx from mcp.server.fastmcp import Context, FastMCP from mcp.server.session import ServerSession from pydantic import Field from whois_mcp.cache import TTLCache from whois_mcp.config import ( ARIN_REST_BASE, HTTP_TIMEOUT_SECONDS, SUPPORT_ARIN, USER_AGENT, ) __all__ = ["register"] # Configure logging logger = logging.getLogger(__name__) # Initialize cache with 5-minute TTL for AS-SET results _as_set_cache: TTLCache[str, Any] = TTLCache(max_items=1000, ttl_seconds=300.0) # Tool metadata constants TOOL_NAME = "arin_expand_as_set" TOOL_DESCRIPTION = ( "Efficiently expand AS-SET objects from the ARIN IRR database into concrete ASNs with configurable depth. " "This tool is specifically for the ARIN RIR (North America region - United States, Canada, parts of Caribbean). " "Use this instead of whois_query when you need ASNs from an ARIN AS-SET. " "CRITICAL: For 'top-level', 'direct', or 'immediate' members, use max_depth=1. " "For complete expansion, use max_depth=10+. Large AS-SETs may have nested structures - " "choose depth carefully to balance completeness vs speed. " "Automatically handles recursive expansion, deduplication, and cycle detection. " "Perfect for network analysis, route filtering, and policy generation for ARIN-managed AS-SETs." ) SETNAME_DESCRIPTION = ( "AS-SET name to recursively expand into concrete ASN numbers from ARIN IRR database. " "Examples: 'AS-COMCAST', 'AS-VERIZON', 'AS-ATT'. " "The tool will automatically resolve all nested AS-SETs and return a complete " "list of individual ASNs contained within the hierarchy from ARIN IRR records." ) MAX_DEPTH_DESCRIPTION = ( "Maximum recursion depth for AS-SET expansion (1-20 levels, default: 10). " "IMPORTANT: Use depth=1 for 'top-level' or 'direct' members only. " "Use depth=2-3 for shallow analysis, depth=10 for complete expansion. " "Higher values provide more complete results but take much longer to process. " "For questions about 'immediate' or 'direct' members, always use depth=1." ) async def _get_as_set_data(setname: str) -> dict[str, Any]: """Fetch AS-SET data from ARIN's IRR database.""" # ARIN IRR AS-SET endpoint url = f"{ARIN_REST_BASE}/irr/as-set/{setname}" async with httpx.AsyncClient( timeout=HTTP_TIMEOUT_SECONDS, headers={"User-Agent": USER_AGENT, "Accept": "application/json"}, follow_redirects=True, ) as client: try: response = await client.get(url) # ARIN may return 404 for non-existent AS-SETs if response.status_code == 404: logger.debug(f"AS-SET not found (404): {setname}") return {"members": []} response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: if e.response.status_code == 404: logger.debug(f"AS-SET not found (404): {setname}") return {"members": []} else: logger.error(f"HTTP error fetching AS-SET {setname}: {str(e)}") raise except httpx.HTTPError as e: logger.error(f"Failed to fetch AS-SET {setname}: {str(e)}") raise def _extract_members(data: dict[str, Any]) -> list[str]: """Extract member list from ARIN AS-SET response.""" # ARIN's response format may vary, handle different possible structures members: list[str] = [] # Try different possible field names for members for field in ["members", "member", "as-set", "asSet"]: if field in data: member_data: str | list[str] = data[field] if isinstance(member_data, list): for item in member_data: if item: members.append(str(item).strip()) else: members.extend( [ m.strip() for m in member_data.replace(",", " ").split() if m.strip() ] ) break return members async def _expand_as_set_recursive( setname: str, seen: set[str], out_asns: set[int], depth: int = 0, max_depth: int = 10, ) -> None: """Recursively expand an AS-SET into concrete ASNs (cycle-safe, depth-limited).""" if depth >= max_depth: logger.warning( f"Maximum recursion depth ({max_depth}) reached for ARIN AS-SET: {setname}" ) return if setname in seen: logger.debug(f"Cycle detected for ARIN AS-SET: {setname}") return seen.add(setname) cache_key = f"arin:as_set:{setname}" cached_result = _as_set_cache.get(cache_key) if cached_result: logger.debug(f"Cache hit for ARIN AS-SET: {setname}") out_asns.update(cached_result) return try: data = await _get_as_set_data(setname) members = _extract_members(data) if not members: logger.warning(f"No members found for ARIN AS-SET: {setname}") return asns: set[int] = set() for member in members: member = member.strip().upper() if member.startswith("AS") and member[2:].isdigit(): # Direct ASN member asns.add(int(member[2:])) elif member.startswith("AS-"): # Nested AS-SET - recurse try: await _expand_as_set_recursive( member, seen, asns, depth + 1, max_depth ) except Exception as e: logger.warning( f"Failed to expand nested ARIN AS-SET {member}: {str(e)}" ) # Continue with other members even if one fails _as_set_cache.set(cache_key, asns) out_asns.update(asns) logger.info( f"Successfully expanded ARIN AS-SET {setname} with {len(asns)} ASNs" ) except Exception as e: logger.error(f"Failed to expand ARIN AS-SET {setname}: {str(e)}") raise async def _expand_as_set_request( setname: Annotated[str, Field(description=SETNAME_DESCRIPTION)], max_depth: Annotated[ int, Field(description=MAX_DEPTH_DESCRIPTION, ge=1, le=20), ] = 10, *, ctx: Context[ServerSession, None], ) -> dict[str, Any]: """ Expand an ARIN AS-SET into all concrete ASNs and return the result in a structured format. This tool recursively resolves AS-SET hierarchies with configurable depth limits, automatically handling cycles and deduplication. Much more efficient than multiple whois_query calls when you need complete AS-SET membership information from ARIN IRR. Args: setname: The AS-SET name to expand (e.g., 'AS-COMCAST') max_depth: Maximum recursion depth (1-20, default: 10). Use lower values (3-5) for quick analysis, higher values (15-20) for comprehensive expansion. Most AS-SETs are fully resolved within 10 levels. Returns: {"ok": true, "data": {"as_set": "...", "asns": [1234, 5678, ...], "count": N}} Performance Notes: - Depth 1: FAST - Only direct/immediate members (use for "top-level" questions) - Depth 2-3: Quick overview with one level of nesting - Depth 6-10: Balanced, handles most real-world AS-SETs completely - Depth 11-20: Comprehensive, may be very slow for large AS-SETs """ # Check if ARIN support is enabled if not SUPPORT_ARIN: error_msg = "AS-SET expansion is currently disabled (SUPPORT_ARIN=false)" logger.warning(error_msg) await ctx.error(error_msg) return { "ok": False, "error": "service_disabled", "detail": "ARIN AS-SET expansion support is disabled. Set SUPPORT_ARIN=true to enable.", } # Log the incoming request await ctx.info( f"Starting ARIN AS-SET expansion for '{setname}' with max_depth={max_depth}" ) # Check cache first for the complete result cache_key = f"arin:as_set:{setname}:depth_{max_depth}" cached_result = _as_set_cache.get(cache_key) if cached_result is not None: logger.info(f"ARIN AS-SET expansion for '{setname}' served from cache") await ctx.info( f"ARIN AS-SET expansion for '{setname}' served from cache ({len(cached_result)} ASNs)" ) return { "ok": True, "data": { "as_set": setname, "asns": sorted(cached_result), "count": len(cached_result), }, } try: seen: set[str] = set() asns: set[int] = set() await _expand_as_set_recursive(setname, seen, asns, 0, max_depth) # Check if the AS-SET was found if len(asns) == 0 and len(seen) <= 1: logger.info(f"ARIN AS-SET '{setname}' not found or contains no ASNs") await ctx.info( f"ARIN AS-SET expansion completed: not-found ('{setname}' does not exist or contains no ASNs)" ) result: dict[str, Any] = { "ok": True, "data": { "as_set": setname, "asns": [], "count": 0, "status": "not-found", }, } else: await ctx.info( f"ARIN AS-SET expansion completed: expanded (found {len(asns)} ASNs from '{setname}' at depth {max_depth})" ) result = { "ok": True, "data": { "as_set": setname, "asns": sorted(asns), "count": len(asns), "status": "expanded", }, } # Cache the complete result _as_set_cache.set(cache_key, asns) logger.info( f"ARIN AS-SET expansion for '{setname}' completed with {len(asns)} ASNs" ) return result except Exception as e: error_msg = f"ARIN AS-SET expansion for '{setname}' failed: {str(e)}" logger.error(error_msg) await ctx.error(f"ARIN AS-SET expansion failed: {error_msg}") return {"ok": False, "error": "expansion_error", "detail": str(e)} def register(mcp: FastMCP) -> None: """Register the ARIN expand_as_set tool with the MCP server.""" mcp.tool( name=TOOL_NAME, description=TOOL_DESCRIPTION, )(_expand_as_set_request)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dadepo/whois-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server