elasticsearch_example.py•6.3 kB
"""
Example Elasticsearch destination implementation (commented out).
This file demonstrates how easy it is to add a new destination.
To enable this destination:
1. Uncomment the code
2. Add 'elasticsearch[async]' to pyproject.toml dependencies
3. Register in __init__.py: LogDestinationFactory.register_destination('elasticsearch', ElasticsearchDestination)
"""
# from typing import List, Dict, Any, Optional
# from datetime import datetime
# import json
#
# from elasticsearch import AsyncElasticsearch
# from elasticsearch.exceptions import ElasticsearchException
#
# from .base import LogDestination, LogEntry
#
#
# class ElasticsearchDestination(LogDestination):
# """Elasticsearch destination for centralized log aggregation."""
#
# def __init__(self, server_config, **settings):
# """Initialize Elasticsearch destination.
#
# Args:
# server_config: Server configuration object
# **settings: Additional settings including:
# - host: Elasticsearch host (default: localhost)
# - port: Elasticsearch port (default: 9200)
# - index: Index name prefix (default: mcp-logs)
# - username: Optional username for authentication
# - password: Optional password for authentication
# """
# self.config = server_config
# self.host = settings.get('host', 'localhost')
# self.port = settings.get('port', 9200)
# self.index_prefix = settings.get('index', 'mcp-logs')
#
# # Build connection arguments
# hosts = [{'host': self.host, 'port': self.port}]
# kwargs = {'hosts': hosts}
#
# # Add authentication if provided
# if 'username' in settings and 'password' in settings:
# kwargs['http_auth'] = (settings['username'], settings['password'])
#
# # Initialize client
# self.client = AsyncElasticsearch(**kwargs)
#
# async def write(self, entry: LogEntry) -> None:
# """Write log entry to Elasticsearch."""
# # Use daily indices for easy management
# index_name = f"{self.index_prefix}-{datetime.utcnow():%Y.%m.%d}"
#
# # Convert LogEntry to dict
# doc = {
# 'correlation_id': entry.correlation_id,
# 'timestamp': entry.timestamp,
# 'level': entry.level,
# 'log_type': entry.log_type,
# 'message': entry.message,
# 'tool_name': entry.tool_name,
# 'duration_ms': entry.duration_ms,
# 'status': entry.status,
# 'input_args': entry.input_args,
# 'output_summary': entry.output_summary,
# 'error_message': entry.error_message,
# 'module': entry.module,
# 'function': entry.function,
# 'line': entry.line,
# 'thread_name': entry.thread_name,
# 'process_id': entry.process_id,
# 'extra_data': entry.extra_data
# }
#
# # Remove None values
# doc = {k: v for k, v in doc.items() if v is not None}
#
# try:
# await self.client.index(index=index_name, document=doc)
# except ElasticsearchException:
# # Silently ignore errors for fire-and-forget pattern
# pass
#
# async def query(self, **filters) -> List[LogEntry]:
# """Query logs from Elasticsearch."""
# # Build query
# query = {'bool': {'must': []}}
#
# if 'correlation_id' in filters:
# query['bool']['must'].append({'term': {'correlation_id': filters['correlation_id']}})
#
# if 'tool_name' in filters:
# query['bool']['must'].append({'term': {'tool_name': filters['tool_name']}})
#
# if 'level' in filters:
# query['bool']['must'].append({'term': {'level': filters['level']}})
#
# # Time range filter
# if 'start_time' in filters or 'end_time' in filters:
# time_range = {}
# if 'start_time' in filters:
# time_range['gte'] = filters['start_time']
# if 'end_time' in filters:
# time_range['lte'] = filters['end_time']
# query['bool']['must'].append({'range': {'timestamp': time_range}})
#
# # Search all relevant indices
# index_pattern = f"{self.index_prefix}-*"
#
# try:
# result = await self.client.search(
# index=index_pattern,
# query=query if query['bool']['must'] else {'match_all': {}},
# size=filters.get('limit', 1000),
# sort=[{'timestamp': {'order': 'desc'}}]
# )
#
# # Convert results to LogEntry objects
# entries = []
# for hit in result['hits']['hits']:
# source = hit['_source']
# entry = LogEntry(
# correlation_id=source['correlation_id'],
# timestamp=datetime.fromisoformat(source['timestamp']),
# level=source['level'],
# log_type=source['log_type'],
# message=source['message'],
# tool_name=source.get('tool_name'),
# duration_ms=source.get('duration_ms'),
# status=source.get('status'),
# input_args=source.get('input_args'),
# output_summary=source.get('output_summary'),
# error_message=source.get('error_message'),
# module=source.get('module'),
# function=source.get('function'),
# line=source.get('line'),
# thread_name=source.get('thread_name'),
# process_id=source.get('process_id'),
# extra_data=source.get('extra_data')
# )
# entries.append(entry)
#
# return entries
# except ElasticsearchException:
# return []
#
# async def close(self) -> None:
# """Close Elasticsearch connection."""
# await self.client.close()