Skip to main content
Glama
server.py16.1 kB
#!/usr/bin/env python3 """SSH MCP Server - Main server implementation.""" import subprocess from typing import Dict, Any import paramiko from mcp.server.fastmcp import FastMCP from .credentials import ( get_credentials, clear_cached_credentials, get_domain_from_hostname, get_username_suggestion, keychain_get_password ) # Create MCP server mcp = FastMCP("SSH Server") # SSH connection timeout SSH_TIMEOUT = 30 @mcp.tool() def ssh_execute_ssh(hostname: str, command: str) -> Dict[str, Any]: """Execute command on remote Linux host via SSH""" try: # Create SSH client ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Check if we have cached credentials to get the correct username domain = get_domain_from_hostname(hostname) cached_username = None try: service = "ssh-mcp" account_result = subprocess.run([ 'security', 'find-generic-password', '-s', service ], capture_output=True, text=True, check=False) if account_result.returncode == 0: for line in account_result.stdout.split('\n'): if 'acct' in line and domain in line: parts = line.split('"') if len(parts) >= 4: account = parts[3] if '@' in account and domain in account: cached_username = account.split('@')[0] break except (subprocess.SubprocessError, OSError): pass # Use cached username if available, otherwise current user ssh_username = cached_username if cached_username else get_username_suggestion() # First try key-based authentication try: ssh.connect( hostname=hostname, username=ssh_username, timeout=SSH_TIMEOUT, look_for_keys=True, allow_agent=True ) except paramiko.AuthenticationException: # Key auth failed, try password authentication try: username, password = get_credentials(hostname) ssh.connect( hostname=hostname, username=username, password=password, timeout=SSH_TIMEOUT, look_for_keys=False, allow_agent=False ) # Clear password from memory password = None except RuntimeError as e: error_msg = str(e) if "cancelled" in error_msg.lower(): return { "error": "Authentication cancelled by user", "details": error_msg, "troubleshooting": [ "User clicked Cancel in authentication dialog", "Authentication dialog may have timed out" ], "suggested_action": f"Try again: ssh_setup_credentials('{hostname}')" } return { "error": "Credential setup failed", "details": error_msg, "troubleshooting": [ "SSH key authentication failed", "Password authentication setup failed", "Check if hostname is correct" ] } # Execute command _, stdout, stderr = ssh.exec_command(command) # Get results exit_status = stdout.channel.recv_exit_status() stdout_text = stdout.read().decode('utf-8', errors='replace') stderr_text = stderr.read().decode('utf-8', errors='replace') # Close connection ssh.close() return { "status": exit_status, "stdout": stdout_text, "stderr": stderr_text, "hostname": hostname, "command": command } except paramiko.SSHException as e: return { "error": "SSH connection failed", "details": str(e), "troubleshooting": [ "Host may be unreachable or offline", "SSH service may not be running on target host", "Firewall may be blocking SSH port (22)", "Host key verification may have failed" ], "suggested_action": f"Check connectivity: ping {hostname} && nc -zv {hostname} 22" } except Exception as e: return { "error": "SSH execution failed", "details": str(e), "troubleshooting": [ "Network connectivity issues", "SSH service configuration problems", "Command execution timeout", "Resource constraints on target host" ], "suggested_action": "Try a simple command first: ssh_get_system_info" } @mcp.tool() def ssh_execute_sudo(hostname: str, command: str) -> Dict[str, Any]: """Execute command with sudo on remote Linux host""" try: # Get domain and check for cached credentials domain = get_domain_from_hostname(hostname) cached_username = None cached_password = None try: service = "ssh-mcp" account_result = subprocess.run([ 'security', 'find-generic-password', '-s', service ], capture_output=True, text=True, check=False) if account_result.returncode == 0: for line in account_result.stdout.split('\n'): if 'acct' in line and domain in line: parts = line.split('"') if len(parts) >= 4: account = parts[3] if '@' in account and domain in account: cached_username = account.split('@')[0] cached_password = keychain_get_password(service, account) break except (subprocess.SubprocessError, OSError): pass # Use cached username if available, otherwise current user ssh_username = cached_username if cached_username else get_username_suggestion() # Create SSH client ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # First try key-based authentication try: ssh.connect( hostname=hostname, username=ssh_username, timeout=SSH_TIMEOUT, look_for_keys=True, allow_agent=True ) except paramiko.AuthenticationException: # Key auth failed, use password auth if cached_password: username, password = cached_username, cached_password else: username, password = get_credentials(hostname) ssh.connect( hostname=hostname, username=username, password=password, timeout=SSH_TIMEOUT, look_for_keys=False, allow_agent=False ) # For sudo, we need a password if not cached_password: # We connected with keys but need password for sudo username, password = get_credentials(hostname) cached_password = password # Execute sudo command with password via stdin stdin, stdout, stderr = ssh.exec_command(f"sudo -S {command}") stdin.write(f"{cached_password}\n") stdin.flush() # Get results exit_status = stdout.channel.recv_exit_status() stdout_text = stdout.read().decode('utf-8', errors='replace') stderr_text = stderr.read().decode('utf-8', errors='replace') # Clean up sudo password prompt from stderr if stderr_text.startswith('[sudo] password for'): lines = stderr_text.split('\n') stderr_text = '\n'.join(lines[1:]) # Close connection ssh.close() # Clear password from memory cached_password = None return { "status": exit_status, "stdout": stdout_text, "stderr": stderr_text, "hostname": hostname, "command": f"sudo {command}" } except RuntimeError as e: error_msg = str(e) if "cancelled" in error_msg.lower(): return { "error": "Authentication cancelled by user", "details": error_msg, "troubleshooting": [ "User clicked Cancel in authentication dialog", "Password required for sudo operations" ], "suggested_action": f"Try again: ssh_setup_credentials('{hostname}')" } return { "error": "Credential setup failed", "details": error_msg } except paramiko.AuthenticationException: return { "error": "SSH authentication failed", "details": "Invalid username or password", "suggested_action": ( f"Clear and re-enter credentials: ssh_clear_credentials() " f"then ssh_setup_credentials('{hostname}')" ) } except Exception as e: return { "error": "SSH sudo execution failed", "details": str(e), "troubleshooting": [ "User may not have sudo privileges", "Sudo may require different password", "Command may require interactive input", "Network or SSH connection issues" ] } @mcp.tool() def ssh_setup_credentials(hostname: str) -> Dict[str, Any]: """Setup credentials for a Linux host using GUI prompts""" try: username, _ = get_credentials(hostname) return { "status": "success", "message": f"Credentials configured for {username}@{hostname}", "details": "Cached for 4 hours, used for password authentication and sudo operations", "next_steps": [ f"Try: ssh_execute_ssh('{hostname}', 'uname -a')", f"Or: ssh_get_system_info('{hostname}')", "Note: SSH will try key authentication first, then fall back to password" ] } except Exception as e: error_msg = str(e) if "cancelled" in error_msg.lower(): return { "error": "Authentication cancelled by user", "details": error_msg, "troubleshooting": [ "User clicked Cancel in authentication dialog", "Authentication dialog may have timed out", "System may be locked or user not present" ], "suggested_action": f"Try again: ssh_setup_credentials('{hostname}')" } if "empty" in error_msg.lower(): return { "error": "Empty password not allowed", "details": error_msg, "troubleshooting": [ "Password field was left blank", "Password is required for sudo operations", "Dialog input may have failed" ], "suggested_action": "Ensure password is entered in the dialog" } return { "error": "Credential setup failed", "details": error_msg, "troubleshooting": [ "macOS Keychain access may be denied", "System security settings may block keychain access", "Hostname format may be invalid" ], "suggested_action": ( f"Check hostname format: '{hostname}' should be a valid hostname or FQDN" ) } @mcp.tool() def ssh_clear_credentials() -> Dict[str, Any]: """Clear all cached SSH credentials""" try: if clear_cached_credentials(): return { "status": "success", "message": "All cached SSH credentials cleared", "details": "All SSH password credentials have been removed from keychain", "next_steps": [ ("Use ssh_setup_credentials(hostname) to set up password " "authentication for specific hosts"), "SSH key authentication will still work if configured" ] } return { "status": "info", "message": "No cached SSH credentials found", "details": "No SSH password credentials were stored in keychain", "suggested_action": ( "Use ssh_setup_credentials(hostname) to set up password authentication" ) } except Exception as e: return { "error": "Failed to clear credentials", "details": str(e), "troubleshooting": [ "macOS Keychain access may be restricted", "Keychain may be locked", "System security settings may prevent access" ], "suggested_action": "Check macOS Keychain Access app for any restrictions" } @mcp.tool() def ssh_get_system_info(hostname: str) -> Dict[str, Any]: """Get basic system information from Linux host""" command = ( "uname -a && cat /etc/os-release 2>/dev/null || " "cat /etc/redhat-release 2>/dev/null || echo 'OS info not available'" ) return ssh_execute_ssh(hostname, command) @mcp.tool() def ssh_get_running_processes(hostname: str) -> Dict[str, Any]: """Get running processes from Linux host""" command = "ps aux --sort=-%cpu | head -20" return ssh_execute_ssh(hostname, command) @mcp.tool() def ssh_get_disk_usage(hostname: str) -> Dict[str, Any]: """Get disk usage information from Linux host""" command = "df -h" return ssh_execute_ssh(hostname, command) @mcp.tool() def ssh_get_services(hostname: str) -> Dict[str, Any]: """Get systemd services status from Linux host""" command = "systemctl list-units --type=service --state=running --no-pager" return ssh_execute_ssh(hostname, command) @mcp.tool() def ssh_puppet_noop(hostname: str) -> Dict[str, Any]: """Run Puppet agent in no-op mode (dry run) with verbose output""" command = "puppet agent --test --noop --verbose" return ssh_execute_sudo(hostname, command) # Legacy compatibility functions @mcp.tool() def execute_ssh(hostname: str, command: str) -> Dict[str, Any]: """Legacy compatibility - Execute command on remote Linux host via SSH""" return ssh_execute_ssh(hostname, command) @mcp.tool() def execute_sudo(hostname: str, command: str) -> Dict[str, Any]: """Legacy compatibility - Execute command with sudo on remote Linux host""" return ssh_execute_sudo(hostname, command) @mcp.tool() def cache_credentials(domain: str) -> Dict[str, Any]: """Legacy compatibility - Pre-cache credentials""" hostname = f"host.{domain}" return ssh_setup_credentials(hostname) @mcp.tool() def get_running_processes(hostname: str) -> Dict[str, Any]: """Legacy compatibility - Get running processes from Linux host""" return ssh_get_running_processes(hostname) @mcp.tool() def get_disk_usage(hostname: str) -> Dict[str, Any]: """Legacy compatibility - Get disk usage information from Linux host""" return ssh_get_disk_usage(hostname) @mcp.tool() def get_services(hostname: str) -> Dict[str, Any]: """Legacy compatibility - Get systemd services status from Linux host""" return ssh_get_services(hostname) def main(): """Main entry point for the SSH MCP server.""" mcp.run() if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rorymcmahon/ssh-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server