server.py•15.6 kB
#!/usr/bin/env python3
"""
Kali Linux MCP Server - Defensive Security Toolkit
Provides secure access to Kali Linux security tools through MCP protocol
"""
import asyncio
import json
import logging
import os
import re
import subprocess
import sys
from pathlib import Path
from typing import Dict, List, Optional, Any
from pydantic import BaseModel, Field
from fastmcp import FastMCP
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize MCP server
mcp = FastMCP("Kali MCP Server")
# Input sanitization patterns
SAFE_IP_PATTERN = re.compile(r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(?:/(?:3[0-2]|[12]?[0-9]))?$')
SAFE_DOMAIN_PATTERN = re.compile(r'^[a-zA-Z0-9.-]+$')
SAFE_PORT_PATTERN = re.compile(r'^[0-9]{1,5}$')
SAFE_PATH_PATTERN = re.compile(r'^(?!/\.\./|.*\.\./)[a-zA-Z0-9/_.-]+$')
SAFE_FILENAME_PATTERN = re.compile(r'^[a-zA-Z0-9_.-]+$')
class ToolConfig:
"""Configuration for security tools"""
def __init__(self):
self.max_execution_time = int(os.getenv('MCP_MAX_EXEC_TIME', '300')) # 5 minutes default
self.output_limit = int(os.getenv('MCP_OUTPUT_LIMIT', '10000')) # 10KB default
self.allowed_networks = os.getenv('MCP_ALLOWED_NETWORKS', '').split(',')
self.tools_path = Path(os.getenv('MCP_TOOLS_PATH', '/usr/bin'))
config = ToolConfig()
def sanitize_input(value: str, pattern: re.Pattern) -> str:
"""Sanitize input against pattern"""
if not isinstance(value, str):
raise ValueError("Input must be a string")
if not pattern.match(value):
raise ValueError(f"Invalid input format: {value}")
return value
def validate_network_target(target: str) -> str:
"""Validate network target is allowed"""
if config.allowed_networks and config.allowed_networks[0]:
for allowed in config.allowed_networks:
if target.startswith(allowed.strip()):
return target
raise ValueError(f"Target {target} not in allowed networks")
return target
async def run_tool(cmd: List[str], timeout: int = None) -> Dict[str, Any]:
"""Execute tool command safely"""
if timeout is None:
timeout = config.max_execution_time
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
limit=config.output_limit
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout
)
return {
"returncode": process.returncode,
"stdout": stdout.decode('utf-8', errors='ignore'),
"stderr": stderr.decode('utf-8', errors='ignore'),
"command": " ".join(cmd)
}
except asyncio.TimeoutError:
try:
process.terminate()
await process.wait()
except:
pass
return {
"returncode": -1,
"stdout": "",
"stderr": f"Command timed out after {timeout} seconds",
"command": " ".join(cmd)
}
except Exception as e:
return {
"returncode": -1,
"stdout": "",
"stderr": str(e),
"command": " ".join(cmd)
}
# Network Scanning Tools
@mcp.tool()
async def nmap_scan(
target: str,
scan_type: str = "basic",
ports: str = "top-1000"
) -> str:
"""
Perform network scan using nmap
Args:
target: IP address or hostname to scan
scan_type: Type of scan (basic, stealth, service, vuln)
ports: Port specification (top-1000, 1-65535, or specific ports)
"""
target = sanitize_input(target, SAFE_IP_PATTERN if '.' in target else SAFE_DOMAIN_PATTERN)
target = validate_network_target(target)
cmd = ["nmap"]
# Scan type options
if scan_type == "stealth":
cmd.extend(["-sS", "-T4"])
elif scan_type == "service":
cmd.extend(["-sV", "-T4"])
elif scan_type == "vuln":
cmd.extend(["--script", "vuln", "-T4"])
else: # basic
cmd.extend(["-T4"])
# Port specification
if ports == "top-1000":
cmd.append("--top-ports=1000")
elif ports == "1-65535":
cmd.extend(["-p", "1-65535"])
else:
# Validate port specification
if re.match(r'^[0-9,-]+$', ports):
cmd.extend(["-p", ports])
cmd.append(target)
result = await run_tool(cmd)
return f"Nmap scan results:\n{result['stdout']}\n{result['stderr']}"
@mcp.tool()
async def gobuster_dir(
target: str,
wordlist: str = "/usr/share/wordlists/dirb/common.txt",
extensions: str = "php,html,txt",
threads: int = 10
) -> str:
"""
Directory/file enumeration using gobuster
Args:
target: Target URL (e.g., http://example.com)
wordlist: Path to wordlist file
extensions: File extensions to search for
threads: Number of concurrent threads
"""
if not target.startswith(('http://', 'https://')):
raise ValueError("Target must be a valid URL with http:// or https://")
wordlist = sanitize_input(wordlist, SAFE_PATH_PATTERN)
extensions = re.sub(r'[^a-zA-Z0-9,]', '', extensions)
cmd = [
"gobuster", "dir",
"-u", target,
"-w", wordlist,
"-x", extensions,
"-t", str(max(1, min(50, threads)))
]
result = await run_tool(cmd)
return f"Gobuster directory scan results:\n{result['stdout']}\n{result['stderr']}"
# Web Application Testing Tools
@mcp.tool()
async def wpscan_scan(
target: str,
enumerate: str = "p,t,u",
api_token: Optional[str] = None
) -> str:
"""
WordPress security scan using WPScan
Args:
target: WordPress site URL
enumerate: What to enumerate (p=plugins, t=themes, u=users)
api_token: WPVulnDB API token for vulnerability data
"""
if not target.startswith(('http://', 'https://')):
raise ValueError("Target must be a valid URL")
cmd = ["wpscan", "--url", target]
if enumerate:
enumerate = re.sub(r'[^ptu,]', '', enumerate)
cmd.extend(["--enumerate", enumerate])
if api_token:
api_token = re.sub(r'[^a-zA-Z0-9]', '', api_token)
cmd.extend(["--api-token", api_token])
result = await run_tool(cmd, timeout=600) # WordPress scans can take longer
return f"WPScan results:\n{result['stdout']}\n{result['stderr']}"
@mcp.tool()
async def sqlmap_test(
target: str,
parameter: Optional[str] = None,
cookie: Optional[str] = None,
risk: int = 1,
level: int = 1
) -> str:
"""
SQL injection testing using sqlmap
Args:
target: Target URL
parameter: Specific parameter to test
cookie: Session cookie if needed
risk: Risk level (1-3)
level: Test level (1-5)
"""
if not target.startswith(('http://', 'https://')):
raise ValueError("Target must be a valid URL")
cmd = ["sqlmap", "-u", target, "--batch"]
if parameter:
parameter = re.sub(r'[^a-zA-Z0-9_]', '', parameter)
cmd.extend(["-p", parameter])
if cookie:
cookie = re.sub(r'[^a-zA-Z0-9=;_ -]', '', cookie)
cmd.extend(["--cookie", cookie])
risk = max(1, min(3, risk))
level = max(1, min(5, level))
cmd.extend(["--risk", str(risk), "--level", str(level)])
result = await run_tool(cmd, timeout=900) # SQL injection tests can be slow
return f"SQLMap results:\n{result['stdout']}\n{result['stderr']}"
@mcp.tool()
async def dirb_scan(
target: str,
wordlist: str = "/usr/share/wordlists/dirb/common.txt",
extensions: Optional[str] = None
) -> str:
"""
Web content scanner using DIRB
Args:
target: Target URL
wordlist: Path to wordlist
extensions: File extensions to append
"""
if not target.startswith(('http://', 'https://')):
raise ValueError("Target must be a valid URL")
wordlist = sanitize_input(wordlist, SAFE_PATH_PATTERN)
cmd = ["dirb", target, wordlist]
if extensions:
extensions = re.sub(r'[^a-zA-Z0-9,.]', '', extensions)
cmd.extend(["-X", extensions])
result = await run_tool(cmd)
return f"DIRB scan results:\n{result['stdout']}\n{result['stderr']}"
# Enumeration Tools
@mcp.tool()
async def enum4linux_scan(
target: str,
scan_type: str = "basic"
) -> str:
"""
SMB/NetBIOS enumeration using enum4linux
Args:
target: Target IP address
scan_type: Type of enumeration (basic, users, shares, policies)
"""
target = sanitize_input(target, SAFE_IP_PATTERN)
target = validate_network_target(target)
cmd = ["enum4linux"]
if scan_type == "users":
cmd.append("-U")
elif scan_type == "shares":
cmd.append("-S")
elif scan_type == "policies":
cmd.append("-P")
else: # basic
cmd.append("-a")
cmd.append(target)
result = await run_tool(cmd)
return f"Enum4linux results:\n{result['stdout']}\n{result['stderr']}"
@mcp.tool()
async def searchsploit_query(
query: str,
exact: bool = False,
case_sensitive: bool = False
) -> str:
"""
Search for exploits using searchsploit
Args:
query: Search term for exploits
exact: Use exact match
case_sensitive: Case sensitive search
"""
# Sanitize query to prevent command injection
query = re.sub(r'[^a-zA-Z0-9\s\-_.]', '', query)
if not query.strip():
raise ValueError("Invalid search query")
cmd = ["searchsploit"]
if exact:
cmd.append("--exact")
if case_sensitive:
cmd.append("--case")
cmd.append(query)
result = await run_tool(cmd)
return f"SearchSploit results:\n{result['stdout']}\n{result['stderr']}"
# Credential Testing Tools
@mcp.tool()
async def crackmapexec_smb(
target: str,
username: Optional[str] = None,
password: Optional[str] = None,
password_list: Optional[str] = None,
hash_value: Optional[str] = None
) -> str:
"""
SMB credential testing using CrackMapExec
Args:
target: Target IP address or range
username: Username to test
password: Password to test
password_list: Path to password list
hash_value: NTLM hash for pass-the-hash
"""
target = sanitize_input(target, SAFE_IP_PATTERN)
target = validate_network_target(target)
cmd = ["crackmapexec", "smb", target]
if username:
username = re.sub(r'[^a-zA-Z0-9_@.-]', '', username)
cmd.extend(["-u", username])
if password:
password = re.sub(r'[^a-zA-Z0-9!@#$%^&*()_+={}|<>?-]', '', password)
cmd.extend(["-p", password])
elif password_list:
password_list = sanitize_input(password_list, SAFE_PATH_PATTERN)
cmd.extend(["-p", password_list])
elif hash_value:
hash_value = re.sub(r'[^a-fA-F0-9:]', '', hash_value)
cmd.extend(["-H", hash_value])
result = await run_tool(cmd)
return f"CrackMapExec SMB results:\n{result['stdout']}\n{result['stderr']}"
@mcp.tool()
async def john_crack(
hash_file: str,
wordlist: str = "/usr/share/wordlists/rockyou.txt",
format_type: Optional[str] = None
) -> str:
"""
Password cracking using John the Ripper
Args:
hash_file: Path to file containing hashes
wordlist: Path to wordlist file
format_type: Hash format (optional, auto-detect if not specified)
"""
hash_file = sanitize_input(hash_file, SAFE_PATH_PATTERN)
wordlist = sanitize_input(wordlist, SAFE_PATH_PATTERN)
cmd = ["john", "--wordlist=" + wordlist]
if format_type:
format_type = re.sub(r'[^a-zA-Z0-9-]', '', format_type)
cmd.extend(["--format=" + format_type])
cmd.append(hash_file)
result = await run_tool(cmd, timeout=1800) # 30 minutes for cracking
return f"John the Ripper results:\n{result['stdout']}\n{result['stderr']}"
@mcp.tool()
async def hashcat_crack(
hash_file: str,
wordlist: str,
attack_mode: int = 0,
hash_type: Optional[int] = None
) -> str:
"""
GPU-accelerated password cracking using Hashcat
Args:
hash_file: Path to file containing hashes
wordlist: Path to wordlist file
attack_mode: Attack mode (0=dictionary, 1=combinator, 3=mask)
hash_type: Hash type number (auto-detect if not specified)
"""
hash_file = sanitize_input(hash_file, SAFE_PATH_PATTERN)
wordlist = sanitize_input(wordlist, SAFE_PATH_PATTERN)
cmd = ["hashcat"]
if hash_type:
hash_type = max(0, min(99999, hash_type))
cmd.extend(["-m", str(hash_type)])
attack_mode = max(0, min(9, attack_mode))
cmd.extend(["-a", str(attack_mode)])
cmd.extend([hash_file, wordlist])
result = await run_tool(cmd, timeout=1800) # 30 minutes for cracking
return f"Hashcat results:\n{result['stdout']}\n{result['stderr']}"
# Utility Tools
@mcp.tool()
async def netcat_connect(
target: str,
port: int,
listen: bool = False,
udp: bool = False,
verbose: bool = True
) -> str:
"""
Network utility using netcat
Args:
target: Target hostname or IP
port: Port number
listen: Listen mode
udp: Use UDP instead of TCP
verbose: Verbose output
"""
if not listen:
target = sanitize_input(target, SAFE_IP_PATTERN if '.' in target else SAFE_DOMAIN_PATTERN)
target = validate_network_target(target)
port = max(1, min(65535, port))
cmd = ["nc"]
if verbose:
cmd.append("-v")
if udp:
cmd.append("-u")
if listen:
cmd.extend(["-l", "-p", str(port)])
else:
cmd.extend([target, str(port)])
# Netcat connections should be short-lived for safety
result = await run_tool(cmd, timeout=30)
return f"Netcat results:\n{result['stdout']}\n{result['stderr']}"
@mcp.tool()
async def bloodhound_py(
target: str,
username: str,
password: str,
domain: str,
output_dir: str = "/tmp/bloodhound"
) -> str:
"""
Active Directory enumeration using bloodhound-python
Args:
target: Domain controller IP
username: Domain username
password: Domain password
domain: Domain name
output_dir: Output directory for results
"""
target = sanitize_input(target, SAFE_IP_PATTERN)
target = validate_network_target(target)
username = re.sub(r'[^a-zA-Z0-9_@.-]', '', username)
password = re.sub(r'[^a-zA-Z0-9!@#$%^&*()_+={}|<>?-]', '', password)
domain = sanitize_input(domain, SAFE_DOMAIN_PATTERN)
output_dir = sanitize_input(output_dir, SAFE_PATH_PATTERN)
cmd = [
"bloodhound-python",
"-d", domain,
"-u", username,
"-p", password,
"-ns", target,
"-c", "All",
"--outputdir", output_dir
]
result = await run_tool(cmd, timeout=600)
return f"BloodHound Python results:\n{result['stdout']}\n{result['stderr']}"
# Main function
def main():
"""Main entry point"""
# Get configuration from environment
host = os.getenv('MCP_HOST', '127.0.0.1')
port = int(os.getenv('MCP_PORT', '8000'))
logger.info(f"Starting Kali MCP Server on {host}:{port}")
logger.info("Available tools: nmap_scan, gobuster_dir, wpscan_scan, sqlmap_test, dirb_scan")
# Use STDIO transport for Claude Desktop compatibility
mcp.run(transport="stdio")
if __name__ == "__main__":
main()