dynamic_hook_system.py•23.8 kB
"""
Dynamic Hook System - AI-Generated Request Hooks
This system allows AI to create custom hook functions that process network requests
in real-time with no pending state. Hooks are Python functions generated by AI
that can modify, block, redirect, or fulfill requests dynamically.
"""
import asyncio
import uuid
import fnmatch
from datetime import datetime
from typing import Dict, List, Any, Callable, Optional, Union
from dataclasses import dataclass, asdict
import nodriver as uc
from debug_logger import debug_logger
import ast
import sys
from io import StringIO
import contextlib
@dataclass
class RequestInfo:
"""Request information passed to hook functions."""
request_id: str
instance_id: str
url: str
method: str
headers: Dict[str, str]
post_data: Optional[str] = None
resource_type: Optional[str] = None
stage: str = "request" # "request" or "response"
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for AI function processing."""
return asdict(self)
@dataclass
class HookAction:
"""Action returned by hook functions."""
action: str # "continue", "block", "redirect", "fulfill", "modify"
url: Optional[str] = None # For redirect/modify
method: Optional[str] = None # For modify
headers: Optional[Dict[str, str]] = None # For modify/fulfill
body: Optional[str] = None # For fulfill
status_code: Optional[int] = None # For fulfill
post_data: Optional[str] = None # For modify
class DynamicHook:
"""A dynamic hook with AI-generated function."""
def __init__(self, hook_id: str, name: str, requirements: Dict[str, Any],
function_code: str, priority: int = 100):
self.hook_id = hook_id
self.name = name
self.requirements = requirements
self.function_code = function_code
self.priority = priority # Lower number = higher priority
self.created_at = datetime.now()
self.trigger_count = 0
self.last_triggered: Optional[datetime] = None
self.status = "active"
self.request_stage = requirements.get('stage', 'request') # 'request' or 'response'
self._compiled_function = self._compile_function()
def _compile_function(self) -> Callable:
"""Compile the AI-generated function."""
try:
namespace = {
'HookAction': HookAction,
'datetime': datetime,
'fnmatch': fnmatch,
'__builtins__': {
'len': len, 'str': str, 'int': int, 'float': float,
'bool': bool, 'dict': dict, 'list': list, 'tuple': tuple,
'print': lambda *args: debug_logger.log_info("hook_function", self.name, " ".join(map(str, args)))
}
}
exec(self.function_code, namespace)
if 'process_request' not in namespace:
raise ValueError("Function must define 'process_request(request)'")
return namespace['process_request']
except Exception as e:
debug_logger.log_error("dynamic_hook", "compile_function", f"Failed to compile function for hook {self.name}: {e}")
return lambda request: HookAction(action="continue")
def matches(self, request: RequestInfo) -> bool:
"""Check if this hook matches the request."""
try:
# Check URL pattern
if 'url_pattern' in self.requirements:
if not fnmatch.fnmatch(request.url, self.requirements['url_pattern']):
return False
# Check method
if 'method' in self.requirements:
if request.method.upper() != self.requirements['method'].upper():
return False
# Check resource type
if 'resource_type' in self.requirements:
if request.resource_type != self.requirements['resource_type']:
return False
# Check stage
if 'stage' in self.requirements:
if request.stage != self.requirements['stage']:
return False
# Check custom conditions (if any)
if 'custom_condition' in self.requirements:
condition_code = self.requirements['custom_condition']
namespace = {'request': request, '__builtins__': {'len': len, 'str': str}}
try:
result = eval(condition_code, namespace)
if not result:
return False
except:
return False
return True
except Exception as e:
debug_logger.log_error("dynamic_hook", "matches", f"Error matching hook {self.name}: {e}")
return False
def process(self, request: RequestInfo) -> HookAction:
"""Execute the hook function."""
try:
self.trigger_count += 1
self.last_triggered = datetime.now()
debug_logger.log_info("dynamic_hook", "process", f"Processing request {request.url} with hook {self.name}")
result = self._compiled_function(request.to_dict())
if isinstance(result, dict):
result = HookAction(**result)
elif not isinstance(result, HookAction):
debug_logger.log_error("dynamic_hook", "process", f"Hook {self.name} returned invalid type: {type(result)}")
return HookAction(action="continue")
debug_logger.log_info("dynamic_hook", "process", f"Hook {self.name} returned action: {result.action}")
return result
except Exception as e:
debug_logger.log_error("dynamic_hook", "process", f"Error executing hook {self.name}: {e}")
return HookAction(action="continue")
class DynamicHookSystem:
"""Real-time dynamic hook processing system."""
def __init__(self):
self.hooks: Dict[str, DynamicHook] = {}
self.instance_hooks: Dict[str, List[str]] = {} # instance_id -> list of hook_ids
self._lock = asyncio.Lock()
async def setup_interception(self, tab, instance_id: str):
"""Set up request and response interception for a browser tab."""
try:
all_hooks = []
instance_hook_ids = self.instance_hooks.get(instance_id, [])
for hook_id in instance_hook_ids:
hook = self.hooks.get(hook_id)
if hook and hook.status == "active":
all_hooks.append(hook)
for hook_id, hook in self.hooks.items():
if hook.status == "active" and hook_id not in instance_hook_ids:
if not hasattr(hook, 'instance_ids') or not hook.instance_ids:
all_hooks.append(hook)
request_patterns = []
response_patterns = []
for hook in all_hooks:
url_pattern = hook.requirements.get('url_pattern', '*')
resource_type = hook.requirements.get('resource_type')
stage = hook.request_stage
if stage == 'response':
pattern = uc.cdp.fetch.RequestPattern(
url_pattern=url_pattern,
resource_type=getattr(uc.cdp.network.ResourceType, resource_type.upper()) if resource_type else None,
request_stage=uc.cdp.fetch.RequestStage.RESPONSE
)
response_patterns.append(pattern)
else:
pattern = uc.cdp.fetch.RequestPattern(
url_pattern=url_pattern,
resource_type=getattr(uc.cdp.network.ResourceType, resource_type.upper()) if resource_type else None,
request_stage=uc.cdp.fetch.RequestStage.REQUEST
)
request_patterns.append(pattern)
all_patterns = request_patterns + response_patterns
if not all_patterns:
all_patterns = [
uc.cdp.fetch.RequestPattern(url_pattern='*', request_stage=uc.cdp.fetch.RequestStage.REQUEST),
uc.cdp.fetch.RequestPattern(url_pattern='*', request_stage=uc.cdp.fetch.RequestStage.RESPONSE)
]
await tab.send(uc.cdp.fetch.enable(patterns=all_patterns))
tab.add_handler(
uc.cdp.fetch.RequestPaused,
lambda event: asyncio.create_task(self._on_request_paused(tab, event, instance_id))
)
debug_logger.log_info("dynamic_hook_system", "setup_interception", f"Set up interception for instance {instance_id} with {len(all_patterns)} patterns ({len(request_patterns)} request, {len(response_patterns)} response)")
except Exception as e:
debug_logger.log_error("dynamic_hook_system", "setup_interception", f"Failed to setup interception: {e}")
async def _on_request_paused(self, tab, event, instance_id: str):
"""Handle intercepted requests and responses - process hooks immediately."""
try:
# Determine if this is request stage or response stage
# According to nodriver docs: "The stage of the request can be determined by presence of responseErrorReason
# and responseStatusCode -- the request is at the response stage if either of these fields is present"
is_response_stage = (hasattr(event, 'response_status_code') and event.response_status_code is not None) or \
(hasattr(event, 'response_error_reason') and event.response_error_reason is not None)
stage = "response" if is_response_stage else "request"
request = RequestInfo(
request_id=str(event.request_id),
instance_id=instance_id,
url=event.request.url,
method=event.request.method,
headers=dict(event.request.headers) if hasattr(event.request, 'headers') else {},
post_data=event.request.post_data if hasattr(event.request, 'post_data') else None,
resource_type=str(event.resource_type) if hasattr(event, 'resource_type') else None,
stage=stage
)
debug_logger.log_info("dynamic_hook_system", "_on_request_paused", f"Intercepted {stage}: {request.method} {request.url}")
if is_response_stage and hasattr(event, 'response_status_code'):
debug_logger.log_info("dynamic_hook_system", "_on_request_paused", f"Response status: {event.response_status_code}")
await self._process_request_hooks(tab, request, event)
except Exception as e:
debug_logger.log_error("dynamic_hook_system", "_on_request_paused", f"Error processing {stage if 'stage' in locals() else 'request'}: {e}")
try:
await tab.send(uc.cdp.fetch.continue_request(request_id=event.request_id))
except:
pass
async def _process_request_hooks(self, tab, request: RequestInfo, event=None):
"""Process hooks for a request/response in real-time with priority chain processing."""
try:
instance_hook_ids = self.instance_hooks.get(request.instance_id, [])
matching_hooks = []
for hook_id in instance_hook_ids:
hook = self.hooks.get(hook_id)
if hook and hook.status == "active" and hook.request_stage == request.stage and hook.matches(request):
matching_hooks.append(hook)
matching_hooks.sort(key=lambda h: h.priority)
if not matching_hooks:
debug_logger.log_info("dynamic_hook_system", "_process_request_hooks", f"No matching hooks for {request.stage} stage: {request.url}")
if request.stage == "response":
await tab.send(uc.cdp.fetch.continue_response(request_id=uc.cdp.fetch.RequestId(request.request_id)))
else:
await tab.send(uc.cdp.fetch.continue_request(request_id=uc.cdp.fetch.RequestId(request.request_id)))
return
debug_logger.log_info("dynamic_hook_system", "_process_request_hooks", f"Found {len(matching_hooks)} matching hooks for {request.stage} stage: {request.url}")
response_body = None
if request.stage == "response" and event:
try:
body_result = await tab.send(uc.cdp.fetch.get_response_body(request_id=uc.cdp.fetch.RequestId(request.request_id)))
response_body = body_result[0] # body content
debug_logger.log_info("dynamic_hook_system", "_process_request_hooks", f"Retrieved response body ({len(response_body)} chars)")
except Exception as e:
debug_logger.log_error("dynamic_hook_system", "_process_request_hooks", f"Failed to get response body: {e}")
hook = matching_hooks[0]
request_data = request.to_dict()
if response_body:
request_data['response_body'] = response_body
request_data['response_status_code'] = getattr(event, 'response_status_code', None)
response_headers = {}
if hasattr(event, 'response_headers') and event.response_headers:
try:
if isinstance(event.response_headers, dict):
response_headers = event.response_headers
elif hasattr(event.response_headers, 'items'):
for header in event.response_headers:
if hasattr(header, 'name') and hasattr(header, 'value'):
response_headers[header.name] = header.value
else:
response_headers = {}
except Exception:
response_headers = {}
request_data['response_headers'] = response_headers
action = hook._compiled_function(request_data)
if isinstance(action, dict):
action = HookAction(**action)
hook.trigger_count += 1
hook.last_triggered = datetime.now()
debug_logger.log_info("dynamic_hook_system", "_process_request_hooks", f"Hook {hook.name} returned action: {action.action}")
await self._execute_hook_action(tab, request, action, event if request.stage == "response" else None)
except Exception as e:
debug_logger.log_error("dynamic_hook_system", "_process_request_hooks", f"Error processing hooks: {e}")
try:
if request.stage == "response":
await tab.send(uc.cdp.fetch.continue_response(request_id=uc.cdp.fetch.RequestId(request.request_id)))
else:
await tab.send(uc.cdp.fetch.continue_request(request_id=uc.cdp.fetch.RequestId(request.request_id)))
except:
pass
async def create_hook(self, name: str, requirements: Dict[str, Any], function_code: str,
instance_ids: Optional[List[str]] = None, priority: int = 100) -> str:
"""Create a new dynamic hook."""
try:
hook_id = str(uuid.uuid4())
hook = DynamicHook(hook_id, name, requirements, function_code, priority)
async with self._lock:
self.hooks[hook_id] = hook
if instance_ids:
for instance_id in instance_ids:
if instance_id not in self.instance_hooks:
self.instance_hooks[instance_id] = []
self.instance_hooks[instance_id].append(hook_id)
else:
for instance_id in self.instance_hooks:
self.instance_hooks[instance_id].append(hook_id)
debug_logger.log_info("dynamic_hook_system", "create_hook", f"Created hook {name} with ID {hook_id}")
return hook_id
except Exception as e:
debug_logger.log_error("dynamic_hook_system", "create_hook", f"Failed to create hook {name}: {e}")
raise
def list_hooks(self) -> List[Dict[str, Any]]:
"""List all hooks."""
return [
{
"hook_id": hook.hook_id,
"name": hook.name,
"requirements": hook.requirements,
"priority": hook.priority,
"status": hook.status,
"trigger_count": hook.trigger_count,
"last_triggered": hook.last_triggered.isoformat() if hook.last_triggered else None,
"created_at": hook.created_at.isoformat()
}
for hook in self.hooks.values()
]
def get_hook_details(self, hook_id: str) -> Optional[Dict[str, Any]]:
"""Get detailed hook information."""
hook = self.hooks.get(hook_id)
if not hook:
return None
return {
"hook_id": hook.hook_id,
"name": hook.name,
"requirements": hook.requirements,
"function_code": hook.function_code,
"priority": hook.priority,
"status": hook.status,
"trigger_count": hook.trigger_count,
"last_triggered": hook.last_triggered.isoformat() if hook.last_triggered else None,
"created_at": hook.created_at.isoformat()
}
async def remove_hook(self, hook_id: str) -> bool:
"""Remove a hook."""
try:
async with self._lock:
if hook_id in self.hooks:
del self.hooks[hook_id]
for instance_id in self.instance_hooks:
if hook_id in self.instance_hooks[instance_id]:
self.instance_hooks[instance_id].remove(hook_id)
debug_logger.log_info("dynamic_hook_system", "remove_hook", f"Removed hook {hook_id}")
return True
return False
except Exception as e:
debug_logger.log_error("dynamic_hook_system", "remove_hook", f"Failed to remove hook {hook_id}: {e}")
return False
def add_instance(self, instance_id: str):
"""Add a new browser instance."""
if instance_id not in self.instance_hooks:
self.instance_hooks[instance_id] = []
async def _execute_hook_action(self, tab, request: RequestInfo, action: HookAction, event=None):
"""Execute a hook action for either request or response stage."""
try:
request_id = uc.cdp.fetch.RequestId(request.request_id)
if action.action == "block":
await tab.send(uc.cdp.fetch.fail_request(
request_id=request_id,
error_reason=uc.cdp.network.ErrorReason.BLOCKED_BY_CLIENT
))
debug_logger.log_info("dynamic_hook_system", "_execute_hook_action", f"Blocked {request.stage} {request.url}")
elif action.action == "fulfill":
headers = []
if action.headers:
for name, value in action.headers.items():
headers.append(uc.cdp.fetch.HeaderEntry(name=name, value=value))
import base64
body_bytes = (action.body or "").encode('utf-8')
body_base64 = base64.b64encode(body_bytes).decode('ascii')
await tab.send(uc.cdp.fetch.fulfill_request(
request_id=request_id,
response_code=action.status_code or 200,
response_headers=headers,
body=body_base64
))
debug_logger.log_info("dynamic_hook_system", "_execute_hook_action", f"Fulfilled {request.stage} {request.url}")
elif action.action == "redirect" and request.stage == "request":
await tab.send(uc.cdp.fetch.continue_request(
request_id=request_id,
url=action.url
))
debug_logger.log_info("dynamic_hook_system", "_execute_hook_action", f"Redirected request {request.url} to {action.url}")
elif action.action == "modify":
if request.stage == "response":
response_headers = []
if action.headers:
for name, value in action.headers.items():
response_headers.append(uc.cdp.fetch.HeaderEntry(name=name, value=value))
await tab.send(uc.cdp.fetch.continue_response(
request_id=request_id,
response_code=action.status_code,
response_headers=response_headers if response_headers else None
))
debug_logger.log_info("dynamic_hook_system", "_execute_hook_action", f"Modified response for {request.url}")
else:
headers = []
if action.headers:
for name, value in action.headers.items():
headers.append(uc.cdp.fetch.HeaderEntry(name=name, value=value))
await tab.send(uc.cdp.fetch.continue_request(
request_id=request_id,
url=action.url or request.url,
method=action.method or request.method,
headers=headers if headers else None,
post_data=action.post_data
))
debug_logger.log_info("dynamic_hook_system", "_execute_hook_action", f"Modified request {request.url}")
else:
if request.stage == "response":
await tab.send(uc.cdp.fetch.continue_response(request_id=request_id))
debug_logger.log_info("dynamic_hook_system", "_execute_hook_action", f"Continued response {request.url}")
else:
await tab.send(uc.cdp.fetch.continue_request(request_id=request_id))
debug_logger.log_info("dynamic_hook_system", "_execute_hook_action", f"Continued request {request.url}")
except Exception as e:
debug_logger.log_error("dynamic_hook_system", "_execute_hook_action", f"Error executing {request.stage} action: {e}")
try:
if request.stage == "response":
await tab.send(uc.cdp.fetch.continue_response(request_id=uc.cdp.fetch.RequestId(request.request_id)))
else:
await tab.send(uc.cdp.fetch.continue_request(request_id=uc.cdp.fetch.RequestId(request.request_id)))
except:
pass
dynamic_hook_system = DynamicHookSystem()