Skip to main content
Glama
tunnel.pyโ€ข10.9 kB
"""Tunnel provider for OpenAccess MCP.""" import asyncio import time import uuid from dataclasses import dataclass from typing import Optional, Dict, Any, List from contextlib import asynccontextmanager try: import asyncssh except ImportError: asyncssh = None from ..types import SecretData @dataclass class TunnelInfo: """Information about an active tunnel.""" tunnel_id: str tunnel_type: str listen_host: str listen_port: int target_host: Optional[str] target_port: Optional[int] created_at: float ttl_seconds: int profile_id: str status: str = "active" @property def expires_at(self) -> float: """When the tunnel expires.""" return self.created_at + self.ttl_seconds @property def is_expired(self) -> bool: """Whether the tunnel has expired.""" return time.time() > self.expires_at @property def remaining_seconds(self) -> int: """Seconds remaining before expiration.""" remaining = self.expires_at - time.time() return max(0, int(remaining)) class TunnelProvider: """Handles SSH tunnel operations.""" def __init__(self): """Initialize the tunnel provider.""" self._active_tunnels: Dict[str, TunnelInfo] = {} self._tunnel_handles: Dict[str, Any] = {} # SSH tunnel handles self._cleanup_task: Optional[asyncio.Task] = None # Don't start cleanup task in constructor - let it be started when needed async def start_cleanup_task(self): """Start the background cleanup task if not already running.""" if self._cleanup_task is None or self._cleanup_task.done(): self._cleanup_task = asyncio.create_task(self._cleanup_expired_tunnels()) def _start_cleanup_task(self): """Start the background cleanup task (synchronous version for backward compatibility).""" # This method is kept for backward compatibility but doesn't start the task # The task should be started explicitly when needed pass async def create_tunnel( self, host: str, port: int, secret: SecretData, tunnel_type: str, listen_host: str = "127.0.0.1", listen_port: int = 0, target_host: Optional[str] = None, target_port: Optional[int] = None, ttl_seconds: int = 3600, profile_id: str = "unknown" ) -> TunnelInfo: """Create an SSH tunnel.""" if asyncssh is None: raise ImportError("asyncssh is required for tunnel operations") # Validate tunnel type if tunnel_type not in ["local", "remote", "dynamic"]: raise ValueError("Tunnel type must be 'local', 'remote', or 'dynamic'") # Validate parameters for tunnel type if tunnel_type in ["local", "remote"]: if not target_host or not target_port: raise ValueError(f"{tunnel_type} tunnels require target_host and target_port") # Create tunnel ID tunnel_id = str(uuid.uuid4()) # Create tunnel info tunnel_info = TunnelInfo( tunnel_id=tunnel_id, tunnel_type=tunnel_type, listen_host=listen_host, listen_port=listen_port, target_host=target_host, target_port=target_port, created_at=time.time(), ttl_seconds=ttl_seconds, profile_id=profile_id ) try: # Establish SSH connection conn = await self._get_connection(host, port, secret) # Create tunnel based on type if tunnel_type == "local": tunnel_handle = await self._create_local_tunnel( conn, listen_host, listen_port, target_host, target_port ) elif tunnel_type == "remote": tunnel_handle = await self._create_remote_tunnel( conn, listen_host, listen_port, target_host, target_port ) else: # dynamic tunnel_handle = await self._create_dynamic_tunnel( conn, listen_host, listen_port ) # Store tunnel info and handle self._active_tunnels[tunnel_id] = tunnel_info self._tunnel_handles[tunnel_id] = tunnel_handle # Update listen port if it was auto-assigned if listen_port == 0: tunnel_info.listen_port = tunnel_handle.get_port() return tunnel_info except Exception as e: # Clean up on failure self._active_tunnels.pop(tunnel_id, None) self._tunnel_handles.pop(tunnel_id, None) raise RuntimeError(f"Failed to create tunnel: {e}") async def _create_local_tunnel( self, conn, listen_host: str, listen_port: int, target_host: str, target_port: int ): """Create a local port forward tunnel.""" return await conn.create_local_port_forward( listen_host, listen_port, target_host, target_port ) async def _create_remote_tunnel( self, conn, listen_host: str, listen_port: int, target_host: str, target_port: int ): """Create a remote port forward tunnel.""" return await conn.create_remote_port_forward( listen_host, listen_port, target_host, target_port ) async def _create_dynamic_tunnel( self, conn, listen_host: str, listen_port: int ): """Create a dynamic SOCKS tunnel.""" return await conn.create_local_port_forward( listen_host, listen_port, None, None ) async def close_tunnel(self, tunnel_id: str) -> bool: """Close an SSH tunnel.""" if tunnel_id not in self._active_tunnels: return False try: # Close tunnel handle if tunnel_id in self._tunnel_handles: tunnel_handle = self._tunnel_handles[tunnel_id] tunnel_handle.close() del self._tunnel_handles[tunnel_id] # Remove tunnel info del self._active_tunnels[tunnel_id] return True except Exception: return False async def close_all_tunnels(self) -> int: """Close all active tunnels.""" closed_count = 0 tunnel_ids = list(self._active_tunnels.keys()) for tunnel_id in tunnel_ids: if await self.close_tunnel(tunnel_id): closed_count += 1 return closed_count def get_tunnel_info(self, tunnel_id: str) -> Optional[TunnelInfo]: """Get information about a specific tunnel.""" return self._active_tunnels.get(tunnel_id) def list_tunnels(self, profile_id: Optional[str] = None) -> List[TunnelInfo]: """List all active tunnels, optionally filtered by profile.""" tunnels = list(self._active_tunnels.values()) if profile_id: tunnels = [t for t in tunnels if t.profile_id == profile_id] return sorted(tunnels, key=lambda t: t.created_at, reverse=True) def get_tunnel_stats(self) -> Dict[str, Any]: """Get statistics about active tunnels.""" now = time.time() active_tunnels = [t for t in self._active_tunnels.values() if not t.is_expired] expired_tunnels = [t for t in self._active_tunnels.values() if t.is_expired] return { "total_tunnels": len(self._active_tunnels), "active_tunnels": len(active_tunnels), "expired_tunnels": len(expired_tunnels), "tunnel_types": { t.tunnel_type: len([t2 for t2 in active_tunnels if t2.tunnel_type == t.tunnel_type]) for t in active_tunnels }, "profiles": { t.profile_id: len([t2 for t2 in active_tunnels if t2.profile_id == t.profile_id]) for t in active_tunnels } } async def _get_connection(self, host: str, port: int, secret: SecretData): """Get or create an SSH connection.""" # For now, create a new connection for each tunnel # In production, you might want connection pooling if asyncssh is None: raise ImportError("asyncssh is required for tunnel operations") try: if secret.has_key_auth(): conn = await asyncssh.connect( host, port=port, username=secret.username, client_keys=[secret.private_key], passphrase=secret.passphrase, known_hosts=None ) else: conn = await asyncssh.connect( host, port=port, username=secret.username, password=secret.password, known_hosts=None ) return conn except Exception as e: raise RuntimeError(f"Failed to establish SSH connection: {e}") async def _cleanup_expired_tunnels(self) -> None: """Background task to clean up expired tunnels.""" while True: try: # Find expired tunnels expired_tunnels = [ tunnel_id for tunnel_id, tunnel_info in self._active_tunnels.items() if tunnel_info.is_expired ] # Close expired tunnels for tunnel_id in expired_tunnels: await self.close_tunnel(tunnel_id) # Sleep for a bit before next check await asyncio.sleep(60) # Check every minute except asyncio.CancelledError: break except Exception: # Log error and continue await asyncio.sleep(60) async def stop_cleanup_task(self) -> None: """Stop the background cleanup task.""" if self._cleanup_task and not self._cleanup_task.done(): self._cleanup_task.cancel() try: await self._cleanup_task except asyncio.CancelledError: pass @asynccontextmanager async def tunnel_context(self, tunnel_info: TunnelInfo): """Context manager for automatic tunnel cleanup.""" try: yield tunnel_info finally: await self.close_tunnel(tunnel_info.tunnel_id)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keepithuman/openaccess-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server