Skip to main content
Glama

Fabric MCP Agent

by yingkiat
logging_config.py19.5 kB
""" Comprehensive logging and performance tracking for MCP Agent """ import logging import json import time import uuid from datetime import datetime from typing import Dict, Any, Optional from functools import wraps import inspect import traceback # Configure structured logging class JSONFormatter(logging.Formatter): def format(self, record): log_data = { "timestamp": datetime.utcnow().isoformat(), "level": record.levelname, "logger": record.name, "message": record.getMessage(), } # Add extra fields if present if hasattr(record, 'request_id'): log_data['request_id'] = record.request_id if hasattr(record, 'user_question'): log_data['user_question'] = record.user_question if hasattr(record, 'duration_ms'): log_data['duration_ms'] = record.duration_ms if hasattr(record, 'error_type'): log_data['error_type'] = record.error_type if hasattr(record, 'tool_name'): log_data['tool_name'] = record.tool_name if hasattr(record, 'classification'): log_data['classification'] = record.classification if hasattr(record, 'api_calls'): log_data['api_calls'] = record.api_calls if hasattr(record, 'tokens_used'): log_data['tokens_used'] = record.tokens_used if hasattr(record, 'sql_query'): log_data['sql_query'] = record.sql_query if hasattr(record, 'result_count'): log_data['result_count'] = record.result_count return json.dumps(log_data) def log_request_flow(request_id: str, user_question: str, persona: str, intent_type: str, execution_path: str, direct_tools_result: dict = None, success: bool = True, duration_ms: float = None): """Log consolidated request flow - integrated into session logs""" # Build direct tools summary tools_summary = "none" if direct_tools_result: if direct_tools_result.get("success"): tools_summary = f"success:{direct_tools_result.get('tool_used', 'unknown')}" else: tools_summary = f"failed:{direct_tools_result.get('reason', 'unknown')}" # Add flow data to session log instead of separate flow.log from session_logger import get_session_logger logger = get_session_logger(request_id, user_question) logger.log_event( "REQUEST_FLOW_SUMMARY", { "user_question": user_question[:100], "persona": persona, "intent_type": intent_type, "execution_path": execution_path, "direct_tools": tools_summary, "success": success, "duration_ms": duration_ms } ) def log_negative_case(request_id: str, case_type: str, user_question: str, reason: str, classification: dict = None, additional_context: dict = None): """Log negative use cases for analysis and improvement""" negative_logger = logging.getLogger('negative_cases') negative_logger.error( f"NEGATIVE_CASE: {case_type}", extra={ 'request_id': request_id, 'case_type': case_type, 'user_question': user_question, 'failure_reason': reason, 'classification': classification, 'context': additional_context } ) def log_usage_metrics(request_id: str, metric_type: str, value: Any, persona: str = None, tool_used: str = None): """Log usage metrics for analytics""" usage_logger = logging.getLogger('usage_metrics') usage_logger.info( f"METRIC: {metric_type}", extra={ 'request_id': request_id, 'metric_type': metric_type, 'value': value, 'persona': persona, 'tool_used': tool_used } ) def setup_logging(): """Setup comprehensive logging configuration""" # Create logs directory import os if not os.path.exists('logs'): os.makedirs('logs') # Flow logging integrated into session logs (removed separate flow.log) # Setup negative cases logger for failure tracking negative_logger = logging.getLogger('negative_cases') negative_logger.setLevel(logging.ERROR) negative_handler = logging.FileHandler('logs/negative_cases.log') negative_handler.setFormatter(JSONFormatter()) negative_logger.addHandler(negative_handler) # Setup usage metrics logger (commented out until needed) # usage_logger = logging.getLogger('usage_metrics') # usage_logger.setLevel(logging.INFO) # usage_handler = logging.FileHandler('logs/usage_metrics.log') # usage_handler.setFormatter(JSONFormatter()) # usage_logger.addHandler(usage_handler) # Ensure logs directory exists import os os.makedirs('logs', exist_ok=True) # Main application logger main_logger = logging.getLogger("mcp_agent") main_logger.setLevel(logging.INFO) # Performance logger perf_logger = logging.getLogger("mcp_agent.performance") perf_logger.setLevel(logging.INFO) # Error logger error_logger = logging.getLogger("mcp_agent.errors") error_logger.setLevel(logging.ERROR) # API calls logger api_logger = logging.getLogger("mcp_agent.api") api_logger.setLevel(logging.INFO) # Clear existing handlers for logger in [main_logger, perf_logger, error_logger, api_logger]: logger.handlers.clear() # Console handler with JSON formatting console_handler = logging.StreamHandler() console_handler.setFormatter(JSONFormatter()) # File handlers main_file_handler = logging.FileHandler('logs/mcp_agent.log') main_file_handler.setFormatter(JSONFormatter()) perf_file_handler = logging.FileHandler('logs/performance.log') perf_file_handler.setFormatter(JSONFormatter()) # error_file_handler = logging.FileHandler('logs/errors.log') # error_file_handler.setFormatter(JSONFormatter()) # api_file_handler = logging.FileHandler('logs/api_calls.log') # api_file_handler.setFormatter(JSONFormatter()) # Add handlers main_logger.addHandler(console_handler) main_logger.addHandler(main_file_handler) perf_logger.addHandler(perf_file_handler) # error_logger.addHandler(error_file_handler) # api_logger.addHandler(api_file_handler) return { 'main': main_logger, 'performance': perf_logger, 'errors': error_logger, 'api': api_logger } # Global performance tracker class PerformanceTracker: def __init__(self): self.metrics = {} self.loggers = setup_logging() def start_request(self, request_id: str, endpoint: str, user_question: str = None): """Start tracking a new business question session""" self.metrics[request_id] = { 'request_id': request_id, 'endpoint': endpoint, 'user_question': user_question, 'session_start_time': time.time(), 'api_calls': 0, 'tokens_used': 0, 'tools_executed': [], 'errors': [], 'classification': None, 'sql_queries': [], 'result_counts': [], 'session_phases': { 'intent_classification': {'start': None, 'end': None, 'tokens': 0}, 'sql_generation': {'start': None, 'end': None, 'tokens': 0}, 'sql_execution': {'start': None, 'end': None, 'duration_ms': 0}, 'result_processing': {'start': None, 'end': None} } } self.loggers['main'].info( f"Started request: {endpoint}", extra={ 'request_id': request_id, 'user_question': user_question, 'endpoint': endpoint } ) def log_api_call(self, request_id: str, model: str, prompt_tokens: int, completion_tokens: int, purpose: str): """Log an API call with token usage and track session phases""" if request_id in self.metrics: self.metrics[request_id]['api_calls'] += 1 self.metrics[request_id]['tokens_used'] += prompt_tokens + completion_tokens # Track phase timing and tokens phases = self.metrics[request_id]['session_phases'] current_time = time.time() if purpose == 'intent_classification': if phases['intent_classification']['start'] is None: phases['intent_classification']['start'] = current_time phases['intent_classification']['end'] = current_time phases['intent_classification']['tokens'] += prompt_tokens + completion_tokens elif purpose == 'sql_generation': if phases['sql_generation']['start'] is None: phases['sql_generation']['start'] = current_time phases['sql_generation']['end'] = current_time phases['sql_generation']['tokens'] += prompt_tokens + completion_tokens self.loggers['api'].info( f"API call: {purpose}", extra={ 'request_id': request_id, 'model': model, 'prompt_tokens': prompt_tokens, 'completion_tokens': completion_tokens, 'total_tokens': prompt_tokens + completion_tokens, 'purpose': purpose } ) def log_tool_execution(self, request_id: str, tool_name: str, duration_ms: float, success: bool, result_count: int = None): """Log tool execution metrics""" if request_id in self.metrics: self.metrics[request_id]['tools_executed'].append({ 'tool': tool_name, 'duration_ms': duration_ms, 'success': success, 'result_count': result_count }) if result_count is not None: self.metrics[request_id]['result_counts'].append(result_count) self.loggers['performance'].info( f"Tool executed: {tool_name}", extra={ 'request_id': request_id, 'tool_name': tool_name, 'duration_ms': duration_ms, 'success': success, 'result_count': result_count } ) def log_classification(self, request_id: str, classification: Dict[str, Any]): """Log intent classification results""" if request_id in self.metrics: self.metrics[request_id]['classification'] = classification self.loggers['main'].info( f"Intent classified: {classification.get('intent', 'unknown')}", extra={ 'request_id': request_id, 'classification': classification } ) def log_sql_query(self, request_id: str, sql: str, execution_time_ms: float, result_count: int): """Log SQL query execution and track session phases""" if request_id in self.metrics: self.metrics[request_id]['sql_queries'].append({ 'sql': sql[:200] + '...' if len(sql) > 200 else sql, # Truncate long queries 'execution_time_ms': execution_time_ms, 'result_count': result_count }) # Track SQL execution phase phases = self.metrics[request_id]['session_phases'] current_time = time.time() if phases['sql_execution']['start'] is None: phases['sql_execution']['start'] = current_time - (execution_time_ms / 1000) phases['sql_execution']['end'] = current_time phases['sql_execution']['duration_ms'] += execution_time_ms self.loggers['performance'].info( "SQL query executed", extra={ 'request_id': request_id, 'sql_query': sql[:200] + '...' if len(sql) > 200 else sql, 'duration_ms': execution_time_ms, 'result_count': result_count } ) def log_error(self, request_id: str, error: Exception, context: str): """Log errors with full context""" error_data = { 'error_type': type(error).__name__, 'error_message': str(error), 'context': context, 'traceback': traceback.format_exc() } if request_id in self.metrics: self.metrics[request_id]['errors'].append(error_data) self.loggers['errors'].error( f"Error in {context}: {str(error)}", extra={ 'request_id': request_id, 'error_type': type(error).__name__, 'context': context, 'traceback': traceback.format_exc() } ) def end_request(self, request_id: str, success: bool = True): """Finish tracking business question session and log final metrics""" if request_id not in self.metrics: return metrics = self.metrics[request_id] session_end_time = time.time() total_session_duration = (session_end_time - metrics['session_start_time']) * 1000 # Convert to ms # Calculate phase durations phases = metrics['session_phases'] phase_durations = {} for phase_name, phase_data in phases.items(): if phase_data['start'] and phase_data['end']: phase_durations[f'{phase_name}_duration_ms'] = (phase_data['end'] - phase_data['start']) * 1000 elif phase_name == 'sql_execution' and phase_data['duration_ms'] > 0: phase_durations[f'{phase_name}_duration_ms'] = phase_data['duration_ms'] # Log business question session summary self.loggers['performance'].info( f"Business Question Session Completed: {metrics['endpoint']}", extra={ 'request_id': request_id, 'session_duration_ms': total_session_duration, 'api_calls': metrics['api_calls'], 'tokens_used': metrics['tokens_used'], 'tools_count': len(metrics['tools_executed']), 'errors_count': len(metrics['errors']), 'success': success, 'user_question': metrics.get('user_question'), 'classification': metrics.get('classification'), 'total_results': sum(metrics['result_counts']) if metrics['result_counts'] else 0, 'phase_breakdown': phase_durations, 'intent_classification_tokens': phases['intent_classification']['tokens'], 'sql_generation_tokens': phases['sql_generation']['tokens'] } ) # Clean up del self.metrics[request_id] def log_direct_tool_success(self, request_id: str, tool_name: str, execution_time_ms: float): """Log successful direct tool execution""" if request_id in self.metrics: self.metrics[request_id]['tools_executed'].append({ 'tool': f"direct_{tool_name}", 'execution_time_ms': execution_time_ms, 'success': True }) self.loggers['main'].info( f"Direct tool executed successfully: {tool_name}", extra={ 'request_id': request_id, 'tool_name': f"direct_{tool_name}", 'execution_time_ms': execution_time_ms, 'tool_type': 'direct' } ) def log_direct_tool_failure(self, request_id: str, tool_name: str, error_msg: str): """Log direct tool execution failure""" if request_id in self.metrics: self.metrics[request_id]['tools_executed'].append({ 'tool': f"direct_{tool_name}", 'success': False, 'error': error_msg }) self.metrics[request_id]['errors'].append({ 'type': 'direct_tool_failure', 'tool': tool_name, 'error': error_msg }) self.loggers['errors'].error( f"Direct tool failed: {tool_name} - {error_msg}", extra={ 'request_id': request_id, 'tool_name': f"direct_{tool_name}", 'error_type': 'direct_tool_failure', 'tool_type': 'direct' } ) def log_direct_tool_fallback(self, request_id: str, reason: str): """Log when direct tools are not used and system falls back to AI workflow""" if request_id in self.metrics: self.metrics[request_id]['tools_executed'].append({ 'tool': 'ai_workflow_fallback', 'reason': reason, 'fallback_used': True }) self.loggers['main'].info( f"Direct tool fallback: {reason}", extra={ 'request_id': request_id, 'fallback_reason': reason, 'execution_path': 'ai_workflow' } ) # Global tracker instance tracker = PerformanceTracker() def track_performance(tool_name: str): """Decorator to track function performance""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): # Try to extract request_id from arguments request_id = kwargs.get('request_id') or getattr(args[0] if args else None, 'request_id', None) if not request_id: request_id = str(uuid.uuid4())[:8] start_time = time.time() success = True result = None try: result = func(*args, **kwargs) # Try to count results if it's a list or dict with results result_count = None if isinstance(result, dict) and 'results' in result: if isinstance(result['results'], list): result_count = len(result['results']) elif isinstance(result, list): result_count = len(result) return result except Exception as e: success = False tracker.log_error(request_id, e, f"{tool_name}_execution") raise finally: duration_ms = (time.time() - start_time) * 1000 result_count = None if result and isinstance(result, dict) and 'results' in result: if isinstance(result['results'], list): result_count = len(result['results']) tracker.log_tool_execution(request_id, tool_name, duration_ms, success, result_count) return wrapper return decorator def generate_request_id(): """Generate unique request ID""" return str(uuid.uuid4()) # Create logs directory if it doesn't exist import os if not os.path.exists('logs'): os.makedirs('logs')

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yingkiat/mcp_fabric_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server