Skip to main content
Glama

WHOIS MCP Server

by dadepo
contact_card.py12.6 kB
"""Contact card tool for retrieving contact information from ARIN database.""" from __future__ import annotations import logging from typing import Annotated, Any import httpx from mcp.server.fastmcp import FastMCP from pydantic import Field from ...cache import TTLCache from ...config import ARIN_REST_BASE, HTTP_TIMEOUT_SECONDS, SUPPORT_ARIN, USER_AGENT __all__ = ["register"] logger = logging.getLogger(__name__) # Initialize cache with 10-minute TTL for contact results _contact_cache: TTLCache[str, Any] = TTLCache(max_items=500, ttl_seconds=600.0) # Tool metadata constants TOOL_NAME = "arin_contact_card" TOOL_DESCRIPTION = ( "PREFERRED TOOL for retrieving contact information (abuse, NOC, admin, tech) for IP addresses, ASNs, or organizations from the ARIN database. " "This tool is specifically for the ARIN RIR (North America region - United States, Canada, parts of Caribbean). " "Use this when you need to CONTACT someone about: abuse reports, security incidents, network issues, or administrative matters. " "Keywords: 'contact', 'abuse', 'who should I contact', 'report', 'incident', 'NOC', 'technical support', 'admin'. " "Automatically resolves organization details and extracts POC (Point of Contact) information including abuse mailboxes, " "technical contacts, administrative contacts, and phone numbers from ARIN database. Perfect for incident response, " "network troubleshooting, and compliance reporting for ARIN-managed resources. Returns structured contact data with " "clear categorization of contact types and purposes." ) IP_DESCRIPTION = ( "IP address to look up contact information for in ARIN database (IPv4 or IPv6)" ) ASN_DESCRIPTION = "ASN number to look up contact information for in ARIN database (without 'AS' prefix)" ORG_DESCRIPTION = "Organization handle/key to look up contact information for directly" async def _get_json(url: str) -> dict[str, Any]: """Fetch JSON data from the specified URL.""" try: async with httpx.AsyncClient( timeout=HTTP_TIMEOUT_SECONDS, headers={"User-Agent": USER_AGENT, "Accept": "application/json"}, follow_redirects=True, ) as client: response = await client.get(url) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: if e.response.status_code == 404: logger.debug(f"Resource not found (404): {url}") return {} else: logger.error(f"HTTP error fetching JSON from {url}: {str(e)}") raise except httpx.HTTPError as e: logger.error(f"HTTP error fetching JSON from {url}: {str(e)}") raise except Exception as e: logger.error(f"Unexpected error fetching JSON from {url}: {str(e)}") raise async def _get_poc_details(poc_handle: str) -> dict[str, Any] | None: """Fetch POC (Point of Contact) details from ARIN.""" try: url = f"{ARIN_REST_BASE}/poc/{poc_handle}" poc_data = await _get_json(url) if not poc_data: return None # Extract POC information from ARIN's response format poc = poc_data.get("poc", {}) # Extract emails emails = [] email_data = poc.get("emails", {}) if isinstance(email_data.get("email"), list): emails = [ email.get("$", "") for email in email_data["email"] if email.get("$") ] elif isinstance(email_data.get("email"), dict): email_val = email_data["email"].get("$", "") if email_val: emails = [email_val] # Extract phones phones = [] phone_data = poc.get("phones", {}) if isinstance(phone_data.get("phone"), list): phones = [ phone.get("$", "") for phone in phone_data["phone"] if phone.get("$") ] elif isinstance(phone_data.get("phone"), dict): phone_val = phone_data["phone"].get("$", "") if phone_val: phones = [phone_val] return { "handle": poc_handle, "name": poc.get("companyName", {}).get("$", "") or poc.get("contactName", {}).get("$", ""), "emails": emails, "phones": phones, "type": poc.get("contactType", {}).get("$", ""), } except Exception as e: logger.warning(f"Failed to fetch POC details for {poc_handle}: {str(e)}") return None async def _contact_card_request( ip: Annotated[str | None, Field(description=IP_DESCRIPTION)] = None, asn: Annotated[int | None, Field(description=ASN_DESCRIPTION)] = None, org: Annotated[str | None, Field(description=ORG_DESCRIPTION)] = None, ) -> dict[str, Any]: """ Retrieve comprehensive contact information for IP addresses, ASNs, or organizations from ARIN. This tool automatically resolves organization details and extracts contact information including abuse mailboxes, NOC contacts, phone numbers, and administrative details. Perfect for incident response, network troubleshooting, and compliance reporting. Args: ip: IP address (IPv4 or IPv6) to look up contact information for asn: ASN number (without 'AS' prefix) to look up contact information for org: Organization handle/key to look up contact information for directly Returns: {"ok": true, "data": {"org": "...", "abuse": {...}, "contacts": [...], ...}} Note: Provide exactly one of ip, asn, or org parameters. """ # Check if ARIN support is enabled if not SUPPORT_ARIN: error_msg = "Contact card lookup is currently disabled (SUPPORT_ARIN=false)" logger.warning(error_msg) return { "ok": False, "error": "service_disabled", "detail": "ARIN contact card support is disabled. Set SUPPORT_ARIN=true to enable.", } # Validate input parameters provided_params = sum(1 for param in [ip, asn, org] if param is not None) if provided_params != 1: logger.error( "ARIN contact card request failed: Provide exactly one of ip, asn, or org" ) return { "ok": False, "error": "bad_request", "detail": "Provide exactly one of: ip, asn, or org", } # Create cache key if ip: cache_key = f"arin:contact_ip:{ip}" query_type = "ip" query_value = ip elif asn is not None: cache_key = f"arin:contact_asn:{asn}" query_type = "asn" query_value = str(asn) else: cache_key = f"arin:contact_org:{org}" query_type = "org" query_value = org # Log the incoming request logger.info(f"Starting ARIN contact card lookup for {query_type}='{query_value}'") # Check cache first cached_result = _contact_cache.get(cache_key) if cached_result is not None: logger.info( f"ARIN contact card for {query_type}='{query_value}' served from cache" ) # Already logged above return cached_result try: # Fetch data from ARIN based on query type if ip: logger.info(f"Looking up ARIN data for IP: {ip}") url = f"{ARIN_REST_BASE}/ip/{ip}" elif asn is not None: logger.info(f"Looking up ARIN data for ASN: {asn}") url = f"{ARIN_REST_BASE}/asn/AS{asn}" else: # org logger.info(f"Looking up ARIN data for organization: {org}") url = f"{ARIN_REST_BASE}/org/{org}" data = await _get_json(url) if not data: result = { "ok": False, "error": "not_found", "detail": f"No records found for {query_type}='{query_value}'", } _contact_cache.set(cache_key, result) return result # Extract organization information org_info = {} org_handle = None if "net" in data: # IP lookup response net_data = data["net"] org_ref = net_data.get("orgRef", {}) org_handle = org_ref.get("@handle", "") org_info = { "key": org_handle, "name": org_ref.get("@name", ""), "country": "US", # ARIN is primarily US-based } elif "asn" in data: # ASN lookup response asn_data = data["asn"] org_ref = asn_data.get("orgRef", {}) org_handle = org_ref.get("@handle", "") org_info = { "key": org_handle, "name": org_ref.get("@name", ""), "country": "US", } elif "org" in data: # Organization lookup response org_data = data["org"] org_handle = org_data.get("handle", {}).get("$", "") org_info = { "key": org_handle, "name": org_data.get("name", {}).get("$", ""), "country": "US", } # Get POC links poc_links: list[dict[str, Any]] = [] poc_links_data: dict[str, list[Any]] = {} if "net" in data: poc_links_data = data["net"].get("pocLinks", {}) elif "asn" in data: poc_links_data = data["asn"].get("pocLinks", {}) elif "org" in data: poc_links_data = data["org"].get("pocLinks", {}) if poc_links_data and "pocLinkRef" in poc_links_data: poc_refs = poc_links_data["pocLinkRef"] for poc_ref in poc_refs: poc_handle = poc_ref.get("@handle", "") poc_function = poc_ref.get("@function", "") if poc_handle: poc_links.append( { "handle": poc_handle, "function": poc_function, } ) # Fetch detailed POC information abuse_contact = None admin_contacts: list[dict[str, Any]] = [] tech_contacts: list[dict[str, Any]] = [] noc_contacts: list[dict[str, Any]] = [] for poc_link in poc_links: poc_details = await _get_poc_details(poc_link["handle"]) if poc_details: function = poc_link["function"].lower() if "abuse" in function: abuse_contact = poc_details elif "admin" in function or "administrative" in function: admin_contacts.append(poc_details) elif "tech" in function or "technical" in function: tech_contacts.append(poc_details) elif "noc" in function: noc_contacts.append(poc_details) # Build result result: dict[str, Any] = { "ok": True, "data": { "query": { "type": query_type, "value": query_value, }, "organization": org_info, "abuse": abuse_contact, "admin_contacts": admin_contacts, "tech_contacts": tech_contacts, "noc_contacts": noc_contacts, }, } # Cache the result _contact_cache.set(cache_key, result) # Log successful completion via MCP context org_name = result["data"]["organization"]["name"] abuse_available = "available" if result["data"]["abuse"] else "not available" admin_count = len(result["data"]["admin_contacts"]) tech_count = len(result["data"]["tech_contacts"]) noc_count = len(result["data"]["noc_contacts"]) logger.info( f"ARIN contact card completed: found '{org_name}' (abuse: {abuse_available}, " f"admin: {admin_count}, tech: {tech_count}, noc: {noc_count} contacts)" ) logger.info( f"ARIN contact card lookup for {query_type}='{query_value}' completed successfully" ) return result except Exception as e: error_msg = f"ARIN contact card lookup for {query_type}='{query_value}' failed: {str(e)}" logger.error(error_msg) # Error already logged above return {"ok": False, "error": "lookup_error", "detail": str(e)} def register(mcp: FastMCP) -> None: """Register the ARIN contact_card tool with the MCP server.""" mcp.tool( name=TOOL_NAME, description=TOOL_DESCRIPTION, )(_contact_card_request)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dadepo/whois-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server