device_selection.py•17.4 kB
"""Utilities for resolving Frida devices with flexible configuration."""
from __future__ import annotations
import logging
import os
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
try: # pragma: no cover - allows tests to supply a stub frida module
import frida # type: ignore
except ImportError: # pragma: no cover
frida = None # type: ignore
logger = logging.getLogger(__name__)
_DEFAULT_FALLBACK = ("local", "usb")
def _normalize_remote_identifier(identifier: str) -> str:
value = identifier.strip()
if value.startswith("tcp@"): # Frida remote IDs are usually tcp@host:port
value = value.split("@", 1)[1]
return value
@dataclass
class DeviceSelectionConfig:
"""Configuration values controlling device resolution."""
default_device: Optional[str] = None
default_remote: Optional[str] = None
fallback_priority: List[str] = field(default_factory=list)
alias_to_address: Dict[str, str] = field(default_factory=dict)
address_to_alias: Dict[str, str] = field(default_factory=dict)
@classmethod
def load(cls) -> "DeviceSelectionConfig":
config = cls()
_apply_environment_overrides(config)
return config
def register_remote_alias(self, alias: str, address: str) -> None:
alias_name = alias.strip()
if not alias_name:
return
address_value = _normalize_remote_identifier(address)
if not address_value:
return
alias_key = alias_name.lower()
self.alias_to_address[alias_key] = address_value
self.address_to_alias[address_value] = alias_name
def fallback_order(self) -> List[str]:
order: List[str] = []
for candidate in self.fallback_priority or []:
if not isinstance(candidate, str):
continue
normalized = candidate.strip().lower()
if not normalized or normalized == "auto":
continue
if normalized not in order:
order.append(normalized)
if not order:
order.extend(_DEFAULT_FALLBACK)
if self.default_remote and "remote" not in order:
order.append("remote")
# Deduplicate while preserving order
deduped: List[str] = []
for candidate in order:
if candidate not in deduped:
deduped.append(candidate)
return deduped
def alias_for_address(self, address: str) -> Optional[str]:
normalized = _normalize_remote_identifier(address)
return self.address_to_alias.get(normalized)
def _apply_environment_overrides(config: DeviceSelectionConfig) -> None:
env_default_device = os.environ.get("FRIDA_DEFAULT_DEVICE")
if env_default_device:
config.default_device = env_default_device.strip()
env_default_remote = os.environ.get("FRIDA_DEFAULT_REMOTE")
if env_default_remote:
config.default_remote = _normalize_remote_identifier(env_default_remote)
env_fallbacks = os.environ.get("FRIDA_DEVICE_FALLBACKS")
if env_fallbacks:
config.fallback_priority = [
entry.strip() for entry in env_fallbacks.split(",") if entry.strip()
]
prefix = "FRIDA_REMOTE_DEVICE_"
for env_key, env_value in os.environ.items():
if env_key.startswith(prefix) and env_value:
alias = env_key[len(prefix) :].replace("__", "-").replace("_", " ")
config.register_remote_alias(alias, env_value)
class DeviceSelectionError(RuntimeError):
"""Raised when device resolution fails."""
def __init__(self, message: str, reasons: Optional[List[str]] = None):
super().__init__(message)
self.reasons = reasons or []
def create_config(
*,
mode: Optional[str] = None,
remote_address: Optional[str] = None,
remote_alias: Optional[str] = None,
fallback_priority: Optional[List[str]] = None,
) -> DeviceSelectionConfig:
"""Construct a configuration with CLI-style inputs applied over env defaults."""
config = DeviceSelectionConfig.load()
if fallback_priority is not None:
config.fallback_priority = [
entry.strip()
for entry in fallback_priority
if isinstance(entry, str) and entry.strip()
]
normalized_mode = (mode or "").strip().lower()
if normalized_mode in {"local", "usb", "remote"}:
config.default_device = normalized_mode
if normalized_mode != "remote":
config.default_remote = None
normalized_remote = (
_normalize_remote_identifier(remote_address)
if isinstance(remote_address, str)
else ""
)
if normalized_remote:
config.default_remote = normalized_remote
alias_value = (
remote_alias.strip()
if isinstance(remote_alias, str) and remote_alias.strip()
else normalized_remote
)
config.register_remote_alias(alias_value, normalized_remote)
if not normalized_mode:
config.default_device = "remote"
return config
class DeviceSelector:
"""Resolve Frida devices using configuration, fallbacks, and aliases."""
def __init__(
self,
config: Optional[DeviceSelectionConfig] = None,
*,
frida_module: Optional[object] = None,
device_manager: Optional[object] = None,
):
self._config = config or DeviceSelectionConfig.load()
self._frida = frida_module or frida
if self._frida is None:
raise RuntimeError(
"Frida module is not available. Provide a frida_module override or install frida."
)
self._device_manager = device_manager or self._frida.get_device_manager()
self._remote_cache: Dict[str, Any] = {}
logger.debug(
"DeviceSelector initialised (default_device=%s, default_remote=%s, fallbacks=%s, aliases=%s)",
self._config.default_device,
self._config.default_remote,
self._config.fallback_order(),
self._config.alias_to_address,
)
def reload(self) -> None:
"""Reload configuration from disk/environment."""
self._config = DeviceSelectionConfig.load()
self._remote_cache.clear()
def register_remote(
self, address: str, alias: Optional[str] = None, *, set_default: bool = False
) -> Any:
"""Register and connect to a remote device, optionally updating defaults."""
normalized = _normalize_remote_identifier(address)
label = (
alias.strip() if isinstance(alias, str) and alias.strip() else normalized
)
self._config.register_remote_alias(label, normalized)
if set_default:
self._config.default_remote = normalized
# Ensure remote appears in fallback ordering
if "remote" not in (self._config.fallback_priority or []):
self._config.fallback_priority.append("remote")
device = self._get_remote_device(normalized)
return device
def get_device(self, device_id: Optional[str] = None) -> Any:
identifier = (device_id or "").strip()
if identifier:
return self._get_by_identifier(identifier)
default_choice = (self._config.default_device or "").strip()
if default_choice and default_choice.lower() not in {"auto", "smart"}:
return self._get_by_identifier(default_choice)
return self._auto_select()
def describe_devices(self) -> List[Dict[str, str]]:
devices = self._frida.enumerate_devices()
descriptions: List[Dict[str, str]] = []
default_choice = (
self._config.default_device or "auto"
).strip().lower() or "auto"
fallback_order = self._config.fallback_order()
for device in devices:
entry = {
"id": device.id,
"name": device.name,
"type": device.type,
}
entry["hint"] = self._usage_hint(device)
entry["default_candidate"] = self._is_default_candidate(
device, default_choice, fallback_order
)
alias = self._config.address_to_alias.get(
_normalize_remote_identifier(device.id)
)
if alias:
entry["alias"] = alias
descriptions.append(entry)
return descriptions
# Internal helpers -------------------------------------------------
def _get_by_identifier(self, identifier: str, *, allow_auto: bool = True) -> Any:
cleaned = identifier.strip()
lowered = cleaned.lower()
if allow_auto and lowered in {"", "auto", "smart"}:
return self._auto_select()
if lowered == "usb":
return self._get_usb_device()
if lowered == "local":
return self._get_local_device()
if lowered == "remote":
return self._get_default_remote_device()
if lowered.startswith("remote:"):
address = cleaned.split(":", 1)[1]
return self._get_remote_device(address)
address = self._config.alias_to_address.get(lowered)
if address:
return self._get_remote_device(address)
if cleaned.startswith("tcp@"):
return self._get_by_exact_id(cleaned)
if ":" in cleaned and "@" not in cleaned and cleaned.count(":") == 1:
# Treat host:port as remote address
return self._get_remote_device(cleaned)
return self._get_by_exact_id(cleaned)
def _get_by_exact_id(self, device_id: str) -> Any:
try:
return self._frida.get_device(device_id)
except self._frida.InvalidArgumentError as exc: # type: ignore[attr-defined]
raise DeviceSelectionError(f"Device '{device_id}' not found") from exc
def _get_usb_device(self) -> Any:
try:
return self._frida.get_usb_device()
except self._frida.InvalidArgumentError as exc: # type: ignore[attr-defined]
raise DeviceSelectionError("No USB devices detected") from exc
def _get_local_device(self) -> Any:
try:
return self._frida.get_local_device()
except self._frida.InvalidArgumentError as exc: # type: ignore[attr-defined]
raise DeviceSelectionError("Unable to access local device") from exc
def _get_default_remote_device(self) -> Any:
if self._config.default_remote:
return self._get_remote_device(self._config.default_remote)
device = self._find_first_remote_device()
if device:
return device
raise DeviceSelectionError(
"No remote devices available. Configure FRIDA_DEFAULT_REMOTE or add a remote address."
)
def _get_remote_device(self, address: str) -> Any:
normalized = _normalize_remote_identifier(address)
if not normalized:
raise DeviceSelectionError("Remote address is empty")
if normalized in self._remote_cache:
logger.debug("Using cached remote device for %s", normalized)
return self._remote_cache[normalized]
logger.debug("Attempting to register remote device at %s", normalized)
try:
device = self._device_manager.add_remote_device(normalized)
except self._frida.InvalidArgumentError: # type: ignore[attr-defined]
device = self._find_remote_by_address(normalized)
if device is None:
raise DeviceSelectionError(
f"Remote device '{normalized}' is not available"
)
except self._frida.TransportError as exc: # type: ignore[attr-defined]
raise DeviceSelectionError(
f"Unable to connect to remote device '{normalized}': {exc}"
) from exc
else:
logger.debug("Connected to remote device %s (%s)", device.id, device.type)
self._config.address_to_alias.setdefault(normalized, normalized)
self._remote_cache[normalized] = device
return device
def _find_remote_by_address(self, address: str) -> Optional[Any]:
target_id = f"tcp@{address}"
for device in self._frida.enumerate_devices():
if device.type != "remote":
continue
if (
device.id == target_id
or _normalize_remote_identifier(device.id) == address
):
return device
return None
def _find_first_remote_device(self) -> Optional[Any]:
for device in self._frida.enumerate_devices():
if device.type == "remote":
return device
return None
def _auto_select(self) -> Any:
errors: List[str] = []
for candidate in self._config.fallback_order():
try:
logger.debug("Attempting auto-select candidate '%s'", candidate)
return self._get_by_identifier(candidate, allow_auto=False)
except DeviceSelectionError as exc:
logger.debug("Auto-select candidate '%s' failed: %s", candidate, exc)
errors.append(f"{candidate}: {exc}")
for device in self._frida.enumerate_devices():
try:
logger.debug("Falling back to enumerated device '%s'", device.id)
return self._get_by_exact_id(device.id)
except DeviceSelectionError:
continue
raise DeviceSelectionError(
"Unable to select a Frida device automatically", errors
)
def _usage_hint(self, device: Any) -> str:
device_type = (device.type or "").lower()
if device_type == "local":
return "device_id='local' or omit"
if device_type == "usb":
return "device_id='usb'"
if device_type == "remote":
address = _normalize_remote_identifier(device.id)
alias = self._config.address_to_alias.get(address)
if alias and alias != address:
return f"device_id='{alias}' or '{address}'"
if device.id.startswith("tcp@"):
return f"device_id='{address}'"
return f"device_id='{device.id}'"
return f"device_id='{device.id}'"
def _is_default_candidate(
self, device: Any, default_choice: str, fallback_order: List[str]
) -> bool:
if default_choice in {"", "auto", "smart"}:
# Auto mode: consider fallbacks; mark as True if matches first available in fallback list
if not fallback_order:
return False
primary = fallback_order[0]
if primary == "usb" and device.type == "usb":
return True
if primary == "local" and device.type == "local":
return True
if primary == "remote" and device.type == "remote":
return True
return False
# Direct match: check by identifier or alias
lowered = default_choice.lower()
if lowered == "usb":
return device.type == "usb"
if lowered == "local":
return device.type == "local"
if lowered == "remote":
return device.type == "remote"
if lowered in self._config.alias_to_address:
return (
_normalize_remote_identifier(device.id)
== self._config.alias_to_address[lowered]
)
return device.id == default_choice
_selector: Optional[DeviceSelector] = None
def initialize_selector(
config: Optional[DeviceSelectionConfig] = None,
*,
frida_module: Optional[Any] = None,
device_manager: Optional[Any] = None,
) -> None:
"""Initialise the module-level selector with an optional configuration."""
global _selector
_selector = DeviceSelector(
config=config, frida_module=frida_module, device_manager=device_manager
)
def resolve_device(device_id: Optional[str] = None) -> Any:
global _selector
if _selector is None:
_selector = DeviceSelector()
try:
return _selector.get_device(device_id)
except DeviceSelectionError:
# re-raise so callers can handle uniformly
raise
def describe_devices() -> List[Dict[str, str]]:
global _selector
if _selector is None:
_selector = DeviceSelector()
return _selector.describe_devices()
def reload_configuration() -> None:
global _selector
if _selector is None:
_selector = DeviceSelector()
else:
_selector.reload()
def register_remote_device(
address: str,
*,
alias: Optional[str] = None,
set_default: bool = False,
) -> Dict[str, str]:
"""Register a remote device using the active selector."""
global _selector
if _selector is None:
_selector = DeviceSelector()
device = _selector.register_remote(address, alias=alias, set_default=set_default)
normalized = _normalize_remote_identifier(address)
resolved_alias = _selector._config.alias_for_address(normalized)
return {
"id": device.id,
"name": device.name,
"type": device.type,
"address": normalized,
"alias": resolved_alias or normalized,
"default_remote": _selector._config.default_remote or "",
}