Skip to main content
Glama
ccglm_mcp_server_original.py25.9 kB
#!/usr/bin/env python3 """ CCGLM MCP Server - Claude Code con backend GLM vía Z.AI Basado en patrón ccr-mcp con inyección de credenciales GLM """ import asyncio import json import logging import os import signal import sys import subprocess import glob import time import fcntl import atexit from pathlib import Path from typing import Any, Dict, List, Optional, Set from datetime import datetime import shlex from dotenv import load_dotenv import mcp.server.stdio import mcp.types as types from mcp.server import Server # Import enhanced logging utilities from logging_utils import get_logger # Initialize enhanced logger (replaces basic logging setup) ccglm_logger = get_logger() logger = ccglm_logger.logger # Cargar variables de entorno desde .env load_dotenv() # Crear servidor MCP server = Server("ccglm-mcp") # Configuración de timeouts sincronizada con Claude Code (300s) DEFAULT_TIMEOUT = 280 # 4.6 minutos (un poco menos que Claude para evitar race conditions) MAX_TIMEOUT = 295 # 4.9 minutos (margen de seguridad antes del timeout de Claude) CONNECTION_TIMEOUT = 15 # 15 segundos para startup de subprocess # Timeouts específicos por modelo para optimización (siempre < 295s) MODEL_TIMEOUTS = { "glm-4.5-air": 120, # 2 minutos para modelo rápido "glm-4.6": 280 # 4.6 minutos para modelo completo (alineado con Claude) } # Configuración GLM - cargada de variables de entorno o .env GLM_BASE_URL = os.getenv("GLM_BASE_URL", "https://api.z.ai/api/anthropic") GLM_AUTH_TOKEN = os.getenv("GLM_AUTH_TOKEN") # Validar que el token esté configurado if not GLM_AUTH_TOKEN: logger.error("❌ GLM_AUTH_TOKEN no configurado. Debe estar en variables de entorno o archivo .env") sys.exit(1) # Cache para sistema de archivos optimizado _file_cache = { "last_scan": 0, "files": set(), "directory": "" } # Process Manager para evitar leaks y manejar cleanup class ProcessManager: """Gestor centralizado de procesos con cleanup garantizado""" def __init__(self): self.active_processes: Set[asyncio.subprocess.Process] = set() self.shutdown_lock = asyncio.Lock() self._shutting_down = False async def create_process(self, *args, **kwargs) -> asyncio.subprocess.Process: """Crear proceso y registrar para cleanup automático""" process = await asyncio.create_subprocess_exec(*args, **kwargs) self.active_processes.add(process) logger.debug(f"📝 Process {process.pid} registered with ProcessManager") return process async def cleanup_process(self, process: asyncio.subprocess.Process): """Cleanup individual con manejo robusto de errores""" if process not in self.active_processes: return try: self.active_processes.discard(process) if process.returncode is None: # Still running logger.debug(f"🔄 Terminating process {process.pid} gracefully...") process.terminate() try: await asyncio.wait_for(process.wait(), timeout=5) logger.debug(f"✅ Process {process.pid} terminated gracefully") except asyncio.TimeoutError: logger.warning(f"⚡ Force killing process {process.pid}...") process.kill() await process.wait() logger.debug(f"💀 Process {process.pid} force killed") else: logger.debug(f"✅ Process {process.pid} already terminated with code {process.returncode}") except ProcessLookupError: logger.debug(f"👻 Process {process.pid} already gone") except Exception as e: logger.error(f"❌ Error cleaning up process {process.pid}: {e}") async def shutdown_all(self): """Shutdown de todos los procesos con cleanup garantizado""" async with self.shutdown_lock: if self._shutting_down: return self._shutting_down = True logger.info(f"🛑 ProcessManager shutting down {len(self.active_processes)} processes...") # Crear copia para evitar modificar set durante iteración processes_to_cleanup = list(self.active_processes) # Cleanup concurrente con timeout cleanup_tasks = [self.cleanup_process(p) for p in processes_to_cleanup] if cleanup_tasks: try: await asyncio.wait_for( asyncio.gather(*cleanup_tasks, return_exceptions=True), timeout=10 ) except asyncio.TimeoutError: logger.warning("⏰ Process shutdown timeout, some processes may remain") self.active_processes.clear() logger.info("✅ ProcessManager shutdown complete") # Instancia global del ProcessManager process_manager = ProcessManager() # PID file locking para evitar instancias múltiples PID_FILE = "/tmp/ccglm-mcp.pid" _pid_file_handle = None def acquire_pid_lock() -> bool: """Adquirir lock exclusivo usando PID file""" global _pid_file_handle try: _pid_file_handle = open(PID_FILE, 'w') fcntl.flock(_pid_file_handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) _pid_file_handle.write(str(os.getpid())) _pid_file_handle.flush() logger.info(f"🔒 PID lock acquired for process {os.getpid()}") return True except (IOError, OSError): if _pid_file_handle: _pid_file_handle.close() _pid_file_handle = None logger.warning("⚠️ Another CCGLM MCP instance is already running") return False def release_pid_lock(): """Liberar PID file lock""" global _pid_file_handle try: if _pid_file_handle: fcntl.flock(_pid_file_handle.fileno(), fcntl.LOCK_UN) _pid_file_handle.close() _pid_file_handle = None if os.path.exists(PID_FILE): os.remove(PID_FILE) logger.info("🔓 PID lock released") except Exception as e: logger.error(f"❌ Error releasing PID lock: {e}") # Signal handlers para shutdown graceful def setup_signal_handlers(): """Configurar signal handlers para shutdown limpio""" def signal_handler(signum, frame): logger.info(f"📡 Received signal {signum}, initiating graceful shutdown...") asyncio.create_task(process_manager.shutdown_all()) release_pid_lock() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) if hasattr(signal, 'SIGHUP'): signal.signal(signal.SIGHUP, signal_handler) # Registration de cleanup al exit atexit.register(lambda: asyncio.run(process_manager.shutdown_all())) atexit.register(release_pid_lock) def get_current_files(directory: str = ".", use_cache: bool = True) -> Set[str]: """Obtener conjunto de archivos actuales con caché optimizada""" global _file_cache try: current_time = time.time() # Usar caché si es reciente (menos de 5 segundos) y mismo directorio if (use_cache and _file_cache["directory"] == directory and current_time - _file_cache["last_scan"] < 5): return _file_cache["files"].copy() # Escanear todos los archivos necesarios files = set() for root, dirs, filenames in os.walk(directory): # Excluir directorios internos dirs[:] = [d for d in dirs if d not in {'.claude', '.git', 'node_modules', '__pycache__', '.venv', '.next', 'dist', 'build'}] for filename in filenames: files.add(os.path.join(root, filename)) # Actualizar caché _file_cache.update({ "last_scan": current_time, "files": files.copy(), "directory": directory }) return files except Exception as e: logger.warning(f"Error scanning directory {directory}: {e}") return set() def detect_new_files(before: Set[str], after: Set[str]) -> List[str]: """Detectar archivos nuevos comparando dos sets""" new_files = after - before return sorted(list(new_files)) def format_file_summary(new_files: List[str], stdout_text: str) -> str: """Formatear resumen de archivos creados""" if not new_files: return stdout_text # Crear resumen de archivos creados summary_lines = [ f"✅ GLM execution completed successfully!", f"📁 {len(new_files)} files created:" ] for file_path in new_files[:10]: # Limitar a primeros 10 archivos try: file_size = os.path.getsize(file_path) summary_lines.append(f" • {file_path} ({file_size} bytes)") except: summary_lines.append(f" • {file_path}") if len(new_files) > 10: summary_lines.append(f" ... and {len(new_files) - 10} more files") # Agregar el output original si existe y es relevante if stdout_text and len(stdout_text.strip()) > 0: summary_lines.extend([ "", "📝 Original output:", stdout_text ]) return "\n".join(summary_lines) def sanitize_for_log(text: str) -> str: """Sanitizar datos sensibles de los logs""" if GLM_AUTH_TOKEN: text = text.replace(GLM_AUTH_TOKEN, "***REDACTED***") return text def contains_chinese(text: str) -> bool: """ Detecta si el texto contiene caracteres chinos. Rangos Unicode cubiertos: - U+4E00–U+9FFF: CJK Unified Ideographs (caracteres comunes) - U+3400–U+4DBF: CJK Extension A - U+20000–U+2A6DF: CJK Extension B """ if not text: return False for char in text: code = ord(char) # Rango principal CJK if 0x4E00 <= code <= 0x9FFF: return True # CJK Extension A if 0x3400 <= code <= 0x4DBF: return True # CJK Extension B if 0x20000 <= code <= 0x2A6DF: return True return False @server.list_tools() async def list_tools() -> List[types.Tool]: """Listar herramientas disponibles""" return [ types.Tool( name="ccglm", description="Route prompt to GLM-4.6 (default) or glm-4.5-air (fast) via Claude CLI (handles all tasks: code generation, analysis, general queries)", inputSchema={ "type": "object", "properties": { "prompt": { "type": "string", "description": "The prompt to send to GLM" }, "model": { "type": "string", "description": "Model to use: glm-4.5-air (fast) or glm-4.6 (default)", "enum": ["glm-4.5-air", "glm-4.6"], "default": "glm-4.6" } }, "required": ["prompt"] } ) ] @server.call_tool() async def call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]: """Manejar llamadas a herramientas""" start_time = time.perf_counter() try: # Create request context with enhanced logging context = ccglm_logger.create_request_context(name, arguments) # Log request event ccglm_logger.log_request(context) if name == "ccglm": prompt = arguments.get("prompt", "") # VALIDACIÓN DE IDIOMA if contains_chinese(prompt): error_msg = ( "❌ CCGLM-MCP: Idioma no soportado\n\n" "Los prompts en chino no son aceptados por este servidor.\n" "GLM-4.6 está optimizado para español e inglés.\n\n" "Idiomas permitidos: Español, Inglés\n" "Idiomas bloqueados: Chino (中文/繁體/简体)\n\n" "Sugerencia: Use el modelo Claude principal para procesamiento en chino." ) logger.warning("Prompt rechazado por contener caracteres chinos") validation_result = {"error": error_msg} ccglm_logger.log_response(context, validation_result, start_time) return [types.TextContent(type="text", text=error_msg)] result = await ccglm_route(arguments) else: result = {"error": f"Unknown tool: {name}"} # Log response event ccglm_logger.log_response(context, result, start_time) # Formatear respuesta if isinstance(result, dict): if "error" in result: response = f"❌ Error: {result['error']}" else: # Para GLM, mostrar solo la respuesta response = result.get("response", json.dumps(result, indent=2, ensure_ascii=False)) else: response = str(result) return [types.TextContent(type="text", text=response)] except Exception as e: # Log error event with enhanced logging context = ccglm_logger.create_request_context(name, arguments) ccglm_logger.log_error(context, e, start_time) return [types.TextContent( type="text", text=f"❌ Error executing {name}: {str(e)}" )] async def ccglm_route(args: Dict[str, Any]) -> Dict[str, Any]: """Route prompt to GLM via Claude CLI with Z.AI credentials""" prompt = args.get("prompt", "") start_time = time.time() if not prompt: logger.error("No prompt provided in ccglm request") return {"error": "No prompt provided"} # Create a basic context for subprocess logging (will be enhanced in call_tool) context = { "instance_id": ccglm_logger.instance_id, "pid": ccglm_logger.pid, "tool": "ccglm" } try: # Enhanced subprocess logging cwd = os.getcwd() files_before = get_current_files(cwd) # Log process start ccglm_logger.log_process_event( context, "spawn", cmd_preview="claude --dangerously-skip-permissions -c -p", cwd=cwd, files_before=len(files_before) ) # Preparar environment con credenciales GLM env = os.environ.copy() env["ANTHROPIC_BASE_URL"] = GLM_BASE_URL env["ANTHROPIC_AUTH_TOKEN"] = GLM_AUTH_TOKEN # Seleccionar modelo model = args.get("model", "glm-4.6") env["ANTHROPIC_MODEL"] = model # Debug logging para verificar configuración del modelo logger.info(f"🎯 MODEL DEBUG: Requested={model}, ANTHROPIC_MODEL={env['ANTHROPIC_MODEL']}") logger.info(f"🔧 ENVIRONMENT DEBUG: GLM_BASE_URL={GLM_BASE_URL}") # Determinar timeout basado en modelo seleccionado model_timeout = MODEL_TIMEOUTS.get(model, DEFAULT_TIMEOUT) effective_timeout = min(model_timeout, MAX_TIMEOUT) logger.info(f"⏱️ Using timeout: {effective_timeout}s for model {model}") # Comando Claude CLI con flags requeridos cmd = ["claude", "--dangerously-skip-permissions", "-c", "-p"] # Crear proceso con Process Manager y comunicación stdin process = None try: process = await asyncio.wait_for( process_manager.create_process( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE, env=env ), timeout=CONNECTION_TIMEOUT ) logger.info(f"🎯 Process {process.pid} created via ProcessManager") except asyncio.TimeoutError: logger.error(f"❌ Failed to start Claude CLI within {CONNECTION_TIMEOUT}s") return {"error": f"Failed to start Claude CLI within {CONNECTION_TIMEOUT}s"} stdout_text = "" stderr_text = "" files_after = set() new_files = set() try: # Comunicación única y confiable con el subprocess logger.info(f"🔄 Sending prompt to GLM model {model} (timeout: {effective_timeout}s)") stdout, stderr = await asyncio.wait_for( process.communicate(input=prompt.encode('utf-8')), timeout=effective_timeout ) # Decodificar salidas stdout_text = stdout.decode('utf-8', errors='replace').strip() stderr_text = stderr.decode('utf-8', errors='replace').strip() # Capturar archivos después de la ejecución files_after = get_current_files(cwd) new_files = detect_new_files(files_before, files_after) # Log process completion with enhanced details ccglm_logger.log_process_event( context, "exit", exit_code=process.returncode, stdout_len=len(stdout_text), stderr_len=len(stderr_text), stderr_preview=stderr_text[:200] if stderr_text else None, files_after=len(files_after), files_created=len(new_files), new_files=new_files[:10] ) except asyncio.TimeoutError: logger.warning(f"⏰ GLM process timeout after {effective_timeout}s for model {model}") return {"error": f"Request timed out after {effective_timeout}s for model {model}"} except Exception as e: logger.error(f"❌ Unexpected error in GLM communication: {e}") return {"error": f"Communication error: {str(e)}"} finally: # Cleanup garantizado del proceso via ProcessManager if process: await process_manager.cleanup_process(process) # Manejo mejorado de códigos de salida if process.returncode != 0: # Verificar si hay respuesta útil en stdout a pesar del error if stdout_text and len(stdout_text) > 10: logger.warning(f"⚠️ GLM returned error code {process.returncode} but has output ({len(stdout_text)} chars)") # Continuar procesando la respuesta elif new_files: logger.warning(f"⚠️ GLM returned error code {process.returncode} but created {len(new_files)} files") # Continuar procesando aunque no haya stdout else: error_msg = stderr_text or f"GLM exited with code {process.returncode}" logger.error(f"❌ GLM command failed: {sanitize_for_log(error_msg)}") return {"error": f"GLM failed: {sanitize_for_log(error_msg)}"} # Enhanced logging for file creation if new_files: ccglm_logger.log_process_event( context, "file_creation", files_created=len(new_files), new_files=new_files[:10], file_summary=" ".join([os.path.basename(f) for f in new_files[:5]]) ) logger.info(f"✅ Success: GLM created {len(new_files)} files") response_text = format_file_summary(new_files, stdout_text) elif stdout_text and len(stdout_text.strip()) > 0: ccglm_logger.log_process_event( context, "text_output", output_length=len(stdout_text), response_preview=stdout_text[:200] ) logger.info("✅ Success: GLM returned text output") response_text = stdout_text else: ccglm_logger.log_process_event( context, "empty_response", warning="No output or files created" ) logger.warning("⚠️ GLM completed but returned empty response and created no files") response_text = "⚠️ GLM execution completed but returned no output or created files. Check GLM logs for details." final_response = { "response": response_text, "model_requested": model, "model_configured": env["ANTHROPIC_MODEL"], "model_used": model, # Esto debería verificarse con la API en el futuro "success": True, "timestamp": datetime.now().isoformat(), "execution_time": round(time.time() - start_time, 2), "exit_code": process.returncode, "files_created": len(new_files), "new_files": new_files[:10] if new_files else [], # Limitar a primeros 10 "stderr": sanitize_for_log(stderr_text) if stderr_text else None, "debug_info": { "glm_base_url": GLM_BASE_URL, "claude_command": "claude --dangerously-skip-permissions -c -p" } } # Enhanced success logging con métricas detalladas ccglm_logger.log_process_event( context, "success", execution_time=final_response['execution_time'], exit_code=process.returncode, files_created=len(new_files), response_length=len(response_text), model_requested=model, model_configured=env["ANTHROPIC_MODEL"] ) # Logging detallado de rendimiento logger.info(f"🎉 GLM routing completed successfully in {final_response['execution_time']}s") logger.info(f"📊 PERFORMANCE METRICS:") logger.info(f" Model requested: {model}") logger.info(f" Model configured: {env['ANTHROPIC_MODEL']}") logger.info(f" Execution time: {final_response['execution_time']}s") logger.info(f" Response length: {len(response_text)} chars") logger.info(f" Files created: {len(new_files)}") logger.info(f" Exit code: {process.returncode}") # Alertas de rendimiento optimizadas para nuevos timeouts if final_response['execution_time'] > effective_timeout * 0.8: logger.warning(f"⚠️ SLOW RESPONSE: {final_response['execution_time']}s exceeds 80% of timeout ({effective_timeout}s)") elif final_response['execution_time'] > effective_timeout * 0.5: logger.warning(f"⚠️ MODERATE SLOW RESPONSE: {final_response['execution_time']}s exceeds 50% of timeout ({effective_timeout}s)") # Alertas específicas por modelo if model == "glm-4.5-air": if final_response['execution_time'] > 45: logger.warning(f"🚨 FAST MODEL SLOW PERFORMANCE: glm-4.5-air took {final_response['execution_time']}s (timeout: 45s)") elif final_response['execution_time'] > 30: logger.warning(f"⚠️ FAST MODEL MODERATE: glm-4.5-air took {final_response['execution_time']}s (should be <30s)") elif model == "glm-4.6": if final_response['execution_time'] > 180: logger.warning(f"🚨 FULL MODEL SLOW PERFORMANCE: glm-4.6 took {final_response['execution_time']}s (timeout: 180s)") elif final_response['execution_time'] > 120: logger.warning(f"⚠️ FULL MODEL MODERATE: glm-4.6 took {final_response['execution_time']}s (should be <120s)") return final_response except FileNotFoundError: error_msg = "claude command not found. Make sure Claude CLI is installed and in PATH" ccglm_logger.log_process_event( context, "command_not_found", error_msg=error_msg ) logger.error(f"❌ {error_msg}") return {"error": error_msg} except Exception as e: execution_time = time.time() - start_time ccglm_logger.log_process_event( context, "unhandled_exception", execution_time=execution_time, error_type=type(e).__name__, error_message=str(e) ) logger.error(f"💥 GLM routing failed after {execution_time:.2f}s: {e}", exc_info=True) return {"error": f"Unexpected error: {str(e)}"} async def main(): """Main entry point""" # Adquirir PID lock para evitar instancias múltiples if not acquire_pid_lock(): logger.error("❌ Another CCGLM MCP instance is already running") sys.exit(1) # Configurar signal handlers para shutdown graceful setup_signal_handlers() logger.info("CCGLM MCP Server starting...") logger.info("GLM routing mode - routes prompts via Claude CLI to Z.AI GLM backend") logger.info(f"GLM endpoint: {GLM_BASE_URL}") logger.info(f"Timeouts - Default: {DEFAULT_TIMEOUT}s, Max: {MAX_TIMEOUT}s, Models: {MODEL_TIMEOUTS}") # Debug logging inicial para variables de entorno logger.info("🔧 ENVIRONMENT DEBUG AT STARTUP:") logger.info(f" GLM_BASE_URL: {GLM_BASE_URL}") logger.info(f" GLM_AUTH_TOKEN: {'***CONFIGURED***' if GLM_AUTH_TOKEN else 'NOT_CONFIGURED'}") logger.info(f" ANTHROPIC_MODEL (default): {os.getenv('ANTHROPIC_MODEL', 'NOT_SET')}") logger.info(f" ANTHROPIC_BASE_URL (env): {os.getenv('ANTHROPIC_BASE_URL', 'NOT_SET')}") logger.info(f" ANTHROPIC_AUTH_TOKEN (env): {'***SET***' if os.getenv('ANTHROPIC_AUTH_TOKEN') else 'NOT_SET'}") try: async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): logger.info("Server ready, waiting for connections...") await server.run( read_stream, write_stream, server.create_initialization_options() ) finally: # Cleanup garantizado al salir await process_manager.shutdown_all() release_pid_lock() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Server shutdown by user") except Exception as e: logger.error(f"Server error: {e}", exc_info=True) sys.exit(1)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nosolosoft/ccglm-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server