Skip to main content
Glama

MCP Ahrefs

by SAGAAIDEV
unified_logger.py8.67 kB
""" Unified logging factory with Loguru integration. This module provides a factory for creating correlation-aware loggers that write to pluggable destinations while intercepting both Loguru and standard Python logging. """ import logging import sys import asyncio from typing import Optional, Any, Dict, List from datetime import datetime from loguru import logger from .correlation import get_correlation_id, get_initialization_correlation_id from .destinations.base import LogDestination, LogEntry from .destinations.factory import LogDestinationFactory, DestinationConfig from .destinations.sqlite import SQLiteDestination class UnifiedLogger: """Factory for creating correlation-aware loggers with pluggable destinations.""" _destination: Optional[LogDestination] = None _initialized: bool = False _event_loop: Optional[asyncio.AbstractEventLoop] = None @classmethod def initialize(cls, destination: LogDestination, event_loop: Optional[asyncio.AbstractEventLoop] = None): """Initialize the unified logging system with a specific destination. Args: destination: The LogDestination implementation to use event_loop: Optional event loop for async operations. If not provided, will attempt to get the current running loop when needed. """ if cls._initialized: # Clean up previous configuration logger.remove() cls._destination = destination cls._event_loop = event_loop cls._initialized = True # Remove default Loguru handler logger.remove() # Add custom sink that writes to destination logger.add( cls._log_sink, level="DEBUG", enqueue=True, # Thread-safe enqueueing serialize=False # We'll handle serialization ourselves ) # Configure standard library logging to use Loguru logging.basicConfig(handlers=[InterceptHandler()], level=logging.INFO, force=True) # Silence noisy loggers logging.getLogger('asyncio').setLevel(logging.WARNING) logging.getLogger('asyncio.selector_events').setLevel(logging.WARNING) # Also intercept root logger logging.getLogger().handlers = [InterceptHandler()] @classmethod def _log_sink(cls, message): """Custom Loguru sink that writes to the configured destination. This method is called by Loguru for each log message and converts it to our unified LogEntry format before writing to the destination. """ if not cls._destination: return record = message.record # Extract extra data extra = dict(record["extra"]) # Build LogEntry # Get correlation ID from the bound logger context, not current context entry = LogEntry( correlation_id=extra.get("correlation_id") or get_correlation_id() or get_initialization_correlation_id() or "init", timestamp=record["time"].replace(tzinfo=None), # Remove timezone for SQLite level=record["level"].name, log_type=extra.get("log_type", "internal"), message=str(record["message"]), tool_name=extra.get("tool_name"), duration_ms=extra.get("duration_ms"), status=extra.get("status"), input_args=extra.get("input_args"), output_summary=extra.get("output_summary"), error_message=extra.get("error_message"), module=record["module"], function=record["function"], line=record["line"], thread_name=record["thread"].name if record["thread"] else None, process_id=record["process"].id if record["process"] else None, extra_data={k: v for k, v in extra.items() if k not in ["log_type", "tool_name", "duration_ms", "status", "input_args", "output_summary", "error_message"]} ) # Write to destination asynchronously try: # Try to get the event loop loop = cls._event_loop or asyncio.get_event_loop() # Schedule the write operation if loop.is_running(): # If loop is running, create a task asyncio.create_task(cls._destination.write(entry)) else: # If loop is not running, run until complete loop.run_until_complete(cls._destination.write(entry)) except RuntimeError: # No event loop available, try to create one try: asyncio.run(cls._destination.write(entry)) except Exception as e: # Fall back to stderr if we can't write to destination print(f"Failed to write log entry: {e}", file=sys.stderr) @classmethod def get_logger(cls, name: Optional[str] = None): """Get a correlation-aware logger instance. Args: name: Optional logger name for identification Returns: A Loguru logger instance bound with correlation ID and name """ bindings = { "correlation_id": get_correlation_id() } if name: bindings["logger_name"] = name return logger.bind(**bindings) @classmethod def set_event_loop(cls, loop: asyncio.AbstractEventLoop): """Set the event loop for async operations. This should be called when the application starts its event loop. """ cls._event_loop = loop @classmethod async def close(cls): """Close the logging system and clean up resources.""" if cls._destination: await cls._destination.close() cls._destination = None cls._initialized = False logger.remove() @classmethod def initialize_from_config(cls, destinations_config: List[DestinationConfig], server_config, event_loop: Optional[asyncio.AbstractEventLoop] = None): """Initialize the unified logging system from configuration. This method uses the factory pattern to create destinations based on configuration. Args: destinations_config: List of DestinationConfig objects server_config: Server configuration object event_loop: Optional event loop for async operations """ destination = LogDestinationFactory.create_from_config(destinations_config, server_config) cls.initialize(destination, event_loop) @classmethod def initialize_default(cls, server_config, event_loop: Optional[asyncio.AbstractEventLoop] = None): """Initialize with default SQLite destination for backward compatibility. Args: server_config: Server configuration object event_loop: Optional event loop for async operations """ # Create a default SQLite configuration default_config = [DestinationConfig(type='sqlite', enabled=True)] destination = LogDestinationFactory.create_from_config(default_config, server_config) cls.initialize(destination, event_loop) @classmethod def get_available_destinations(cls) -> List[str]: """Get list of available destination types. Returns: List of registered destination type names """ return LogDestinationFactory.get_available_types() class InterceptHandler(logging.Handler): """Intercept standard library logging and route to Loguru. This handler captures all standard logging calls and forwards them to Loguru, preserving the original context and level information. """ def emit(self, record: logging.LogRecord): """Emit a log record by forwarding to Loguru.""" # Get corresponding Loguru level if it exists try: level = logger.level(record.levelname).name except ValueError: level = record.levelno # Find caller from where originated the logged message frame, depth = sys._getframe(6), 6 while frame and frame.f_code.co_filename == logging.__file__: frame = frame.f_back depth += 1 # Log with Loguru, preserving the original context logger.opt(depth=depth, exception=record.exc_info).log( level, record.getMessage() )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SAGAAIDEV/mcp-ahrefs'

If you have feedback or need assistance with the MCP directory API, please join our Discord server