server.py•9.94 kB
#!/usr/bin/env python3
"""
MCP Server for Domain Availability Checking
"""
import asyncio
import json
import logging
import os
import time
from collections import defaultdict, deque
from typing import Any, Dict, List, Optional, Sequence
import httpx
from mcp.server.fastmcp import FastMCP
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RateLimiter:
"""Simple rate limiter to track calls per minute"""
def __init__(self, max_calls: int = 60, window_seconds: int = 60):
self.max_calls = max_calls
self.window_seconds = window_seconds
self.calls = deque()
def is_allowed(self) -> bool:
"""Check if a new call is allowed within the rate limit"""
current_time = time.time()
# Remove old calls outside the window
while self.calls and self.calls[0] < current_time - self.window_seconds:
self.calls.popleft()
# Check if we can make a new call
if len(self.calls) < self.max_calls:
self.calls.append(current_time)
return True
return False
def get_retry_after(self) -> int:
"""Get the number of seconds to wait before retrying"""
if not self.calls:
return 0
current_time = time.time()
oldest_call = self.calls[0]
return max(0, int(self.window_seconds - (current_time - oldest_call)))
# Initialize FastMCP server
mcp = FastMCP("domain-availability")
# Global variables for API configuration
api_key = os.getenv("DOMAIN_API_KEY")
api_url = "https://api.ote-godaddy.com"
if not api_key:
logger.error("DOMAIN_API_KEY environment variable is required")
raise ValueError("DOMAIN_API_KEY environment variable is required")
# Initialize rate limiter (60 calls per minute)
rate_limiter = RateLimiter(max_calls=60, window_seconds=60)
# Initialize HTTP client
client = httpx.AsyncClient(
timeout=30.0,
headers={
"Authorization": f"sso-key {api_key}",
"Content-Type": "application/json",
},
)
@mcp.tool()
async def check_domain_availability(
domains: Optional[List[str]] = None,
checkType: str = "FAST",
base_name: Optional[str] = None,
tld_suffixes: Optional[List[str]] = None,
) -> str:
"""
Check domain availability and pricing for multiple domains with different TLD suffixes.
Rate limited: 60 calls per minute.
Args:
domains: List of domain names to check (e.g., ['example.com', 'example.org'])
checkType: Optimize for time (FAST) or accuracy (FULL)
base_name: Base domain name to check with multiple TLD suffixes (e.g., 'example')
tld_suffixes: List of TLD suffixes to check (e.g., ['.com', '.org', '.net'])
Returns:
Formatted string with domain availability results
"""
try:
# Check rate limit first
if not rate_limiter.is_allowed():
retry_after = rate_limiter.get_retry_after()
return f"Rate limit exceeded. Maximum 60 calls per minute allowed. Please try again in {retry_after} seconds."
# Determine domains to check
domains_to_check = []
if domains:
domains_to_check = domains
elif base_name and tld_suffixes:
domains_to_check = [f"{base_name}{suffix}" for suffix in tld_suffixes]
else:
return (
"Either 'domains' or 'base_name' with 'tld_suffixes' must be provided"
)
if not domains_to_check:
return "No domains to check"
# Prepare API request for GoDaddy
check_type = checkType.lower()
# Make API request - GoDaddy uses GET with query parameters
logger.info(f"Checking availability for domains: {domains_to_check}")
# For multiple domains, we need to make separate calls
all_results = []
for domain in domains_to_check:
try:
response = await client.get(
f"{api_url}/v1/domains/available",
params={"domain": domain},
)
if response.status_code == 200:
result = response.json()
# Convert price from micro-units to dollars (divide by 1,000,000)
raw_price = result.get("price", 0)
price_dollars = round(raw_price / 1000000, 2) if raw_price else 0.00
# Convert period from years to months for consistency
period_years = result.get("period", 1)
registration_period_months = (
period_years * 12 if period_years else 12
)
all_results.append(
{
"domain_name": domain,
"is_available": result.get("available", False),
"price_dollars": price_dollars,
"currency_code": result.get("currency", "USD"),
"is_definitive": True,
"registration_period_months": registration_period_months,
}
)
else:
all_results.append(
{
"domain_name": domain,
"is_available": False,
"price_dollars": 0.00,
"currency_code": "USD",
"is_definitive": False,
"registration_period_months": 12,
"error_message": f"API error: {response.status_code}",
}
)
except Exception as e:
all_results.append(
{
"domain_name": domain,
"is_available": False,
"price_dollars": 0.00,
"currency_code": "USD",
"is_definitive": False,
"registration_period_months": 12,
"error_message": str(e),
}
)
# Create a unified response format
result = {"domains": all_results, "errors": []}
# Format results for display
available_domains = []
unavailable_domains = []
results_summary = []
if "domains" in result:
for domain_info in result["domains"]:
domain = domain_info.get("domain_name", "")
available = domain_info.get("is_available", False)
price = domain_info.get("price_dollars", 0)
currency = domain_info.get("currency_code", "USD")
definitive = domain_info.get("is_definitive", False)
domain_result = {
"domain_name": domain,
"is_available": available,
"price_dollars": price,
"currency_code": currency,
"is_definitive": definitive,
"registration_period_months": domain_info.get(
"registration_period_months", 12
),
}
if available:
available_domains.append(domain_result)
else:
unavailable_domains.append(domain_result)
results_summary.append(domain_result)
# Create formatted response
response_text = f"Domain Availability Check Results:\n\n"
if available_domains:
response_text += "✅ AVAILABLE DOMAINS:\n"
for domain in available_domains:
price_info = (
f"${domain['price_dollars']:.2f} {domain['currency_code']}"
if domain["price_dollars"] > 0.00
else "Price not available"
)
# Convert months back to years for display
period_months = domain.get("registration_period_months", 12)
period_years = period_months / 12
period_display = (
f"{period_years:.0f} year{'s' if period_years != 1 else ''}"
)
confidence = "Definitive" if domain["is_definitive"] else "Preliminary"
response_text += f" • {domain['domain_name']} - {price_info} for {period_display} ({confidence})\n"
response_text += "\n"
if unavailable_domains:
response_text += "❌ UNAVAILABLE DOMAINS:\n"
for domain in unavailable_domains:
error_msg = ""
if "error_message" in domain:
error_msg = f" - {domain['error_message']}"
response_text += (
f" • {domain['domain_name']} - Not available{error_msg}\n"
)
response_text += "\n"
# Handle errors if any
if "errors" in result and result["errors"]:
response_text += "⚠️ ERRORS:\n"
for error in result["errors"]:
domain = error.get("domain_name", "Unknown")
message = error.get("error_message", "Unknown error")
response_text += f" • {domain}: {message}\n"
# Add raw JSON data to response
response_text += f"\n📋 Raw API Response:\n{json.dumps(result, indent=2)}"
return response_text
except Exception as e:
logger.error(f"Error checking domain availability: {e}")
return f"Error checking domain availability: {str(e)}"
async def cleanup():
"""Cleanup resources"""
await client.aclose()
def main():
"""Entry point for the MCP server"""
try:
mcp.run(transport="stdio")
finally:
asyncio.run(cleanup())
if __name__ == "__main__":
main()