Skip to main content
Glama
server.py15.6 kB
#!/usr/bin/env python3 """ Kali Linux MCP Server - Defensive Security Toolkit Provides secure access to Kali Linux security tools through MCP protocol """ import asyncio import json import logging import os import re import subprocess import sys from pathlib import Path from typing import Dict, List, Optional, Any from pydantic import BaseModel, Field from fastmcp import FastMCP # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize MCP server mcp = FastMCP("Kali MCP Server") # Input sanitization patterns SAFE_IP_PATTERN = re.compile(r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(?:/(?:3[0-2]|[12]?[0-9]))?$') SAFE_DOMAIN_PATTERN = re.compile(r'^[a-zA-Z0-9.-]+$') SAFE_PORT_PATTERN = re.compile(r'^[0-9]{1,5}$') SAFE_PATH_PATTERN = re.compile(r'^(?!/\.\./|.*\.\./)[a-zA-Z0-9/_.-]+$') SAFE_FILENAME_PATTERN = re.compile(r'^[a-zA-Z0-9_.-]+$') class ToolConfig: """Configuration for security tools""" def __init__(self): self.max_execution_time = int(os.getenv('MCP_MAX_EXEC_TIME', '300')) # 5 minutes default self.output_limit = int(os.getenv('MCP_OUTPUT_LIMIT', '10000')) # 10KB default self.allowed_networks = os.getenv('MCP_ALLOWED_NETWORKS', '').split(',') self.tools_path = Path(os.getenv('MCP_TOOLS_PATH', '/usr/bin')) config = ToolConfig() def sanitize_input(value: str, pattern: re.Pattern) -> str: """Sanitize input against pattern""" if not isinstance(value, str): raise ValueError("Input must be a string") if not pattern.match(value): raise ValueError(f"Invalid input format: {value}") return value def validate_network_target(target: str) -> str: """Validate network target is allowed""" if config.allowed_networks and config.allowed_networks[0]: for allowed in config.allowed_networks: if target.startswith(allowed.strip()): return target raise ValueError(f"Target {target} not in allowed networks") return target async def run_tool(cmd: List[str], timeout: int = None) -> Dict[str, Any]: """Execute tool command safely""" if timeout is None: timeout = config.max_execution_time try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, limit=config.output_limit ) stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=timeout ) return { "returncode": process.returncode, "stdout": stdout.decode('utf-8', errors='ignore'), "stderr": stderr.decode('utf-8', errors='ignore'), "command": " ".join(cmd) } except asyncio.TimeoutError: try: process.terminate() await process.wait() except: pass return { "returncode": -1, "stdout": "", "stderr": f"Command timed out after {timeout} seconds", "command": " ".join(cmd) } except Exception as e: return { "returncode": -1, "stdout": "", "stderr": str(e), "command": " ".join(cmd) } # Network Scanning Tools @mcp.tool() async def nmap_scan( target: str, scan_type: str = "basic", ports: str = "top-1000" ) -> str: """ Perform network scan using nmap Args: target: IP address or hostname to scan scan_type: Type of scan (basic, stealth, service, vuln) ports: Port specification (top-1000, 1-65535, or specific ports) """ target = sanitize_input(target, SAFE_IP_PATTERN if '.' in target else SAFE_DOMAIN_PATTERN) target = validate_network_target(target) cmd = ["nmap"] # Scan type options if scan_type == "stealth": cmd.extend(["-sS", "-T4"]) elif scan_type == "service": cmd.extend(["-sV", "-T4"]) elif scan_type == "vuln": cmd.extend(["--script", "vuln", "-T4"]) else: # basic cmd.extend(["-T4"]) # Port specification if ports == "top-1000": cmd.append("--top-ports=1000") elif ports == "1-65535": cmd.extend(["-p", "1-65535"]) else: # Validate port specification if re.match(r'^[0-9,-]+$', ports): cmd.extend(["-p", ports]) cmd.append(target) result = await run_tool(cmd) return f"Nmap scan results:\n{result['stdout']}\n{result['stderr']}" @mcp.tool() async def gobuster_dir( target: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt", extensions: str = "php,html,txt", threads: int = 10 ) -> str: """ Directory/file enumeration using gobuster Args: target: Target URL (e.g., http://example.com) wordlist: Path to wordlist file extensions: File extensions to search for threads: Number of concurrent threads """ if not target.startswith(('http://', 'https://')): raise ValueError("Target must be a valid URL with http:// or https://") wordlist = sanitize_input(wordlist, SAFE_PATH_PATTERN) extensions = re.sub(r'[^a-zA-Z0-9,]', '', extensions) cmd = [ "gobuster", "dir", "-u", target, "-w", wordlist, "-x", extensions, "-t", str(max(1, min(50, threads))) ] result = await run_tool(cmd) return f"Gobuster directory scan results:\n{result['stdout']}\n{result['stderr']}" # Web Application Testing Tools @mcp.tool() async def wpscan_scan( target: str, enumerate: str = "p,t,u", api_token: Optional[str] = None ) -> str: """ WordPress security scan using WPScan Args: target: WordPress site URL enumerate: What to enumerate (p=plugins, t=themes, u=users) api_token: WPVulnDB API token for vulnerability data """ if not target.startswith(('http://', 'https://')): raise ValueError("Target must be a valid URL") cmd = ["wpscan", "--url", target] if enumerate: enumerate = re.sub(r'[^ptu,]', '', enumerate) cmd.extend(["--enumerate", enumerate]) if api_token: api_token = re.sub(r'[^a-zA-Z0-9]', '', api_token) cmd.extend(["--api-token", api_token]) result = await run_tool(cmd, timeout=600) # WordPress scans can take longer return f"WPScan results:\n{result['stdout']}\n{result['stderr']}" @mcp.tool() async def sqlmap_test( target: str, parameter: Optional[str] = None, cookie: Optional[str] = None, risk: int = 1, level: int = 1 ) -> str: """ SQL injection testing using sqlmap Args: target: Target URL parameter: Specific parameter to test cookie: Session cookie if needed risk: Risk level (1-3) level: Test level (1-5) """ if not target.startswith(('http://', 'https://')): raise ValueError("Target must be a valid URL") cmd = ["sqlmap", "-u", target, "--batch"] if parameter: parameter = re.sub(r'[^a-zA-Z0-9_]', '', parameter) cmd.extend(["-p", parameter]) if cookie: cookie = re.sub(r'[^a-zA-Z0-9=;_ -]', '', cookie) cmd.extend(["--cookie", cookie]) risk = max(1, min(3, risk)) level = max(1, min(5, level)) cmd.extend(["--risk", str(risk), "--level", str(level)]) result = await run_tool(cmd, timeout=900) # SQL injection tests can be slow return f"SQLMap results:\n{result['stdout']}\n{result['stderr']}" @mcp.tool() async def dirb_scan( target: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt", extensions: Optional[str] = None ) -> str: """ Web content scanner using DIRB Args: target: Target URL wordlist: Path to wordlist extensions: File extensions to append """ if not target.startswith(('http://', 'https://')): raise ValueError("Target must be a valid URL") wordlist = sanitize_input(wordlist, SAFE_PATH_PATTERN) cmd = ["dirb", target, wordlist] if extensions: extensions = re.sub(r'[^a-zA-Z0-9,.]', '', extensions) cmd.extend(["-X", extensions]) result = await run_tool(cmd) return f"DIRB scan results:\n{result['stdout']}\n{result['stderr']}" # Enumeration Tools @mcp.tool() async def enum4linux_scan( target: str, scan_type: str = "basic" ) -> str: """ SMB/NetBIOS enumeration using enum4linux Args: target: Target IP address scan_type: Type of enumeration (basic, users, shares, policies) """ target = sanitize_input(target, SAFE_IP_PATTERN) target = validate_network_target(target) cmd = ["enum4linux"] if scan_type == "users": cmd.append("-U") elif scan_type == "shares": cmd.append("-S") elif scan_type == "policies": cmd.append("-P") else: # basic cmd.append("-a") cmd.append(target) result = await run_tool(cmd) return f"Enum4linux results:\n{result['stdout']}\n{result['stderr']}" @mcp.tool() async def searchsploit_query( query: str, exact: bool = False, case_sensitive: bool = False ) -> str: """ Search for exploits using searchsploit Args: query: Search term for exploits exact: Use exact match case_sensitive: Case sensitive search """ # Sanitize query to prevent command injection query = re.sub(r'[^a-zA-Z0-9\s\-_.]', '', query) if not query.strip(): raise ValueError("Invalid search query") cmd = ["searchsploit"] if exact: cmd.append("--exact") if case_sensitive: cmd.append("--case") cmd.append(query) result = await run_tool(cmd) return f"SearchSploit results:\n{result['stdout']}\n{result['stderr']}" # Credential Testing Tools @mcp.tool() async def crackmapexec_smb( target: str, username: Optional[str] = None, password: Optional[str] = None, password_list: Optional[str] = None, hash_value: Optional[str] = None ) -> str: """ SMB credential testing using CrackMapExec Args: target: Target IP address or range username: Username to test password: Password to test password_list: Path to password list hash_value: NTLM hash for pass-the-hash """ target = sanitize_input(target, SAFE_IP_PATTERN) target = validate_network_target(target) cmd = ["crackmapexec", "smb", target] if username: username = re.sub(r'[^a-zA-Z0-9_@.-]', '', username) cmd.extend(["-u", username]) if password: password = re.sub(r'[^a-zA-Z0-9!@#$%^&*()_+={}|<>?-]', '', password) cmd.extend(["-p", password]) elif password_list: password_list = sanitize_input(password_list, SAFE_PATH_PATTERN) cmd.extend(["-p", password_list]) elif hash_value: hash_value = re.sub(r'[^a-fA-F0-9:]', '', hash_value) cmd.extend(["-H", hash_value]) result = await run_tool(cmd) return f"CrackMapExec SMB results:\n{result['stdout']}\n{result['stderr']}" @mcp.tool() async def john_crack( hash_file: str, wordlist: str = "/usr/share/wordlists/rockyou.txt", format_type: Optional[str] = None ) -> str: """ Password cracking using John the Ripper Args: hash_file: Path to file containing hashes wordlist: Path to wordlist file format_type: Hash format (optional, auto-detect if not specified) """ hash_file = sanitize_input(hash_file, SAFE_PATH_PATTERN) wordlist = sanitize_input(wordlist, SAFE_PATH_PATTERN) cmd = ["john", "--wordlist=" + wordlist] if format_type: format_type = re.sub(r'[^a-zA-Z0-9-]', '', format_type) cmd.extend(["--format=" + format_type]) cmd.append(hash_file) result = await run_tool(cmd, timeout=1800) # 30 minutes for cracking return f"John the Ripper results:\n{result['stdout']}\n{result['stderr']}" @mcp.tool() async def hashcat_crack( hash_file: str, wordlist: str, attack_mode: int = 0, hash_type: Optional[int] = None ) -> str: """ GPU-accelerated password cracking using Hashcat Args: hash_file: Path to file containing hashes wordlist: Path to wordlist file attack_mode: Attack mode (0=dictionary, 1=combinator, 3=mask) hash_type: Hash type number (auto-detect if not specified) """ hash_file = sanitize_input(hash_file, SAFE_PATH_PATTERN) wordlist = sanitize_input(wordlist, SAFE_PATH_PATTERN) cmd = ["hashcat"] if hash_type: hash_type = max(0, min(99999, hash_type)) cmd.extend(["-m", str(hash_type)]) attack_mode = max(0, min(9, attack_mode)) cmd.extend(["-a", str(attack_mode)]) cmd.extend([hash_file, wordlist]) result = await run_tool(cmd, timeout=1800) # 30 minutes for cracking return f"Hashcat results:\n{result['stdout']}\n{result['stderr']}" # Utility Tools @mcp.tool() async def netcat_connect( target: str, port: int, listen: bool = False, udp: bool = False, verbose: bool = True ) -> str: """ Network utility using netcat Args: target: Target hostname or IP port: Port number listen: Listen mode udp: Use UDP instead of TCP verbose: Verbose output """ if not listen: target = sanitize_input(target, SAFE_IP_PATTERN if '.' in target else SAFE_DOMAIN_PATTERN) target = validate_network_target(target) port = max(1, min(65535, port)) cmd = ["nc"] if verbose: cmd.append("-v") if udp: cmd.append("-u") if listen: cmd.extend(["-l", "-p", str(port)]) else: cmd.extend([target, str(port)]) # Netcat connections should be short-lived for safety result = await run_tool(cmd, timeout=30) return f"Netcat results:\n{result['stdout']}\n{result['stderr']}" @mcp.tool() async def bloodhound_py( target: str, username: str, password: str, domain: str, output_dir: str = "/tmp/bloodhound" ) -> str: """ Active Directory enumeration using bloodhound-python Args: target: Domain controller IP username: Domain username password: Domain password domain: Domain name output_dir: Output directory for results """ target = sanitize_input(target, SAFE_IP_PATTERN) target = validate_network_target(target) username = re.sub(r'[^a-zA-Z0-9_@.-]', '', username) password = re.sub(r'[^a-zA-Z0-9!@#$%^&*()_+={}|<>?-]', '', password) domain = sanitize_input(domain, SAFE_DOMAIN_PATTERN) output_dir = sanitize_input(output_dir, SAFE_PATH_PATTERN) cmd = [ "bloodhound-python", "-d", domain, "-u", username, "-p", password, "-ns", target, "-c", "All", "--outputdir", output_dir ] result = await run_tool(cmd, timeout=600) return f"BloodHound Python results:\n{result['stdout']}\n{result['stderr']}" # Main function def main(): """Main entry point""" # Get configuration from environment host = os.getenv('MCP_HOST', '127.0.0.1') port = int(os.getenv('MCP_PORT', '8000')) logger.info(f"Starting Kali MCP Server on {host}:{port}") logger.info("Available tools: nmap_scan, gobuster_dir, wpscan_scan, sqlmap_test, dirb_scan") # Use STDIO transport for Claude Desktop compatibility mcp.run(transport="stdio") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pellax/kaliMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server