Skip to main content
Glama

MCP Developer Server

by RA86-dev
monitoring_tools.py14.7 kB
""" Monitoring and metrics tools for system and container performance """ import json import time import psutil import docker from typing import Dict, Any, List from collections import defaultdict from datetime import datetime class MonitoringTools: """System and container monitoring functionality""" def __init__(self, docker_client, active_containers: dict, logger=None): self.docker_client = docker_client self.active_containers = active_containers self.logger = logger self.monitoring_data = defaultdict(list) self.performance_metrics = { "container_stats": {}, "system_stats": {}, "operation_times": [], "api_response_times": [], "error_rates": defaultdict(int), "resource_usage": defaultdict(list), } def register_tools(self, mcp_server): """Register monitoring tools with the MCP server""" @mcp_server.tool() async def monitor_system_resources() -> str: """Monitor system CPU, memory, disk, and network usage""" try: # CPU usage cpu_percent = psutil.cpu_percent(interval=1) cpu_count = psutil.cpu_count() cpu_freq = psutil.cpu_freq() # Memory usage memory = psutil.virtual_memory() swap = psutil.swap_memory() # Disk usage disk_usage = [] for partition in psutil.disk_partitions(): try: partition_usage = psutil.disk_usage(partition.mountpoint) disk_usage.append( { "device": partition.device, "mountpoint": partition.mountpoint, "fstype": partition.fstype, "total_gb": round(partition_usage.total / (1024**3), 2), "used_gb": round(partition_usage.used / (1024**3), 2), "free_gb": round(partition_usage.free / (1024**3), 2), "percent_used": round( (partition_usage.used / partition_usage.total) * 100, 1, ), } ) except PermissionError: continue # Network usage network = psutil.net_io_counters() system_info = { "timestamp": datetime.now().isoformat(), "cpu": { "percent": cpu_percent, "count": cpu_count, "frequency_mhz": cpu_freq.current if cpu_freq else None, }, "memory": { "total_gb": round(memory.total / (1024**3), 2), "available_gb": round(memory.available / (1024**3), 2), "used_gb": round(memory.used / (1024**3), 2), "percent": memory.percent, }, "swap": { "total_gb": round(swap.total / (1024**3), 2), "used_gb": round(swap.used / (1024**3), 2), "percent": swap.percent, }, "disk": disk_usage, "network": { "bytes_sent": network.bytes_sent, "bytes_recv": network.bytes_recv, "packets_sent": network.packets_sent, "packets_recv": network.packets_recv, }, } # Store for historical tracking self.performance_metrics["system_stats"][ datetime.now().isoformat() ] = system_info return json.dumps(system_info, indent=2) except Exception as e: return f"Error monitoring system resources: {str(e)}" @mcp_server.tool() async def monitor_container_performance(container_id: str) -> str: """Monitor performance metrics for a specific container""" try: container = self._find_container(container_id) if not container: return f"Container {container_id} not found" # Get container stats stats = container.stats(stream=False) # Calculate CPU usage cpu_stats = stats.get("cpu_stats", {}) precpu_stats = stats.get("precpu_stats", {}) cpu_usage = 0.0 if cpu_stats.get("cpu_usage") and precpu_stats.get("cpu_usage"): cpu_delta = ( cpu_stats["cpu_usage"]["total_usage"] - precpu_stats["cpu_usage"]["total_usage"] ) system_delta = cpu_stats.get( "system_cpu_usage", 0 ) - precpu_stats.get("system_cpu_usage", 0) online_cpus = cpu_stats.get( "online_cpus", len(cpu_stats.get("cpu_usage", {}).get("percpu_usage", [1])), ) if system_delta > 0: cpu_usage = (cpu_delta / system_delta) * online_cpus * 100.0 # Memory usage memory_stats = stats.get("memory_stats", {}) memory_usage = memory_stats.get("usage", 0) memory_limit = memory_stats.get("limit", 0) memory_percent = ( (memory_usage / memory_limit * 100) if memory_limit else 0 ) # Network I/O networks = stats.get("networks", {}) network_rx = sum(net.get("rx_bytes", 0) for net in networks.values()) network_tx = sum(net.get("tx_bytes", 0) for net in networks.values()) # Block I/O blkio_stats = stats.get("blkio_stats", {}) blk_read = 0 blk_write = 0 for entry in blkio_stats.get("io_service_bytes_recursive", []): if entry.get("op") == "Read": blk_read += entry.get("value", 0) elif entry.get("op") == "Write": blk_write += entry.get("value", 0) container_performance = { "container_id": container_id[:12], "name": container.name, "timestamp": datetime.now().isoformat(), "cpu": {"usage_percent": round(cpu_usage, 2)}, "memory": { "usage_bytes": memory_usage, "usage_mb": round(memory_usage / (1024**2), 2), "limit_bytes": memory_limit, "limit_mb": round(memory_limit / (1024**2), 2), "usage_percent": round(memory_percent, 2), }, "network": { "rx_bytes": network_rx, "tx_bytes": network_tx, "rx_mb": round(network_rx / (1024**2), 2), "tx_mb": round(network_tx / (1024**2), 2), }, "block_io": { "read_bytes": blk_read, "write_bytes": blk_write, "read_mb": round(blk_read / (1024**2), 2), "write_mb": round(blk_write / (1024**2), 2), }, } # Store for historical tracking self.performance_metrics["container_stats"][ container_id ] = container_performance return json.dumps(container_performance, indent=2) except Exception as e: return f"Error monitoring container performance: {str(e)}" @mcp_server.tool() async def get_server_status() -> str: """Get comprehensive server status and health information""" try: # Docker status docker_info = self.docker_client.info() # Active containers count containers = self.docker_client.containers.list(all=True) running_containers = len( [c for c in containers if c.status == "running"] ) total_containers = len(containers) # System uptime boot_time = psutil.boot_time() uptime_seconds = time.time() - boot_time uptime_hours = uptime_seconds / 3600 # Load averages (Unix-like systems only) load_avg = None try: load_avg = psutil.getloadavg() except AttributeError: pass # Windows doesn't have load averages status_info = { "timestamp": datetime.now().isoformat(), "system": { "uptime_hours": round(uptime_hours, 2), "boot_time": datetime.fromtimestamp(boot_time).isoformat(), "load_average": load_avg, "platform": psutil.LINUX or psutil.MACOS or psutil.WINDOWS, }, "docker": { "version": docker_info.get("ServerVersion", "Unknown"), "containers_running": running_containers, "containers_total": total_containers, "images_count": len(self.docker_client.images.list()), "storage_driver": docker_info.get("Driver", "Unknown"), "kernel_version": docker_info.get("KernelVersion", "Unknown"), }, "resources": { "cpu_count": psutil.cpu_count(), "memory_total_gb": round( psutil.virtual_memory().total / (1024**3), 2 ), "disk_total_gb": ( round(psutil.disk_usage("/").total / (1024**3), 2) if psutil.disk_usage("/") else 0 ), }, "active_services": { "mcp_server": True, "docker_daemon": True, "monitoring": True, }, } return json.dumps(status_info, indent=2) except Exception as e: return f"Error getting server status: {str(e)}" @mcp_server.tool() async def create_container_backup( container_id: str, backup_name: str = None ) -> str: """Create a backup of a container and its data""" try: container = self._find_container(container_id) if not container: return f"Container {container_id} not found" if not backup_name: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_name = f"{container.name}_backup_{timestamp}" # Commit the container to create an image image = container.commit(repository=backup_name, tag="latest") backup_info = { "backup_name": backup_name, "original_container": container_id, "image_id": image.id, "created": datetime.now().isoformat(), "size_mb": round(image.attrs.get("Size", 0) / (1024**2), 2), } return f"Container backup created: {json.dumps(backup_info, indent=2)}" except Exception as e: return f"Error creating container backup: {str(e)}" @mcp_server.tool() async def create_workspace_backup(backup_name: str = None) -> str: """Create a backup of the shared workspace""" try: import tarfile from pathlib import Path workspace_path = Path("/tmp/workspace") if not workspace_path.exists(): return "Workspace directory does not exist" if not backup_name: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_name = f"workspace_backup_{timestamp}.tar.gz" backup_path = Path("/tmp") / backup_name with tarfile.open(backup_path, "w:gz") as tar: tar.add(workspace_path, arcname="workspace") backup_size = backup_path.stat().st_size backup_info = { "backup_name": backup_name, "backup_path": str(backup_path), "created": datetime.now().isoformat(), "size_bytes": backup_size, "size_mb": round(backup_size / (1024**2), 2), } return f"Workspace backup created: {json.dumps(backup_info, indent=2)}" except Exception as e: return f"Error creating workspace backup: {str(e)}" def _find_container(self, container_id: str): """Find a container by ID or name""" try: # Try exact match first if container_id in self.active_containers: return self.active_containers[container_id]["container"] # Try to get from Docker directly try: return self.docker_client.containers.get(container_id) except docker.errors.NotFound: # Try partial ID match containers = self.docker_client.containers.list(all=True) for container in containers: if ( container.id.startswith(container_id) or container.name == container_id ): return container return None except Exception as e: if self.logger: self.logger.error(f"Error finding container: {e}") return None

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/RA86-dev/mcpdev-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server