Skip to main content
Glama

CFM Tips - Cost Optimization MCP Server

by aws-samples
base_analyzer.py16.7 kB
""" Base Analyzer Interface for S3 Optimization Abstract base class defining the interface for all S3 optimization analyzers. """ import logging from abc import ABC, abstractmethod from typing import Dict, List, Any, Optional from datetime import datetime logger = logging.getLogger(__name__) class BaseAnalyzer(ABC): """Base class for all S3 analyzers.""" def __init__(self, s3_service=None, pricing_service=None, storage_lens_service=None, performance_monitor=None, memory_manager=None): """ Initialize BaseAnalyzer with performance optimization components. Args: s3_service: S3Service instance for AWS S3 operations pricing_service: S3Pricing instance for cost calculations storage_lens_service: StorageLensService instance for Storage Lens data performance_monitor: Performance monitoring instance memory_manager: Memory management instance """ self.s3_service = s3_service self.pricing_service = pricing_service self.storage_lens_service = storage_lens_service self.performance_monitor = performance_monitor self.memory_manager = memory_manager self.logger = logging.getLogger(self.__class__.__name__) # Analysis metadata self.analysis_type = self.__class__.__name__.replace('Analyzer', '').lower() self.version = "1.0.0" self.last_execution = None self.execution_count = 0 @abstractmethod async def analyze(self, **kwargs) -> Dict[str, Any]: """ Execute the analysis. Args: **kwargs: Analysis-specific parameters Returns: Dictionary containing analysis results """ pass @abstractmethod def get_recommendations(self, analysis_results: Dict[str, Any]) -> List[Dict[str, Any]]: """ Generate recommendations from analysis results. Args: analysis_results: Results from the analyze method Returns: List of recommendation dictionaries """ pass def validate_parameters(self, **kwargs) -> Dict[str, Any]: """ Validate input parameters for the analysis. Args: **kwargs: Parameters to validate Returns: Dictionary with validation results """ validation_result = { "valid": True, "errors": [], "warnings": [] } # Common parameter validation region = kwargs.get('region') if region and not isinstance(region, str): validation_result["errors"].append("Region must be a string") validation_result["valid"] = False lookback_days = kwargs.get('lookback_days', 30) if not isinstance(lookback_days, int) or lookback_days <= 0: validation_result["errors"].append("lookback_days must be a positive integer") validation_result["valid"] = False elif lookback_days > 365: validation_result["warnings"].append("lookback_days > 365 may result in large datasets") bucket_names = kwargs.get('bucket_names') if bucket_names is not None: if not isinstance(bucket_names, list): validation_result["errors"].append("bucket_names must be a list") validation_result["valid"] = False elif not all(isinstance(name, str) for name in bucket_names): validation_result["errors"].append("All bucket names must be strings") validation_result["valid"] = False timeout_seconds = kwargs.get('timeout_seconds', 60) if not isinstance(timeout_seconds, (int, float)) or timeout_seconds <= 0: validation_result["errors"].append("timeout_seconds must be a positive number") validation_result["valid"] = False return validation_result def prepare_analysis_context(self, **kwargs) -> Dict[str, Any]: """ Prepare analysis context with common parameters. Args: **kwargs: Analysis parameters Returns: Dictionary containing analysis context """ context = { "analysis_type": self.analysis_type, "analyzer_version": self.version, "region": kwargs.get('region'), "session_id": kwargs.get('session_id'), "lookback_days": kwargs.get('lookback_days', 30), "include_cost_analysis": kwargs.get('include_cost_analysis', True), "bucket_names": kwargs.get('bucket_names'), "timeout_seconds": kwargs.get('timeout_seconds', 60), "started_at": datetime.now().isoformat(), "execution_id": f"{self.analysis_type}_{int(datetime.now().timestamp())}" } return context def handle_analysis_error(self, error: Exception, context: Dict[str, Any]) -> Dict[str, Any]: """ Handle analysis errors with consistent error reporting. Args: error: Exception that occurred context: Analysis context Returns: Dictionary containing error information """ error_message = str(error) self.logger.error(f"Analysis error in {self.analysis_type}: {error_message}") return { "status": "error", "analysis_type": self.analysis_type, "error_message": error_message, "error_type": error.__class__.__name__, "context": context, "timestamp": datetime.now().isoformat(), "recommendations": [{ "type": "error_resolution", "priority": "high", "title": f"Analysis Error: {self.analysis_type}", "description": f"Analysis failed with error: {error_message}", "action_items": [ "Check AWS credentials and permissions", "Verify network connectivity", "Review input parameters", "Check service quotas and limits" ] }] } def create_recommendation(self, rec_type: str, priority: str, title: str, description: str, potential_savings: Optional[float] = None, implementation_effort: str = "medium", affected_resources: Optional[List[str]] = None, action_items: Optional[List[str]] = None) -> Dict[str, Any]: """ Create a standardized recommendation dictionary. Args: rec_type: Type of recommendation priority: Priority level (high, medium, low) title: Recommendation title description: Detailed description potential_savings: Estimated cost savings implementation_effort: Implementation effort level affected_resources: List of affected resources action_items: List of action items Returns: Standardized recommendation dictionary """ recommendation = { "type": rec_type, "priority": priority, "title": title, "description": description, "implementation_effort": implementation_effort, "analyzer": self.analysis_type, "created_at": datetime.now().isoformat() } if potential_savings is not None: recommendation["potential_savings"] = potential_savings recommendation["potential_savings_formatted"] = f"${potential_savings:.2f}" if affected_resources: recommendation["affected_resources"] = affected_resources recommendation["resource_count"] = len(affected_resources) if action_items: recommendation["action_items"] = action_items return recommendation def log_analysis_start(self, context: Dict[str, Any]): """Log analysis start with context and performance monitoring.""" self.execution_count += 1 self.last_execution = datetime.now() self.logger.info(f"Starting {self.analysis_type} analysis (execution #{self.execution_count})") self.logger.debug(f"Analysis context: {context}") # Record performance metrics if available if self.performance_monitor: self.performance_monitor.record_metric( f"analyzer_{self.analysis_type}_started", 1, tags={"analyzer": self.analysis_type, "execution": str(self.execution_count)} ) def log_analysis_complete(self, context: Dict[str, Any], result: Dict[str, Any]): """Log analysis completion with results summary and performance metrics.""" status = result.get('status', 'unknown') execution_time = result.get('execution_time', 0) recommendation_count = len(result.get('recommendations', [])) self.logger.info( f"Completed {self.analysis_type} analysis - " f"Status: {status}, Time: {execution_time:.2f}s, " f"Recommendations: {recommendation_count}" ) # Record performance metrics if available if self.performance_monitor: self.performance_monitor.record_metric( f"analyzer_{self.analysis_type}_completed", 1, tags={ "analyzer": self.analysis_type, "status": status, "execution": str(self.execution_count) } ) self.performance_monitor.record_metric( f"analyzer_{self.analysis_type}_execution_time", execution_time, tags={"analyzer": self.analysis_type} ) self.performance_monitor.record_metric( f"analyzer_{self.analysis_type}_recommendations", recommendation_count, tags={"analyzer": self.analysis_type} ) def get_analyzer_info(self) -> Dict[str, Any]: """Get information about this analyzer.""" return { "analysis_type": self.analysis_type, "class_name": self.__class__.__name__, "version": self.version, "execution_count": self.execution_count, "last_execution": self.last_execution.isoformat() if self.last_execution else None, "services": { "s3_service": self.s3_service is not None, "pricing_service": self.pricing_service is not None, "storage_lens_service": self.storage_lens_service is not None } } async def execute_with_error_handling(self, **kwargs) -> Dict[str, Any]: """ Execute analysis with comprehensive error handling and performance monitoring. Args: **kwargs: Analysis parameters Returns: Dictionary containing analysis results or error information """ context = self.prepare_analysis_context(**kwargs) # Start memory tracking if memory manager is available memory_tracker_name = None if self.memory_manager: memory_tracker_name = f"analyzer_{self.analysis_type}_{int(datetime.now().timestamp())}" self.memory_manager.start_memory_tracking(memory_tracker_name) try: # Validate parameters validation = self.validate_parameters(**kwargs) if not validation["valid"]: error_result = { "status": "error", "analysis_type": self.analysis_type, "error_message": "Parameter validation failed", "validation_errors": validation["errors"], "context": context, "timestamp": datetime.now().isoformat() } # Stop memory tracking if memory_tracker_name and self.memory_manager: memory_stats = self.memory_manager.stop_memory_tracking(memory_tracker_name) if memory_stats: error_result["memory_usage"] = memory_stats return error_result # Log warnings if any for warning in validation.get("warnings", []): self.logger.warning(f"Parameter warning: {warning}") # Log analysis start self.log_analysis_start(context) # Register large object for memory management if available if self.memory_manager: try: self.memory_manager.register_large_object( f"analysis_context_{self.analysis_type}_{int(datetime.now().timestamp())}", context, size_mb=0.1, # Small object cleanup_callback=lambda: self.logger.debug(f"Cleaned up {self.analysis_type} analysis context") ) except Exception as e: self.logger.warning(f"Could not register large object with memory manager: {str(e)}") # Execute analysis result = await self.analyze(**kwargs) # Ensure result has required fields if "status" not in result: result["status"] = "success" if "analysis_type" not in result: result["analysis_type"] = self.analysis_type if "timestamp" not in result: result["timestamp"] = datetime.now().isoformat() # Generate recommendations if not present if "recommendations" not in result: result["recommendations"] = self.get_recommendations(result) # Add memory usage statistics if memory_tracker_name and self.memory_manager: memory_stats = self.memory_manager.stop_memory_tracking(memory_tracker_name) if memory_stats: result["memory_usage"] = memory_stats # Log completion self.log_analysis_complete(context, result) return result except Exception as e: # Stop memory tracking on error if memory_tracker_name and self.memory_manager: memory_stats = self.memory_manager.stop_memory_tracking(memory_tracker_name) error_result = self.handle_analysis_error(e, context) # Add memory stats to error result if available if memory_tracker_name and self.memory_manager and 'memory_stats' in locals(): error_result["memory_usage"] = memory_stats return error_result class AnalyzerRegistry: """Registry for managing analyzer instances.""" def __init__(self): self._analyzers: Dict[str, BaseAnalyzer] = {} self.logger = logging.getLogger(__name__) def register(self, analyzer: BaseAnalyzer): """Register an analyzer instance.""" analysis_type = analyzer.analysis_type self._analyzers[analysis_type] = analyzer self.logger.info(f"Registered analyzer: {analysis_type}") def get(self, analysis_type: str) -> Optional[BaseAnalyzer]: """Get an analyzer by type.""" return self._analyzers.get(analysis_type) def list_analyzers(self) -> List[str]: """List all registered analyzer types.""" return list(self._analyzers.keys()) def get_analyzer_info(self) -> Dict[str, Any]: """Get information about all registered analyzers.""" return { analysis_type: analyzer.get_analyzer_info() for analysis_type, analyzer in self._analyzers.items() } # Global analyzer registry _analyzer_registry = AnalyzerRegistry() def get_analyzer_registry() -> AnalyzerRegistry: """Get the global analyzer registry.""" return _analyzer_registry

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aws-samples/sample-cfm-tips-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server