Skip to main content
Glama

MCP Ahrefs

by SAGAAIDEV
elasticsearch_example.py6.3 kB
""" Example Elasticsearch destination implementation (commented out). This file demonstrates how easy it is to add a new destination. To enable this destination: 1. Uncomment the code 2. Add 'elasticsearch[async]' to pyproject.toml dependencies 3. Register in __init__.py: LogDestinationFactory.register_destination('elasticsearch', ElasticsearchDestination) """ # from typing import List, Dict, Any, Optional # from datetime import datetime # import json # # from elasticsearch import AsyncElasticsearch # from elasticsearch.exceptions import ElasticsearchException # # from .base import LogDestination, LogEntry # # # class ElasticsearchDestination(LogDestination): # """Elasticsearch destination for centralized log aggregation.""" # # def __init__(self, server_config, **settings): # """Initialize Elasticsearch destination. # # Args: # server_config: Server configuration object # **settings: Additional settings including: # - host: Elasticsearch host (default: localhost) # - port: Elasticsearch port (default: 9200) # - index: Index name prefix (default: mcp-logs) # - username: Optional username for authentication # - password: Optional password for authentication # """ # self.config = server_config # self.host = settings.get('host', 'localhost') # self.port = settings.get('port', 9200) # self.index_prefix = settings.get('index', 'mcp-logs') # # # Build connection arguments # hosts = [{'host': self.host, 'port': self.port}] # kwargs = {'hosts': hosts} # # # Add authentication if provided # if 'username' in settings and 'password' in settings: # kwargs['http_auth'] = (settings['username'], settings['password']) # # # Initialize client # self.client = AsyncElasticsearch(**kwargs) # # async def write(self, entry: LogEntry) -> None: # """Write log entry to Elasticsearch.""" # # Use daily indices for easy management # index_name = f"{self.index_prefix}-{datetime.utcnow():%Y.%m.%d}" # # # Convert LogEntry to dict # doc = { # 'correlation_id': entry.correlation_id, # 'timestamp': entry.timestamp, # 'level': entry.level, # 'log_type': entry.log_type, # 'message': entry.message, # 'tool_name': entry.tool_name, # 'duration_ms': entry.duration_ms, # 'status': entry.status, # 'input_args': entry.input_args, # 'output_summary': entry.output_summary, # 'error_message': entry.error_message, # 'module': entry.module, # 'function': entry.function, # 'line': entry.line, # 'thread_name': entry.thread_name, # 'process_id': entry.process_id, # 'extra_data': entry.extra_data # } # # # Remove None values # doc = {k: v for k, v in doc.items() if v is not None} # # try: # await self.client.index(index=index_name, document=doc) # except ElasticsearchException: # # Silently ignore errors for fire-and-forget pattern # pass # # async def query(self, **filters) -> List[LogEntry]: # """Query logs from Elasticsearch.""" # # Build query # query = {'bool': {'must': []}} # # if 'correlation_id' in filters: # query['bool']['must'].append({'term': {'correlation_id': filters['correlation_id']}}) # # if 'tool_name' in filters: # query['bool']['must'].append({'term': {'tool_name': filters['tool_name']}}) # # if 'level' in filters: # query['bool']['must'].append({'term': {'level': filters['level']}}) # # # Time range filter # if 'start_time' in filters or 'end_time' in filters: # time_range = {} # if 'start_time' in filters: # time_range['gte'] = filters['start_time'] # if 'end_time' in filters: # time_range['lte'] = filters['end_time'] # query['bool']['must'].append({'range': {'timestamp': time_range}}) # # # Search all relevant indices # index_pattern = f"{self.index_prefix}-*" # # try: # result = await self.client.search( # index=index_pattern, # query=query if query['bool']['must'] else {'match_all': {}}, # size=filters.get('limit', 1000), # sort=[{'timestamp': {'order': 'desc'}}] # ) # # # Convert results to LogEntry objects # entries = [] # for hit in result['hits']['hits']: # source = hit['_source'] # entry = LogEntry( # correlation_id=source['correlation_id'], # timestamp=datetime.fromisoformat(source['timestamp']), # level=source['level'], # log_type=source['log_type'], # message=source['message'], # tool_name=source.get('tool_name'), # duration_ms=source.get('duration_ms'), # status=source.get('status'), # input_args=source.get('input_args'), # output_summary=source.get('output_summary'), # error_message=source.get('error_message'), # module=source.get('module'), # function=source.get('function'), # line=source.get('line'), # thread_name=source.get('thread_name'), # process_id=source.get('process_id'), # extra_data=source.get('extra_data') # ) # entries.append(entry) # # return entries # except ElasticsearchException: # return [] # # async def close(self) -> None: # """Close Elasticsearch connection.""" # await self.client.close()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SAGAAIDEV/mcp-ahrefs'

If you have feedback or need assistance with the MCP directory API, please join our Discord server