Skip to main content
Glama

Frida MCP

by rmorgans
cli.py27 kB
#!/usr/bin/env python3 """ Command line entry point specifically for Claude Desktop integration. This script is designed to be the target of the command in claude_desktop_config.json. It sets up a basic Frida MCP server with STDIO transport for Claude to communicate with. """ import argparse import logging import threading import time from typing import Any, Dict, List, Optional import frida from mcp.server.fastmcp import FastMCP from pydantic import Field from .device_selection import ( DeviceSelectionError, create_config, describe_devices, initialize_selector, register_remote_device, resolve_device, ) VALID_DEVICE_MODES = {"remote", "usb", "local"} DEFAULT_REMOTE_PORT = 27042 def _validate_remote_address(value: str) -> str: raw_value = value.strip() if not raw_value: raise argparse.ArgumentTypeError("--remote-address requires a host") host, sep, port = raw_value.partition(":") if not host: raise argparse.ArgumentTypeError("--remote-address requires a host before ':'") if not sep: port = str(DEFAULT_REMOTE_PORT) elif not port: port = str(DEFAULT_REMOTE_PORT) elif not port.isdigit(): raise argparse.ArgumentTypeError("--remote-address port must be a number") port_value = int(port) if not (0 < port_value < 65536): raise argparse.ArgumentTypeError("--remote-address port must be 1-65535") return f"{host}:{port_value}" def _validate_fallback_order(value: str) -> List[str]: order = [segment.strip() for segment in value.split(",") if segment.strip()] if not order: raise argparse.ArgumentTypeError("--fallback-order must list at least one mode") invalid = [segment for segment in order if segment not in VALID_DEVICE_MODES] if invalid: allowed = ", ".join(sorted(VALID_DEVICE_MODES)) invalid_list = ", ".join(invalid) raise argparse.ArgumentTypeError( f"--fallback-order contains unsupported mode(s): {invalid_list}. " f"Allowed values: {allowed}" ) return order # Create the MCP server mcp = FastMCP("Frida") logger = logging.getLogger(__name__) # Global dictionary to store scripts and their messages # This allows us to retrieve messages from scripts after they've been created _scripts = {} _script_messages = {} _message_locks = {} global_persistent_scripts = {} # Added for managing persistent scripts def _parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Frida MCP server entry point", allow_abbrev=False, ) parser.add_argument( "--mode", choices=sorted(VALID_DEVICE_MODES), help=( "Preferred device mode for this session. Without this flag, the selector uses " "environment defaults or smart fallbacks." ), ) parser.add_argument( "--remote-address", type=_validate_remote_address, help=( "Remote device address in <host[:port]> form. Defaults to port 27042 when omitted." ), ) parser.add_argument( "--remote-alias", help="Optional alias that can be used as device_id for the remote device.", ) parser.add_argument( "--fallback-order", type=_validate_fallback_order, help=( "Comma separated list describing fallback priority (e.g. 'remote,usb,local')." ), ) parser.add_argument( "--log-level", default="INFO", choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"], help="Set logging verbosity (default: INFO). Use DEBUG for detailed diagnostics.", ) parser.add_argument( "--log-file", help="Optional path to write logs. Useful when stdio is reserved for MCP traffic.", ) return parser.parse_args(argv) def _setup_logging(level: str, log_file: Optional[str] = None) -> None: numeric_level = getattr(logging, level.upper(), None) if not isinstance(numeric_level, int): raise ValueError(f"Invalid log level: {level}") handlers: List[logging.Handler] = [logging.StreamHandler()] if log_file: handlers.append(logging.FileHandler(log_file)) logging.basicConfig( level=numeric_level, format="[%(asctime)s] %(levelname)s %(name)s: %(message)s", datefmt="%H:%M:%S", handlers=handlers, force=True, ) logger.debug("Logging configured at %s", level.upper()) if log_file: logger.debug("File logging enabled at %s", log_file) def _prepare_configuration(args: argparse.Namespace) -> None: logger.debug( "Preparing selector config (mode=%s, remote_address=%s, remote_alias=%s, fallback=%s)", args.mode, args.remote_address, args.remote_alias, args.fallback_order, ) config = create_config( mode=args.mode, remote_address=args.remote_address, remote_alias=args.remote_alias, fallback_priority=args.fallback_order, ) initialize_selector(config) logger.debug( "Selector initialised: default_device=%s default_remote=%s fallbacks=%s aliases=%s", getattr(config, "default_device", None), getattr(config, "default_remote", None), config.fallback_order(), config.alias_to_address, ) def _resolve_device_or_raise(device_id: Optional[str] = None) -> Any: """Resolve a Frida device using the smart selector and convert errors.""" try: logger.debug("Resolving device for id=%s", device_id or "<default>") device = resolve_device(device_id) logger.debug( "Resolved device id=%s -> %s (%s)", device_id or "<default>", getattr(device, "id", "<unknown>"), getattr(device, "type", "<unknown>"), ) return device except DeviceSelectionError as exc: if exc.reasons: attempts = "; ".join(exc.reasons) logger.error( "Device resolution failed for id=%s: %s (attempts: %s)", device_id or "<default>", exc, attempts, ) raise ValueError(f"{exc}. Attempts: {attempts}") from exc logger.error( "Device resolution failed for id=%s: %s", device_id or "<default>", exc, ) raise ValueError(str(exc)) from exc @mcp.tool() def enumerate_processes( device_id: Optional[str] = Field( default=None, description="Optional ID of the device to enumerate processes from. Uses smart selection when omitted.", ), ) -> List[Dict[str, Any]]: """List all processes running on the system. Returns: A list of process information dictionaries containing: - pid: Process ID - name: Process name """ device = _resolve_device_or_raise(device_id) processes = device.enumerate_processes() return [{"pid": process.pid, "name": process.name} for process in processes] @mcp.tool() def enumerate_devices() -> List[Dict[str, Any]]: """List all devices connected to the system. Returns: A list of device information dictionaries containing: - id: Device ID - name: Device name - type: Device type - hint: How to reference the device via device_id - alias: Configured alias for remote devices (if any) - default_candidate: Whether the device is the current default choice """ return describe_devices() @mcp.tool() def configure_remote_device( address: str = Field(description="The remote <host>:<port> to add."), alias: Optional[str] = Field( default=None, description="Optional alias used as device_id when targeting this remote.", ), set_as_default: bool = Field( default=False, description="If true, future requests without device_id will prefer this remote.", ), ) -> Dict[str, Any]: """Connect to a remote Frida server and make it available for future requests.""" try: info = register_remote_device(address, alias=alias, set_default=set_as_default) except DeviceSelectionError as exc: if exc.reasons: attempts = "; ".join(exc.reasons) raise ValueError(f"{exc}. Attempts: {attempts}") from exc raise ValueError(str(exc)) from exc response: Dict[str, Any] = {"status": "success"} response.update(info) if set_as_default: response["message"] = "Remote device set as default" return response @mcp.tool() def get_device( device_id: str = Field(description="The ID of the device to get"), ) -> Dict[str, Any]: """Get a device by its ID. Returns: Information about the device """ device = _resolve_device_or_raise(device_id) return { "id": device.id, "name": device.name, "type": device.type, } @mcp.tool() def get_usb_device() -> Dict[str, Any]: """Get the USB device connected to the system. Returns: Information about the USB device """ try: device = frida.get_usb_device() return { "id": device.id, "name": device.name, "type": device.type, } except frida.InvalidArgumentError: raise ValueError("No USB device found") @mcp.tool() def get_local_device() -> Dict[str, Any]: """Get the local device. Returns: Information about the local device """ try: device = frida.get_local_device() return { "id": device.id, "name": device.name, "type": device.type, } except frida.InvalidArgumentError: # Or other relevant Frida exceptions raise ValueError("No local device found or error accessing it.") @mcp.tool() def get_process_by_name( name: str = Field( description="The name (or part of the name) of the process to find. Case-insensitive." ), device_id: Optional[str] = Field( default=None, description="Optional ID of the device to search the process on. Uses smart selection when omitted.", ), ) -> dict: """Find a process by name.""" device = _resolve_device_or_raise(device_id) for proc in device.enumerate_processes(): if name.lower() in proc.name.lower(): return {"pid": proc.pid, "name": proc.name, "found": True} return {"found": False, "error": f"Process '{name}' not found"} @mcp.tool() def attach_to_process( pid: int = Field(description="The ID of the process to attach to."), device_id: Optional[str] = Field( default=None, description="Optional ID of the device where the process is running. Uses smart selection when omitted.", ), ) -> dict: """Attach to a process by ID.""" try: device = _resolve_device_or_raise(device_id) device.attach(pid) return { "pid": pid, "success": True, "is_detached": False, # New session is not detached } except Exception as e: return {"success": False, "error": str(e)} @mcp.tool() def spawn_process( program: str = Field(description="The program or application identifier to spawn."), args: Optional[List[str]] = Field( default=None, description="Optional list of arguments for the program." ), device_id: Optional[str] = Field( default=None, description="Optional ID of the device where the program should be spawned. Uses smart selection when omitted.", ), ) -> Dict[str, Any]: """Spawn a program. Returns: Information about the spawned process """ try: device = _resolve_device_or_raise(device_id) argv = None if args: argv = list(args) if not argv or argv[0] != program: argv.insert(0, program) pid = device.spawn(program, argv=argv) return {"pid": pid} except Exception as e: raise ValueError(f"Failed to spawn {program}: {str(e)}") @mcp.tool() def resume_process( pid: int = Field(description="The ID of the process to resume."), device_id: Optional[str] = Field( default=None, description="Optional ID of the device where the process is running. Uses smart selection when omitted.", ), ) -> Dict[str, Any]: """Resume a process by ID. Returns: Status information """ try: device = _resolve_device_or_raise(device_id) device.resume(pid) return {"success": True, "pid": pid} except Exception as e: raise ValueError(f"Failed to resume process {pid}: {str(e)}") @mcp.tool() def kill_process( pid: int = Field(description="The ID of the process to kill."), device_id: Optional[str] = Field( default=None, description="Optional ID of the device where the process is running. Uses smart selection when omitted.", ), ) -> Dict[str, Any]: """Kill a process by ID. Returns: Status information """ try: device = _resolve_device_or_raise(device_id) device.kill(pid) return {"success": True, "pid": pid} except Exception as e: raise ValueError(f"Failed to kill process {pid}: {str(e)}") @mcp.resource("frida://version") def get_version() -> str: """Get the Frida version.""" return frida.__version__ @mcp.resource("frida://processes") def get_processes_resource() -> str: """Get a list of processes from the currently selected default device as a readable string.""" device = _resolve_device_or_raise() processes = device.enumerate_processes() return "\n".join([f"PID: {p.pid}, Name: {p.name}" for p in processes]) @mcp.resource("frida://devices") def get_devices_resource() -> str: """Get a list of all devices as a readable string.""" devices = frida.enumerate_devices() return "\n".join([f"ID: {d.id}, Name: {d.name}, Type: {d.type}" for d in devices]) @mcp.tool() def create_interactive_session( process_id: int = Field( description="The ID of the process to attach to for creating an interactive session." ), device_id: Optional[str] = Field( default=None, description="Optional ID of the device where the process is running. Uses smart selection when omitted.", ), ) -> Dict[str, Any]: """Create an interactive REPL-like session with a process. This returns a session ID that can be used with execute_in_session to run commands. Returns: Information about the created session """ try: # Attach to process device = _resolve_device_or_raise(device_id) session = device.attach(process_id) # Generate a unique session ID session_id = f"session_{process_id}_{int(time.time())}" # Store the session _scripts[session_id] = session _script_messages[session_id] = [] _message_locks[session_id] = threading.Lock() return { "status": "success", "process_id": process_id, "session_id": session_id, "message": f"Interactive session created for process {process_id}. Use execute_in_session to run JavaScript commands.", } except Exception as e: return {"status": "error", "error": str(e)} @mcp.tool() def execute_in_session( session_id: str = Field( description="The unique identifier of the active Frida session. This ID is obtained when the session is first created." ), javascript_code: str = Field( description="A string containing the JavaScript code to be executed in the target process's context. The script can use Frida's JavaScript API (e.g., Interceptor, Memory, Module, rpc)." ), keep_alive: bool = Field( default=False, description="A boolean flag indicating whether the script should remain loaded in the target process after its initial execution. If False (default), the script is unloaded after initial run. If True, it persists for hooks/RPC and messages are retrieved via get_session_messages. Note: With keep_alive=True, JavaScript code should manage log volume (limits, deduplication) to prevent too many messages.", ), ) -> Dict[str, Any]: """Execute JavaScript code within an existing interactive Frida session. This tool allows for dynamic scripting against a process previously attached to via `create_interactive_session`. """ if session_id not in _scripts: raise ValueError(f"Session with ID {session_id} not found") session = _scripts[session_id] lock = _message_locks[session_id] try: # For interactive use, we need to handle console.log output # and properly format the result # Wrap the code to capture console.log output and return values # This basic wrapper sends back immediate script result/errors and console.log output # For keep_alive=True, subsequent messages from the script (e.g., from Interceptor) # will be handled by the persistent on_message handler. wrapped_code = f""" (function() {{ function formatValue(value) {{ if (value === null) {{ return 'null'; }} if (value === undefined) {{ return 'undefined'; }} var valueType = typeof value; if (valueType === 'string') {{ return value; }} if (valueType === 'number' || valueType === 'boolean' || valueType === 'bigint') {{ return String(value); }} if (valueType === 'symbol' || valueType === 'function') {{ try {{ return value.toString(); }} catch (e) {{ return '[unrepresentable value]'; }} }} if (valueType === 'object') {{ try {{ var cache = new Set(); var json = JSON.stringify(value, function(key, val) {{ if (typeof val === 'object' && val !== null) {{ if (cache.has(val)) {{ return '[Circular]'; }} cache.add(val); }} return val; }}); if (json !== undefined) {{ return json; }} }} catch (jsonErr) {{ // Fall through to more generic formatting below }} try {{ return Object.prototype.toString.call(value); }} catch (descriptorErr) {{ // Fall through to final String conversion }} }} try {{ return String(value); }} catch (stringErr) {{ return '[unrepresentable value]'; }} }} var initialLogs = []; var originalLog = console.log; console.log = function() {{ var args = Array.prototype.slice.call(arguments); var logMsg = args.map(formatValue).join(' '); initialLogs.push(logMsg); originalLog.apply(console, arguments); // Also keep original console behavior }}; var scriptResult; var scriptError; try {{ scriptResult = eval({javascript_code!r}); }} catch (e) {{ scriptError = {{ message: e.toString(), stack: e.stack }}; }} console.log = originalLog; // Restore send({{ // This send is for the initial execution result type: 'execution_receipt', result: scriptError ? undefined : formatValue(scriptResult), error: scriptError, initial_logs: initialLogs }}); }})(); """ script = session.create_script(wrapped_code) # This list captures messages from the initial execution of the script (the wrapper) initial_execution_results = [] def on_initial_message(message, data): # This handler is for the initial execution wrapper's send() if ( message["type"] == "send" and message["payload"]["type"] == "execution_receipt" ): initial_execution_results.append(message["payload"]) elif message["type"] == "error": # Script compilation/syntax errors initial_execution_results.append( {"script_error": message["description"], "details": message} ) # This handler is for persistent messages if keep_alive is true def on_persistent_message(message, data): with lock: _script_messages[session_id].append( { "type": message["type"], "payload": message.get("payload"), "data": data, } ) if keep_alive: # For keep_alive, messages go to the global queue _script_messages # The script object itself will handle these. script.on("message", on_persistent_message) # Store the script object if we need to interact with it later (e.g., specific unload) # For now, it's attached to the session and will be cleaned up when session is detached or process ends. if ( session_id not in global_persistent_scripts ): # Requires global_persistent_scripts dict global_persistent_scripts[session_id] = [] global_persistent_scripts[session_id].append(script) else: # For non-persistent scripts, use the local handler for immediate results script.on("message", on_initial_message) script.load() # For non-persistent scripts, give a short time for the initial_execution_results # For persistent scripts, this sleep is less critical as it's about setting up. if not keep_alive: time.sleep(0.2) # Slightly increased for safety # Process initial results (for both modes, but primarily for non-keep_alive) final_result = {} if initial_execution_results: # Use the first message from the execution_receipt receipt = initial_execution_results[0] if "script_error" in receipt: final_result = { "status": "error", "error": "Script execution error", "details": receipt["script_error"], } elif receipt.get("error"): final_result = { "status": "error", "error": receipt["error"]["message"], "stack": receipt["error"]["stack"], "initial_logs": receipt.get("initial_logs", []), } else: final_result = { "status": "success", "result": receipt["result"], "initial_logs": receipt.get("initial_logs", []), } elif keep_alive: final_result = { "status": "success", "message": "Script loaded persistently. Use get_session_messages to retrieve asynchronous messages.", "initial_logs": [], } else: # No messages received, could be an issue or just a silent script final_result = { "status": "nodata", # Or "success" if empty result is fine "message": "Script loaded but sent no initial messages.", "initial_logs": [], } if not keep_alive: script.unload() final_result["script_unloaded"] = True else: final_result["script_unloaded"] = False final_result["info"] = ( "Script is persistent. Remember to manage its lifecycle if necessary." ) return final_result except frida.InvalidOperationError as e: # E.g. session detached return { "status": "error", "error": f"Frida operation error: {str(e)} (Session may be detached)", } except Exception as e: return {"status": "error", "error": str(e)} @mcp.tool() def get_session_messages( session_id: str = Field( description="The ID of the session to retrieve messages from." ), ) -> Dict[str, Any]: """Retrieve and clear messages sent by persistent scripts in a session. Returns: A list of messages captured since the last call, or an error if the session is not found. """ if session_id not in _scripts: # Check if it was a session that had persistent scripts but might have been cleared or detached if ( session_id in global_persistent_scripts and not global_persistent_scripts[session_id] ): return { "status": "success", "messages": [], "info": "Session had persistent scripts that might be finished or detached.", } raise ValueError( f"Session with ID {session_id} not found or no persistent scripts active." ) if session_id not in _message_locks or session_id not in _script_messages: # This case should ideally not happen if session_id is in _scripts from create_interactive_session return { "status": "error", "error": f"Message queue or lock not found for session {session_id}.", } lock = _message_locks[session_id] with lock: messages = list(_script_messages[session_id]) # Make a copy _script_messages[session_id].clear() # Clear the queue return { "status": "success", "session_id": session_id, "messages_retrieved": len(messages), "messages": messages, } def main(argv: Optional[List[str]] = None) -> None: """Run the CLI entry point for Claude Desktop integration.""" args = _parse_args(argv) _setup_logging(args.log_level, args.log_file) mcp.settings.log_level = args.log_level logger.info("Starting Frida MCP server (mode=%s)", args.mode or "auto") _prepare_configuration(args) logger.debug("Running MCP server on stdio transport") mcp.run() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rmorgans/frida-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server