Skip to main content
Glama

Stealth Browser MCP

by vibheksoni
dynamic_hook_system.py23.8 kB
""" Dynamic Hook System - AI-Generated Request Hooks This system allows AI to create custom hook functions that process network requests in real-time with no pending state. Hooks are Python functions generated by AI that can modify, block, redirect, or fulfill requests dynamically. """ import asyncio import uuid import fnmatch from datetime import datetime from typing import Dict, List, Any, Callable, Optional, Union from dataclasses import dataclass, asdict import nodriver as uc from debug_logger import debug_logger import ast import sys from io import StringIO import contextlib @dataclass class RequestInfo: """Request information passed to hook functions.""" request_id: str instance_id: str url: str method: str headers: Dict[str, str] post_data: Optional[str] = None resource_type: Optional[str] = None stage: str = "request" # "request" or "response" def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for AI function processing.""" return asdict(self) @dataclass class HookAction: """Action returned by hook functions.""" action: str # "continue", "block", "redirect", "fulfill", "modify" url: Optional[str] = None # For redirect/modify method: Optional[str] = None # For modify headers: Optional[Dict[str, str]] = None # For modify/fulfill body: Optional[str] = None # For fulfill status_code: Optional[int] = None # For fulfill post_data: Optional[str] = None # For modify class DynamicHook: """A dynamic hook with AI-generated function.""" def __init__(self, hook_id: str, name: str, requirements: Dict[str, Any], function_code: str, priority: int = 100): self.hook_id = hook_id self.name = name self.requirements = requirements self.function_code = function_code self.priority = priority # Lower number = higher priority self.created_at = datetime.now() self.trigger_count = 0 self.last_triggered: Optional[datetime] = None self.status = "active" self.request_stage = requirements.get('stage', 'request') # 'request' or 'response' self._compiled_function = self._compile_function() def _compile_function(self) -> Callable: """Compile the AI-generated function.""" try: namespace = { 'HookAction': HookAction, 'datetime': datetime, 'fnmatch': fnmatch, '__builtins__': { 'len': len, 'str': str, 'int': int, 'float': float, 'bool': bool, 'dict': dict, 'list': list, 'tuple': tuple, 'print': lambda *args: debug_logger.log_info("hook_function", self.name, " ".join(map(str, args))) } } exec(self.function_code, namespace) if 'process_request' not in namespace: raise ValueError("Function must define 'process_request(request)'") return namespace['process_request'] except Exception as e: debug_logger.log_error("dynamic_hook", "compile_function", f"Failed to compile function for hook {self.name}: {e}") return lambda request: HookAction(action="continue") def matches(self, request: RequestInfo) -> bool: """Check if this hook matches the request.""" try: # Check URL pattern if 'url_pattern' in self.requirements: if not fnmatch.fnmatch(request.url, self.requirements['url_pattern']): return False # Check method if 'method' in self.requirements: if request.method.upper() != self.requirements['method'].upper(): return False # Check resource type if 'resource_type' in self.requirements: if request.resource_type != self.requirements['resource_type']: return False # Check stage if 'stage' in self.requirements: if request.stage != self.requirements['stage']: return False # Check custom conditions (if any) if 'custom_condition' in self.requirements: condition_code = self.requirements['custom_condition'] namespace = {'request': request, '__builtins__': {'len': len, 'str': str}} try: result = eval(condition_code, namespace) if not result: return False except: return False return True except Exception as e: debug_logger.log_error("dynamic_hook", "matches", f"Error matching hook {self.name}: {e}") return False def process(self, request: RequestInfo) -> HookAction: """Execute the hook function.""" try: self.trigger_count += 1 self.last_triggered = datetime.now() debug_logger.log_info("dynamic_hook", "process", f"Processing request {request.url} with hook {self.name}") result = self._compiled_function(request.to_dict()) if isinstance(result, dict): result = HookAction(**result) elif not isinstance(result, HookAction): debug_logger.log_error("dynamic_hook", "process", f"Hook {self.name} returned invalid type: {type(result)}") return HookAction(action="continue") debug_logger.log_info("dynamic_hook", "process", f"Hook {self.name} returned action: {result.action}") return result except Exception as e: debug_logger.log_error("dynamic_hook", "process", f"Error executing hook {self.name}: {e}") return HookAction(action="continue") class DynamicHookSystem: """Real-time dynamic hook processing system.""" def __init__(self): self.hooks: Dict[str, DynamicHook] = {} self.instance_hooks: Dict[str, List[str]] = {} # instance_id -> list of hook_ids self._lock = asyncio.Lock() async def setup_interception(self, tab, instance_id: str): """Set up request and response interception for a browser tab.""" try: all_hooks = [] instance_hook_ids = self.instance_hooks.get(instance_id, []) for hook_id in instance_hook_ids: hook = self.hooks.get(hook_id) if hook and hook.status == "active": all_hooks.append(hook) for hook_id, hook in self.hooks.items(): if hook.status == "active" and hook_id not in instance_hook_ids: if not hasattr(hook, 'instance_ids') or not hook.instance_ids: all_hooks.append(hook) request_patterns = [] response_patterns = [] for hook in all_hooks: url_pattern = hook.requirements.get('url_pattern', '*') resource_type = hook.requirements.get('resource_type') stage = hook.request_stage if stage == 'response': pattern = uc.cdp.fetch.RequestPattern( url_pattern=url_pattern, resource_type=getattr(uc.cdp.network.ResourceType, resource_type.upper()) if resource_type else None, request_stage=uc.cdp.fetch.RequestStage.RESPONSE ) response_patterns.append(pattern) else: pattern = uc.cdp.fetch.RequestPattern( url_pattern=url_pattern, resource_type=getattr(uc.cdp.network.ResourceType, resource_type.upper()) if resource_type else None, request_stage=uc.cdp.fetch.RequestStage.REQUEST ) request_patterns.append(pattern) all_patterns = request_patterns + response_patterns if not all_patterns: all_patterns = [ uc.cdp.fetch.RequestPattern(url_pattern='*', request_stage=uc.cdp.fetch.RequestStage.REQUEST), uc.cdp.fetch.RequestPattern(url_pattern='*', request_stage=uc.cdp.fetch.RequestStage.RESPONSE) ] await tab.send(uc.cdp.fetch.enable(patterns=all_patterns)) tab.add_handler( uc.cdp.fetch.RequestPaused, lambda event: asyncio.create_task(self._on_request_paused(tab, event, instance_id)) ) debug_logger.log_info("dynamic_hook_system", "setup_interception", f"Set up interception for instance {instance_id} with {len(all_patterns)} patterns ({len(request_patterns)} request, {len(response_patterns)} response)") except Exception as e: debug_logger.log_error("dynamic_hook_system", "setup_interception", f"Failed to setup interception: {e}") async def _on_request_paused(self, tab, event, instance_id: str): """Handle intercepted requests and responses - process hooks immediately.""" try: # Determine if this is request stage or response stage # According to nodriver docs: "The stage of the request can be determined by presence of responseErrorReason # and responseStatusCode -- the request is at the response stage if either of these fields is present" is_response_stage = (hasattr(event, 'response_status_code') and event.response_status_code is not None) or \ (hasattr(event, 'response_error_reason') and event.response_error_reason is not None) stage = "response" if is_response_stage else "request" request = RequestInfo( request_id=str(event.request_id), instance_id=instance_id, url=event.request.url, method=event.request.method, headers=dict(event.request.headers) if hasattr(event.request, 'headers') else {}, post_data=event.request.post_data if hasattr(event.request, 'post_data') else None, resource_type=str(event.resource_type) if hasattr(event, 'resource_type') else None, stage=stage ) debug_logger.log_info("dynamic_hook_system", "_on_request_paused", f"Intercepted {stage}: {request.method} {request.url}") if is_response_stage and hasattr(event, 'response_status_code'): debug_logger.log_info("dynamic_hook_system", "_on_request_paused", f"Response status: {event.response_status_code}") await self._process_request_hooks(tab, request, event) except Exception as e: debug_logger.log_error("dynamic_hook_system", "_on_request_paused", f"Error processing {stage if 'stage' in locals() else 'request'}: {e}") try: await tab.send(uc.cdp.fetch.continue_request(request_id=event.request_id)) except: pass async def _process_request_hooks(self, tab, request: RequestInfo, event=None): """Process hooks for a request/response in real-time with priority chain processing.""" try: instance_hook_ids = self.instance_hooks.get(request.instance_id, []) matching_hooks = [] for hook_id in instance_hook_ids: hook = self.hooks.get(hook_id) if hook and hook.status == "active" and hook.request_stage == request.stage and hook.matches(request): matching_hooks.append(hook) matching_hooks.sort(key=lambda h: h.priority) if not matching_hooks: debug_logger.log_info("dynamic_hook_system", "_process_request_hooks", f"No matching hooks for {request.stage} stage: {request.url}") if request.stage == "response": await tab.send(uc.cdp.fetch.continue_response(request_id=uc.cdp.fetch.RequestId(request.request_id))) else: await tab.send(uc.cdp.fetch.continue_request(request_id=uc.cdp.fetch.RequestId(request.request_id))) return debug_logger.log_info("dynamic_hook_system", "_process_request_hooks", f"Found {len(matching_hooks)} matching hooks for {request.stage} stage: {request.url}") response_body = None if request.stage == "response" and event: try: body_result = await tab.send(uc.cdp.fetch.get_response_body(request_id=uc.cdp.fetch.RequestId(request.request_id))) response_body = body_result[0] # body content debug_logger.log_info("dynamic_hook_system", "_process_request_hooks", f"Retrieved response body ({len(response_body)} chars)") except Exception as e: debug_logger.log_error("dynamic_hook_system", "_process_request_hooks", f"Failed to get response body: {e}") hook = matching_hooks[0] request_data = request.to_dict() if response_body: request_data['response_body'] = response_body request_data['response_status_code'] = getattr(event, 'response_status_code', None) response_headers = {} if hasattr(event, 'response_headers') and event.response_headers: try: if isinstance(event.response_headers, dict): response_headers = event.response_headers elif hasattr(event.response_headers, 'items'): for header in event.response_headers: if hasattr(header, 'name') and hasattr(header, 'value'): response_headers[header.name] = header.value else: response_headers = {} except Exception: response_headers = {} request_data['response_headers'] = response_headers action = hook._compiled_function(request_data) if isinstance(action, dict): action = HookAction(**action) hook.trigger_count += 1 hook.last_triggered = datetime.now() debug_logger.log_info("dynamic_hook_system", "_process_request_hooks", f"Hook {hook.name} returned action: {action.action}") await self._execute_hook_action(tab, request, action, event if request.stage == "response" else None) except Exception as e: debug_logger.log_error("dynamic_hook_system", "_process_request_hooks", f"Error processing hooks: {e}") try: if request.stage == "response": await tab.send(uc.cdp.fetch.continue_response(request_id=uc.cdp.fetch.RequestId(request.request_id))) else: await tab.send(uc.cdp.fetch.continue_request(request_id=uc.cdp.fetch.RequestId(request.request_id))) except: pass async def create_hook(self, name: str, requirements: Dict[str, Any], function_code: str, instance_ids: Optional[List[str]] = None, priority: int = 100) -> str: """Create a new dynamic hook.""" try: hook_id = str(uuid.uuid4()) hook = DynamicHook(hook_id, name, requirements, function_code, priority) async with self._lock: self.hooks[hook_id] = hook if instance_ids: for instance_id in instance_ids: if instance_id not in self.instance_hooks: self.instance_hooks[instance_id] = [] self.instance_hooks[instance_id].append(hook_id) else: for instance_id in self.instance_hooks: self.instance_hooks[instance_id].append(hook_id) debug_logger.log_info("dynamic_hook_system", "create_hook", f"Created hook {name} with ID {hook_id}") return hook_id except Exception as e: debug_logger.log_error("dynamic_hook_system", "create_hook", f"Failed to create hook {name}: {e}") raise def list_hooks(self) -> List[Dict[str, Any]]: """List all hooks.""" return [ { "hook_id": hook.hook_id, "name": hook.name, "requirements": hook.requirements, "priority": hook.priority, "status": hook.status, "trigger_count": hook.trigger_count, "last_triggered": hook.last_triggered.isoformat() if hook.last_triggered else None, "created_at": hook.created_at.isoformat() } for hook in self.hooks.values() ] def get_hook_details(self, hook_id: str) -> Optional[Dict[str, Any]]: """Get detailed hook information.""" hook = self.hooks.get(hook_id) if not hook: return None return { "hook_id": hook.hook_id, "name": hook.name, "requirements": hook.requirements, "function_code": hook.function_code, "priority": hook.priority, "status": hook.status, "trigger_count": hook.trigger_count, "last_triggered": hook.last_triggered.isoformat() if hook.last_triggered else None, "created_at": hook.created_at.isoformat() } async def remove_hook(self, hook_id: str) -> bool: """Remove a hook.""" try: async with self._lock: if hook_id in self.hooks: del self.hooks[hook_id] for instance_id in self.instance_hooks: if hook_id in self.instance_hooks[instance_id]: self.instance_hooks[instance_id].remove(hook_id) debug_logger.log_info("dynamic_hook_system", "remove_hook", f"Removed hook {hook_id}") return True return False except Exception as e: debug_logger.log_error("dynamic_hook_system", "remove_hook", f"Failed to remove hook {hook_id}: {e}") return False def add_instance(self, instance_id: str): """Add a new browser instance.""" if instance_id not in self.instance_hooks: self.instance_hooks[instance_id] = [] async def _execute_hook_action(self, tab, request: RequestInfo, action: HookAction, event=None): """Execute a hook action for either request or response stage.""" try: request_id = uc.cdp.fetch.RequestId(request.request_id) if action.action == "block": await tab.send(uc.cdp.fetch.fail_request( request_id=request_id, error_reason=uc.cdp.network.ErrorReason.BLOCKED_BY_CLIENT )) debug_logger.log_info("dynamic_hook_system", "_execute_hook_action", f"Blocked {request.stage} {request.url}") elif action.action == "fulfill": headers = [] if action.headers: for name, value in action.headers.items(): headers.append(uc.cdp.fetch.HeaderEntry(name=name, value=value)) import base64 body_bytes = (action.body or "").encode('utf-8') body_base64 = base64.b64encode(body_bytes).decode('ascii') await tab.send(uc.cdp.fetch.fulfill_request( request_id=request_id, response_code=action.status_code or 200, response_headers=headers, body=body_base64 )) debug_logger.log_info("dynamic_hook_system", "_execute_hook_action", f"Fulfilled {request.stage} {request.url}") elif action.action == "redirect" and request.stage == "request": await tab.send(uc.cdp.fetch.continue_request( request_id=request_id, url=action.url )) debug_logger.log_info("dynamic_hook_system", "_execute_hook_action", f"Redirected request {request.url} to {action.url}") elif action.action == "modify": if request.stage == "response": response_headers = [] if action.headers: for name, value in action.headers.items(): response_headers.append(uc.cdp.fetch.HeaderEntry(name=name, value=value)) await tab.send(uc.cdp.fetch.continue_response( request_id=request_id, response_code=action.status_code, response_headers=response_headers if response_headers else None )) debug_logger.log_info("dynamic_hook_system", "_execute_hook_action", f"Modified response for {request.url}") else: headers = [] if action.headers: for name, value in action.headers.items(): headers.append(uc.cdp.fetch.HeaderEntry(name=name, value=value)) await tab.send(uc.cdp.fetch.continue_request( request_id=request_id, url=action.url or request.url, method=action.method or request.method, headers=headers if headers else None, post_data=action.post_data )) debug_logger.log_info("dynamic_hook_system", "_execute_hook_action", f"Modified request {request.url}") else: if request.stage == "response": await tab.send(uc.cdp.fetch.continue_response(request_id=request_id)) debug_logger.log_info("dynamic_hook_system", "_execute_hook_action", f"Continued response {request.url}") else: await tab.send(uc.cdp.fetch.continue_request(request_id=request_id)) debug_logger.log_info("dynamic_hook_system", "_execute_hook_action", f"Continued request {request.url}") except Exception as e: debug_logger.log_error("dynamic_hook_system", "_execute_hook_action", f"Error executing {request.stage} action: {e}") try: if request.stage == "response": await tab.send(uc.cdp.fetch.continue_response(request_id=uc.cdp.fetch.RequestId(request.request_id))) else: await tab.send(uc.cdp.fetch.continue_request(request_id=uc.cdp.fetch.RequestId(request.request_id))) except: pass dynamic_hook_system = DynamicHookSystem()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vibheksoni/stealth-browser-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server