Skip to main content
Glama
system.py9.22 kB
# SPDX-License-Identifier: GPL-3.0-only OR MIT """ System diagnostics and information module. Provides system health checks, disk space monitoring, and service status. """ import logging import os import re from pathlib import Path from typing import Dict, Any, List from .utils import ( IS_ARCH, run_command, create_error_response, check_command_exists ) logger = logging.getLogger(__name__) async def get_system_info() -> Dict[str, Any]: """ Get core system information. Returns: Dict with kernel, architecture, hostname, uptime, memory info """ logger.info("Gathering system information") info = {} try: # Kernel version exit_code, stdout, _ = await run_command(["uname", "-r"], timeout=5, check=False) if exit_code == 0: info["kernel"] = stdout.strip() # Architecture exit_code, stdout, _ = await run_command(["uname", "-m"], timeout=5, check=False) if exit_code == 0: info["architecture"] = stdout.strip() # Hostname exit_code, stdout, _ = await run_command(["hostname"], timeout=5, check=False) if exit_code == 0: info["hostname"] = stdout.strip() # Uptime exit_code, stdout, _ = await run_command(["uptime", "-p"], timeout=5, check=False) if exit_code == 0: info["uptime"] = stdout.strip() # Memory info from /proc/meminfo try: meminfo_path = Path("/proc/meminfo") if meminfo_path.exists(): with open(meminfo_path, "r") as f: meminfo = f.read() # Parse memory values mem_total_match = re.search(r"MemTotal:\s+(\d+)", meminfo) mem_available_match = re.search(r"MemAvailable:\s+(\d+)", meminfo) if mem_total_match: info["memory_total_kb"] = int(mem_total_match.group(1)) info["memory_total_mb"] = int(mem_total_match.group(1)) // 1024 if mem_available_match: info["memory_available_kb"] = int(mem_available_match.group(1)) info["memory_available_mb"] = int(mem_available_match.group(1)) // 1024 except Exception as e: logger.warning(f"Failed to read memory info: {e}") info["is_arch_linux"] = IS_ARCH logger.info("Successfully gathered system information") return info except Exception as e: logger.error(f"Failed to gather system info: {e}") return create_error_response( "SystemInfoError", f"Failed to gather system information: {str(e)}" ) async def check_disk_space() -> Dict[str, Any]: """ Check disk space for critical paths. Returns: Dict with disk usage for /, /home, /var, /var/cache/pacman/pkg """ logger.info("Checking disk space") paths_to_check = ["/", "/home", "/var"] if IS_ARCH: paths_to_check.append("/var/cache/pacman/pkg") disk_info = {} try: for path in paths_to_check: if not Path(path).exists(): continue exit_code, stdout, _ = await run_command( ["df", "-h", path], timeout=5, check=False ) if exit_code == 0: lines = stdout.strip().split('\n') if len(lines) >= 2: # Parse df output parts = lines[1].split() if len(parts) >= 5: disk_info[path] = { "size": parts[1], "used": parts[2], "available": parts[3], "use_percent": parts[4], "mounted_on": parts[5] if len(parts) > 5 else path } # Check if space is critically low use_pct = int(parts[4].rstrip('%')) if use_pct > 90: disk_info[path]["warning"] = "Critical: Less than 10% free" elif use_pct > 80: disk_info[path]["warning"] = "Low: Less than 20% free" logger.info(f"Checked disk space for {len(disk_info)} paths") return { "disk_usage": disk_info, "paths_checked": len(disk_info) } except Exception as e: logger.error(f"Failed to check disk space: {e}") return create_error_response( "DiskCheckError", f"Failed to check disk space: {str(e)}" ) async def get_pacman_cache_stats() -> Dict[str, Any]: """ Analyze pacman package cache. Returns: Dict with cache size, package count, statistics """ if not IS_ARCH: return create_error_response( "NotSupported", "Pacman cache analysis is only available on Arch Linux" ) logger.info("Analyzing pacman cache") cache_dir = Path("/var/cache/pacman/pkg") try: if not cache_dir.exists(): return create_error_response( "NotFound", "Pacman cache directory not found" ) # Count packages pkg_files = list(cache_dir.glob("*.pkg.tar.*")) pkg_count = len(pkg_files) # Calculate total size total_size = sum(f.stat().st_size for f in pkg_files) total_size_mb = total_size / (1024 * 1024) total_size_gb = total_size_mb / 1024 logger.info(f"Cache: {pkg_count} packages, {total_size_gb:.2f} GB") return { "cache_dir": str(cache_dir), "package_count": pkg_count, "total_size_bytes": total_size, "total_size_mb": round(total_size_mb, 2), "total_size_gb": round(total_size_gb, 2) } except Exception as e: logger.error(f"Failed to analyze cache: {e}") return create_error_response( "CacheAnalysisError", f"Failed to analyze pacman cache: {str(e)}" ) async def check_failed_services() -> Dict[str, Any]: """ Detect failed systemd services. Returns: Dict with list of failed services """ if not check_command_exists("systemctl"): return create_error_response( "NotSupported", "systemctl not available (systemd-based system required)" ) logger.info("Checking for failed services") try: exit_code, stdout, _ = await run_command( ["systemctl", "--failed", "--no-pager"], timeout=10, check=False ) # Parse output failed_services = [] lines = stdout.strip().split('\n') for line in lines: # Skip header and footer lines if line.startswith('●') or line.startswith('UNIT'): continue if 'loaded units listed' in line.lower(): continue # Parse service line parts = line.split() if parts and parts[0].endswith('.service'): failed_services.append({ "unit": parts[0], "load": parts[1] if len(parts) > 1 else "", "active": parts[2] if len(parts) > 2 else "", "sub": parts[3] if len(parts) > 3 else "", }) logger.info(f"Found {len(failed_services)} failed services") return { "failed_count": len(failed_services), "failed_services": failed_services, "all_ok": len(failed_services) == 0 } except Exception as e: logger.error(f"Failed to check services: {e}") return create_error_response( "ServiceCheckError", f"Failed to check failed services: {str(e)}" ) async def get_boot_logs(lines: int = 100) -> Dict[str, Any]: """ Retrieve recent boot logs. Args: lines: Number of lines to retrieve Returns: Dict with boot log contents """ if not check_command_exists("journalctl"): return create_error_response( "NotSupported", "journalctl not available (systemd-based system required)" ) logger.info(f"Retrieving {lines} lines of boot logs") try: exit_code, stdout, stderr = await run_command( ["journalctl", "-b", "-n", str(lines), "--no-pager"], timeout=15, check=False ) if exit_code != 0: return create_error_response( "CommandError", f"Failed to retrieve boot logs: {stderr}" ) log_lines = stdout.strip().split('\n') logger.info(f"Retrieved {len(log_lines)} lines of boot logs") return { "line_count": len(log_lines), "logs": log_lines } except Exception as e: logger.error(f"Failed to get boot logs: {e}") return create_error_response( "LogRetrievalError", f"Failed to retrieve boot logs: {str(e)}" )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nihalxkumar/arch-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server