Skip to main content
Glama
event_router.py8.62 kB
""" Event router for the FreeCAD AI addon. This module provides event routing and broadcasting capabilities for the MCP server. """ import asyncio import logging from typing import Any, Dict, List, Set, Callable import weakref from datetime import datetime logger = logging.getLogger(__name__) class EventRouter: """ Central event router for broadcasting events to multiple listeners. This class manages event subscriptions and broadcasts events to all registered listeners in a thread-safe manner. """ def __init__(self, max_history_size: int = 1000): """Initialize the event router. Args: max_history_size: Maximum number of events to keep in history """ self.listeners: Dict[str, Set[Callable]] = {} self.client_subscriptions: Dict[str, Set[str]] = {} self.event_history: List[Dict[str, Any]] = [] self.max_history_size = max_history_size self._lock = asyncio.Lock() async def add_listener(self, client_id: str, event_types: List[str] = None, callback: Callable = None) -> None: """ Add a listener for specific event types. Args: client_id: Unique identifier for the client event_types: List of event types to listen for (None means all events) callback: Optional callback function for direct notification """ async with self._lock: if client_id not in self.client_subscriptions: self.client_subscriptions[client_id] = set() if event_types: for event_type in event_types: self.client_subscriptions[client_id].add(event_type) if event_type not in self.listeners: self.listeners[event_type] = set() if callback: # Use weak reference to avoid memory leaks self.listeners[event_type].add(weakref.ref(callback)) else: # Subscribe to all events self.client_subscriptions[client_id].add("*") logger.info(f"Added listener {client_id} for events: {event_types or ['all']}") async def remove_listener(self, client_id: str, event_types: List[str] = None) -> None: """ Remove a listener from specific event types. Args: client_id: Unique identifier for the client event_types: List of event types to stop listening for (None means all events) """ async with self._lock: if client_id not in self.client_subscriptions: return if event_types: for event_type in event_types: self.client_subscriptions[client_id].discard(event_type) else: # Remove from all events self.client_subscriptions[client_id].clear() # Clean up empty subscriptions if not self.client_subscriptions[client_id]: del self.client_subscriptions[client_id] logger.info(f"Removed listener {client_id} from events: {event_types or ['all']}") async def broadcast_event(self, event_type: str, event_data: Dict[str, Any]) -> None: """ Broadcast an event to all registered listeners. Args: event_type: The type of event being broadcast event_data: The event data """ # Add timestamp if not present if "timestamp" not in event_data: event_data["timestamp"] = datetime.now().isoformat() # Create event record event_record = { "type": event_type, "data": event_data, "timestamp": event_data.get("timestamp"), "listeners_notified": 0 } async with self._lock: # Add to history self.event_history.append(event_record) if len(self.event_history) > self.max_history_size: self.event_history.pop(0) # Find all listeners for this event type interested_clients = [] for client_id, subscriptions in self.client_subscriptions.items(): if "*" in subscriptions or event_type in subscriptions: interested_clients.append(client_id) event_record["listeners_notified"] = len(interested_clients) # Notify direct callback listeners if event_type in self.listeners: dead_refs = [] for callback_ref in self.listeners[event_type]: callback = callback_ref() if callback is None: # Callback was garbage collected dead_refs.append(callback_ref) else: try: if asyncio.iscoroutinefunction(callback): asyncio.create_task(callback(event_type, event_data)) else: callback(event_type, event_data) except Exception as e: logger.error(f"Error calling event callback: {e}") # Clean up dead references for dead_ref in dead_refs: self.listeners[event_type].discard(dead_ref) logger.debug(f"Broadcast {event_type} event to {len(interested_clients)} listeners") async def get_event_history(self, client_id: str = None, event_types: List[str] = None, limit: int = 100) -> List[Dict[str, Any]]: """ Get event history for a client or specific event types. Args: client_id: Optional client ID to filter by subscriptions event_types: Optional list of event types to filter by limit: Maximum number of events to return Returns: List of event records """ async with self._lock: filtered_events = [] for event in reversed(self.event_history): # Filter by event types if specified if event_types and event["type"] not in event_types: continue # Filter by client subscriptions if specified if client_id and client_id in self.client_subscriptions: subscriptions = self.client_subscriptions[client_id] if "*" not in subscriptions and event["type"] not in subscriptions: continue filtered_events.append(event) if len(filtered_events) >= limit: break return filtered_events async def get_active_listeners(self) -> Dict[str, Any]: """ Get information about currently active listeners. Returns: Dictionary with listener information """ async with self._lock: return { "total_clients": len(self.client_subscriptions), "clients": dict(self.client_subscriptions), "event_types_with_callbacks": list(self.listeners.keys()), "total_events_in_history": len(self.event_history) } def get_stats(self) -> Dict[str, Any]: """ Get statistics about the event router. Returns: Dictionary with statistics """ stats = { "total_clients": len(self.client_subscriptions), "total_event_types": len(self.listeners), "events_in_history": len(self.event_history), "max_history_size": self.max_history_size } if self.event_history: stats["latest_event"] = self.event_history[-1]["timestamp"] stats["oldest_event"] = self.event_history[0]["timestamp"] return stats async def clear_history(self) -> None: """Clear the event history.""" async with self._lock: self.event_history.clear() logger.info("Event history cleared") async def shutdown(self) -> None: """Shutdown the event router and clean up resources.""" async with self._lock: self.listeners.clear() self.client_subscriptions.clear() self.event_history.clear() logger.info("Event router shutdown complete")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jango-blockchained/mcp-freecad'

If you have feedback or need assistance with the MCP directory API, please join our Discord server