Skip to main content
Glama

Vectara MCP server

Official
by vectara
health_checks.py13.4 kB
""" Health check endpoints for Vectara MCP Server. Provides liveness, readiness, and detailed health status endpoints for production deployment with load balancers and orchestration platforms. """ import asyncio import logging import time from typing import Dict, Any, Optional, List from dataclasses import dataclass from enum import Enum from .connection_manager import get_connection_manager from . import __version__ logger = logging.getLogger(__name__) class HealthStatus(Enum): """Health check status values.""" HEALTHY = "healthy" UNHEALTHY = "unhealthy" DEGRADED = "degraded" UNKNOWN = "unknown" @dataclass class HealthCheck: """Individual health check result.""" name: str status: HealthStatus message: str response_time_ms: Optional[float] = None details: Optional[Dict[str, Any]] = None class HealthChecker: """Manages health checks for the MCP server.""" def __init__(self): """Initialize health checker.""" self.server_start_time = time.time() self.last_check_cache = {} self.cache_ttl = 5 # Cache health checks for 5 seconds async def liveness_check(self) -> Dict[str, Any]: """Basic liveness check - is the server process running and responding? This should be fast and only check if the process is alive. Used by load balancers to determine if traffic should be routed here. Returns: Dict: Liveness status """ return { "status": HealthStatus.HEALTHY.value, "timestamp": time.time(), "uptime_seconds": round(time.time() - self.server_start_time, 2), "version": __version__, "service": "vectara-mcp-server" } async def readiness_check(self) -> Dict[str, Any]: """Readiness check - can the server handle traffic? Checks critical dependencies that must be working for the server to properly handle requests. Used by orchestration platforms. Returns: Dict: Readiness status with dependency checks """ checks = [] overall_status = HealthStatus.HEALTHY start_time = time.time() # Check connection manager try: connection_check = await self._check_connection_manager() checks.append(connection_check) if connection_check.status != HealthStatus.HEALTHY: overall_status = HealthStatus.UNHEALTHY except Exception as e: checks.append(HealthCheck( name="connection_manager", status=HealthStatus.UNHEALTHY, message=f"Connection manager check failed: {str(e)}" )) overall_status = HealthStatus.UNHEALTHY # Check Vectara API connectivity try: vectara_check = await self._check_vectara_connectivity() checks.append(vectara_check) if vectara_check.status == HealthStatus.UNHEALTHY: overall_status = HealthStatus.UNHEALTHY elif vectara_check.status == HealthStatus.DEGRADED and overall_status == HealthStatus.HEALTHY: overall_status = HealthStatus.DEGRADED except Exception as e: checks.append(HealthCheck( name="vectara_api", status=HealthStatus.UNHEALTHY, message=f"Vectara API check failed: {str(e)}" )) overall_status = HealthStatus.UNHEALTHY total_time = round((time.time() - start_time) * 1000, 2) return { "status": overall_status.value, "timestamp": time.time(), "response_time_ms": total_time, "checks": [ { "name": check.name, "status": check.status.value, "message": check.message, "response_time_ms": check.response_time_ms, "details": check.details } for check in checks ] } async def detailed_health_check(self) -> Dict[str, Any]: """Comprehensive health check with all system components. Provides detailed information about all system components, metrics, and configuration. Used for monitoring and debugging. Returns: Dict: Detailed health status """ checks = [] metrics = {} overall_status = HealthStatus.HEALTHY start_time = time.time() # Basic server info server_info = { "uptime_seconds": round(time.time() - self.server_start_time, 2), "version": __version__, "service": "vectara-mcp-server", "pid": os.getpid() if hasattr(os, 'getpid') else None } # Connection manager health try: connection_check = await self._check_connection_manager_detailed() checks.append(connection_check) if connection_check.status != HealthStatus.HEALTHY: overall_status = HealthStatus.DEGRADED except Exception as e: checks.append(HealthCheck( name="connection_manager_detailed", status=HealthStatus.UNHEALTHY, message=f"Detailed connection check failed: {str(e)}" )) overall_status = HealthStatus.UNHEALTHY # Vectara API connectivity try: vectara_check = await self._check_vectara_connectivity() checks.append(vectara_check) if vectara_check.status == HealthStatus.UNHEALTHY: overall_status = HealthStatus.UNHEALTHY elif vectara_check.status == HealthStatus.DEGRADED and overall_status == HealthStatus.HEALTHY: overall_status = HealthStatus.DEGRADED except Exception as e: checks.append(HealthCheck( name="vectara_api_detailed", status=HealthStatus.UNHEALTHY, message=f"Vectara API detailed check failed: {str(e)}" )) overall_status = HealthStatus.UNHEALTHY # Memory usage (if available) try: import psutil process = psutil.Process() metrics["memory"] = { "rss_mb": round(process.memory_info().rss / 1024 / 1024, 2), "vms_mb": round(process.memory_info().vms / 1024 / 1024, 2), "percent": round(process.memory_percent(), 2) } except ImportError: metrics["memory"] = {"error": "psutil not available"} except Exception as e: metrics["memory"] = {"error": str(e)} total_time = round((time.time() - start_time) * 1000, 2) return { "status": overall_status.value, "timestamp": time.time(), "response_time_ms": total_time, "server": server_info, "checks": [ { "name": check.name, "status": check.status.value, "message": check.message, "response_time_ms": check.response_time_ms, "details": check.details } for check in checks ], "metrics": metrics } async def _check_connection_manager(self) -> HealthCheck: """Check connection manager basic health.""" start_time = time.time() try: manager = await get_connection_manager() stats = manager.get_stats() response_time = round((time.time() - start_time) * 1000, 2) if stats["session_initialized"]: return HealthCheck( name="connection_manager", status=HealthStatus.HEALTHY, message="Connection manager initialized and ready", response_time_ms=response_time, details={"circuit_breaker_state": stats["circuit_breaker"]["state"]} ) else: return HealthCheck( name="connection_manager", status=HealthStatus.UNHEALTHY, message="Connection manager not initialized", response_time_ms=response_time ) except Exception as e: response_time = round((time.time() - start_time) * 1000, 2) return HealthCheck( name="connection_manager", status=HealthStatus.UNHEALTHY, message=f"Connection manager error: {str(e)}", response_time_ms=response_time ) async def _check_connection_manager_detailed(self) -> HealthCheck: """Check connection manager detailed health.""" start_time = time.time() try: manager = await get_connection_manager() stats = manager.get_stats() response_time = round((time.time() - start_time) * 1000, 2) circuit_state = stats["circuit_breaker"]["state"] failure_count = stats["circuit_breaker"]["failure_count"] if stats["session_initialized"]: if circuit_state == "open": status = HealthStatus.UNHEALTHY message = f"Circuit breaker OPEN with {failure_count} failures" elif circuit_state == "half_open": status = HealthStatus.DEGRADED message = "Circuit breaker testing recovery" elif failure_count > 0: status = HealthStatus.DEGRADED message = f"Recent failures: {failure_count}" else: status = HealthStatus.HEALTHY message = "Connection manager healthy" return HealthCheck( name="connection_manager_detailed", status=status, message=message, response_time_ms=response_time, details=stats ) else: return HealthCheck( name="connection_manager_detailed", status=HealthStatus.UNHEALTHY, message="Connection manager not initialized", response_time_ms=response_time ) except Exception as e: response_time = round((time.time() - start_time) * 1000, 2) return HealthCheck( name="connection_manager_detailed", status=HealthStatus.UNHEALTHY, message=f"Connection manager error: {str(e)}", response_time_ms=response_time ) async def _check_vectara_connectivity(self) -> HealthCheck: """Check Vectara API connectivity.""" cache_key = "vectara_connectivity" # Check cache first if cache_key in self.last_check_cache: cached_result, cache_time = self.last_check_cache[cache_key] if time.time() - cache_time < self.cache_ttl: return cached_result start_time = time.time() try: manager = await get_connection_manager() health_result = await manager.health_check("https://api.vectara.io") response_time = round((time.time() - start_time) * 1000, 2) if health_result["status"] == "healthy": status = HealthStatus.HEALTHY message = f"Vectara API accessible ({health_result['response_time_ms']}ms)" else: status = HealthStatus.DEGRADED message = f"Vectara API issues: {health_result.get('error', 'Unknown error')}" result = HealthCheck( name="vectara_api", status=status, message=message, response_time_ms=response_time, details={ "api_response_time_ms": health_result.get("response_time_ms"), "circuit_breaker_state": health_result.get("circuit_breaker_state") } ) # Cache the result self.last_check_cache[cache_key] = (result, time.time()) return result except Exception as e: response_time = round((time.time() - start_time) * 1000, 2) result = HealthCheck( name="vectara_api", status=HealthStatus.UNHEALTHY, message=f"Vectara API connectivity failed: {str(e)}", response_time_ms=response_time ) # Cache the result self.last_check_cache[cache_key] = (result, time.time()) return result # Global health checker instance health_checker = HealthChecker() # Convenience functions for FastMCP integration async def get_liveness() -> Dict[str, Any]: """Get liveness status.""" return await health_checker.liveness_check() async def get_readiness() -> Dict[str, Any]: """Get readiness status.""" return await health_checker.readiness_check() async def get_detailed_health() -> Dict[str, Any]: """Get detailed health status.""" return await health_checker.detailed_health_check() # Import os here to avoid issues if not available import os

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vectara/vectara-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server