Skip to main content
Glama

CFM Tips - Cost Optimization MCP Server

by aws-samples
s3_optimization_orchestrator.py166 kB
""" S3 Optimization Orchestrator for CFM Tips MCP Server Main coordination layer for S3 optimization workflows with session integration, performance monitoring, intelligent caching, and memory management. """ import json import logging import time from typing import Dict, List, Any, Optional from datetime import datetime from utils.service_orchestrator import ServiceOrchestrator from utils.session_manager import get_session_manager from .s3_analysis_engine import S3AnalysisEngine from .s3_aggregation_queries import S3AggregationQueries, S3QueryExecutor from utils.performance_monitor import get_performance_monitor from utils.intelligent_cache import get_pricing_cache, get_bucket_metadata_cache, get_analysis_results_cache from utils.memory_manager import get_memory_manager from utils.progressive_timeout import get_timeout_handler from utils.documentation_links import add_documentation_links logger = logging.getLogger(__name__) class S3OptimizationOrchestrator: """Main orchestrator for S3 optimization workflows.""" def __init__(self, region: Optional[str] = None, session_id: Optional[str] = None): """ Initialize S3OptimizationOrchestrator with performance optimizations. Args: region: AWS region for S3 operations session_id: Optional session ID for data persistence """ self.region = region self.session_manager = get_session_manager() # Initialize ServiceOrchestrator (it will create session if session_id is None) self.service_orchestrator = ServiceOrchestrator(session_id) # Get the actual session ID from ServiceOrchestrator self.session_id = self.service_orchestrator.session_id # Initialize performance optimization components self.performance_monitor = get_performance_monitor() self.memory_manager = get_memory_manager() self.timeout_handler = get_timeout_handler() # Initialize caching systems self.pricing_cache = get_pricing_cache() self.bucket_metadata_cache = get_bucket_metadata_cache() self.analysis_results_cache = get_analysis_results_cache() # Register cache instances with memory manager for cleanup self.memory_manager.add_cache_reference(self.pricing_cache) self.memory_manager.add_cache_reference(self.bucket_metadata_cache) self.memory_manager.add_cache_reference(self.analysis_results_cache) # Set up performance monitor integration self.pricing_cache.set_performance_monitor(self.performance_monitor) self.bucket_metadata_cache.set_performance_monitor(self.performance_monitor) self.analysis_results_cache.set_performance_monitor(self.performance_monitor) self.memory_manager.set_performance_monitor(self.performance_monitor) self.timeout_handler.set_performance_monitor(self.performance_monitor) # Initialize analysis engine with all analyzers and performance components self.analysis_engine = S3AnalysisEngine( region=region, performance_monitor=self.performance_monitor, memory_manager=self.memory_manager, timeout_handler=self.timeout_handler, pricing_cache=self.pricing_cache, bucket_metadata_cache=self.bucket_metadata_cache, analysis_results_cache=self.analysis_results_cache ) logger.info(f"S3OptimizationOrchestrator initialized with performance optimizations for region: {region or 'default'}, session: {self.session_id}") async def execute_analysis(self, analysis_type: str, **kwargs) -> Dict[str, Any]: """ Execute a specific S3 analysis with performance optimizations. Args: analysis_type: Type of analysis to execute **kwargs: Analysis-specific parameters Returns: Dictionary containing analysis results """ start_time = time.time() # Start performance monitoring monitoring_session = self.performance_monitor.start_analysis_monitoring( analysis_type, f"single_{int(start_time)}" ) # Start memory tracking memory_tracker = self.memory_manager.start_memory_tracking(f"analysis_{analysis_type}") logger.info(f"Starting S3 analysis with performance optimizations: {analysis_type}") try: # Check cache for recent results first cache_key = [analysis_type, kwargs.get('region', self.region), kwargs] cached_result = self.analysis_results_cache.get(cache_key) if cached_result is not None: logger.info(f"Retrieved {analysis_type} analysis from cache") self.performance_monitor.record_cache_hit("analysis_results", analysis_type) # Update execution time and return cached result cached_result["orchestrator_execution_time"] = time.time() - start_time cached_result["session_id"] = self.session_id cached_result["from_cache"] = True # End monitoring self.performance_monitor.end_analysis_monitoring(monitoring_session, success=True) self.memory_manager.stop_memory_tracking(f"analysis_{analysis_type}") return cached_result self.performance_monitor.record_cache_miss("analysis_results", analysis_type) # Validate analysis type if not self._is_valid_analysis_type(analysis_type): error_result = { "status": "error", "message": f"Invalid analysis type: {analysis_type}", "analysis_type": analysis_type, "available_types": self.analysis_engine.analyzer_registry.list_analyzers() } self.performance_monitor.end_analysis_monitoring( monitoring_session, success=False, error_message="Invalid analysis type" ) self.memory_manager.stop_memory_tracking(f"analysis_{analysis_type}") return error_result # Calculate intelligent timeout timeout_seconds = self.timeout_handler.get_timeout_for_analysis(analysis_type, **kwargs) kwargs['timeout_seconds'] = timeout_seconds logger.info(f"Calculated timeout for {analysis_type}: {timeout_seconds:.1f}s") # Prepare analysis parameters with performance optimizations analysis_params = self._prepare_analysis_params(analysis_type, **kwargs) analysis_params['monitoring_session'] = monitoring_session analysis_params['memory_tracker'] = memory_tracker # Execute analysis using the analysis engine result = await self.analysis_engine.run_analysis(analysis_type, **analysis_params) # Cache successful results if result.get('status') == 'success': # Cache result with appropriate TTL based on analysis type cache_ttl = self._get_cache_ttl_for_analysis(analysis_type) self.analysis_results_cache.put( cache_key, result.copy(), ttl_seconds=cache_ttl, tags={"analysis_type": analysis_type, "region": self.region} ) logger.debug(f"Cached {analysis_type} analysis result (TTL: {cache_ttl}s)") # Store results if requested and analysis was successful if kwargs.get('store_results', True) and result.get('status') == 'success': self._store_analysis_results(analysis_type, result) execution_time = time.time() - start_time result["orchestrator_execution_time"] = execution_time result["session_id"] = self.session_id result["from_cache"] = False # Record performance metrics self.timeout_handler.record_execution_time( analysis_type, execution_time, self.timeout_handler.get_complexity_level(analysis_type, **kwargs) ) # End monitoring success = result.get('status') == 'success' self.performance_monitor.end_analysis_monitoring( monitoring_session, success=success, error_message=result.get('error_message') if not success else None ) memory_stats = self.memory_manager.stop_memory_tracking(f"analysis_{analysis_type}") if memory_stats: result["memory_usage"] = memory_stats logger.info(f"Completed S3 analysis: {analysis_type} in {execution_time:.2f}s") return result except Exception as e: execution_time = time.time() - start_time logger.error(f"Error in S3 analysis {analysis_type}: {str(e)}") # End monitoring with error self.performance_monitor.end_analysis_monitoring( monitoring_session, success=False, error_message=str(e) ) self.memory_manager.stop_memory_tracking(f"analysis_{analysis_type}") return { "status": "error", "analysis_type": analysis_type, "message": f"Analysis failed: {str(e)}", "execution_time": execution_time, "session_id": self.session_id, "timestamp": datetime.now().isoformat(), "from_cache": False } async def execute_comprehensive_analysis(self, **kwargs) -> Dict[str, Any]: """ Execute all S3 analyses in parallel with performance optimizations and intelligent resource management. Args: **kwargs: Analysis parameters Returns: Dictionary containing comprehensive analysis results """ start_time = time.time() # Start comprehensive performance monitoring monitoring_session = self.performance_monitor.start_analysis_monitoring( "comprehensive", f"comprehensive_{int(start_time)}" ) # Start memory tracking for comprehensive analysis memory_tracker = self.memory_manager.start_memory_tracking("comprehensive_analysis") logger.info("Starting comprehensive S3 analysis with performance optimizations and parallel execution") try: # Check cache for recent comprehensive analysis cache_key = ["comprehensive", kwargs.get('region', self.region), kwargs] cached_result = self.analysis_results_cache.get(cache_key) if cached_result is not None: logger.info("Retrieved comprehensive analysis from cache") self.performance_monitor.record_cache_hit("analysis_results", "comprehensive") # Update execution time and return cached result cached_result["orchestrator_execution_time"] = time.time() - start_time cached_result["session_id"] = self.session_id cached_result["from_cache"] = True # End monitoring self.performance_monitor.end_analysis_monitoring(monitoring_session, success=True) self.memory_manager.stop_memory_tracking("comprehensive_analysis") return cached_result self.performance_monitor.record_cache_miss("analysis_results", "comprehensive") # Get all available analysis types from the engine with priority information available_analyses = self.analysis_engine.get_available_analyses() analysis_types = [analysis["analysis_type"] for analysis in available_analyses] logger.info(f"Executing {len(analysis_types)} analyses with intelligent prioritization: {analysis_types}") # Calculate intelligent timeout for comprehensive analysis comprehensive_timeout = self.timeout_handler.get_timeout_for_analysis("comprehensive", **kwargs) kwargs['total_timeout'] = comprehensive_timeout logger.info(f"Calculated comprehensive analysis timeout: {comprehensive_timeout:.1f}s") # Prepare analysis parameters with performance optimizations analysis_params = self._prepare_comprehensive_analysis_params(**kwargs) analysis_params['monitoring_session'] = monitoring_session analysis_params['memory_tracker'] = memory_tracker # Register large object for memory management if available if self.memory_manager: try: self.memory_manager.register_large_object( f"comprehensive_analysis_{int(start_time)}", analysis_params, size_mb=1.0, # Estimated size cleanup_callback=lambda: logger.debug("Cleaned up comprehensive analysis parameters") ) except Exception as e: logger.warning(f"Could not register large object with memory manager: {str(e)}") # Create parallel analysis tasks using analysis engine service_calls = self.analysis_engine.create_parallel_analysis_tasks( analysis_types=analysis_types, **analysis_params ) logger.info(f"Created {len(service_calls)} intelligently prioritized parallel tasks") # Execute analyses in parallel using ServiceOrchestrator with session-sql integration execution_results = self.service_orchestrator.execute_parallel_analysis( service_calls=service_calls, store_results=kwargs.get('store_results', True), timeout=comprehensive_timeout ) logger.info(f"Parallel execution completed: {execution_results['successful']}/{execution_results['total_tasks']} successful") # Record performance metrics self.performance_monitor.record_metric( "comprehensive_analysis_success_rate", (execution_results['successful'] / execution_results['total_tasks'] * 100) if execution_results['total_tasks'] > 0 else 0, tags={"session_id": self.session_id} ) # Aggregate results using enhanced aggregation with cross-analyzer insights aggregated_results = self.aggregate_results_with_insights( results=execution_results.get('results', {}), include_cross_analysis=kwargs.get('include_cross_analysis', True) ) # Store aggregated results with session-sql integration if kwargs.get('store_results', True): self._store_comprehensive_results(aggregated_results, execution_results) # Execute cross-analysis aggregation queries for deeper insights cross_analysis_data = {} if kwargs.get('include_cross_analysis', True) and execution_results.get('stored_tables'): cross_analysis_data = self._execute_cross_analysis_queries(execution_results['stored_tables']) execution_time = time.time() - start_time # Create comprehensive result comprehensive_result = { "status": "success", "analysis_type": "comprehensive", "execution_summary": execution_results, "aggregated_results": aggregated_results, "cross_analysis_data": cross_analysis_data, "execution_time": execution_time, "session_id": self.session_id, "timestamp": datetime.now().isoformat(), "from_cache": False, "performance_optimizations": { "intelligent_timeout": comprehensive_timeout, "cache_enabled": True, "memory_management": True, "progressive_timeouts": True }, "analysis_metadata": { "total_analyses": len(analysis_types), "successful_analyses": aggregated_results.get("aggregation_metadata", {}).get("successful_analyses", 0), "failed_analyses": aggregated_results.get("aggregation_metadata", {}).get("failed_analyses", 0), "total_potential_savings": aggregated_results.get("total_potential_savings", 0), "stored_tables": execution_results.get('stored_tables', []), "task_prioritization": self._get_task_prioritization_summary(available_analyses) } } # Cache the comprehensive result cache_ttl = self._get_cache_ttl_for_analysis("comprehensive") self.analysis_results_cache.put( cache_key, comprehensive_result.copy(), ttl_seconds=cache_ttl, tags={"analysis_type": "comprehensive", "region": self.region} ) # Record performance metrics self.timeout_handler.record_execution_time( "comprehensive", execution_time, self.timeout_handler.get_complexity_level("comprehensive", **kwargs) ) # End monitoring self.performance_monitor.end_analysis_monitoring(monitoring_session, success=True) memory_stats = self.memory_manager.stop_memory_tracking("comprehensive_analysis") if memory_stats: comprehensive_result["memory_usage"] = memory_stats logger.info(f"Completed comprehensive S3 analysis with optimizations in {execution_time:.2f}s") return comprehensive_result except Exception as e: execution_time = time.time() - start_time logger.error(f"Error in comprehensive S3 analysis: {str(e)}") # End monitoring with error self.performance_monitor.end_analysis_monitoring( monitoring_session, success=False, error_message=str(e) ) self.memory_manager.stop_memory_tracking("comprehensive_analysis") return { "status": "error", "analysis_type": "comprehensive", "message": f"Comprehensive analysis failed: {str(e)}", "execution_time": execution_time, "session_id": self.session_id, "timestamp": datetime.now().isoformat(), "from_cache": False } def get_analysis_results(self, query: str) -> List[Dict[str, Any]]: """ Query stored analysis results. Args: query: SQL query to execute Returns: List of query results """ try: return self.service_orchestrator.query_session_data(query) except Exception as e: logger.error(f"Error querying analysis results: {str(e)}") return [] def get_stored_tables(self) -> List[str]: """ Get list of tables stored in the session. Returns: List of table names """ try: return self.service_orchestrator.get_stored_tables() except Exception as e: logger.error(f"Error getting stored tables: {str(e)}") return [] def _is_valid_analysis_type(self, analysis_type: str) -> bool: """Validate analysis type using the analysis engine registry.""" if analysis_type == "comprehensive": return True return self.analysis_engine.analyzer_registry.get(analysis_type) is not None def get_analyzer_registry(self) -> Dict[str, Any]: """ Get comprehensive information about the analyzer registry and registered analyzers. Returns: Dictionary containing analyzer registry information """ try: # Get basic registry info registry_info = { "registry_status": "active", "total_analyzers": len(self.analysis_engine.analyzer_registry.list_analyzers()), "registered_analyzers": self.analysis_engine.analyzer_registry.get_analyzer_info(), "analysis_priorities": self.analysis_engine.analysis_priorities, "available_analyses": self.analysis_engine.get_available_analyses(), "registry_timestamp": datetime.now().isoformat() } # Get comprehensive health status from analysis engine health_status = self.analysis_engine.get_analyzer_health_status() registry_info["health_status"] = health_status # Add loading results if available if hasattr(self.analysis_engine, 'analyzer_loading_results'): registry_info["loading_results"] = self.analysis_engine.analyzer_loading_results # Add execution history summary if hasattr(self.analysis_engine, 'execution_history') and self.analysis_engine.execution_history: recent_executions = self.analysis_engine.execution_history[-10:] # Last 10 executions registry_info["recent_executions"] = recent_executions # Calculate execution statistics successful_executions = [e for e in recent_executions if e.get('status') == 'success'] registry_info["execution_statistics"] = { "recent_success_rate": len(successful_executions) / len(recent_executions) * 100 if recent_executions else 0, "total_executions": len(self.analysis_engine.execution_history), "recent_executions": len(recent_executions) } # Add performance optimization status registry_info["performance_optimizations"] = { "performance_monitor_enabled": self.performance_monitor is not None, "memory_manager_enabled": self.memory_manager is not None, "timeout_handler_enabled": self.timeout_handler is not None, "caching_enabled": { "pricing_cache": self.pricing_cache is not None, "bucket_metadata_cache": self.bucket_metadata_cache is not None, "analysis_results_cache": self.analysis_results_cache is not None } } return registry_info except Exception as e: logger.error(f"Error getting analyzer registry info: {str(e)}") return { "registry_status": "error", "error": str(e), "error_type": e.__class__.__name__, "registry_timestamp": datetime.now().isoformat() } def reload_analyzer(self, analysis_type: str) -> Dict[str, Any]: """ Reload a specific analyzer through the analysis engine. Args: analysis_type: Type of analyzer to reload Returns: Dictionary containing reload results """ try: logger.info(f"Orchestrator reloading analyzer: {analysis_type}") # Use analysis engine's reload method reload_result = self.analysis_engine.reload_analyzer(analysis_type) # Clear related caches if reload was successful if reload_result.get("status") == "success": try: # Clear analysis results cache for this analyzer if self.analysis_results_cache: cache_keys_to_clear = [] cache_dict = getattr(self.analysis_results_cache, '_cache', {}) for key in cache_dict.keys(): if isinstance(key, (list, tuple)) and len(key) > 0 and key[0] == analysis_type: cache_keys_to_clear.append(key) for key in cache_keys_to_clear: if hasattr(self.analysis_results_cache, 'invalidate'): self.analysis_results_cache.invalidate(key) elif hasattr(self.analysis_results_cache, 'delete'): self.analysis_results_cache.delete(key) logger.info(f"Cleared {len(cache_keys_to_clear)} cache entries for {analysis_type}") reload_result["cache_entries_cleared"] = len(cache_keys_to_clear) except Exception as cache_error: logger.warning(f"Error clearing cache for {analysis_type}: {str(cache_error)}") reload_result["cache_clear_warning"] = str(cache_error) return reload_result except Exception as e: logger.error(f"Error in orchestrator reload for {analysis_type}: {str(e)}") return { "status": "error", "message": f"Orchestrator reload failed for {analysis_type}: {str(e)}", "error_type": e.__class__.__name__, "reloaded_at": datetime.now().isoformat() } def handle_analyzer_failure(self, analysis_type: str, error: Exception, context: Dict[str, Any]) -> Dict[str, Any]: """ Handle analyzer failures with comprehensive error handling and recovery strategies. Args: analysis_type: Type of analysis that failed error: Exception that occurred context: Analysis context Returns: Dictionary containing error handling results and recovery recommendations """ try: logger.error(f"Handling analyzer failure for {analysis_type}: {str(error)}") # Use analysis engine's error handling if available if hasattr(self.analysis_engine, 'handle_analyzer_failure'): return self.analysis_engine.handle_analyzer_failure(analysis_type, error, context) # Fallback error handling error_message = str(error) error_type = error.__class__.__name__ # Determine recovery strategy recovery_strategy = self._determine_recovery_strategy(error_type, error_message) # Create comprehensive error result error_result = { "status": "error", "analysis_type": analysis_type, "error_message": error_message, "error_type": error_type, "error_category": recovery_strategy["category"], "recovery_strategy": recovery_strategy["strategy"], "context": context, "timestamp": datetime.now().isoformat(), "session_id": self.session_id, "orchestrator_handled": True, "recommendations": [] } # Add specific recommendations based on error type if "permission" in error_message.lower() or "access" in error_message.lower(): error_result["recommendations"].extend([ { "type": "permission_fix", "priority": "high", "title": "Fix AWS Permissions", "description": f"Analysis {analysis_type} failed due to permission issues", "action_items": [ "Check IAM permissions for S3, Cost Explorer, and Storage Lens", "Verify AWS credentials are valid and not expired", "Ensure required service permissions are granted", "Check if MFA is required for API access" ] } ]) elif "timeout" in error_message.lower(): error_result["recommendations"].extend([ { "type": "timeout_optimization", "priority": "medium", "title": "Optimize Timeout Settings", "description": f"Analysis {analysis_type} timed out during execution", "action_items": [ "Increase timeout_seconds parameter", "Reduce lookback_days to limit data volume", "Filter to specific bucket_names if possible", "Run analysis during off-peak hours" ] } ]) elif "rate" in error_message.lower() or "throttl" in error_message.lower(): error_result["recommendations"].extend([ { "type": "rate_limit_handling", "priority": "medium", "title": "Handle API Rate Limits", "description": f"Analysis {analysis_type} hit API rate limits", "action_items": [ "Implement exponential backoff retry logic", "Reduce concurrent analysis execution", "Spread analysis execution over time", "Consider using AWS SDK retry configuration" ] } ]) # Record error in performance monitor if available if self.performance_monitor: self.performance_monitor.record_metric( f"orchestrator_analyzer_failure_{analysis_type}", 1, tags={ "error_type": error_type, "error_category": recovery_strategy["category"], "session_id": self.session_id } ) # Attempt automatic recovery if strategy suggests it if recovery_strategy.get("auto_recovery", False): recovery_result = self._attempt_auto_recovery(analysis_type, error, context) error_result["auto_recovery_attempted"] = True error_result["auto_recovery_result"] = recovery_result return error_result except Exception as handling_error: logger.error(f"Error in analyzer failure handling: {str(handling_error)}") return { "status": "error", "analysis_type": analysis_type, "error_message": f"Original error: {str(error)}. Handling error: {str(handling_error)}", "error_type": "FailureHandlingError", "timestamp": datetime.now().isoformat(), "session_id": self.session_id, "critical_error": True } def _determine_recovery_strategy(self, error_type: str, error_message: str) -> Dict[str, Any]: """ Determine recovery strategy based on error type and message. Args: error_type: Type of exception error_message: Error message Returns: Dictionary containing recovery strategy information """ error_lower = error_message.lower() if "permission" in error_lower or "access" in error_lower or "credential" in error_lower: return { "category": "permission_error", "strategy": "Check and fix AWS permissions", "auto_recovery": False, "severity": "high" } elif "timeout" in error_lower: return { "category": "timeout_error", "strategy": "Reduce scope and increase timeout", "auto_recovery": True, "severity": "medium" } elif "throttl" in error_lower or "rate" in error_lower: return { "category": "rate_limit_error", "strategy": "Implement backoff and retry", "auto_recovery": True, "severity": "medium" } elif "network" in error_lower or "connection" in error_lower: return { "category": "network_error", "strategy": "Retry with exponential backoff", "auto_recovery": True, "severity": "medium" } elif "service" in error_lower and "unavailable" in error_lower: return { "category": "service_error", "strategy": "Wait and retry, use fallback data sources", "auto_recovery": True, "severity": "high" } else: return { "category": "unknown_error", "strategy": "Manual investigation required", "auto_recovery": False, "severity": "high" } def _attempt_auto_recovery(self, analysis_type: str, error: Exception, context: Dict[str, Any]) -> Dict[str, Any]: """ Attempt automatic recovery for recoverable errors. Args: analysis_type: Type of analysis that failed error: Exception that occurred context: Analysis context Returns: Dictionary containing recovery attempt results """ try: logger.info(f"Attempting auto-recovery for {analysis_type}") recovery_actions = [] # Try reloading the analyzer reload_result = self.reload_analyzer(analysis_type) recovery_actions.append({ "action": "reload_analyzer", "result": reload_result.get("status"), "details": reload_result }) # Clear related caches if self.analysis_results_cache: try: cache_keys_cleared = 0 cache_dict = getattr(self.analysis_results_cache, '_cache', {}) for key in list(cache_dict.keys()): if isinstance(key, (list, tuple)) and len(key) > 0 and key[0] == analysis_type: if hasattr(self.analysis_results_cache, 'invalidate'): self.analysis_results_cache.invalidate(key) elif hasattr(self.analysis_results_cache, 'delete'): self.analysis_results_cache.delete(key) cache_keys_cleared += 1 recovery_actions.append({ "action": "clear_cache", "result": "success", "details": {"cache_keys_cleared": cache_keys_cleared} }) except Exception as cache_error: recovery_actions.append({ "action": "clear_cache", "result": "error", "details": {"error": str(cache_error)} }) return { "status": "completed", "recovery_actions": recovery_actions, "timestamp": datetime.now().isoformat(), "analysis_type": analysis_type } except Exception as recovery_error: logger.error(f"Error during auto-recovery for {analysis_type}: {str(recovery_error)}") return { "status": "failed", "error": str(recovery_error), "timestamp": datetime.now().isoformat(), "analysis_type": analysis_type } def get_analyzer_diagnostics(self) -> Dict[str, Any]: """ Get comprehensive diagnostics for all analyzers and the analysis engine. Returns: Dictionary containing diagnostic information """ try: diagnostics = { "diagnostics_timestamp": datetime.now().isoformat(), "orchestrator_info": { "region": self.region, "session_id": self.session_id, "performance_optimizations_enabled": True }, "analysis_engine_info": { "region": self.analysis_engine.region, "total_analyzers": len(self.analysis_engine.analyzer_registry.list_analyzers()), "execution_history_count": len(getattr(self.analysis_engine, 'execution_history', [])) } } # Get health status health_status = self.analysis_engine.get_analyzer_health_status() diagnostics["health_status"] = health_status # Get loading results if hasattr(self.analysis_engine, 'analyzer_loading_results'): diagnostics["loading_results"] = self.analysis_engine.analyzer_loading_results # Test analyzer connectivity connectivity_tests = {} for analysis_type in self.analysis_engine.analyzer_registry.list_analyzers(): analyzer = self.analysis_engine.analyzer_registry.get(analysis_type) connectivity_test = { "analyzer_valid": self.analysis_engine._validate_analyzer(analyzer), "services_connected": { "s3_service": analyzer.s3_service is not None, "pricing_service": analyzer.pricing_service is not None, "storage_lens_service": analyzer.storage_lens_service is not None }, "performance_components": { "performance_monitor": hasattr(analyzer, 'performance_monitor') and analyzer.performance_monitor is not None, "memory_manager": hasattr(analyzer, 'memory_manager') and analyzer.memory_manager is not None, "timeout_handler": hasattr(analyzer, 'timeout_handler') and analyzer.timeout_handler is not None } } connectivity_tests[analysis_type] = connectivity_test diagnostics["connectivity_tests"] = connectivity_tests # Get cache statistics cache_stats = {} if self.pricing_cache: try: cache_size = len(getattr(self.pricing_cache, '_cache', {})) except: cache_size = 0 cache_stats["pricing_cache"] = { "size": cache_size, "hit_rate": getattr(self.pricing_cache, 'hit_rate', 0), "enabled": True } if self.bucket_metadata_cache: try: cache_size = len(getattr(self.bucket_metadata_cache, '_cache', {})) except: cache_size = 0 cache_stats["bucket_metadata_cache"] = { "size": cache_size, "hit_rate": getattr(self.bucket_metadata_cache, 'hit_rate', 0), "enabled": True } if self.analysis_results_cache: try: cache_size = len(getattr(self.analysis_results_cache, '_cache', {})) except: cache_size = 0 cache_stats["analysis_results_cache"] = { "size": cache_size, "hit_rate": getattr(self.analysis_results_cache, 'hit_rate', 0), "enabled": True } diagnostics["cache_statistics"] = cache_stats return diagnostics except Exception as e: logger.error(f"Error getting analyzer diagnostics: {str(e)}") return { "diagnostics_timestamp": datetime.now().isoformat(), "status": "error", "error_message": str(e), "error_type": e.__class__.__name__ } def reload_analyzers(self) -> Dict[str, Any]: """ Reload all analyzers in the registry with fresh instances. Returns: Dictionary containing reload results """ try: logger.info("Reloading analyzers in registry") # Get current analyzer count old_count = len(self.analysis_engine.analyzer_registry.list_analyzers()) # Clear existing registry self.analysis_engine.analyzer_registry._analyzers.clear() # Reinitialize analyzers self.analysis_engine._initialize_analyzers() # Get new analyzer count new_count = len(self.analysis_engine.analyzer_registry.list_analyzers()) reload_result = { "status": "success", "message": "Analyzers reloaded successfully", "old_analyzer_count": old_count, "new_analyzer_count": new_count, "reloaded_analyzers": self.analysis_engine.analyzer_registry.list_analyzers(), "reload_timestamp": datetime.now().isoformat() } logger.info(f"Successfully reloaded {new_count} analyzers") return reload_result except Exception as e: logger.error(f"Error reloading analyzers: {str(e)}") return { "status": "error", "message": f"Failed to reload analyzers: {str(e)}", "reload_timestamp": datetime.now().isoformat() } def handle_analyzer_failure(self, analysis_type: str, error: Exception, context: Dict[str, Any]) -> Dict[str, Any]: """ Handle analyzer failures with comprehensive error handling and recovery strategies. Args: analysis_type: Type of analysis that failed error: Exception that occurred context: Analysis context Returns: Dictionary containing error handling results and recovery recommendations """ error_message = str(error) error_type = error.__class__.__name__ logger.error(f"Analyzer failure in {analysis_type}: {error_message}") # Determine error category and recovery strategy recovery_strategy = self._determine_recovery_strategy(error_type, error_message) # Create comprehensive error result error_result = { "status": "error", "analysis_type": analysis_type, "error_message": error_message, "error_type": error_type, "error_category": recovery_strategy["category"], "recovery_strategy": recovery_strategy["strategy"], "context": context, "timestamp": datetime.now().isoformat(), "session_id": self.session_id, "recommendations": [] } # Add specific recommendations based on error type if "permission" in error_message.lower() or "access" in error_message.lower(): error_result["recommendations"].extend([ { "type": "permission_fix", "priority": "high", "title": "Fix AWS Permissions", "description": f"Analysis {analysis_type} failed due to permission issues", "action_items": [ "Check IAM permissions for S3, Cost Explorer, and Storage Lens", "Verify AWS credentials are valid and not expired", "Ensure required service permissions are granted", "Check if MFA is required for API access" ] } ]) elif "timeout" in error_message.lower(): error_result["recommendations"].extend([ { "type": "timeout_optimization", "priority": "medium", "title": "Optimize Timeout Settings", "description": f"Analysis {analysis_type} timed out during execution", "action_items": [ "Increase timeout_seconds parameter", "Reduce lookback_days to limit data volume", "Filter to specific bucket_names if possible", "Run analysis during off-peak hours" ] } ]) elif "rate" in error_message.lower() or "throttl" in error_message.lower(): error_result["recommendations"].extend([ { "type": "rate_limit_handling", "priority": "medium", "title": "Handle API Rate Limits", "description": f"Analysis {analysis_type} hit API rate limits", "action_items": [ "Implement exponential backoff retry logic", "Reduce concurrent analysis execution", "Spread analysis execution over time", "Consider using AWS SDK retry configuration" ] } ]) else: error_result["recommendations"].extend([ { "type": "general_troubleshooting", "priority": "medium", "title": "General Troubleshooting", "description": f"Analysis {analysis_type} failed with unexpected error", "action_items": [ "Check AWS service status and availability", "Verify network connectivity", "Review analysis parameters for validity", "Check system resources and memory usage", "Try running analysis with reduced scope" ] } ]) # Record error in performance monitor if available if self.performance_monitor: self.performance_monitor.record_metric( f"analyzer_failure_{analysis_type}", 1, tags={ "error_type": error_type, "error_category": recovery_strategy["category"], "session_id": self.session_id } ) # Attempt automatic recovery if strategy suggests it if recovery_strategy["auto_recovery"]: recovery_result = self._attempt_auto_recovery(analysis_type, error, context) error_result["auto_recovery_attempted"] = True error_result["auto_recovery_result"] = recovery_result return error_result def _determine_recovery_strategy(self, error_type: str, error_message: str) -> Dict[str, Any]: """ Determine recovery strategy based on error type and message. Args: error_type: Type of exception error_message: Error message content Returns: Dictionary containing recovery strategy information """ error_message_lower = error_message.lower() # Permission/Access errors if any(keyword in error_message_lower for keyword in ["permission", "access", "denied", "unauthorized", "forbidden"]): return { "category": "permission_error", "strategy": "check_permissions", "auto_recovery": False, "severity": "high" } # Timeout errors elif any(keyword in error_message_lower for keyword in ["timeout", "timed out", "deadline"]): return { "category": "timeout_error", "strategy": "increase_timeout_or_reduce_scope", "auto_recovery": True, "severity": "medium" } # Rate limiting errors elif any(keyword in error_message_lower for keyword in ["rate", "throttl", "limit", "quota"]): return { "category": "rate_limit_error", "strategy": "implement_backoff_retry", "auto_recovery": True, "severity": "medium" } # Network/connectivity errors elif any(keyword in error_message_lower for keyword in ["network", "connection", "dns", "resolve"]): return { "category": "network_error", "strategy": "check_connectivity", "auto_recovery": True, "severity": "medium" } # Service unavailable errors elif any(keyword in error_message_lower for keyword in ["unavailable", "service", "maintenance"]): return { "category": "service_error", "strategy": "retry_later", "auto_recovery": True, "severity": "low" } # Data/validation errors elif any(keyword in error_message_lower for keyword in ["invalid", "validation", "parameter", "format"]): return { "category": "validation_error", "strategy": "check_parameters", "auto_recovery": False, "severity": "medium" } # Memory/resource errors elif any(keyword in error_message_lower for keyword in ["memory", "resource", "capacity"]): return { "category": "resource_error", "strategy": "optimize_resource_usage", "auto_recovery": True, "severity": "high" } # Unknown errors else: return { "category": "unknown_error", "strategy": "general_troubleshooting", "auto_recovery": False, "severity": "medium" } def register_custom_analyzer(self, analyzer: 'BaseAnalyzer') -> Dict[str, Any]: """ Dynamically register a custom analyzer with the orchestrator. Args: analyzer: BaseAnalyzer instance to register Returns: Dictionary containing registration results """ try: # Validate analyzer if not hasattr(analyzer, 'analyze') or not hasattr(analyzer, 'get_recommendations'): return { "status": "error", "message": "Analyzer must implement analyze() and get_recommendations() methods", "timestamp": datetime.now().isoformat() } # Add performance optimization components if available if self.performance_monitor: analyzer.performance_monitor = self.performance_monitor if self.memory_manager: analyzer.memory_manager = self.memory_manager # Register with the analysis engine self.analysis_engine.analyzer_registry.register(analyzer) # Update analysis priorities if not already defined analysis_type = analyzer.analysis_type if analysis_type not in self.analysis_engine.analysis_priorities: self.analysis_engine.analysis_priorities[analysis_type] = { "priority": 2, # Default medium priority "cost_impact": "unknown", "execution_time_estimate": 30, "dependencies": [], "description": f"Custom analyzer: {analyzer.__class__.__name__}" } logger.info(f"Successfully registered custom analyzer: {analysis_type}") return { "status": "success", "message": f"Custom analyzer '{analysis_type}' registered successfully", "analyzer_type": analysis_type, "analyzer_class": analyzer.__class__.__name__, "total_analyzers": len(self.analysis_engine.analyzer_registry.list_analyzers()), "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"Error registering custom analyzer: {str(e)}") return { "status": "error", "message": f"Failed to register custom analyzer: {str(e)}", "timestamp": datetime.now().isoformat() } def unregister_analyzer(self, analysis_type: str) -> Dict[str, Any]: """ Unregister an analyzer from the orchestrator. Args: analysis_type: Type of analysis to unregister Returns: Dictionary containing unregistration results """ try: if analysis_type not in self.analysis_engine.analyzer_registry.list_analyzers(): return { "status": "error", "message": f"Analyzer '{analysis_type}' is not registered", "available_analyzers": self.analysis_engine.analyzer_registry.list_analyzers(), "timestamp": datetime.now().isoformat() } # Remove from registry del self.analysis_engine.analyzer_registry._analyzers[analysis_type] # Remove from priorities if it exists if analysis_type in self.analysis_engine.analysis_priorities: del self.analysis_engine.analysis_priorities[analysis_type] logger.info(f"Successfully unregistered analyzer: {analysis_type}") return { "status": "success", "message": f"Analyzer '{analysis_type}' unregistered successfully", "remaining_analyzers": self.analysis_engine.analyzer_registry.list_analyzers(), "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"Error unregistering analyzer {analysis_type}: {str(e)}") return { "status": "error", "message": f"Failed to unregister analyzer '{analysis_type}': {str(e)}", "timestamp": datetime.now().isoformat() } def get_analyzer_execution_history(self, analysis_type: Optional[str] = None) -> Dict[str, Any]: """ Get execution history for analyzers. Args: analysis_type: Optional specific analyzer type to get history for Returns: Dictionary containing execution history """ try: if analysis_type: # Get history for specific analyzer analyzer = self.analysis_engine.analyzer_registry.get(analysis_type) if not analyzer: return { "status": "error", "message": f"Analyzer '{analysis_type}' not found", "available_analyzers": self.analysis_engine.analyzer_registry.list_analyzers() } return { "status": "success", "analysis_type": analysis_type, "execution_count": analyzer.execution_count, "last_execution": analyzer.last_execution.isoformat() if analyzer.last_execution else None, "analyzer_info": analyzer.get_analyzer_info() } else: # Get history for all analyzers all_history = {} for analyzer_type in self.analysis_engine.analyzer_registry.list_analyzers(): analyzer = self.analysis_engine.analyzer_registry.get(analyzer_type) all_history[analyzer_type] = { "execution_count": analyzer.execution_count, "last_execution": analyzer.last_execution.isoformat() if analyzer.last_execution else None, "analyzer_class": analyzer.__class__.__name__ } # Add engine-level execution history engine_history = getattr(self.analysis_engine, 'execution_history', []) return { "status": "success", "analyzer_history": all_history, "engine_execution_history": engine_history[-10:], # Last 10 executions "total_analyzers": len(all_history), "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"Error getting analyzer execution history: {str(e)}") return { "status": "error", "message": f"Failed to get execution history: {str(e)}", "timestamp": datetime.now().isoformat() } def _attempt_auto_recovery(self, analysis_type: str, error: Exception, context: Dict[str, Any]) -> Dict[str, Any]: """ Attempt automatic recovery from analyzer failures. Args: analysis_type: Type of analysis that failed error: Exception that occurred context: Analysis context Returns: Dictionary containing recovery attempt results """ recovery_result = { "attempted": True, "success": False, "strategy_used": "none", "timestamp": datetime.now().isoformat() } try: error_message = str(error).lower() # Timeout recovery: reduce scope and retry if "timeout" in error_message: recovery_result["strategy_used"] = "reduce_scope_retry" # Reduce lookback days original_lookback = context.get("lookback_days", 30) reduced_lookback = max(7, original_lookback // 2) # Reduce bucket scope if applicable original_buckets = context.get("bucket_names") reduced_buckets = original_buckets[:5] if original_buckets and len(original_buckets) > 5 else original_buckets logger.info(f"Attempting timeout recovery for {analysis_type}: reducing lookback from {original_lookback} to {reduced_lookback} days") # This would be implemented as a retry mechanism in the actual execution flow recovery_result["recovery_parameters"] = { "reduced_lookback_days": reduced_lookback, "reduced_bucket_scope": reduced_buckets, "original_lookback_days": original_lookback, "original_bucket_count": len(original_buckets) if original_buckets else 0 } recovery_result["success"] = True # Mark as successful strategy identification # Rate limit recovery: implement backoff elif any(keyword in error_message for keyword in ["rate", "throttl", "limit"]): recovery_result["strategy_used"] = "exponential_backoff" # Calculate backoff delay backoff_delay = min(60, 2 ** context.get("retry_count", 0)) recovery_result["recovery_parameters"] = { "backoff_delay_seconds": backoff_delay, "retry_count": context.get("retry_count", 0) + 1, "max_retries": 3 } recovery_result["success"] = True # Memory recovery: trigger cleanup elif "memory" in error_message: recovery_result["strategy_used"] = "memory_cleanup" if self.memory_manager: cleanup_result = self.memory_manager.force_cleanup("aggressive") recovery_result["recovery_parameters"] = { "cleanup_performed": True, "cleanup_result": cleanup_result } recovery_result["success"] = True else: recovery_result["recovery_parameters"] = { "cleanup_performed": False, "reason": "memory_manager_not_available" } # Network recovery: simple retry with delay elif any(keyword in error_message for keyword in ["network", "connection", "dns"]): recovery_result["strategy_used"] = "network_retry" recovery_result["recovery_parameters"] = { "retry_delay_seconds": 10, "retry_count": context.get("retry_count", 0) + 1, "max_retries": 2 } recovery_result["success"] = True return recovery_result except Exception as recovery_error: logger.error(f"Error during auto-recovery attempt: {str(recovery_error)}") recovery_result["error"] = str(recovery_error) return recovery_result def _prepare_analysis_params(self, analysis_type: str, **kwargs) -> Dict[str, Any]: """Prepare parameters for specific analysis type.""" base_params = { 'region': self.region, 'session_id': self.session_id, 'lookback_days': kwargs.get('lookback_days', 30), 'include_cost_analysis': kwargs.get('include_cost_analysis', True), 'bucket_names': kwargs.get('bucket_names'), 'timeout_seconds': kwargs.get('timeout_seconds', 60) } # Add analysis-specific parameters if analysis_type == "storage_class": base_params.update({ 'min_object_size_mb': kwargs.get('min_object_size_mb', 1), 'include_recommendations': kwargs.get('include_recommendations', True) }) elif analysis_type == "archive_optimization": base_params.update({ 'min_age_days': kwargs.get('min_age_days', 180), 'archive_tier_preference': kwargs.get('archive_tier_preference', 'auto'), 'include_compliance_check': kwargs.get('include_compliance_check', True) }) elif analysis_type == "api_cost": base_params.update({ 'request_threshold': kwargs.get('request_threshold', 10000), 'include_cloudfront_analysis': kwargs.get('include_cloudfront_analysis', True) }) elif analysis_type == "multipart_cleanup": base_params.update({ 'min_age_days': kwargs.get('min_age_days', 7), 'max_results_per_bucket': kwargs.get('max_results_per_bucket', 1000) }) elif analysis_type == "governance": base_params.update({ 'check_lifecycle_policies': kwargs.get('check_lifecycle_policies', True), 'check_versioning': kwargs.get('check_versioning', True), 'check_tagging': kwargs.get('check_tagging', True) }) return base_params def _prepare_comprehensive_analysis_params(self, **kwargs) -> Dict[str, Any]: """Prepare parameters for comprehensive analysis.""" return { 'region': self.region, 'session_id': self.session_id, 'lookback_days': kwargs.get('lookback_days', 30), 'include_cost_analysis': kwargs.get('include_cost_analysis', True), 'bucket_names': kwargs.get('bucket_names'), 'timeout_seconds': kwargs.get('timeout_seconds', 60), 'store_results': kwargs.get('store_results', True), 'include_cross_analysis': kwargs.get('include_cross_analysis', True) } def _create_prioritized_analysis_tasks(self, analysis_types: List[str], available_analyses: List[Dict[str, Any]], **kwargs) -> List[Dict[str, Any]]: """ Create prioritized analysis tasks for parallel execution based on cost impact and execution time. Args: analysis_types: List of analysis types to execute available_analyses: List of analysis metadata with priority information **kwargs: Analysis parameters Returns: List of prioritized service call definitions for parallel execution """ service_calls = [] # Create priority mapping for quick lookup priority_map = { analysis["analysis_type"]: analysis for analysis in available_analyses } # Sort analysis types by priority (highest first), then by execution time (shortest first) sorted_analyses = sorted( analysis_types, key=lambda x: ( priority_map.get(x, {}).get("priority", 1), -priority_map.get(x, {}).get("execution_time_estimate", 30) # Negative for ascending order ), reverse=True ) logger.info(f"Task prioritization order: {sorted_analyses}") # Create service call definitions with proper prioritization for i, analysis_type in enumerate(sorted_analyses): analysis_info = priority_map.get(analysis_type, {}) # Create synchronous wrapper function for the analysis (ServiceOrchestrator expects sync functions) def create_analysis_wrapper(atype=analysis_type, params=kwargs.copy()): def analysis_wrapper(): # Run async analysis in sync context import asyncio try: # Get or create event loop try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Run the analysis if loop.is_running(): # If loop is already running, we need to use a different approach import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(asyncio.run, self.analysis_engine.run_analysis(atype, **params)) return future.result(timeout=params.get("timeout_seconds", 60)) else: return loop.run_until_complete(self.analysis_engine.run_analysis(atype, **params)) except Exception as e: logger.error(f"Error in analysis wrapper for {atype}: {str(e)}") return { "status": "error", "analysis_type": atype, "error_message": str(e), "timestamp": datetime.now().isoformat() } return analysis_wrapper # Calculate dynamic timeout based on priority and estimated execution time base_timeout = analysis_info.get("execution_time_estimate", 30) priority_multiplier = 1.0 + (analysis_info.get("priority", 1) * 0.2) # Higher priority gets more time dynamic_timeout = int(base_timeout * priority_multiplier) + 15 # Add buffer service_call = { "service": "s3_analysis_engine", "operation": analysis_type, "function": create_analysis_wrapper(), "args": (), "kwargs": {}, "timeout": kwargs.get("timeout_seconds", dynamic_timeout), "priority": analysis_info.get("priority", 1), "metadata": { "analysis_type": analysis_type, "cost_impact": analysis_info.get("cost_impact", "unknown"), "execution_time_estimate": analysis_info.get("execution_time_estimate", 30), "execution_order": i + 1, "dependencies": analysis_info.get("dependencies", []), "description": analysis_info.get("description", "S3 optimization analysis"), "dynamic_timeout": dynamic_timeout, "priority_multiplier": priority_multiplier } } service_calls.append(service_call) return service_calls def _get_task_prioritization_summary(self, available_analyses: List[Dict[str, Any]]) -> Dict[str, Any]: """ Get summary of task prioritization for reporting. Args: available_analyses: List of analysis metadata Returns: Dictionary containing prioritization summary """ prioritization_summary = { "high_priority_analyses": [], "medium_priority_analyses": [], "low_priority_analyses": [], "prioritization_criteria": { "cost_impact": "Higher cost impact gets higher priority", "execution_time": "Shorter execution time gets higher priority within same cost impact", "dependencies": "Analyses with dependencies are scheduled after their dependencies" } } for analysis in available_analyses: priority = analysis.get("priority", 1) analysis_summary = { "analysis_type": analysis["analysis_type"], "priority": priority, "cost_impact": analysis.get("cost_impact", "unknown"), "execution_time_estimate": analysis.get("execution_time_estimate", 30) } if priority >= 4: prioritization_summary["high_priority_analyses"].append(analysis_summary) elif priority >= 2: prioritization_summary["medium_priority_analyses"].append(analysis_summary) else: prioritization_summary["low_priority_analyses"].append(analysis_summary) return prioritization_summary def _execute_cross_analysis_queries(self, stored_tables: List[str]) -> Dict[str, Any]: """ Execute cross-analysis aggregation queries for deeper insights. Args: stored_tables: List of table names stored in the session Returns: Dictionary containing cross-analysis query results """ cross_analysis_results = {} try: # Import aggregation queries from .s3_aggregation_queries import S3AggregationQueries # Get cross-analysis queries based on available tables aggregation_queries = self._get_cross_analysis_queries_for_tables(stored_tables) if aggregation_queries: logger.info(f"Executing {len(aggregation_queries)} cross-analysis queries") # Execute aggregation queries using ServiceOrchestrator cross_analysis_results = self.service_orchestrator.aggregate_results(aggregation_queries) logger.info(f"Completed cross-analysis queries: {len(cross_analysis_results)} results") else: logger.info("No cross-analysis queries available for current table set") except Exception as e: logger.error(f"Error executing cross-analysis queries: {str(e)}") cross_analysis_results = {"error": str(e)} return cross_analysis_results def _get_cross_analysis_queries_for_tables(self, stored_tables: List[str]) -> List[Dict[str, str]]: """ Get cross-analysis queries adapted for the specific tables available in the session. Args: stored_tables: List of table names stored in the session Returns: List of query definitions adapted for available tables """ queries = [] # Filter tables to S3-related ones s3_tables = [table for table in stored_tables if table.startswith('s3_')] if not s3_tables: return queries # Get table names by analysis type table_map = {} for table in s3_tables: if 'general_spend' in table: table_map['general_spend'] = table elif 'storage_class' in table: table_map['storage_class'] = table elif 'archive_optimization' in table: table_map['archive_optimization'] = table elif 'api_cost' in table: table_map['api_cost'] = table elif 'multipart_cleanup' in table: table_map['multipart_cleanup'] = table elif 'governance' in table: table_map['governance'] = table elif 'comprehensive' in table: table_map['comprehensive'] = table # Query 1: Recommendations by priority (if we have recommendation tables) recommendation_tables = [table for table in s3_tables if 'comprehensive' not in table] if recommendation_tables: union_clauses = [] for table in recommendation_tables: union_clauses.append(f""" SELECT priority, analysis_type, title, description, potential_savings, implementation_effort FROM "{table}" WHERE record_type = 'recommendation' """) if union_clauses: queries.append({ "name": "recommendations_by_priority", "query": f""" SELECT priority, COUNT(*) as recommendation_count, SUM(CASE WHEN potential_savings > 0 THEN potential_savings ELSE 0 END) as total_potential_savings, AVG(CASE WHEN potential_savings > 0 THEN potential_savings ELSE NULL END) as avg_potential_savings FROM ( {' UNION ALL '.join(union_clauses)} ) all_recommendations GROUP BY priority ORDER BY CASE priority WHEN 'high' THEN 3 WHEN 'medium' THEN 2 WHEN 'low' THEN 1 ELSE 0 END DESC """ }) # Query 2: Top optimization opportunities (if we have comprehensive table) if 'comprehensive' in table_map: queries.append({ "name": "top_optimization_opportunities", "query": f""" SELECT title, description, potential_savings, source_analysis, implementation_effort, priority, rank FROM "{table_map['comprehensive']}" WHERE record_type = 'optimization_opportunity' ORDER BY potential_savings DESC, rank ASC LIMIT 10 """ }) # Query 3: Analysis execution summary metadata_tables = [table for table in s3_tables if 'comprehensive' not in table] if metadata_tables: union_clauses = [] for table in metadata_tables: union_clauses.append(f""" SELECT analysis_type, status, execution_time, recommendations_count, data_sources, timestamp FROM "{table}" WHERE record_type = 'metadata' """) if union_clauses: queries.append({ "name": "analysis_execution_summary", "query": f""" SELECT analysis_type, status, execution_time, recommendations_count, data_sources, timestamp FROM ( {' UNION ALL '.join(union_clauses)} ) metadata ORDER BY CASE status WHEN 'success' THEN 1 WHEN 'error' THEN 2 ELSE 3 END, execution_time ASC """ }) # Query 4: Total potential savings by analysis type if recommendation_tables: union_clauses = [] for table in recommendation_tables: union_clauses.append(f""" SELECT analysis_type, potential_savings, priority, implementation_effort FROM "{table}" WHERE record_type = 'recommendation' """) if union_clauses: queries.append({ "name": "total_savings_by_analysis", "query": f""" SELECT analysis_type, COUNT(*) as recommendation_count, SUM(CASE WHEN potential_savings > 0 THEN potential_savings ELSE 0 END) as total_potential_savings, AVG(CASE WHEN potential_savings > 0 THEN potential_savings ELSE NULL END) as avg_potential_savings, MAX(potential_savings) as max_potential_savings, COUNT(CASE WHEN priority = 'high' THEN 1 END) as high_priority_count, COUNT(CASE WHEN priority = 'medium' THEN 1 END) as medium_priority_count, COUNT(CASE WHEN priority = 'low' THEN 1 END) as low_priority_count FROM ( {' UNION ALL '.join(union_clauses)} ) all_recommendations GROUP BY analysis_type ORDER BY total_potential_savings DESC """ }) # Query 5: Cross-analysis insights (if we have comprehensive table) if 'comprehensive' in table_map: queries.append({ "name": "cross_analysis_insights", "query": f""" SELECT insight_type, title, description, recommendation, analyses_involved, timestamp FROM "{table_map['comprehensive']}" WHERE record_type = 'cross_analysis_insight' ORDER BY timestamp DESC """ }) logger.info(f"Generated {len(queries)} cross-analysis queries for {len(s3_tables)} S3 tables") return queries def _get_analysis_priority(self, analysis_type: str) -> int: """Get priority for analysis type (higher number = higher priority).""" return self.analysis_engine.analysis_priorities.get(analysis_type, {}).get("priority", 1) def _get_cache_ttl_for_analysis(self, analysis_type: str) -> int: """ Get appropriate cache TTL for analysis type. Args: analysis_type: Type of analysis Returns: TTL in seconds """ # Different analysis types have different cache lifetimes ttl_mapping = { "general_spend": 1800, # 30 minutes - cost data changes frequently "storage_class": 3600, # 1 hour - storage class analysis is more stable "archive_optimization": 7200, # 2 hours - archive recommendations change slowly "api_cost": 1800, # 30 minutes - API costs can fluctuate "multipart_cleanup": 900, # 15 minutes - multipart uploads change frequently "governance": 3600, # 1 hour - governance policies are relatively stable "comprehensive": 1800 # 30 minutes - comprehensive analysis includes dynamic data } return ttl_mapping.get(analysis_type, 1800) # Default 30 minutes def get_performance_summary(self) -> Dict[str, Any]: """Get comprehensive performance summary from all optimization components.""" try: return { "timestamp": datetime.now().isoformat(), "session_id": self.session_id, "region": self.region, "performance_monitor": self.performance_monitor.get_performance_summary(), "memory_manager": self.memory_manager.get_memory_statistics(), "timeout_handler": self.timeout_handler.get_performance_statistics(), "caches": { "pricing_cache": self.pricing_cache.get_statistics(), "bucket_metadata_cache": self.bucket_metadata_cache.get_statistics(), "analysis_results_cache": self.analysis_results_cache.get_statistics() } } except Exception as e: logger.error(f"Error getting performance summary: {e}") return {"error": str(e)} def optimize_performance(self): """Optimize performance by running optimization routines on all components.""" try: logger.info("Running performance optimization") # Optimize timeout configuration based on historical data self.timeout_handler.optimize_configuration() # Force gentle memory cleanup self.memory_manager.force_cleanup("gentle") # Optimize caches for cache in [self.pricing_cache, self.bucket_metadata_cache, self.analysis_results_cache]: if hasattr(cache, '_optimize_cache'): cache._optimize_cache() logger.info("Performance optimization completed") except Exception as e: logger.error(f"Error during performance optimization: {e}") def clear_caches(self, cache_types: Optional[List[str]] = None): """ Clear specified caches or all caches. Args: cache_types: List of cache types to clear (None for all) """ cache_map = { "pricing": self.pricing_cache, "bucket_metadata": self.bucket_metadata_cache, "analysis_results": self.analysis_results_cache } if cache_types is None: cache_types = list(cache_map.keys()) for cache_type in cache_types: if cache_type in cache_map: cache_map[cache_type].clear() logger.info(f"Cleared {cache_type} cache") else: logger.warning(f"Unknown cache type: {cache_type}") def _store_analysis_results(self, analysis_type: str, result: Dict[str, Any]): """Store analysis results in session database with proper schema for cross-analysis queries.""" try: table_name = f"s3_{analysis_type}_{int(time.time())}" # Convert result to structured format for storage data_to_store = [] # Store main result metadata with consistent column names data_to_store.append({ "record_type": "metadata", "analysis_type": analysis_type, "status": result.get("status"), "execution_time": result.get("execution_time", 0), "timestamp": result.get("timestamp", datetime.now().isoformat()), "session_id": self.session_id, "data_sources": str(result.get("data_sources", [])), "recommendations_count": len(result.get("recommendations", [])), # Add placeholder columns for cross-analysis compatibility "priority": None, "title": None, "description": None, "potential_savings": 0, "implementation_effort": None, "recommendation_id": None, "rec_type": None }) # Store recommendations separately for easier querying with consistent schema for i, rec in enumerate(result.get("recommendations", [])): rec_data = { "record_type": "recommendation", "analysis_type": analysis_type, "recommendation_id": i, "rec_type": rec.get("type", ""), "priority": rec.get("priority", "medium"), "title": rec.get("title", ""), "description": rec.get("description", ""), "potential_savings": rec.get("potential_savings", 0), "implementation_effort": rec.get("implementation_effort", "medium"), "timestamp": datetime.now().isoformat(), # Add metadata columns for consistency "status": None, "execution_time": None, "session_id": self.session_id, "data_sources": None, "recommendations_count": None } data_to_store.append(rec_data) # Store analysis data summary with consistent schema if result.get("data"): data_summary = { "record_type": "data_summary", "analysis_type": analysis_type, "data_keys": str(list(result["data"].keys())), "timestamp": datetime.now().isoformat(), # Add placeholder columns for consistency "status": None, "execution_time": None, "session_id": self.session_id, "data_sources": None, "recommendations_count": None, "priority": None, "title": None, "description": None, "potential_savings": 0, "implementation_effort": None, "recommendation_id": None, "rec_type": None } data_to_store.append(data_summary) success = self.session_manager.store_data( self.session_id, table_name, data_to_store ) if success: logger.info(f"Stored {analysis_type} results ({len(data_to_store)} records) in table {table_name}") else: logger.warning(f"Failed to store {analysis_type} results") except Exception as e: logger.error(f"Error storing analysis results for {analysis_type}: {str(e)}") def _store_comprehensive_results(self, aggregated_results: Dict[str, Any], execution_results: Dict[str, Any]): """Store comprehensive analysis results with execution metadata in session database.""" try: table_name = f"s3_comprehensive_{int(time.time())}" data_to_store = [] # Store aggregation metadata with execution information metadata = aggregated_results.get("aggregation_metadata", {}) execution_summary = execution_results.get("execution_summary", execution_results) data_to_store.append({ "record_type": "comprehensive_metadata", "total_analyses": metadata.get("total_analyses", 0), "successful_analyses": metadata.get("successful_analyses", 0), "failed_analyses": metadata.get("failed_analyses", 0), "total_potential_savings": aggregated_results.get("total_potential_savings", 0), "aggregated_at": metadata.get("aggregated_at", datetime.now().isoformat()), "session_id": self.session_id, "parallel_execution_time": execution_summary.get("total_execution_time", 0), "successful_tasks": execution_summary.get("successful", 0), "failed_tasks": execution_summary.get("failed", 0), "timeout_tasks": execution_summary.get("timeout", 0), "stored_tables_count": len(execution_summary.get("stored_tables", [])) }) # Store execution task details for performance analysis for task_id, task_result in execution_summary.get("results", {}).items(): task_data = { "record_type": "task_execution", "task_id": task_id, "service": task_result.get("service", ""), "operation": task_result.get("operation", ""), "status": task_result.get("status", ""), "execution_time": task_result.get("execution_time", 0), "error": task_result.get("error", ""), "stored_table": task_result.get("stored_table", ""), "timestamp": datetime.now().isoformat() } data_to_store.append(task_data) # Store top optimization opportunities with enhanced metadata for i, opportunity in enumerate(aggregated_results.get("optimization_opportunities", [])[:10]): opp_data = { "record_type": "optimization_opportunity", "rank": opportunity.get("rank", i + 1), "title": opportunity.get("title", ""), "description": opportunity.get("description", ""), "potential_savings": opportunity.get("potential_savings", 0), "implementation_effort": opportunity.get("implementation_effort", "medium"), "source_analysis": opportunity.get("source_analysis", ""), "priority": opportunity.get("priority", "medium"), "timestamp": datetime.now().isoformat() } data_to_store.append(opp_data) # Store cross-analysis insights for i, insight in enumerate(aggregated_results.get("cross_analysis_insights", [])): insight_data = { "record_type": "cross_analysis_insight", "insight_id": i, "insight_type": insight.get("type", ""), "title": insight.get("title", ""), "description": insight.get("description", ""), "recommendation": insight.get("recommendation", ""), "analyses_involved": str(insight.get("analyses_involved", [])), "timestamp": datetime.now().isoformat() } data_to_store.append(insight_data) # Store cost insights summary cost_insights = aggregated_results.get("cost_insights", {}) if cost_insights and not cost_insights.get("error"): cost_data = { "record_type": "cost_insights_summary", "total_storage_cost": cost_insights.get("total_storage_cost", 0), "total_transfer_cost": cost_insights.get("total_transfer_cost", 0), "total_api_cost": cost_insights.get("total_api_cost", 0), "highest_cost_area": cost_insights.get("highest_cost_areas", [{}])[0].get("0", "Unknown") if cost_insights.get("highest_cost_areas") else "Unknown", "optimization_potential_count": len(cost_insights.get("cost_optimization_potential", {})), "timestamp": datetime.now().isoformat() } data_to_store.append(cost_data) success = self.session_manager.store_data( self.session_id, table_name, data_to_store ) if success: logger.info(f"Stored comprehensive results ({len(data_to_store)} records) in table {table_name}") else: logger.warning("Failed to store comprehensive results") except Exception as e: logger.error(f"Error storing comprehensive results: {str(e)}") def get_available_analyses(self) -> List[Dict[str, Any]]: """ Get list of available analyses with metadata. Returns: List of analysis information dictionaries """ return self.analysis_engine.get_available_analyses() def generate_cross_analyzer_insights(self, analysis_results: Dict[str, Any]) -> List[Dict[str, Any]]: """ Generate cross-analyzer insights by analyzing relationships between different analysis results. Args: analysis_results: Dictionary of analysis results from multiple analyzers Returns: List of cross-analyzer insight dictionaries """ insights = [] try: # Extract successful results for cross-analysis successful_results = {} for task_id, result in analysis_results.items(): # Handle both TaskResult objects and direct dictionaries if hasattr(result, 'data') and result.data and hasattr(result, 'status') and result.status == "success": analysis_type = result.data.get("analysis_type", "unknown") successful_results[analysis_type] = result.data elif isinstance(result, dict) and result.get("status") == "success": analysis_type = result.get("analysis_type", "unknown") successful_results[analysis_type] = result logger.info(f"Generating cross-analyzer insights from {len(successful_results)} successful analyses") # Insight 1: Storage cost optimization correlation if "general_spend" in successful_results and "storage_class" in successful_results: spend_data = successful_results["general_spend"].get("data", {}) storage_data = successful_results["storage_class"].get("data", {}) total_storage_cost = spend_data.get("total_storage_cost", 0) storage_recommendations = successful_results["storage_class"].get("recommendations", []) storage_savings = sum(rec.get("potential_savings", 0) for rec in storage_recommendations) if total_storage_cost > 0 and storage_savings > 0: savings_percentage = (storage_savings / total_storage_cost) * 100 insights.append({ "type": "cost_optimization_correlation", "title": "Storage Class Optimization Impact", "description": f"Storage class optimization could reduce total storage costs by {savings_percentage:.1f}% (${storage_savings:.2f} out of ${total_storage_cost:.2f})", "recommendation": "Prioritize storage class optimization as it offers significant cost reduction potential", "analyses_involved": ["general_spend", "storage_class"], "metrics": { "total_storage_cost": total_storage_cost, "potential_storage_savings": storage_savings, "savings_percentage": savings_percentage }, "priority": "high" if savings_percentage > 20 else "medium", "confidence": "high" }) # Insight 2: Governance and cost correlation if "governance" in successful_results and "multipart_cleanup" in successful_results: governance_violations = len(successful_results["governance"].get("recommendations", [])) multipart_savings = sum( rec.get("potential_savings", 0) for rec in successful_results["multipart_cleanup"].get("recommendations", []) ) if governance_violations > 0 and multipart_savings > 0: insights.append({ "type": "governance_cost_correlation", "title": "Governance Gaps Leading to Cost Waste", "description": f"Found {governance_violations} governance violations and ${multipart_savings:.2f} in multipart upload waste", "recommendation": "Implement lifecycle policies to prevent future cost waste from incomplete uploads", "analyses_involved": ["governance", "multipart_cleanup"], "metrics": { "governance_violations": governance_violations, "multipart_waste_cost": multipart_savings }, "priority": "high" if multipart_savings > 100 else "medium", "confidence": "high" }) # Insight 3: Archive optimization opportunity if "storage_class" in successful_results and "archive_optimization" in successful_results: storage_recs = successful_results["storage_class"].get("recommendations", []) archive_recs = successful_results["archive_optimization"].get("recommendations", []) # Look for overlapping optimization opportunities storage_savings = sum(rec.get("potential_savings", 0) for rec in storage_recs) archive_savings = sum(rec.get("potential_savings", 0) for rec in archive_recs) if storage_savings > 0 and archive_savings > 0: combined_savings = storage_savings + archive_savings insights.append({ "type": "combined_optimization_opportunity", "title": "Combined Storage and Archive Optimization", "description": f"Combining storage class optimization (${storage_savings:.2f}) with archive strategies (${archive_savings:.2f}) could save ${combined_savings:.2f} total", "recommendation": "Implement a phased approach: optimize storage classes first, then implement archive policies for long-term data", "analyses_involved": ["storage_class", "archive_optimization"], "metrics": { "storage_class_savings": storage_savings, "archive_savings": archive_savings, "combined_savings": combined_savings }, "priority": "high" if combined_savings > 500 else "medium", "confidence": "medium" }) # Insight 4: API cost vs storage cost balance if "general_spend" in successful_results and "api_cost" in successful_results: spend_data = successful_results["general_spend"].get("data", {}) api_recs = successful_results["api_cost"].get("recommendations", []) total_api_cost = spend_data.get("total_api_cost", 0) total_storage_cost = spend_data.get("total_storage_cost", 0) api_savings = sum(rec.get("potential_savings", 0) for rec in api_recs) if total_api_cost > 0 and total_storage_cost > 0: api_percentage = (total_api_cost / (total_api_cost + total_storage_cost)) * 100 if api_percentage > 30: # API costs are significant insights.append({ "type": "cost_distribution_analysis", "title": "High API Cost Ratio Detected", "description": f"API costs represent {api_percentage:.1f}% of total S3 costs (${total_api_cost:.2f} API vs ${total_storage_cost:.2f} storage)", "recommendation": "Focus on API cost optimization through caching, request consolidation, and CloudFront integration", "analyses_involved": ["general_spend", "api_cost"], "metrics": { "api_cost_percentage": api_percentage, "total_api_cost": total_api_cost, "total_storage_cost": total_storage_cost, "api_optimization_potential": api_savings }, "priority": "high" if api_percentage > 50 else "medium", "confidence": "high" }) # Insight 5: Comprehensive optimization priority ranking if len(successful_results) >= 3: # Calculate total potential savings by analysis type savings_by_analysis = {} for analysis_type, result in successful_results.items(): recommendations = result.get("recommendations", []) total_savings = sum(rec.get("potential_savings", 0) for rec in recommendations) savings_by_analysis[analysis_type] = total_savings # Sort by savings potential sorted_savings = sorted(savings_by_analysis.items(), key=lambda x: x[1], reverse=True) if sorted_savings and sorted_savings[0][1] > 0: top_analysis = sorted_savings[0][0] top_savings = sorted_savings[0][1] total_savings = sum(savings_by_analysis.values()) insights.append({ "type": "optimization_priority_ranking", "title": "Optimization Priority Recommendations", "description": f"Based on potential savings analysis, prioritize {top_analysis} optimization (${top_savings:.2f} of ${total_savings:.2f} total potential)", "recommendation": f"Start with {top_analysis} optimization for maximum impact, then proceed with other optimizations in order of savings potential", "analyses_involved": list(successful_results.keys()), "metrics": { "priority_ranking": [{"analysis": analysis, "savings": savings} for analysis, savings in sorted_savings], "total_potential_savings": total_savings, "top_opportunity": {"analysis": top_analysis, "savings": top_savings} }, "priority": "high", "confidence": "high" }) # Insight 6: Data freshness and analysis reliability analysis_timestamps = {} for analysis_type, result in successful_results.items(): timestamp = result.get("timestamp") if timestamp: analysis_timestamps[analysis_type] = timestamp if len(analysis_timestamps) > 1: # Check for timestamp consistency (all analyses should be recent) from datetime import datetime, timedelta current_time = datetime.now() old_analyses = [] for analysis_type, timestamp_str in analysis_timestamps.items(): try: analysis_time = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) if current_time - analysis_time > timedelta(hours=24): old_analyses.append(analysis_type) except: pass if old_analyses: insights.append({ "type": "data_freshness_warning", "title": "Analysis Data Freshness Concern", "description": f"Some analyses may be using stale data: {', '.join(old_analyses)}", "recommendation": "Re-run analyses with stale data to ensure recommendations are based on current information", "analyses_involved": old_analyses, "metrics": { "stale_analyses": old_analyses, "total_analyses": len(analysis_timestamps) }, "priority": "medium", "confidence": "medium" }) logger.info(f"Generated {len(insights)} cross-analyzer insights") return insights except Exception as e: logger.error(f"Error generating cross-analyzer insights: {str(e)}") return [{ "type": "insight_generation_error", "title": "Cross-Analyzer Insight Generation Failed", "description": f"Failed to generate cross-analyzer insights: {str(e)}", "recommendation": "Review analysis results manually for optimization opportunities", "analyses_involved": list(analysis_results.keys()) if analysis_results else [], "priority": "low", "confidence": "low" }] def aggregate_results_with_insights(self, results: Dict[str, Any], include_cross_analysis: bool = True) -> Dict[str, Any]: """ Enhanced result aggregation with cross-analyzer insights and comprehensive analysis. Args: results: Dictionary of analysis results by task ID include_cross_analysis: Whether to include cross-analysis insights Returns: Enhanced aggregated analysis results """ try: # Start with base aggregation from analysis engine base_aggregation = self.analysis_engine.aggregate_analysis_results(results, include_cross_analysis) # Enhance with orchestrator-level insights if include_cross_analysis: cross_insights = self.generate_cross_analyzer_insights(results) base_aggregation["cross_analysis_insights"] = cross_insights # Update optimization opportunities with cross-analyzer insights insight_opportunities = [] for insight in cross_insights: if insight.get("type") in ["cost_optimization_correlation", "combined_optimization_opportunity", "optimization_priority_ranking"]: opportunity = { "title": insight["title"], "description": insight["description"], "recommendation": insight["recommendation"], "potential_savings": insight.get("metrics", {}).get("combined_savings") or insight.get("metrics", {}).get("potential_storage_savings", 0), "priority": insight["priority"], "source_analysis": "cross_analyzer_insight", "analyses_involved": insight["analyses_involved"], "confidence": insight.get("confidence", "medium"), "insight_type": insight["type"] } insight_opportunities.append(opportunity) # Merge with existing opportunities existing_opportunities = base_aggregation.get("optimization_opportunities", []) all_opportunities = existing_opportunities + insight_opportunities # Sort by potential savings and priority priority_weights = {"high": 3, "medium": 2, "low": 1} all_opportunities.sort( key=lambda x: ( priority_weights.get(x.get("priority", "medium"), 2), x.get("potential_savings", 0) ), reverse=True ) base_aggregation["optimization_opportunities"] = all_opportunities # Add orchestrator-specific metadata base_aggregation["orchestrator_metadata"] = { "orchestrator_class": "S3OptimizationOrchestrator", "session_id": self.session_id, "region": self.region, "performance_optimizations_enabled": True, "cross_analyzer_insights_count": len(base_aggregation.get("cross_analysis_insights", [])), "enhanced_aggregation": True, "aggregation_timestamp": datetime.now().isoformat() } # Add performance summary if available if self.performance_monitor: try: performance_summary = self.get_performance_summary() base_aggregation["performance_summary"] = performance_summary except Exception as e: logger.warning(f"Could not include performance summary: {str(e)}") return base_aggregation except Exception as e: logger.error(f"Error in enhanced result aggregation: {str(e)}") # Fall back to base aggregation try: return self.analysis_engine.aggregate_analysis_results(results, include_cross_analysis) except Exception as fallback_error: logger.error(f"Fallback aggregation also failed: {str(fallback_error)}") return { "status": "error", "message": f"Result aggregation failed: {str(e)}", "fallback_error": str(fallback_error), "timestamp": datetime.now().isoformat() } def get_engine_status(self) -> Dict[str, Any]: """ Get status information about the analysis engine. Returns: Dictionary containing engine status """ return self.analysis_engine.get_engine_status() def cleanup_session(self): """Clean up the orchestrator session.""" try: if self.service_orchestrator: self.service_orchestrator.cleanup_session() if self.analysis_engine: self.analysis_engine.cleanup() logger.info("S3OptimizationOrchestrator session cleaned up") except Exception as e: logger.error(f"Error cleaning up S3OptimizationOrchestrator session: {str(e)}") def get_session_info(self) -> Dict[str, Any]: """Get information about the current session.""" try: session_info = self.service_orchestrator.get_session_info() session_info["analysis_engine_status"] = self.analysis_engine.get_engine_status() return session_info except Exception as e: logger.error(f"Error getting session info: {str(e)}") return {"error": f"Failed to get session info: {str(e)}"} def get_orchestrator_status(self) -> Dict[str, Any]: """ Get comprehensive status of the S3OptimizationOrchestrator including analyzer integration. Returns: Dictionary containing complete orchestrator status """ try: status = { "orchestrator_info": { "class_name": "S3OptimizationOrchestrator", "session_id": self.session_id, "region": self.region, "initialized_at": datetime.now().isoformat(), "performance_optimizations_enabled": True }, "analyzer_integration": {}, "service_integration": {}, "performance_components": {}, "session_integration": {}, "capabilities": {}, "health_status": "unknown" } # Get analyzer integration status try: analyzer_registry_info = self.get_analyzer_registry() status["analyzer_integration"] = { "status": "active" if analyzer_registry_info.get("registry_status") == "active" else "inactive", "total_analyzers": analyzer_registry_info.get("total_analyzers", 0), "registered_analyzers": list(analyzer_registry_info.get("registered_analyzers", {}).keys()), "analyzer_health": analyzer_registry_info.get("analyzer_health", {}), "analysis_priorities_configured": len(self.analysis_engine.analysis_priorities) > 0 } except Exception as e: status["analyzer_integration"] = {"status": "error", "error": str(e)} # Get service integration status try: engine_status = self.analysis_engine.get_engine_status() status["service_integration"] = { "s3_service": engine_status.get("services_status", {}).get("s3_service", False), "pricing_service": engine_status.get("services_status", {}).get("pricing_service", False), "storage_lens_service": engine_status.get("services_status", {}).get("storage_lens_service", False), "service_orchestrator": self.service_orchestrator is not None, "analysis_engine": self.analysis_engine is not None } except Exception as e: status["service_integration"] = {"status": "error", "error": str(e)} # Get performance components status try: status["performance_components"] = { "performance_monitor": self.performance_monitor is not None, "memory_manager": self.memory_manager is not None, "timeout_handler": self.timeout_handler is not None, "pricing_cache": self.pricing_cache is not None, "bucket_metadata_cache": self.bucket_metadata_cache is not None, "analysis_results_cache": self.analysis_results_cache is not None } # Add cache statistics if available if self.pricing_cache: status["performance_components"]["pricing_cache_stats"] = self.pricing_cache.get_statistics() if self.bucket_metadata_cache: status["performance_components"]["bucket_cache_stats"] = self.bucket_metadata_cache.get_statistics() if self.analysis_results_cache: status["performance_components"]["results_cache_stats"] = self.analysis_results_cache.get_statistics() except Exception as e: status["performance_components"] = {"status": "error", "error": str(e)} # Get session integration status try: session_info = self.service_orchestrator.get_session_info() stored_tables = self.get_stored_tables() s3_tables = [table for table in stored_tables if table.startswith('s3_')] status["session_integration"] = { "session_active": session_info.get("error") is None, "session_id": self.session_id, "total_tables": len(stored_tables), "s3_analysis_tables": len(s3_tables), "session_manager_available": self.session_manager is not None, "cross_analysis_ready": len(s3_tables) > 0 } except Exception as e: status["session_integration"] = {"status": "error", "error": str(e)} # Get capabilities try: available_analyses = self.analysis_engine.get_available_analyses() status["capabilities"] = { "total_analysis_types": len(available_analyses), "available_analyses": [analysis["analysis_type"] for analysis in available_analyses], "parallel_execution": True, "cross_analyzer_insights": True, "performance_optimizations": True, "dynamic_analyzer_loading": True, "comprehensive_error_handling": True, "session_sql_integration": True, "intelligent_caching": True, "memory_management": True, "progressive_timeouts": True } except Exception as e: status["capabilities"] = {"status": "error", "error": str(e)} # Determine overall health status try: analyzer_ok = status["analyzer_integration"].get("status") == "active" services_ok = all(status["service_integration"].values()) if isinstance(status["service_integration"], dict) else False performance_ok = any(status["performance_components"].values()) if isinstance(status["performance_components"], dict) else False session_ok = status["session_integration"].get("session_active", False) if analyzer_ok and services_ok and performance_ok and session_ok: status["health_status"] = "healthy" elif analyzer_ok and services_ok: status["health_status"] = "functional" else: status["health_status"] = "degraded" except Exception as e: status["health_status"] = "unknown" status["health_check_error"] = str(e) return status except Exception as e: logger.error(f"Error getting orchestrator status: {str(e)}") return { "orchestrator_info": { "class_name": "S3OptimizationOrchestrator", "session_id": self.session_id, "region": self.region }, "status": "error", "error": str(e), "timestamp": datetime.now().isoformat() } def get_parallel_execution_status(self) -> Dict[str, Any]: """ Get detailed status of parallel execution framework integration. Returns: Dictionary containing parallel execution status and metrics """ try: # Get parallel executor status parallel_executor = self.service_orchestrator.parallel_executor executor_status = parallel_executor.get_status() # Get session manager status session_info = self.session_manager.get_session_info(self.session_id) # Get analysis engine status engine_status = self.analysis_engine.get_engine_status() # Get stored tables for cross-analysis stored_tables = self.get_stored_tables() s3_tables = [table for table in stored_tables if table.startswith('s3_')] integration_status = { "integration_info": { "orchestrator_class": "S3OptimizationOrchestrator", "session_id": self.session_id, "region": self.region, "integration_complete": True, "timestamp": datetime.now().isoformat() }, "parallel_executor": { "status": "active" if executor_status.get("executor_alive", False) else "inactive", "max_workers": executor_status.get("max_workers", 0), "active_tasks": executor_status.get("active_tasks", 0), "completed_tasks": executor_status.get("completed_tasks", 0), "status_breakdown": executor_status.get("status_breakdown", {}) }, "session_sql_integration": { "session_active": session_info.get("error") is None, "session_created_at": session_info.get("created_at"), "session_last_accessed": session_info.get("last_accessed"), "total_tables": len(stored_tables), "s3_analysis_tables": len(s3_tables), "stored_table_names": s3_tables[:10] # Show first 10 S3 tables }, "analysis_engine": { "registered_analyzers": len(engine_status.get("registered_analyzers", {})), "analyzer_types": list(engine_status.get("registered_analyzers", {}).keys()), "execution_history_count": len(engine_status.get("execution_history", [])), "services_available": engine_status.get("services_status", {}) }, "task_prioritization": { "prioritization_enabled": True, "priority_factors": [ "cost_impact", "execution_time_estimate", "dependency_level", "context_adjustments" ], "analysis_priorities": self.analysis_engine.analysis_priorities }, "cross_analysis_capabilities": { "cross_analysis_enabled": True, "aggregation_queries_available": len(s3_tables) > 0, "insight_generation_enabled": True, "available_query_types": [ "recommendations_by_priority", "top_optimization_opportunities", "analysis_execution_summary", "total_savings_by_analysis", "cross_analysis_insights" ] if s3_tables else [] }, "integration_health": { "all_components_active": all([ executor_status.get("executor_alive", False), session_info.get("error") is None, len(engine_status.get("registered_analyzers", {})) > 0 ]), "ready_for_analysis": True, "estimated_capacity": executor_status.get("max_workers", 0), "last_health_check": datetime.now().isoformat() } } return integration_status except Exception as e: logger.error(f"Error getting parallel execution status: {str(e)}") return { "integration_info": { "orchestrator_class": "S3OptimizationOrchestrator", "session_id": self.session_id, "integration_complete": False, "error": str(e), "timestamp": datetime.now().isoformat() }, "error": f"Failed to get parallel execution status: {str(e)}" } def validate_analyzer_integration(self) -> Dict[str, Any]: """ Validate that all analyzers are properly integrated and functional. Returns: Dictionary containing analyzer integration validation results """ validation_results = { "validation_timestamp": datetime.now().isoformat(), "overall_status": "unknown", "analyzer_validations": {}, "integration_tests": {}, "recommendations": [] } try: # Validate each registered analyzer for analysis_type in self.analysis_engine.analyzer_registry.list_analyzers(): analyzer = self.analysis_engine.analyzer_registry.get(analysis_type) analyzer_validation = { "status": "unknown", "checks": {}, "errors": [], "warnings": [] } # Check 1: Analyzer instance validity try: if analyzer and hasattr(analyzer, 'analyze') and hasattr(analyzer, 'get_recommendations'): analyzer_validation["checks"]["instance_valid"] = True else: analyzer_validation["checks"]["instance_valid"] = False analyzer_validation["errors"].append("Analyzer missing required methods") except Exception as e: analyzer_validation["checks"]["instance_valid"] = False analyzer_validation["errors"].append(f"Instance validation error: {str(e)}") # Check 2: Service dependencies try: services_available = { "s3_service": analyzer.s3_service is not None, "pricing_service": analyzer.pricing_service is not None, "storage_lens_service": analyzer.storage_lens_service is not None } analyzer_validation["checks"]["services"] = services_available missing_services = [service for service, available in services_available.items() if not available] if missing_services: analyzer_validation["warnings"].append(f"Missing services: {', '.join(missing_services)}") except Exception as e: analyzer_validation["checks"]["services"] = {} analyzer_validation["errors"].append(f"Service check error: {str(e)}") # Check 3: Performance optimization components try: performance_components = { "performance_monitor": hasattr(analyzer, 'performance_monitor') and analyzer.performance_monitor is not None, "memory_manager": hasattr(analyzer, 'memory_manager') and analyzer.memory_manager is not None } analyzer_validation["checks"]["performance_components"] = performance_components missing_components = [comp for comp, available in performance_components.items() if not available] if missing_components: analyzer_validation["warnings"].append(f"Missing performance components: {', '.join(missing_components)}") except Exception as e: analyzer_validation["checks"]["performance_components"] = {} analyzer_validation["errors"].append(f"Performance component check error: {str(e)}") # Check 4: Priority configuration try: priority_info = self.analysis_engine.analysis_priorities.get(analysis_type, {}) analyzer_validation["checks"]["priority_configured"] = bool(priority_info) if not priority_info: analyzer_validation["warnings"].append("No priority configuration found") else: required_fields = ["priority", "cost_impact", "execution_time_estimate"] missing_fields = [field for field in required_fields if field not in priority_info] if missing_fields: analyzer_validation["warnings"].append(f"Missing priority fields: {', '.join(missing_fields)}") except Exception as e: analyzer_validation["checks"]["priority_configured"] = False analyzer_validation["errors"].append(f"Priority check error: {str(e)}") # Check 5: Parameter validation capability try: test_params = {"region": "us-east-1", "lookback_days": 30} validation_result = analyzer.validate_parameters(**test_params) analyzer_validation["checks"]["parameter_validation"] = validation_result.get("valid", False) if not validation_result.get("valid", False): analyzer_validation["errors"].append("Parameter validation failed") except Exception as e: analyzer_validation["checks"]["parameter_validation"] = False analyzer_validation["errors"].append(f"Parameter validation check error: {str(e)}") # Determine analyzer status if analyzer_validation["errors"]: analyzer_validation["status"] = "invalid" elif analyzer_validation["warnings"]: analyzer_validation["status"] = "partial" else: analyzer_validation["status"] = "valid" validation_results["analyzer_validations"][analysis_type] = analyzer_validation # Run integration tests validation_results["integration_tests"] = self._run_analyzer_integration_tests() # Determine overall status analyzer_statuses = [val["status"] for val in validation_results["analyzer_validations"].values()] if all(status == "valid" for status in analyzer_statuses): if validation_results["integration_tests"].get("all_tests_passed", False): validation_results["overall_status"] = "valid" else: validation_results["overall_status"] = "partial" validation_results["recommendations"].append("Some integration tests failed") elif any(status == "invalid" for status in analyzer_statuses): validation_results["overall_status"] = "invalid" validation_results["recommendations"].append("Critical analyzer validation failures detected") else: validation_results["overall_status"] = "partial" validation_results["recommendations"].append("Some analyzers have warnings or missing components") # Add specific recommendations for analysis_type, analyzer_val in validation_results["analyzer_validations"].items(): if analyzer_val["status"] == "invalid": validation_results["recommendations"].append(f"Fix critical issues with {analysis_type} analyzer") elif analyzer_val["warnings"]: validation_results["recommendations"].append(f"Address warnings for {analysis_type} analyzer") return validation_results except Exception as e: logger.error(f"Error during analyzer integration validation: {str(e)}") validation_results["overall_status"] = "error" validation_results["error"] = str(e) return validation_results def _run_analyzer_integration_tests(self) -> Dict[str, Any]: """ Run integration tests for analyzer functionality. Returns: Dictionary containing integration test results """ test_results = { "tests_run": 0, "tests_passed": 0, "tests_failed": 0, "test_details": {}, "all_tests_passed": False } try: # Test 1: Analyzer registry functionality test_results["tests_run"] += 1 try: analyzer_list = self.analysis_engine.analyzer_registry.list_analyzers() if len(analyzer_list) > 0: test_results["tests_passed"] += 1 test_results["test_details"]["registry_functionality"] = { "status": "passed", "message": f"Registry contains {len(analyzer_list)} analyzers" } else: test_results["tests_failed"] += 1 test_results["test_details"]["registry_functionality"] = { "status": "failed", "message": "No analyzers registered" } except Exception as e: test_results["tests_failed"] += 1 test_results["test_details"]["registry_functionality"] = { "status": "failed", "message": f"Registry test error: {str(e)}" } # Test 2: Priority system functionality test_results["tests_run"] += 1 try: available_analyses = self.analysis_engine.get_available_analyses() if len(available_analyses) > 0 and all("priority" in analysis for analysis in available_analyses): test_results["tests_passed"] += 1 test_results["test_details"]["priority_system"] = { "status": "passed", "message": f"Priority system working for {len(available_analyses)} analyses" } else: test_results["tests_failed"] += 1 test_results["test_details"]["priority_system"] = { "status": "failed", "message": "Priority system not properly configured" } except Exception as e: test_results["tests_failed"] += 1 test_results["test_details"]["priority_system"] = { "status": "failed", "message": f"Priority system test error: {str(e)}" } # Test 3: Task creation functionality test_results["tests_run"] += 1 try: analyzer_types = self.analysis_engine.analyzer_registry.list_analyzers() if analyzer_types: tasks = self.analysis_engine.create_parallel_analysis_tasks( analyzer_types[:2], # Test with first 2 analyzers region="us-east-1", lookback_days=7 ) if len(tasks) > 0: test_results["tests_passed"] += 1 test_results["test_details"]["task_creation"] = { "status": "passed", "message": f"Successfully created {len(tasks)} parallel tasks" } else: test_results["tests_failed"] += 1 test_results["test_details"]["task_creation"] = { "status": "failed", "message": "No tasks created" } else: test_results["tests_failed"] += 1 test_results["test_details"]["task_creation"] = { "status": "failed", "message": "No analyzers available for task creation" } except Exception as e: test_results["tests_failed"] += 1 test_results["test_details"]["task_creation"] = { "status": "failed", "message": f"Task creation test error: {str(e)}" } # Test 4: Cross-analyzer insight generation (mock test) test_results["tests_run"] += 1 try: # Create mock results for testing mock_results = { "task1": { "status": "success", "analysis_type": "general_spend", "data": {"total_storage_cost": 1000}, "recommendations": [{"potential_savings": 100}] }, "task2": { "status": "success", "analysis_type": "storage_class", "data": {"optimization_opportunities": 5}, "recommendations": [{"potential_savings": 200}] } } insights = self.generate_cross_analyzer_insights(mock_results) if len(insights) > 0: test_results["tests_passed"] += 1 test_results["test_details"]["cross_analyzer_insights"] = { "status": "passed", "message": f"Generated {len(insights)} cross-analyzer insights" } else: test_results["tests_failed"] += 1 test_results["test_details"]["cross_analyzer_insights"] = { "status": "failed", "message": "No cross-analyzer insights generated" } except Exception as e: test_results["tests_failed"] += 1 test_results["test_details"]["cross_analyzer_insights"] = { "status": "failed", "message": f"Cross-analyzer insights test error: {str(e)}" } # Determine overall test status test_results["all_tests_passed"] = test_results["tests_failed"] == 0 return test_results except Exception as e: logger.error(f"Error running analyzer integration tests: {str(e)}") return { "tests_run": 0, "tests_passed": 0, "tests_failed": 1, "test_details": {"error": {"status": "failed", "message": str(e)}}, "all_tests_passed": False } def validate_integration(self) -> Dict[str, Any]: """ Validate that all components of the parallel execution framework are properly integrated. Returns: Dictionary containing validation results """ validation_results = { "validation_timestamp": datetime.now().isoformat(), "overall_status": "unknown", "component_validations": {}, "integration_tests": {}, "recommendations": [] } try: # Validate ServiceOrchestrator integration try: orchestrator_info = self.service_orchestrator.get_session_info() validation_results["component_validations"]["service_orchestrator"] = { "status": "valid", "session_active": True, "message": "ServiceOrchestrator integration successful" } except Exception as e: validation_results["component_validations"]["service_orchestrator"] = { "status": "invalid", "error": str(e), "message": "ServiceOrchestrator integration failed" } validation_results["recommendations"].append("Check ServiceOrchestrator initialization") # Validate ParallelExecutor integration try: executor_status = self.service_orchestrator.parallel_executor.get_status() if executor_status.get("executor_alive", False): validation_results["component_validations"]["parallel_executor"] = { "status": "valid", "max_workers": executor_status.get("max_workers", 0), "message": "ParallelExecutor integration successful" } else: validation_results["component_validations"]["parallel_executor"] = { "status": "invalid", "message": "ParallelExecutor is not active" } validation_results["recommendations"].append("Restart ParallelExecutor") except Exception as e: validation_results["component_validations"]["parallel_executor"] = { "status": "invalid", "error": str(e), "message": "ParallelExecutor integration failed" } validation_results["recommendations"].append("Check ParallelExecutor configuration") # Validate SessionManager integration try: session_info = self.session_manager.get_session_info(self.session_id) if session_info.get("error") is None: validation_results["component_validations"]["session_manager"] = { "status": "valid", "session_id": self.session_id, "message": "SessionManager integration successful" } else: validation_results["component_validations"]["session_manager"] = { "status": "invalid", "error": session_info.get("error"), "message": "SessionManager session invalid" } validation_results["recommendations"].append("Recreate session or check SessionManager") except Exception as e: validation_results["component_validations"]["session_manager"] = { "status": "invalid", "error": str(e), "message": "SessionManager integration failed" } validation_results["recommendations"].append("Check SessionManager initialization") # Validate S3AnalysisEngine integration try: engine_status = self.analysis_engine.get_engine_status() analyzer_count = len(engine_status.get("registered_analyzers", {})) if analyzer_count > 0: validation_results["component_validations"]["analysis_engine"] = { "status": "valid", "registered_analyzers": analyzer_count, "message": "S3AnalysisEngine integration successful" } else: validation_results["component_validations"]["analysis_engine"] = { "status": "invalid", "message": "No analyzers registered in S3AnalysisEngine" } validation_results["recommendations"].append("Check analyzer registration") except Exception as e: validation_results["component_validations"]["analysis_engine"] = { "status": "invalid", "error": str(e), "message": "S3AnalysisEngine integration failed" } validation_results["recommendations"].append("Check S3AnalysisEngine initialization") # Run integration tests validation_results["integration_tests"] = self._run_integration_tests() # Determine overall status all_valid = all( comp.get("status") == "valid" for comp in validation_results["component_validations"].values() ) if all_valid and validation_results["integration_tests"].get("all_tests_passed", False): validation_results["overall_status"] = "valid" elif all_valid: validation_results["overall_status"] = "partial" validation_results["recommendations"].append("Some integration tests failed") else: validation_results["overall_status"] = "invalid" validation_results["recommendations"].append("Critical component validation failures") return validation_results except Exception as e: logger.error(f"Error during integration validation: {str(e)}") validation_results["overall_status"] = "error" validation_results["error"] = str(e) return validation_results def _run_integration_tests(self) -> Dict[str, Any]: """ Run integration tests to verify parallel execution framework functionality. Returns: Dictionary containing test results """ test_results = { "test_timestamp": datetime.now().isoformat(), "tests_run": 0, "tests_passed": 0, "tests_failed": 0, "test_details": {}, "all_tests_passed": False } try: # Test 1: Session data storage and retrieval test_results["tests_run"] += 1 try: test_data = [{"test_key": "test_value", "timestamp": datetime.now().isoformat()}] success = self.session_manager.store_data(self.session_id, "integration_test", test_data) if success: query_result = self.service_orchestrator.query_session_data( "SELECT * FROM integration_test LIMIT 1" ) if query_result and len(query_result) > 0: test_results["tests_passed"] += 1 test_results["test_details"]["session_storage"] = { "status": "passed", "message": "Session storage and retrieval working" } else: test_results["tests_failed"] += 1 test_results["test_details"]["session_storage"] = { "status": "failed", "message": "Data stored but query failed" } else: test_results["tests_failed"] += 1 test_results["test_details"]["session_storage"] = { "status": "failed", "message": "Failed to store test data" } except Exception as e: test_results["tests_failed"] += 1 test_results["test_details"]["session_storage"] = { "status": "failed", "error": str(e) } # Test 2: Analyzer registry functionality test_results["tests_run"] += 1 try: available_analyzers = self.analysis_engine.analyzer_registry.list_analyzers() if len(available_analyzers) >= 6: # Should have all 6 S3 analyzers test_results["tests_passed"] += 1 test_results["test_details"]["analyzer_registry"] = { "status": "passed", "analyzer_count": len(available_analyzers), "analyzers": available_analyzers } else: test_results["tests_failed"] += 1 test_results["test_details"]["analyzer_registry"] = { "status": "failed", "message": f"Expected 6 analyzers, found {len(available_analyzers)}" } except Exception as e: test_results["tests_failed"] += 1 test_results["test_details"]["analyzer_registry"] = { "status": "failed", "error": str(e) } # Test 3: Task prioritization test_results["tests_run"] += 1 try: test_analyses = ["general_spend", "storage_class", "governance"] priority_order = self.analysis_engine._calculate_advanced_task_priority(test_analyses) if len(priority_order) == len(test_analyses) and all("priority_score" in item for item in priority_order): test_results["tests_passed"] += 1 test_results["test_details"]["task_prioritization"] = { "status": "passed", "priority_order": [item["analysis_type"] for item in priority_order] } else: test_results["tests_failed"] += 1 test_results["test_details"]["task_prioritization"] = { "status": "failed", "message": "Task prioritization failed" } except Exception as e: test_results["tests_failed"] += 1 test_results["test_details"]["task_prioritization"] = { "status": "failed", "error": str(e) } # Determine overall test status test_results["all_tests_passed"] = test_results["tests_failed"] == 0 return test_results except Exception as e: logger.error(f"Error running integration tests: {str(e)}") test_results["error"] = str(e) return test_results # ============================================================================= # MCP Wrapper Functions # ============================================================================= async def run_s3_comprehensive_analysis(arguments: Dict[str, Any]) -> List[Any]: """Run comprehensive S3 cost optimization analysis.""" try: region = arguments.get('region') orchestrator = S3OptimizationOrchestrator(region=region) # Execute comprehensive analysis result = await orchestrator.execute_comprehensive_analysis(**arguments) # Add documentation links result = add_documentation_links(result, "s3") return [{ "type": "text", "text": json.dumps(result, indent=2, default=str) }] except Exception as e: return [{ "type": "text", "text": json.dumps({ "status": "error", "error_code": getattr(e, 'code', 'UnknownError'), "message": str(e), "context": "comprehensive_analysis" }, indent=2) }] async def run_s3_general_spend_analysis(arguments: Dict[str, Any]) -> List[Any]: """Analyze overall S3 spending patterns and usage.""" try: region = arguments.get('region') orchestrator = S3OptimizationOrchestrator(region=region) # Execute general spend analysis result = await orchestrator.execute_analysis("general_spend", **arguments) # Add documentation links result = add_documentation_links(result, "s3") return [{ "type": "text", "text": json.dumps(result, indent=2, default=str) }] except Exception as e: return [{ "type": "text", "text": json.dumps({ "status": "error", "error_code": getattr(e, 'code', 'UnknownError'), "message": str(e), "context": "run_s3_general_spend_analysis" }, indent=2) }] async def run_s3_storage_class_selection(arguments: Dict[str, Any]) -> List[Any]: """Provide guidance on choosing the most cost-effective storage class.""" try: region = arguments.get('region') orchestrator = S3OptimizationOrchestrator(region=region) # Execute storage class analysis (covers both selection and validation) result = await orchestrator.execute_analysis("storage_class", **arguments) # Add documentation links result = add_documentation_links(result, "s3") return [{ "type": "text", "text": json.dumps(result, indent=2, default=str) }] except Exception as e: return [{ "type": "text", "text": json.dumps({ "status": "error", "error_code": getattr(e, 'code', 'UnknownError'), "message": str(e), "context": "storage_class_selection" }, indent=2) }] async def run_s3_storage_class_validation(arguments: Dict[str, Any]) -> List[Any]: """Validate that existing data is stored in the most appropriate storage class.""" try: region = arguments.get('region') orchestrator = S3OptimizationOrchestrator(region=region) # Execute storage class analysis (covers both selection and validation) result = await orchestrator.execute_analysis("storage_class", **arguments) # Add documentation links result = add_documentation_links(result, "s3") return [{ "type": "text", "text": json.dumps(result, indent=2, default=str) }] except Exception as e: return [{ "type": "text", "text": json.dumps({ "status": "error", "error_code": getattr(e, 'code', 'UnknownError'), "message": str(e), "context": "storage_class_validation" }, indent=2) }] async def run_s3_archive_optimization(arguments: Dict[str, Any]) -> List[Any]: """Identify and optimize long-term archive data storage.""" try: region = arguments.get('region') orchestrator = S3OptimizationOrchestrator(region=region) # Execute archive optimization result = await orchestrator.execute_analysis("archive_optimization", **arguments) # Add documentation links result = add_documentation_links(result, "s3") return [{ "type": "text", "text": json.dumps(result, indent=2, default=str) }] except Exception as e: return [{ "type": "text", "text": json.dumps({ "status": "error", "error_code": getattr(e, 'code', 'UnknownError'), "message": str(e), "context": "archive_optimization" }, indent=2) }] async def run_s3_api_cost_minimization(arguments: Dict[str, Any]) -> List[Any]: """Minimize S3 API request charges through access pattern optimization.""" try: region = arguments.get('region') orchestrator = S3OptimizationOrchestrator(region=region) # Execute API cost analysis result = await orchestrator.execute_analysis("api_cost", **arguments) # Add documentation links result = add_documentation_links(result, "s3") return [{ "type": "text", "text": json.dumps(result, indent=2, default=str) }] except Exception as e: return [{ "type": "text", "text": json.dumps({ "status": "error", "error_code": getattr(e, 'code', 'UnknownError'), "message": str(e), "context": "api_cost_minimization" }, indent=2) }] async def run_s3_multipart_cleanup(arguments: Dict[str, Any]) -> List[Any]: """Identify and clean up incomplete multipart uploads.""" try: region = arguments.get('region') orchestrator = S3OptimizationOrchestrator(region=region) # Execute multipart cleanup analysis result = await orchestrator.execute_analysis("multipart_cleanup", **arguments) # Add documentation links result = add_documentation_links(result, "s3") return [{ "type": "text", "text": json.dumps(result, indent=2, default=str) }] except Exception as e: return [{ "type": "text", "text": json.dumps({ "status": "error", "error_code": getattr(e, 'code', 'UnknownError'), "message": str(e), "context": "multipart_cleanup" }, indent=2) }] async def run_s3_governance_check(arguments: Dict[str, Any]) -> List[Any]: """Implement S3 cost controls and governance policy compliance checking.""" try: region = arguments.get('region') orchestrator = S3OptimizationOrchestrator(region=region) # Execute governance check result = await orchestrator.execute_analysis("governance", **arguments) # Add documentation links result = add_documentation_links(result, "s3") return [{ "type": "text", "text": json.dumps(result, indent=2, default=str) }] except Exception as e: return [{ "type": "text", "text": json.dumps({ "status": "error", "error_code": getattr(e, 'code', 'UnknownError'), "message": str(e), "context": "run_s3_governance_check" }, indent=2) }] async def run_s3_comprehensive_optimization_tool(arguments: Dict[str, Any]) -> List[Any]: """Run comprehensive S3 optimization with unified tool.""" try: from .s3_comprehensive_optimization_tool import S3ComprehensiveOptimizationTool region = arguments.get('region') tool = S3ComprehensiveOptimizationTool(region=region) # Execute comprehensive optimization result = await tool.execute_comprehensive_optimization(**arguments) # Add documentation links result = add_documentation_links(result, "s3") return [{ "type": "text", "text": json.dumps(result, indent=2, default=str) }] except Exception as e: return [{ "type": "text", "text": json.dumps({ "status": "error", "error_code": getattr(e, 'code', 'UnknownError'), "message": str(e), "context": "run_s3_comprehensive_optimization_tool.execution" }, indent=2) }] async def run_s3_quick_analysis(arguments: Dict[str, Any]) -> List[Any]: """Run a quick S3 analysis focusing on the most impactful optimizations.""" try: region = arguments.get('region') orchestrator = S3OptimizationOrchestrator(region=region) # Execute a subset of high-impact analyses with short timeout quick_analyses = ["general_spend", "multipart_cleanup", "governance"] results = {} for analysis_type in quick_analyses: try: result = await orchestrator.execute_analysis( analysis_type, timeout_seconds=10, # Quick timeout **arguments ) results[analysis_type] = result except Exception as e: results[analysis_type] = { "status": "error", "message": str(e) } # Aggregate quick results quick_result = { "status": "success", "analysis_type": "quick_analysis", "results": results, "message": f"Quick analysis completed for {len(quick_analyses)} analyses" } # Add documentation links quick_result = add_documentation_links(quick_result, "s3") return [{ "type": "text", "text": json.dumps(quick_result, indent=2, default=str) }] except Exception as e: return [{ "type": "text", "text": json.dumps({ "status": "error", "error_code": getattr(e, 'code', 'UnknownError'), "message": str(e), "context": "quick_analysis" }, indent=2) }] async def run_s3_bucket_analysis(arguments: Dict[str, Any]) -> List[Any]: """Analyze specific S3 buckets for optimization opportunities.""" try: region = arguments.get('region') bucket_names = arguments.get('bucket_names', []) if not bucket_names: return [{ "type": "text", "text": json.dumps({ "status": "error", "message": "bucket_names parameter is required", "context": "bucket_analysis" }, indent=2) }] orchestrator = S3OptimizationOrchestrator(region=region) # Execute comprehensive analysis for specific buckets result = await orchestrator.execute_comprehensive_analysis(**arguments) # Add documentation links result = add_documentation_links(result, "s3") return [{ "type": "text", "text": json.dumps(result, indent=2, default=str) }] except Exception as e: return [{ "type": "text", "text": json.dumps({ "status": "error", "error_code": getattr(e, 'code', 'UnknownError'), "message": str(e), "context": "bucket_analysis" }, indent=2) }]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aws-samples/sample-cfm-tips-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server