Skip to main content
Glama
by kcsoukup
vpn_routing.py14.2 kB
""" VPN Director routing tools for Asuswrt-Merlin firmware. VPN Director is Merlin's implementation of policy-based routing for OpenVPN clients. It replaces the stock ASUS "VPN Fusion" feature with more powerful routing capabilities. Uses /jffs/openvpn/vpndirector_rulelist file with format: <enable>description>localIP>remoteIP>interface> IMPORTANT: These tools only work with Asuswrt-Merlin firmware. Stock ASUS firmware uses VPN Fusion (different file and format). """ from typing import Any from mcp.types import TextContent from config.constants import VPN_DIRECTOR_RULELIST_FILE from core.ssh_client import RouterSSHClient from utils.nvram_parser import build_vpn_director_rules, parse_vpn_director_rules from utils.validators import is_merlin_firmware, is_valid_mac, normalize_mac def _get_device_ip_by_mac(router: RouterSSHClient, mac: str) -> str | None: """ Look up device IP address by MAC address from DHCP leases. Args: router: RouterSSHClient instance mac: MAC address (normalized format) Returns: IP address string if found, None otherwise """ # Get DHCP leases output, _, code = router.execute_command( "cat /var/lib/misc/dnsmasq.leases 2>/dev/null" ) if code != 0: return None # Parse DHCP leases format: timestamp MAC IP hostname client-id for line in output.strip().split("\n"): parts = line.split() if len(parts) >= 3: lease_mac = parts[1].upper() lease_ip = parts[2] if lease_mac == mac: return lease_ip return None def _vpn_client_to_interface(vpn_client_number: int) -> str: """ Convert VPN client number (1-5) to VPN Director interface name (OVPN1-5). Args: vpn_client_number: VPN client number (1-5) Returns: Interface name (OVPN1, OVPN2, etc.) """ return f"OVPN{vpn_client_number}" def _interface_to_vpn_client(interface: str) -> int | None: """ Convert VPN Director interface name to VPN client number. Args: interface: Interface name (OVPN1-5, WGC1-2, WAN) Returns: VPN client number (1-5) for OVPN interfaces, None for others """ if interface.startswith("OVPN") and len(interface) == 5: try: return int(interface[4]) except ValueError: return None return None def _check_merlin_firmware(router: RouterSSHClient) -> TextContent | None: """ Check if router is running Merlin firmware. Returns error message if not Merlin, None if Merlin detected. """ if not is_merlin_firmware(router): error_msg = ( "ERROR: VPN routing tools require Asuswrt-Merlin firmware\n\n" "Your router is running stock ASUS firmware, which is not currently supported.\n\n" "Why: Merlin firmware uses VPN Director (vpndirector_rulelist) while stock ASUS\n" " uses VPN Fusion (vpnc_dev_policy_list) - different features entirely.\n\n" "Options:\n" " 1. Install Asuswrt-Merlin firmware (recommended): https://www.asuswrt-merlin.net/\n" " 2. Request stock ASUS firmware support via GitHub:\n" " https://github.com/kcsoukup/mcp-asus-merlin/issues\n\n" "Note: The primary user base for this MCP server uses Merlin firmware.\n" " Stock ASUS support may be added in future releases if there's demand." ) return TextContent(type="text", text=error_msg) return None def handle_add_vpn_routing_policy( router: RouterSSHClient, arguments: Any ) -> list[TextContent]: """ Add device to VPN Director routing (Asuswrt-Merlin only). Creates a VPN Director rule to route a specific device through a VPN client. Requires Asuswrt-Merlin firmware - stock ASUS firmware is not supported. Args: router: RouterSSHClient instance for executing commands arguments: Dict containing: - mac_address: Device MAC address (required) - vpn_client_number: VPN client to route through (1-5, required) - description: Optional rule description/hostname Returns: List containing TextContent with operation result """ # Check Merlin firmware first firmware_check = _check_merlin_firmware(router) if firmware_check: return [firmware_check] mac_address = arguments.get("mac_address") vpn_client_number = arguments.get("vpn_client_number") description = arguments.get("description", "") # Validate inputs if not is_valid_mac(mac_address): return [ TextContent( type="text", text=f"Error: Invalid MAC address format: {mac_address}", ) ] if not isinstance(vpn_client_number, int) or not 1 <= vpn_client_number <= 5: return [ TextContent( type="text", text=f"Error: VPN client number must be 1-5, got {vpn_client_number}", ) ] # Normalize MAC and look up IP mac_normalized = normalize_mac(mac_address) device_ip = _get_device_ip_by_mac(router, mac_normalized) if not device_ip: return [ TextContent( type="text", text=f"Error: Cannot find IP address for MAC {mac_normalized}\n" f"Device must be connected and have a DHCP lease.\n" f"Try connecting the device first, or create a DHCP reservation.", ) ] # Get current VPN Director rules from file success, content, error = router.read_file_content(VPN_DIRECTOR_RULELIST_FILE) # If file doesn't exist, start with empty rules if not success: rules = [] else: # Parse existing rules rules = parse_vpn_director_rules(content.strip()) # Check if device IP already has a rule for rule in rules: if rule["local_ip"] == device_ip: existing_vpn = _interface_to_vpn_client(rule["interface"]) return [ TextContent( type="text", text=f"Error: Device IP {device_ip} already has a VPN routing rule\n" f"Currently routing through: {rule['interface']}" + (f" (VPN Client {existing_vpn})" if existing_vpn else "") + "\nRemove existing rule first or use different device", ) ] # Add new rule interface = _vpn_client_to_interface(vpn_client_number) rule_description = description if description else f"Device-{device_ip}" new_rule = { "enable": "1", # Enabled by default "description": rule_description, "local_ip": device_ip, "remote_ip": "", # Empty = route all destinations "interface": interface, } rules.append(new_rule) # Build new rulelist new_list_str = build_vpn_director_rules(rules) # Write to VPN Director file with MD5 verification success, error = router.write_file_content(VPN_DIRECTOR_RULELIST_FILE, new_list_str) if not success: return [ TextContent(type="text", text=f"Error writing VPN Director rule: {error}") ] # Format result result = "✓ Device added to VPN Director routing:\n" result += f" MAC: {mac_normalized}\n" result += f" IP: {device_ip}\n" result += f" VPN Client: {vpn_client_number} ({interface})\n" result += f" Description: {rule_description}\n" result += " Status: Enabled\n" result += " Destination: All (routes all traffic)\n" result += "\n⚠ Note: Changes take effect immediately. Check VPN Director page in router UI." return [TextContent(type="text", text=result)] def handle_remove_vpn_routing_policy( router: RouterSSHClient, arguments: Any ) -> list[TextContent]: """ Remove device from VPN Director routing (Asuswrt-Merlin only). Removes the device's routing rule from VPN Director, returning it to normal routing. Requires Asuswrt-Merlin firmware - stock ASUS firmware is not supported. Args: router: RouterSSHClient instance for executing commands arguments: Dict containing: - mac_address: Device MAC address to remove Returns: List containing TextContent with operation result """ # Check Merlin firmware first firmware_check = _check_merlin_firmware(router) if firmware_check: return [firmware_check] mac_address = arguments.get("mac_address") # Validate MAC if not is_valid_mac(mac_address): return [ TextContent( type="text", text=f"Error: Invalid MAC address format: {mac_address}", ) ] # Normalize MAC and look up IP mac_normalized = normalize_mac(mac_address) device_ip = _get_device_ip_by_mac(router, mac_normalized) if not device_ip: return [ TextContent( type="text", text=f"Error: Cannot find IP address for MAC {mac_normalized}\n" f"Unable to remove rule without knowing device IP.\n" f"Device may be offline or rule may already be removed.", ) ] # Get current VPN Director rules from file success, content, error = router.read_file_content(VPN_DIRECTOR_RULELIST_FILE) if not success: return [ TextContent( type="text", text=f"Error: No VPN Director rules file found\n" f"File {VPN_DIRECTOR_RULELIST_FILE} does not exist or cannot be read.", ) ] # Parse rules rules = parse_vpn_director_rules(content.strip()) # Find and remove matching rule by IP removed = None new_rules = [] for rule in rules: if rule["local_ip"] == device_ip: removed = rule else: new_rules.append(rule) if not removed: return [ TextContent( type="text", text=f"Error: No VPN routing rule found for IP {device_ip} (MAC: {mac_normalized})", ) ] # Build new rulelist new_list_str = build_vpn_director_rules(new_rules) # Write to VPN Director file (or remove file if no rules left) if new_list_str: success, error = router.write_file_content( VPN_DIRECTOR_RULELIST_FILE, new_list_str ) if not success: return [ TextContent( type="text", text=f"Error removing VPN Director rule: {error}" ) ] else: # Remove file if no rules remain _, error, code = router.execute_command(f"rm -f {VPN_DIRECTOR_RULELIST_FILE}") if code != 0: return [ TextContent( type="text", text=f"Error removing VPN Director file: {error}" ) ] # Format result vpn_client = _interface_to_vpn_client(removed["interface"]) result = "✓ Device removed from VPN Director routing:\n" result += f" MAC: {mac_normalized}\n" result += f" IP: {device_ip}\n" result += f" Was routing through: {removed['interface']}" if vpn_client: result += f" (VPN Client {vpn_client})" result += "\n" if removed.get("description"): result += f" Description: {removed['description']}\n" result += "\n⚠ Note: Changes take effect immediately" return [TextContent(type="text", text=result)] def handle_list_vpn_policies( router: RouterSSHClient, _arguments: Any ) -> list[TextContent]: """ List all VPN Director routing rules (Asuswrt-Merlin only). Shows all devices configured to route through VPN clients via VPN Director. Requires Asuswrt-Merlin firmware - stock ASUS firmware is not supported. Args: router: RouterSSHClient instance for executing commands _arguments: Unused (no parameters required) Returns: List containing TextContent with formatted VPN Director rules table """ # Check Merlin firmware first firmware_check = _check_merlin_firmware(router) if firmware_check: return [firmware_check] # Get VPN Director rulelist from file success, content, error = router.read_file_content(VPN_DIRECTOR_RULELIST_FILE) # If file doesn't exist or is empty, no rules configured if not success: rules = [] else: # Parse rules rules = parse_vpn_director_rules(content.strip()) if not rules: return [ TextContent( type="text", text="No VPN Director routing rules configured\n\n" "VPN Director is Merlin's policy-based routing system.\n" "Use 'add_vpn_routing_policy' to route devices through VPN clients.", ) ] # Format as table result = f"VPN Director Routing Rules ({len(rules)} total)\n" result += "Firmware: Asuswrt-Merlin\n\n" result += "Source IP Interface VPN Client Status Description\n" result += "─" * 75 + "\n" for rule in rules: local_ip = rule.get("local_ip", "Unknown") interface = rule.get("interface", "?") status = "Enabled" if rule.get("enable") == "1" else "Disabled" description = rule.get("description", "") remote_ip = rule.get("remote_ip", "") # Try to extract VPN client number from interface vpn_client = _interface_to_vpn_client(interface) vpn_display = f"Client {vpn_client}" if vpn_client else "N/A" result += f"{local_ip:<15} {interface:<10} {vpn_display:<11} {status:<9} {description}\n" if remote_ip: result += ( f" → Dest: {remote_ip}\n" ) result += ( "\n💡 Tip: Use 'add_vpn_routing_policy' to route devices through VPN clients" ) result += "\n💡 Check VPN client status with 'get_vpn_status'" result += "\n💡 View rules in router UI: VPN → VPN Director" return [TextContent(type="text", text=result)]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kcsoukup/asus-merlin-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server