Skip to main content
Glama

Frida MCP

by rmorgans
device_selection.py17.4 kB
"""Utilities for resolving Frida devices with flexible configuration.""" from __future__ import annotations import logging import os from dataclasses import dataclass, field from typing import Any, Dict, List, Optional try: # pragma: no cover - allows tests to supply a stub frida module import frida # type: ignore except ImportError: # pragma: no cover frida = None # type: ignore logger = logging.getLogger(__name__) _DEFAULT_FALLBACK = ("local", "usb") def _normalize_remote_identifier(identifier: str) -> str: value = identifier.strip() if value.startswith("tcp@"): # Frida remote IDs are usually tcp@host:port value = value.split("@", 1)[1] return value @dataclass class DeviceSelectionConfig: """Configuration values controlling device resolution.""" default_device: Optional[str] = None default_remote: Optional[str] = None fallback_priority: List[str] = field(default_factory=list) alias_to_address: Dict[str, str] = field(default_factory=dict) address_to_alias: Dict[str, str] = field(default_factory=dict) @classmethod def load(cls) -> "DeviceSelectionConfig": config = cls() _apply_environment_overrides(config) return config def register_remote_alias(self, alias: str, address: str) -> None: alias_name = alias.strip() if not alias_name: return address_value = _normalize_remote_identifier(address) if not address_value: return alias_key = alias_name.lower() self.alias_to_address[alias_key] = address_value self.address_to_alias[address_value] = alias_name def fallback_order(self) -> List[str]: order: List[str] = [] for candidate in self.fallback_priority or []: if not isinstance(candidate, str): continue normalized = candidate.strip().lower() if not normalized or normalized == "auto": continue if normalized not in order: order.append(normalized) if not order: order.extend(_DEFAULT_FALLBACK) if self.default_remote and "remote" not in order: order.append("remote") # Deduplicate while preserving order deduped: List[str] = [] for candidate in order: if candidate not in deduped: deduped.append(candidate) return deduped def alias_for_address(self, address: str) -> Optional[str]: normalized = _normalize_remote_identifier(address) return self.address_to_alias.get(normalized) def _apply_environment_overrides(config: DeviceSelectionConfig) -> None: env_default_device = os.environ.get("FRIDA_DEFAULT_DEVICE") if env_default_device: config.default_device = env_default_device.strip() env_default_remote = os.environ.get("FRIDA_DEFAULT_REMOTE") if env_default_remote: config.default_remote = _normalize_remote_identifier(env_default_remote) env_fallbacks = os.environ.get("FRIDA_DEVICE_FALLBACKS") if env_fallbacks: config.fallback_priority = [ entry.strip() for entry in env_fallbacks.split(",") if entry.strip() ] prefix = "FRIDA_REMOTE_DEVICE_" for env_key, env_value in os.environ.items(): if env_key.startswith(prefix) and env_value: alias = env_key[len(prefix) :].replace("__", "-").replace("_", " ") config.register_remote_alias(alias, env_value) class DeviceSelectionError(RuntimeError): """Raised when device resolution fails.""" def __init__(self, message: str, reasons: Optional[List[str]] = None): super().__init__(message) self.reasons = reasons or [] def create_config( *, mode: Optional[str] = None, remote_address: Optional[str] = None, remote_alias: Optional[str] = None, fallback_priority: Optional[List[str]] = None, ) -> DeviceSelectionConfig: """Construct a configuration with CLI-style inputs applied over env defaults.""" config = DeviceSelectionConfig.load() if fallback_priority is not None: config.fallback_priority = [ entry.strip() for entry in fallback_priority if isinstance(entry, str) and entry.strip() ] normalized_mode = (mode or "").strip().lower() if normalized_mode in {"local", "usb", "remote"}: config.default_device = normalized_mode if normalized_mode != "remote": config.default_remote = None normalized_remote = ( _normalize_remote_identifier(remote_address) if isinstance(remote_address, str) else "" ) if normalized_remote: config.default_remote = normalized_remote alias_value = ( remote_alias.strip() if isinstance(remote_alias, str) and remote_alias.strip() else normalized_remote ) config.register_remote_alias(alias_value, normalized_remote) if not normalized_mode: config.default_device = "remote" return config class DeviceSelector: """Resolve Frida devices using configuration, fallbacks, and aliases.""" def __init__( self, config: Optional[DeviceSelectionConfig] = None, *, frida_module: Optional[object] = None, device_manager: Optional[object] = None, ): self._config = config or DeviceSelectionConfig.load() self._frida = frida_module or frida if self._frida is None: raise RuntimeError( "Frida module is not available. Provide a frida_module override or install frida." ) self._device_manager = device_manager or self._frida.get_device_manager() self._remote_cache: Dict[str, Any] = {} logger.debug( "DeviceSelector initialised (default_device=%s, default_remote=%s, fallbacks=%s, aliases=%s)", self._config.default_device, self._config.default_remote, self._config.fallback_order(), self._config.alias_to_address, ) def reload(self) -> None: """Reload configuration from disk/environment.""" self._config = DeviceSelectionConfig.load() self._remote_cache.clear() def register_remote( self, address: str, alias: Optional[str] = None, *, set_default: bool = False ) -> Any: """Register and connect to a remote device, optionally updating defaults.""" normalized = _normalize_remote_identifier(address) label = ( alias.strip() if isinstance(alias, str) and alias.strip() else normalized ) self._config.register_remote_alias(label, normalized) if set_default: self._config.default_remote = normalized # Ensure remote appears in fallback ordering if "remote" not in (self._config.fallback_priority or []): self._config.fallback_priority.append("remote") device = self._get_remote_device(normalized) return device def get_device(self, device_id: Optional[str] = None) -> Any: identifier = (device_id or "").strip() if identifier: return self._get_by_identifier(identifier) default_choice = (self._config.default_device or "").strip() if default_choice and default_choice.lower() not in {"auto", "smart"}: return self._get_by_identifier(default_choice) return self._auto_select() def describe_devices(self) -> List[Dict[str, str]]: devices = self._frida.enumerate_devices() descriptions: List[Dict[str, str]] = [] default_choice = ( self._config.default_device or "auto" ).strip().lower() or "auto" fallback_order = self._config.fallback_order() for device in devices: entry = { "id": device.id, "name": device.name, "type": device.type, } entry["hint"] = self._usage_hint(device) entry["default_candidate"] = self._is_default_candidate( device, default_choice, fallback_order ) alias = self._config.address_to_alias.get( _normalize_remote_identifier(device.id) ) if alias: entry["alias"] = alias descriptions.append(entry) return descriptions # Internal helpers ------------------------------------------------- def _get_by_identifier(self, identifier: str, *, allow_auto: bool = True) -> Any: cleaned = identifier.strip() lowered = cleaned.lower() if allow_auto and lowered in {"", "auto", "smart"}: return self._auto_select() if lowered == "usb": return self._get_usb_device() if lowered == "local": return self._get_local_device() if lowered == "remote": return self._get_default_remote_device() if lowered.startswith("remote:"): address = cleaned.split(":", 1)[1] return self._get_remote_device(address) address = self._config.alias_to_address.get(lowered) if address: return self._get_remote_device(address) if cleaned.startswith("tcp@"): return self._get_by_exact_id(cleaned) if ":" in cleaned and "@" not in cleaned and cleaned.count(":") == 1: # Treat host:port as remote address return self._get_remote_device(cleaned) return self._get_by_exact_id(cleaned) def _get_by_exact_id(self, device_id: str) -> Any: try: return self._frida.get_device(device_id) except self._frida.InvalidArgumentError as exc: # type: ignore[attr-defined] raise DeviceSelectionError(f"Device '{device_id}' not found") from exc def _get_usb_device(self) -> Any: try: return self._frida.get_usb_device() except self._frida.InvalidArgumentError as exc: # type: ignore[attr-defined] raise DeviceSelectionError("No USB devices detected") from exc def _get_local_device(self) -> Any: try: return self._frida.get_local_device() except self._frida.InvalidArgumentError as exc: # type: ignore[attr-defined] raise DeviceSelectionError("Unable to access local device") from exc def _get_default_remote_device(self) -> Any: if self._config.default_remote: return self._get_remote_device(self._config.default_remote) device = self._find_first_remote_device() if device: return device raise DeviceSelectionError( "No remote devices available. Configure FRIDA_DEFAULT_REMOTE or add a remote address." ) def _get_remote_device(self, address: str) -> Any: normalized = _normalize_remote_identifier(address) if not normalized: raise DeviceSelectionError("Remote address is empty") if normalized in self._remote_cache: logger.debug("Using cached remote device for %s", normalized) return self._remote_cache[normalized] logger.debug("Attempting to register remote device at %s", normalized) try: device = self._device_manager.add_remote_device(normalized) except self._frida.InvalidArgumentError: # type: ignore[attr-defined] device = self._find_remote_by_address(normalized) if device is None: raise DeviceSelectionError( f"Remote device '{normalized}' is not available" ) except self._frida.TransportError as exc: # type: ignore[attr-defined] raise DeviceSelectionError( f"Unable to connect to remote device '{normalized}': {exc}" ) from exc else: logger.debug("Connected to remote device %s (%s)", device.id, device.type) self._config.address_to_alias.setdefault(normalized, normalized) self._remote_cache[normalized] = device return device def _find_remote_by_address(self, address: str) -> Optional[Any]: target_id = f"tcp@{address}" for device in self._frida.enumerate_devices(): if device.type != "remote": continue if ( device.id == target_id or _normalize_remote_identifier(device.id) == address ): return device return None def _find_first_remote_device(self) -> Optional[Any]: for device in self._frida.enumerate_devices(): if device.type == "remote": return device return None def _auto_select(self) -> Any: errors: List[str] = [] for candidate in self._config.fallback_order(): try: logger.debug("Attempting auto-select candidate '%s'", candidate) return self._get_by_identifier(candidate, allow_auto=False) except DeviceSelectionError as exc: logger.debug("Auto-select candidate '%s' failed: %s", candidate, exc) errors.append(f"{candidate}: {exc}") for device in self._frida.enumerate_devices(): try: logger.debug("Falling back to enumerated device '%s'", device.id) return self._get_by_exact_id(device.id) except DeviceSelectionError: continue raise DeviceSelectionError( "Unable to select a Frida device automatically", errors ) def _usage_hint(self, device: Any) -> str: device_type = (device.type or "").lower() if device_type == "local": return "device_id='local' or omit" if device_type == "usb": return "device_id='usb'" if device_type == "remote": address = _normalize_remote_identifier(device.id) alias = self._config.address_to_alias.get(address) if alias and alias != address: return f"device_id='{alias}' or '{address}'" if device.id.startswith("tcp@"): return f"device_id='{address}'" return f"device_id='{device.id}'" return f"device_id='{device.id}'" def _is_default_candidate( self, device: Any, default_choice: str, fallback_order: List[str] ) -> bool: if default_choice in {"", "auto", "smart"}: # Auto mode: consider fallbacks; mark as True if matches first available in fallback list if not fallback_order: return False primary = fallback_order[0] if primary == "usb" and device.type == "usb": return True if primary == "local" and device.type == "local": return True if primary == "remote" and device.type == "remote": return True return False # Direct match: check by identifier or alias lowered = default_choice.lower() if lowered == "usb": return device.type == "usb" if lowered == "local": return device.type == "local" if lowered == "remote": return device.type == "remote" if lowered in self._config.alias_to_address: return ( _normalize_remote_identifier(device.id) == self._config.alias_to_address[lowered] ) return device.id == default_choice _selector: Optional[DeviceSelector] = None def initialize_selector( config: Optional[DeviceSelectionConfig] = None, *, frida_module: Optional[Any] = None, device_manager: Optional[Any] = None, ) -> None: """Initialise the module-level selector with an optional configuration.""" global _selector _selector = DeviceSelector( config=config, frida_module=frida_module, device_manager=device_manager ) def resolve_device(device_id: Optional[str] = None) -> Any: global _selector if _selector is None: _selector = DeviceSelector() try: return _selector.get_device(device_id) except DeviceSelectionError: # re-raise so callers can handle uniformly raise def describe_devices() -> List[Dict[str, str]]: global _selector if _selector is None: _selector = DeviceSelector() return _selector.describe_devices() def reload_configuration() -> None: global _selector if _selector is None: _selector = DeviceSelector() else: _selector.reload() def register_remote_device( address: str, *, alias: Optional[str] = None, set_default: bool = False, ) -> Dict[str, str]: """Register a remote device using the active selector.""" global _selector if _selector is None: _selector = DeviceSelector() device = _selector.register_remote(address, alias=alias, set_default=set_default) normalized = _normalize_remote_identifier(address) resolved_alias = _selector._config.alias_for_address(normalized) return { "id": device.id, "name": device.name, "type": device.type, "address": normalized, "alias": resolved_alias or normalized, "default_remote": _selector._config.default_remote or "", }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rmorgans/frida-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server