Skip to main content
Glama

CFM Tips - Cost Optimization MCP Server

by aws-samples
cloudwatch_optimization.py•17.9 kB
""" CloudWatch Optimization MCP Wrapper Functions This module provides MCP-compatible wrapper functions for CloudWatch optimization analysis. These functions follow the same pattern as other service optimization modules in the CFM Tips project. """ import asyncio import json import logging import time from typing import Dict, List, Any from mcp.types import TextContent from utils.logging_config import log_function_entry, log_function_exit from utils.error_handler import handle_aws_error, ResponseFormatter from utils.documentation_links import add_documentation_links logger = logging.getLogger(__name__) @handle_aws_error async def run_cloudwatch_general_spend_analysis_mcp(arguments: Dict[str, Any]) -> List[TextContent]: """Run CloudWatch general spend analysis to understand cost breakdown across logs, metrics, alarms, and dashboards.""" log_function_entry(logger, "run_cloudwatch_general_spend_analysis_mcp", arguments=arguments) start_time = time.time() try: from playbooks.cloudwatch.cloudwatch_optimization_analyzer import CloudWatchOptimizationAnalyzer from playbooks.cloudwatch.cost_controller import CostPreferences region = arguments.get("region") page = arguments.get("page", 1) timeout_seconds = arguments.get("timeout_seconds", 120) # Build cost preferences from arguments cost_preferences = CostPreferences( allow_cost_explorer=arguments.get("allow_cost_explorer", False), allow_aws_config=arguments.get("allow_aws_config", False), allow_cloudtrail=arguments.get("allow_cloudtrail", False), allow_minimal_cost_metrics=arguments.get("allow_minimal_cost_metrics", False) ) # Initialize analyzer analyzer = CloudWatchOptimizationAnalyzer(region=region, cost_preferences=cost_preferences) # Remove internal parameters from arguments analysis_args = {k: v for k, v in arguments.items() if k not in ['page', 'timeout_seconds']} # Execute analysis with timeout result = await asyncio.wait_for( analyzer.analyze_general_spend(page=page, **analysis_args), timeout=timeout_seconds ) # Format response formatted_result = ResponseFormatter.success_response( data=result, message="CloudWatch general spend analysis completed successfully", analysis_type="cloudwatch_general_spend" ) # Add documentation links formatted_result = add_documentation_links(formatted_result, "cloudwatch") execution_time = time.time() - start_time log_function_exit(logger, "run_cloudwatch_general_spend_analysis_mcp", "success", execution_time) return ResponseFormatter.to_text_content(formatted_result) except asyncio.TimeoutError: error_message = f"CloudWatch general spend analysis timed out after {timeout_seconds} seconds" logger.error(error_message) return ResponseFormatter.to_text_content(ResponseFormatter.error_response( error_message, "timeout_error", "cloudwatch_general_spend" )) except Exception as e: logger.error(f"CloudWatch general spend analysis failed: {str(e)}") return ResponseFormatter.to_text_content(ResponseFormatter.error_response( str(e), "analysis_error", "cloudwatch_general_spend" )) @handle_aws_error async def run_cloudwatch_metrics_optimization_mcp(arguments: Dict[str, Any]) -> List[TextContent]: """Run CloudWatch metrics optimization analysis to identify custom metrics cost optimization opportunities.""" log_function_entry(logger, "run_cloudwatch_metrics_optimization_mcp", arguments=arguments) start_time = time.time() try: from playbooks.cloudwatch.cloudwatch_optimization_analyzer import CloudWatchOptimizationAnalyzer from playbooks.cloudwatch.cost_controller import CostPreferences region = arguments.get("region") page = arguments.get("page", 1) timeout_seconds = arguments.get("timeout_seconds", 120) # Build cost preferences from arguments cost_preferences = CostPreferences( allow_cost_explorer=arguments.get("allow_cost_explorer", False), allow_aws_config=arguments.get("allow_aws_config", False), allow_cloudtrail=arguments.get("allow_cloudtrail", False), allow_minimal_cost_metrics=arguments.get("allow_minimal_cost_metrics", False) ) # Initialize analyzer analyzer = CloudWatchOptimizationAnalyzer(region=region, cost_preferences=cost_preferences) # Remove internal parameters from arguments analysis_args = {k: v for k, v in arguments.items() if k not in ['page', 'timeout_seconds']} # Execute analysis with timeout result = await asyncio.wait_for( analyzer.analyze_metrics_optimization(page=page, **analysis_args), timeout=timeout_seconds ) # Format response formatted_result = ResponseFormatter.success_response( data=result, message="CloudWatch metrics optimization analysis completed successfully", analysis_type="cloudwatch_metrics_optimization" ) # Add documentation links formatted_result = add_documentation_links(formatted_result, "cloudwatch") execution_time = time.time() - start_time log_function_exit(logger, "run_cloudwatch_metrics_optimization_mcp", "success", execution_time) return ResponseFormatter.to_text_content(formatted_result) except asyncio.TimeoutError: error_message = f"CloudWatch metrics optimization analysis timed out after {timeout_seconds} seconds" logger.error(error_message) return ResponseFormatter.to_text_content(ResponseFormatter.error_response( error_message, "timeout_error", "cloudwatch_metrics_optimization" )) except Exception as e: logger.error(f"CloudWatch metrics optimization analysis failed: {str(e)}") return ResponseFormatter.to_text_content(ResponseFormatter.error_response( str(e), "analysis_error", "cloudwatch_metrics_optimization" )) @handle_aws_error async def run_cloudwatch_logs_optimization_mcp(arguments: Dict[str, Any]) -> List[TextContent]: """Run CloudWatch logs optimization analysis to identify log retention and ingestion cost optimization opportunities.""" log_function_entry(logger, "run_cloudwatch_logs_optimization_mcp", arguments=arguments) start_time = time.time() try: from playbooks.cloudwatch.cloudwatch_optimization_analyzer import CloudWatchOptimizationAnalyzer from playbooks.cloudwatch.cost_controller import CostPreferences region = arguments.get("region") page = arguments.get("page", 1) timeout_seconds = arguments.get("timeout_seconds", 120) # Build cost preferences from arguments cost_preferences = CostPreferences( allow_cost_explorer=arguments.get("allow_cost_explorer", False), allow_aws_config=arguments.get("allow_aws_config", False), allow_cloudtrail=arguments.get("allow_cloudtrail", False), allow_minimal_cost_metrics=arguments.get("allow_minimal_cost_metrics", False) ) # Initialize analyzer analyzer = CloudWatchOptimizationAnalyzer(region=region, cost_preferences=cost_preferences) # Remove internal parameters from arguments analysis_args = {k: v for k, v in arguments.items() if k not in ['page', 'timeout_seconds']} # Execute analysis with timeout result = await asyncio.wait_for( analyzer.analyze_logs_optimization(page=page, **analysis_args), timeout=timeout_seconds ) # Format response formatted_result = ResponseFormatter.success_response( data=result, message="CloudWatch logs optimization analysis completed successfully", analysis_type="cloudwatch_logs_optimization" ) # Add documentation links formatted_result = add_documentation_links(formatted_result, "cloudwatch") execution_time = time.time() - start_time log_function_exit(logger, "run_cloudwatch_logs_optimization_mcp", "success", execution_time) return ResponseFormatter.to_text_content(formatted_result) except asyncio.TimeoutError: error_message = f"CloudWatch logs optimization analysis timed out after {timeout_seconds} seconds" logger.error(error_message) return ResponseFormatter.to_text_content(ResponseFormatter.error_response( error_message, "timeout_error", "cloudwatch_logs_optimization" )) except Exception as e: logger.error(f"CloudWatch logs optimization analysis failed: {str(e)}") return ResponseFormatter.to_text_content(ResponseFormatter.error_response( str(e), "analysis_error", "cloudwatch_logs_optimization" )) @handle_aws_error async def run_cloudwatch_alarms_and_dashboards_optimization_mcp(arguments: Dict[str, Any]) -> List[TextContent]: """Run CloudWatch alarms and dashboards optimization analysis to identify monitoring efficiency improvements.""" log_function_entry(logger, "run_cloudwatch_alarms_and_dashboards_optimization_mcp", arguments=arguments) start_time = time.time() try: from playbooks.cloudwatch.cloudwatch_optimization_analyzer import CloudWatchOptimizationAnalyzer from playbooks.cloudwatch.cost_controller import CostPreferences region = arguments.get("region") page = arguments.get("page", 1) timeout_seconds = arguments.get("timeout_seconds", 120) # Build cost preferences from arguments cost_preferences = CostPreferences( allow_cost_explorer=arguments.get("allow_cost_explorer", False), allow_aws_config=arguments.get("allow_aws_config", False), allow_cloudtrail=arguments.get("allow_cloudtrail", False), allow_minimal_cost_metrics=arguments.get("allow_minimal_cost_metrics", False) ) # Initialize analyzer analyzer = CloudWatchOptimizationAnalyzer(region=region, cost_preferences=cost_preferences) # Remove internal parameters from arguments analysis_args = {k: v for k, v in arguments.items() if k not in ['page', 'timeout_seconds']} # Execute analysis with timeout result = await asyncio.wait_for( analyzer.analyze_alarms_optimization(page=page, **analysis_args), timeout=timeout_seconds ) # Format response formatted_result = ResponseFormatter.success_response( data=result, message="CloudWatch alarms and dashboards optimization analysis completed successfully", analysis_type="cloudwatch_alarms_dashboards_optimization" ) # Add documentation links formatted_result = add_documentation_links(formatted_result, "cloudwatch") execution_time = time.time() - start_time log_function_exit(logger, "run_cloudwatch_alarms_and_dashboards_optimization_mcp", "success", execution_time) return ResponseFormatter.to_text_content(formatted_result) except asyncio.TimeoutError: error_message = f"CloudWatch alarms and dashboards optimization analysis timed out after {timeout_seconds} seconds" logger.error(error_message) return ResponseFormatter.to_text_content(ResponseFormatter.error_response( error_message, "timeout_error", "cloudwatch_alarms_dashboards_optimization" )) except Exception as e: logger.error(f"CloudWatch alarms and dashboards optimization analysis failed: {str(e)}") return ResponseFormatter.to_text_content(ResponseFormatter.error_response( str(e), "analysis_error", "cloudwatch_alarms_dashboards_optimization" )) @handle_aws_error async def run_cloudwatch_comprehensive_optimization_tool_mcp(arguments: Dict[str, Any]) -> List[TextContent]: """Run comprehensive CloudWatch optimization using the unified optimization tool with intelligent orchestration.""" log_function_entry(logger, "run_cloudwatch_comprehensive_optimization_tool_mcp", arguments=arguments) start_time = time.time() try: from playbooks.cloudwatch.cloudwatch_optimization_tool import CloudWatchOptimizationTool region = arguments.get("region") timeout_seconds = arguments.get("timeout_seconds", 120) # Initialize comprehensive optimization tool tool = CloudWatchOptimizationTool(region=region) # Execute comprehensive analysis with timeout result = await asyncio.wait_for( tool.execute_comprehensive_optimization_analysis(**arguments), timeout=timeout_seconds ) # Format response formatted_result = ResponseFormatter.success_response( data=result, message="CloudWatch comprehensive optimization analysis completed successfully", analysis_type="cloudwatch_comprehensive_optimization" ) # Add documentation links formatted_result = add_documentation_links(formatted_result, "cloudwatch") execution_time = time.time() - start_time log_function_exit(logger, "run_cloudwatch_comprehensive_optimization_tool_mcp", "success", execution_time) return ResponseFormatter.to_text_content(formatted_result) except asyncio.TimeoutError: error_message = f"CloudWatch comprehensive optimization analysis timed out after {timeout_seconds} seconds" logger.error(error_message) return ResponseFormatter.to_text_content(ResponseFormatter.error_response( error_message, "timeout_error", "cloudwatch_comprehensive_optimization" )) except Exception as e: logger.error(f"CloudWatch comprehensive optimization analysis failed: {str(e)}") return ResponseFormatter.to_text_content(ResponseFormatter.error_response( str(e), "analysis_error", "cloudwatch_comprehensive_optimization" )) @handle_aws_error async def query_cloudwatch_analysis_results_mcp(arguments: Dict[str, Any]) -> List[TextContent]: """Query stored CloudWatch analysis results using SQL queries.""" log_function_entry(logger, "query_cloudwatch_analysis_results_mcp", arguments=arguments) start_time = time.time() try: # Import the sync function from runbook_functions and call it from runbook_functions import query_cloudwatch_analysis_results # Call the existing function and convert to MCP format result = await query_cloudwatch_analysis_results(arguments) execution_time = time.time() - start_time log_function_exit(logger, "query_cloudwatch_analysis_results_mcp", "success", execution_time) return result except Exception as e: logger.error(f"CloudWatch analysis results query failed: {str(e)}") return ResponseFormatter.to_text_content(ResponseFormatter.error_response( str(e), "query_error", "cloudwatch_analysis_results" )) @handle_aws_error async def validate_cloudwatch_cost_preferences_mcp(arguments: Dict[str, Any]) -> List[TextContent]: """Validate CloudWatch cost preferences and get functionality coverage estimates.""" log_function_entry(logger, "validate_cloudwatch_cost_preferences_mcp", arguments=arguments) start_time = time.time() try: # Import the sync function from runbook_functions and call it from runbook_functions import validate_cloudwatch_cost_preferences # Call the existing function and convert to MCP format result = await validate_cloudwatch_cost_preferences(arguments) execution_time = time.time() - start_time log_function_exit(logger, "validate_cloudwatch_cost_preferences_mcp", "success", execution_time) return result except Exception as e: logger.error(f"CloudWatch cost preferences validation failed: {str(e)}") return ResponseFormatter.to_text_content(ResponseFormatter.error_response( str(e), "validation_error", "cloudwatch_cost_preferences" )) @handle_aws_error async def get_cloudwatch_cost_estimate_mcp(arguments: Dict[str, Any]) -> List[TextContent]: """Get detailed cost estimate for CloudWatch optimization analysis based on enabled features.""" log_function_entry(logger, "get_cloudwatch_cost_estimate_mcp", arguments=arguments) start_time = time.time() try: # Import the sync function from runbook_functions and call it from runbook_functions import get_cloudwatch_cost_estimate # Call the existing function and convert to MCP format result = await get_cloudwatch_cost_estimate(arguments) execution_time = time.time() - start_time log_function_exit(logger, "get_cloudwatch_cost_estimate_mcp", "success", execution_time) return result except Exception as e: logger.error(f"CloudWatch cost estimate failed: {str(e)}") return ResponseFormatter.to_text_content(ResponseFormatter.error_response( str(e), "estimation_error", "cloudwatch_cost_estimate" ))

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aws-samples/sample-cfm-tips-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server