Skip to main content
Glama
by kcsoukup
system_info.py26.7 kB
"""System information and management tool handlers for ASUS router MCP server.""" import logging from typing import Any from mcp.types import TextContent from core.ssh_client import RouterSSHClient from utils.nvram_parser import parse_dhcp_reservation_list logger = logging.getLogger("asus-merlin-mcp") def handle_get_router_info( router: RouterSSHClient, arguments: Any ) -> list[TextContent]: """Get router system information (uptime, memory, CPU, firmware version).""" output, error, code = router.execute_command( "echo '=== Uptime ==='; uptime; " "echo '=== Memory ==='; free; " "echo '=== Firmware ==='; nvram get firmver; nvram get buildno" ) return [TextContent(type="text", text=output if code == 0 else f"Error: {error}")] def handle_get_connected_devices( router: RouterSSHClient, _arguments: Any ) -> list[TextContent]: """List all devices connected to the router (via DHCP).""" output, error, code = router.execute_command( "cat /var/lib/misc/dnsmasq.leases 2>/dev/null || arp -a" ) return [TextContent(type="text", text=output if code == 0 else f"Error: {error}")] def handle_get_all_network_devices( router: RouterSSHClient, arguments: Any ) -> list[TextContent]: """Get comprehensive list of all network devices (DHCP + static + ARP) with detailed info.""" filter_type = arguments.get("filter_type", "all") # Get DHCP leases dhcp_output, _, dhcp_code = router.execute_command( "cat /var/lib/misc/dnsmasq.leases 2>/dev/null" ) # Get ARP table arp_output, _, arp_code = router.execute_command("cat /proc/net/arp") # Get DHCP reservations dhcp_res_output, _, _ = router.execute_command( "nvram get dhcp_staticlist 2>/dev/null" ) # Get hostnames from /etc/hosts hosts_output, _, _ = router.execute_command("cat /etc/hosts") # Parse data devices = {} # Key: MAC address, Value: device info dict # Parse DHCP leases if dhcp_code == 0: for line in dhcp_output.strip().split("\n"): if not line: continue parts = line.split() if len(parts) >= 4: # Format: timestamp mac ip hostname client-id mac = parts[1].upper() ip = parts[2] hostname = parts[3] if parts[3] != "*" else "" devices[mac] = { "ip": ip, "mac": mac, "hostname": hostname, "type": "DHCP", "status": "Active", } # Parse DHCP reservations reservations = parse_dhcp_reservation_list(dhcp_res_output.strip()) for res in reservations: mac = res["mac"].upper() if mac in devices: devices[mac]["type"] = "DHCP Reservation" else: devices[mac] = { "ip": res["ip"], "mac": mac, "hostname": res["hostname"], "type": "DHCP Reservation", "status": "Configured", } # Parse ARP table and merge if arp_code == 0: for line in arp_output.strip().split("\n")[1:]: # Skip header if not line or "incomplete" in line.lower(): continue parts = line.split() if len(parts) >= 4: ip = parts[0] mac = parts[3].upper() if mac in devices: # Update existing device with ARP status devices[mac]["status"] = "Active" else: # New device found only in ARP (static IP) devices[mac] = { "ip": ip, "mac": mac, "hostname": "", "type": "Static/ARP Only", "status": "Active", } # Enhance hostnames from /etc/hosts if hosts_output: for line in hosts_output.strip().split("\n"): if not line or line.startswith("#"): continue parts = line.split() if len(parts) >= 2: ip = parts[0] hostname = parts[1] # Find device by IP and update hostname if better for mac, dev in devices.items(): if dev["ip"] == ip and ( not dev["hostname"] or dev["hostname"] == "*" ): dev["hostname"] = hostname # Apply filter filtered_devices = devices.values() if filter_type == "dhcp": filtered_devices = [d for d in devices.values() if d["type"] == "DHCP"] elif filter_type == "static": filtered_devices = [ d for d in devices.values() if d["type"] == "Static/ARP Only" ] elif filter_type == "reservation": filtered_devices = [ d for d in devices.values() if d["type"] == "DHCP Reservation" ] # Sort by IP address sorted_devices = sorted( filtered_devices, key=lambda x: tuple(map(int, x["ip"].split("."))) ) # Format output filter_label = f" ({filter_type.upper()})" if filter_type != "all" else "" result = f"Network Devices Report{filter_label}\n" result += "=" * 70 + "\n\n" # Count device types dhcp_count = sum(1 for d in devices.values() if d["type"] == "DHCP") dhcp_res_count = sum(1 for d in devices.values() if d["type"] == "DHCP Reservation") static_count = sum(1 for d in devices.values() if d["type"] == "Static/ARP Only") active_count = sum(1 for d in devices.values() if d["status"] == "Active") if filter_type == "all": result += f"Total Devices: {len(devices)}\n" result += f"├─ DHCP Leases: {dhcp_count}\n" result += f"├─ DHCP Reservations: {dhcp_res_count}\n" result += f"├─ Static/ARP Only: {static_count}\n" result += f"└─ Currently Active: {active_count}\n\n" else: result += ( f"Showing: {len(sorted_devices)} devices (filtered by {filter_type})\n" ) result += f"Total Network Devices: {len(devices)}\n\n" # Device table result += f"{'IP Address':<16} {'MAC Address':<18} {'Hostname':<25} {'Type':<18}\n" result += "-" * 77 + "\n" for dev in sorted_devices: hostname = dev["hostname"] if dev["hostname"] else "(unknown)" result += f"{dev['ip']:<16} {dev['mac']:<18} {hostname:<25} {dev['type']:<18}\n" result += "\n" + "=" * 70 + "\n" result += "Legend:\n" result += " • DHCP = Dynamic IP from DHCP server\n" result += " • DHCP Reservation = Static assignment via DHCP\n" result += " • Static/ARP Only = Manually configured static IP\n" return [TextContent(type="text", text=result)] def handle_get_wifi_status( router: RouterSSHClient, _arguments: Any ) -> list[TextContent]: """Get WiFi status for all radios (2.4GHz, 5GHz, etc.).""" # Get interface names dynamically for compatibility across router models wl0_if, _, _ = router.execute_command("nvram get wl0_ifname") wl1_if, _, _ = router.execute_command("nvram get wl1_ifname") wl0_if = wl0_if.strip() wl1_if = wl1_if.strip() # Get WiFi status for both radios result = "WiFi Status Report\n" result += "=" * 70 + "\n\n" # 2.4GHz Radio (wl0) if wl0_if: ssid_output, _, _ = router.execute_command("nvram get wl0_ssid") status_output, _, status_code = router.execute_command( f"wl -i {wl0_if} status 2>/dev/null" ) result += "2.4GHz Radio (wl0)\n" result += "-" * 70 + "\n" result += f"Interface: {wl0_if}\n" result += f"SSID: {ssid_output.strip()}\n" if status_code == 0 and status_output: result += status_output + "\n" else: result += "Status: Unable to retrieve detailed status\n" else: result += "2.4GHz Radio: Not configured\n" result += "\n" # 5GHz Radio (wl1) if wl1_if: ssid_output, _, _ = router.execute_command("nvram get wl1_ssid") status_output, _, status_code = router.execute_command( f"wl -i {wl1_if} status 2>/dev/null" ) result += "5GHz Radio (wl1)\n" result += "-" * 70 + "\n" result += f"Interface: {wl1_if}\n" result += f"SSID: {ssid_output.strip()}\n" if status_code == 0 and status_output: result += status_output + "\n" else: result += "Status: Unable to retrieve detailed status\n" else: result += "5GHz Radio: Not configured\n" result += "\n" + "=" * 70 + "\n" return [TextContent(type="text", text=result)] def handle_restart_service( router: RouterSSHClient, arguments: Any ) -> list[TextContent]: """Restart a specific router service (e.g., wireless, vpnclient1, httpd).""" service = arguments.get("service_name") output, error, code = router.execute_command(f"service restart_{service}") result = f"Service '{service}' restart command executed.\n{output}" if error: result += f"\nErrors: {error}" return [TextContent(type="text", text=result)] def handle_reboot_router(router: RouterSSHClient, arguments: Any) -> list[TextContent]: """Reboot the router. WARNING: This will disconnect all clients.""" if not arguments.get("confirm"): return [ TextContent( type="text", text="Reboot not confirmed. Set 'confirm' to true." ) ] output, error, code = router.execute_command("service reboot") return [ TextContent( type="text", text="Router reboot initiated. Connection will be lost.", ) ] def handle_get_nvram_variable( router: RouterSSHClient, arguments: Any ) -> list[TextContent]: """Get the value of a specific NVRAM variable.""" var = arguments.get("variable_name") output, error, code = router.execute_command(f"nvram get {var}") return [ TextContent( type="text", text=output.strip() if code == 0 else f"Error: {error}" ) ] def handle_set_nvram_variable( router: RouterSSHClient, arguments: Any ) -> list[TextContent]: """Set a NVRAM variable value. WARNING: Incorrect values can break router configuration.""" var = arguments.get("variable_name") val = arguments.get("value") commit = arguments.get("commit", False) cmd = f"nvram set {var}='{val}'" if commit: cmd += " && nvram commit" output, error, code = router.execute_command(cmd) result = f"NVRAM variable '{var}' set to '{val}'" if commit: result += " and committed to permanent storage" if error: result += f"\nErrors: {error}" return [TextContent(type="text", text=result)] def handle_execute_command( router: RouterSSHClient, arguments: Any ) -> list[TextContent]: """Execute a custom command on the router via SSH.""" cmd = arguments.get("command") output, error, code = router.execute_command(cmd) result = f"Command: {cmd}\n\nOutput:\n{output}" if error: result += f"\n\nErrors:\n{error}" result += f"\n\nExit code: {code}" return [TextContent(type="text", text=result)] def handle_read_file(router: RouterSSHClient, arguments: Any) -> list[TextContent]: """Read contents of a file on the router.""" path = arguments.get("file_path") max_lines = arguments.get("max_lines", 100) output, error, code = router.execute_command(f"head -n {max_lines} {path}") return [TextContent(type="text", text=output if code == 0 else f"Error: {error}")] def handle_upload_file(router: RouterSSHClient, arguments: Any) -> list[TextContent]: """Upload a file to the router via SCP.""" local = arguments.get("local_path") remote = arguments.get("remote_path") # Try SFTP first success, message = router.upload_file(local, remote) # If SFTP fails, try shell-based fallback if not success and "SFTP" in message: logger.info("SFTP unavailable, falling back to shell-based upload") success, message = router.upload_file_shell(local, remote) if success: result = f"✓ File uploaded successfully: {local} -> {remote}\n" result += "Note: Used shell commands (SFTP not available on router)\n" result += f"Details: {message}" else: result = f"✗ File upload failed: {local} -> {remote}\n" result += f"Error: {message}" elif success: result = f"✓ File uploaded successfully: {local} -> {remote}\n" result += "Method: SFTP\n" result += f"Details: {message}" else: result = f"✗ File upload failed: {local} -> {remote}\n" result += f"Error: {message}" return [TextContent(type="text", text=result)] def handle_download_file(router: RouterSSHClient, arguments: Any) -> list[TextContent]: """Download a file from the router via SCP.""" remote = arguments.get("remote_path") local = arguments.get("local_path") # Try SFTP first success, message = router.download_file(remote, local) # If SFTP fails, try shell-based fallback if not success and "SFTP" in message: logger.info("SFTP unavailable, falling back to shell-based download") success, message = router.download_file_shell(remote, local) if success: result = f"✓ File downloaded successfully: {remote} -> {local}\n" result += "Note: Used shell commands (SFTP not available on router)\n" result += f"Details: {message}" else: result = f"✗ File download failed: {remote} -> {local}\n" result += f"Error: {message}" elif success: result = f"✓ File downloaded successfully: {remote} -> {local}\n" result += "Method: SFTP\n" result += f"Details: {message}" else: result = f"✗ File download failed: {remote} -> {local}\n" result += f"Error: {message}" return [TextContent(type="text", text=result)] def handle_get_vpn_status( router: RouterSSHClient, _arguments: Any ) -> list[TextContent]: """Get status of VPN clients and servers.""" output, error, code = router.execute_command( "nvram get vpn_client1_state; nvram get vpn_client2_state; ps | grep vpn" ) return [TextContent(type="text", text=output if code == 0 else f"Error: {error}")] def handle_get_aiprotection_status( router: RouterSSHClient, _arguments: Any ) -> list[TextContent]: """ Get AiProtection (Trend Micro) security status. Returns status of: - AiProtection enabled/disabled - Malicious Sites Blocking - Two-Way IPS (Intrusion Prevention System) - Infected Device Prevention and Blocking """ # Get all AiProtection NVRAM variables output, error, code = router.execute_command( "nvram get TM_EULA; " "nvram get wrs_protect_enable; " "nvram get wrs_mals_enable; " "nvram get wrs_vp_enable; " "nvram get wrs_cc_enable; " "nvram get wrs_mals_t; " "nvram get wrs_vp_t; " "nvram get wrs_cc_t; " "nvram get bwdpi_sig_ver" ) if code != 0: return [ TextContent(type="text", text=f"Error reading AiProtection status: {error}") ] # Parse the output lines = output.strip().split("\n") if len(lines) < 9: return [TextContent(type="text", text="Error: Unexpected NVRAM output format")] tm_eula = lines[0].strip() wrs_protect_enable = lines[1].strip() wrs_mals_enable = lines[2].strip() wrs_vp_enable = lines[3].strip() wrs_cc_enable = lines[4].strip() wrs_mals_t = lines[5].strip() wrs_vp_t = lines[6].strip() wrs_cc_t = lines[7].strip() sig_ver = lines[8].strip() # Format the output result = "AiProtection Status\n" result += "=" * 70 + "\n\n" # Overall status if tm_eula == "1": result += "✓ Trend Micro EULA: Accepted\n" else: result += "✗ Trend Micro EULA: Not Accepted\n" if wrs_protect_enable == "1": result += "✓ AiProtection: ENABLED\n" else: result += "✗ AiProtection: DISABLED\n" result += "\n" + "-" * 70 + "\n" result += "Protection Modules\n" result += "-" * 70 + "\n\n" # Malicious Sites Blocking status = "ON" if wrs_mals_enable == "1" else "OFF" result += f"1. Malicious Sites Blocking: {status}\n" if wrs_mals_t and wrs_mals_t != "0": import time timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(wrs_mals_t))) result += f" Last update: {timestamp}\n" # Two-Way IPS status = "ON" if wrs_vp_enable == "1" else "OFF" result += f"\n2. Two-Way IPS (Intrusion Prevention): {status}\n" if wrs_vp_t and wrs_vp_t != "0": import time timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(wrs_vp_t))) result += f" Last update: {timestamp}\n" # Infected Device Prevention status = "ON" if wrs_cc_enable == "1" else "OFF" result += f"\n3. Infected Device Prevention and Blocking: {status}\n" if wrs_cc_t and wrs_cc_t != "0": import time timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(wrs_cc_t))) result += f" Last update: {timestamp}\n" # Signature version result += "\n" + "-" * 70 + "\n" result += "Threat Database\n" result += "-" * 70 + "\n" result += f"Signature Version: {sig_ver}\n" result += ( "\nNote: Protection event counts are available through the router's web UI.\n" ) result += "Event logs may be stored in /tmp/bwdpi/ or /jffs/ directories.\n" return [TextContent(type="text", text=result)] def handle_get_system_log(router: RouterSSHClient, arguments: Any) -> list[TextContent]: """ Get system log entries from the router. Args: router: RouterSSHClient instance for executing commands arguments: Dict containing: - lines: Number of lines to retrieve (default: 100, max: 1000) - filter: Optional grep filter pattern Returns: List containing TextContent with log entries and configuration """ lines = arguments.get("lines", 100) filter_pattern = arguments.get("filter", "") # Validate lines parameter if lines < 1: lines = 100 elif lines > 1000: lines = 1000 # Get log configuration settings config_output, _, config_code = router.execute_command( "nvram get message_loglevel; " "nvram get log_level; " "nvram get log_ipaddr; " "nvram get log_port" ) # Build log retrieval command if filter_pattern: cmd = f"tail -n {lines} /tmp/syslog.log | grep '{filter_pattern}'" else: cmd = f"tail -n {lines} /tmp/syslog.log" output, error, code = router.execute_command(cmd) if code != 0: return [TextContent(type="text", text=f"Error reading system log: {error}")] # Build result with configuration header result = "System Log Configuration\n" result += "=" * 70 + "\n\n" # Parse log configuration if config_code == 0: config_lines = config_output.strip().split("\n") if len(config_lines) >= 4: message_loglevel = config_lines[0].strip() log_level = config_lines[1].strip() log_ipaddr = config_lines[2].strip() log_port = config_lines[3].strip() # Map message_loglevel to name loglevel_map = { "0": "emergency", "1": "alert", "2": "critical", "3": "error", "4": "warning", "5": "notice", "6": "info", "7": "debug", } loglevel_name = loglevel_map.get(message_loglevel, message_loglevel) # Map log_level to name urgency_map = { "0": "emergency", "1": "alert", "2": "critical", "3": "error", "4": "warning", "5": "notice", "6": "info", "7": "debug", "8": "all", } urgency_name = urgency_map.get(log_level, log_level) result += f"Message Log Level: {loglevel_name} ({message_loglevel})\n" result += f"Urgency Level: {urgency_name} ({log_level})\n" # Remote log server if log_ipaddr and log_ipaddr != "": result += f"\nRemote Log Server: {log_ipaddr}:{log_port}\n" else: result += "\nRemote Log Server: Not configured\n" result += "\n" + "=" * 70 + "\n" result += f"Log Entries (last {lines} lines" if filter_pattern: result += f", filtered by '{filter_pattern}'" result += ")\n" result += "=" * 70 + "\n\n" result += output return [TextContent(type="text", text=result)] def handle_set_system_log_config( router: RouterSSHClient, arguments: Any ) -> list[TextContent]: """ Configure system log settings. Args: router: RouterSSHClient instance for executing commands arguments: Dict containing (all optional): - message_loglevel: Message log level (0-7 or name: emergency/alert/critical/error/warning/notice/info/debug) - log_level: Urgency level (0-8 or name: same as above plus 'all') - log_ipaddr: Remote syslog server IP (empty string to disable) - log_port: Remote syslog server port (default: 514) Returns: List containing TextContent with configuration result """ # Level name to number mapping level_map = { "emergency": "0", "alert": "1", "critical": "2", "error": "3", "warning": "4", "notice": "5", "info": "6", "debug": "7", "all": "8", } changes = [] nvram_commands = [] # Process message_loglevel if "message_loglevel" in arguments: value = arguments["message_loglevel"] # Convert name to number if needed if isinstance(value, str) and value.lower() in level_map: value = level_map[value.lower()] # Validate range (0-7) try: num_value = int(value) if 0 <= num_value <= 7: nvram_commands.append(f"nvram set message_loglevel={num_value}") changes.append(f"Message Log Level: {value}") else: return [ TextContent( type="text", text=f"Error: message_loglevel must be 0-7, got {value}", ) ] except ValueError: return [ TextContent( type="text", text=f"Error: Invalid message_loglevel value: {value}", ) ] # Process log_level (urgency) if "log_level" in arguments: value = arguments["log_level"] # Convert name to number if needed if isinstance(value, str) and value.lower() in level_map: value = level_map[value.lower()] # Validate range (0-8) try: num_value = int(value) if 0 <= num_value <= 8: nvram_commands.append(f"nvram set log_level={num_value}") changes.append(f"Urgency Level: {value}") else: return [ TextContent( type="text", text=f"Error: log_level must be 0-8, got {value}", ) ] except ValueError: return [ TextContent( type="text", text=f"Error: Invalid log_level value: {value}" ) ] # Process log_ipaddr if "log_ipaddr" in arguments: value = arguments["log_ipaddr"] nvram_commands.append(f"nvram set log_ipaddr='{value}'") if value: changes.append(f"Remote Log Server IP: {value}") else: changes.append("Remote Log Server: Disabled") # Process log_port if "log_port" in arguments: value = arguments["log_port"] try: port = int(value) if 1 <= port <= 65535: nvram_commands.append(f"nvram set log_port={port}") changes.append(f"Remote Log Server Port: {port}") else: return [ TextContent( type="text", text=f"Error: log_port must be 1-65535, got {value}", ) ] except ValueError: return [ TextContent(type="text", text=f"Error: Invalid log_port value: {value}") ] # If no changes, return error if not changes: return [ TextContent( type="text", text="Error: No configuration changes specified. Provide at least one of: message_loglevel, log_level, log_ipaddr, log_port", ) ] # Execute NVRAM commands cmd = "; ".join(nvram_commands) output, error, code = router.execute_command(cmd) if code != 0: return [ TextContent(type="text", text=f"Error setting log configuration: {error}") ] # Restart syslog to apply changes restart_output, restart_error, restart_code = router.execute_command( "service restart_logger" ) result = "✓ System Log Configuration Updated\n" result += "=" * 70 + "\n\n" result += "Changes applied:\n" for change in changes: result += f" • {change}\n" if restart_code == 0: result += "\n✓ Logger service restarted (changes applied)" else: result += f"\n⚠ Logger service restart failed: {restart_error}" result += "\nNote: You may need to manually restart the logger service" return [TextContent(type="text", text=result)] def handle_list_processes(router: RouterSSHClient, arguments: Any) -> list[TextContent]: """List running processes on the router.""" filter_name = arguments.get("filter", "") cmd = "ps" if not filter_name else f"ps | grep {filter_name}" output, error, code = router.execute_command(cmd) return [TextContent(type="text", text=output if code == 0 else f"Error: {error}")]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kcsoukup/asus-merlin-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server