Skip to main content
Glama

CFM Tips - Cost Optimization MCP Server

by aws-samples
analysis_engine.py82.9 kB
""" CloudWatch Analysis Engine for coordinating all analyzers with parallel execution. This module provides the core analysis engine that coordinates all CloudWatch analyzers with the ServiceOrchestrator for parallel execution and session-sql integration. Features: - Analyzer registry and dynamic loading capabilities - Comprehensive error handling and fallback coordination - Performance optimization integration (caching, memory management, timeouts) - Session-sql integration for storing analysis results - Cross-analysis insights and correlation detection """ import logging import asyncio import importlib import inspect from typing import Dict, List, Any, Optional, Callable, Type, Union from datetime import datetime, timedelta from dataclasses import dataclass from concurrent.futures import TimeoutError as ConcurrentTimeoutError from playbooks.cloudwatch.base_analyzer import BaseAnalyzer, get_analyzer_registry from playbooks.cloudwatch.general_spend_analyzer import GeneralSpendAnalyzer from playbooks.cloudwatch.metrics_optimization_analyzer import MetricsOptimizationAnalyzer from playbooks.cloudwatch.logs_optimization_analyzer import LogsOptimizationAnalyzer from playbooks.cloudwatch.alarms_and_dashboards_analyzer import AlarmsAndDashboardsAnalyzer from playbooks.cloudwatch.cost_controller import CostController, CostPreferences from services.cloudwatch_pricing import CloudWatchPricing from utils.service_orchestrator import ServiceOrchestrator from utils.parallel_executor import create_task, ParallelTask from utils.performance_monitor import get_performance_monitor from utils.memory_manager import get_memory_manager from utils.intelligent_cache import get_analysis_results_cache from utils.logging_config import log_cloudwatch_operation logger = logging.getLogger(__name__) @dataclass class AnalyzerConfig: """Configuration for analyzer instances.""" analyzer_class: Type[BaseAnalyzer] enabled: bool = True priority: int = 1 timeout_seconds: float = 60.0 retry_attempts: int = 2 cache_results: bool = True dependencies: List[str] = None def __post_init__(self): if self.dependencies is None: self.dependencies = [] @dataclass class EngineConfig: """Configuration for the CloudWatch Analysis Engine.""" max_parallel_analyzers: int = 4 default_timeout_seconds: float = 120.0 enable_caching: bool = True enable_performance_monitoring: bool = True enable_memory_management: bool = True cache_ttl_seconds: int = 3600 retry_failed_analyzers: bool = True cross_analysis_insights: bool = True class CloudWatchAnalysisEngine: """ Core analysis engine that coordinates all CloudWatch analyzers. This engine provides: - Analyzer registry and dynamic loading capabilities - Integration with ServiceOrchestrator for parallel execution - Comprehensive error handling and fallback coordination - Performance optimization integration (caching, memory management, timeouts) - Session-sql integration for storing analysis results - Cross-analysis insights and correlation detection """ def __init__(self, region: str = None, session_id: str = None, config: EngineConfig = None, performance_monitor=None, memory_manager=None, timeout_handler=None, pricing_cache=None, analysis_results_cache=None): """ Initialize the CloudWatch Analysis Engine with performance optimizations. Args: region: AWS region for analysis session_id: Session ID for data persistence config: Engine configuration performance_monitor: Performance monitoring instance memory_manager: Memory management instance timeout_handler: Timeout handling instance pricing_cache: Pricing cache instance analysis_results_cache: Analysis results cache instance """ self.region = region self.session_id = session_id self.config = config or EngineConfig() self.logger = logging.getLogger(__name__) # Initialize performance optimization components self.performance_monitor = performance_monitor or get_performance_monitor() self.memory_manager = memory_manager or get_memory_manager() self.pricing_cache = pricing_cache self.analysis_results_cache = analysis_results_cache or get_analysis_results_cache() self.timeout_handler = timeout_handler # Initialize core services from services.cloudwatch_service import CloudWatchService, CloudWatchServiceConfig cloudwatch_config = CloudWatchServiceConfig(region=region) self.cloudwatch_service = CloudWatchService(config=cloudwatch_config) self.pricing_service = CloudWatchPricing(region=region) self.cost_controller = CostController() # Initialize service orchestrator for parallel execution self.service_orchestrator = ServiceOrchestrator(session_id=session_id) # Performance optimization components with enhanced configuration self.performance_monitor = performance_monitor or (get_performance_monitor() if self.config.enable_performance_monitoring else None) self.memory_manager = memory_manager or (get_memory_manager() if self.config.enable_memory_management else None) self.cache = analysis_results_cache or (get_analysis_results_cache() if self.config.enable_caching else None) self.timeout_handler = timeout_handler # Initialize CloudWatch-specific performance optimizations self._setup_cloudwatch_performance_optimizations() # Integrate performance components if self.performance_monitor and self.memory_manager: self.memory_manager.set_performance_monitor(self.performance_monitor) if self.cache and self.performance_monitor: self.cache.set_performance_monitor(self.performance_monitor) if self.cache and self.memory_manager: self.memory_manager.add_cache_reference(self.cache) if self.timeout_handler and self.performance_monitor: self.timeout_handler.set_performance_monitor(self.performance_monitor) # Analyzer registry and configurations self.analyzer_registry = get_analyzer_registry() self.analyzer_configs: Dict[str, AnalyzerConfig] = {} self.analyzer_instances: Dict[str, BaseAnalyzer] = {} # Initialize analyzers self._initialize_default_analyzers() self._load_analyzer_instances() # Engine state self.engine_id = f"cloudwatch_engine_{int(datetime.now().timestamp())}" self.active_analyses: Dict[str, Dict[str, Any]] = {} # Performance tracking self.engine_start_time = datetime.now() self.total_analyses_run = 0 self.successful_analyses = 0 self.failed_analyses = 0 log_cloudwatch_operation(self.logger, "analysis_engine_initialized", region=region, session_id=session_id, engine_id=self.engine_id, analyzers_count=len(self.analyzer_instances), performance_monitoring=self.config.enable_performance_monitoring, memory_management=self.config.enable_memory_management, caching_enabled=self.config.enable_caching, progressive_timeouts=self.timeout_handler is not None, cloudwatch_optimizations=True) @property def analyzers(self) -> Dict[str, BaseAnalyzer]: """ Property to access analyzer instances for backward compatibility. Returns: Dictionary of analyzer instances keyed by analyzer name """ return self.analyzer_instances def _initialize_default_analyzers(self): """Initialize default analyzer configurations.""" default_configs = { 'general_spend': AnalyzerConfig( analyzer_class=GeneralSpendAnalyzer, enabled=True, priority=4, # Highest priority - foundational analysis timeout_seconds=90.0, retry_attempts=2, cache_results=True, dependencies=[] ), 'logs_optimization': AnalyzerConfig( analyzer_class=LogsOptimizationAnalyzer, enabled=True, priority=3, # High priority - often highest cost impact timeout_seconds=120.0, retry_attempts=2, cache_results=True, dependencies=['general_spend'] ), 'metrics_optimization': AnalyzerConfig( analyzer_class=MetricsOptimizationAnalyzer, enabled=True, priority=2, # Medium priority timeout_seconds=90.0, retry_attempts=2, cache_results=True, dependencies=[] ), 'alarms_and_dashboards': AnalyzerConfig( analyzer_class=AlarmsAndDashboardsAnalyzer, enabled=True, priority=1, # Lower priority - efficiency focused timeout_seconds=60.0, retry_attempts=2, cache_results=True, dependencies=['metrics_optimization'] ) } for analyzer_name, config in default_configs.items(): self.register_analyzer(analyzer_name, config) def _load_analyzer_instances(self): """Load analyzer instances based on configurations.""" for analyzer_name, config in self.analyzer_configs.items(): if not config.enabled: continue try: # Create analyzer instance analyzer = config.analyzer_class( cost_explorer_service=None, # Will be injected per analysis config_service=None, # Will be injected per analysis metrics_service=None, # Will be injected per analysis cloudwatch_service=self.cloudwatch_service, pricing_service=self.pricing_service, performance_monitor=self.performance_monitor, memory_manager=self.memory_manager ) # Register with global registry self.analyzer_registry.register(analyzer) # Store in local instances self.analyzer_instances[analyzer_name] = analyzer log_cloudwatch_operation(self.logger, "analyzer_loaded", analyzer_name=analyzer_name, analyzer_class=config.analyzer_class.__name__, priority=config.priority, timeout=config.timeout_seconds) except Exception as e: self.logger.error(f"Failed to load analyzer {analyzer_name}: {str(e)}") # Continue loading other analyzers def register_analyzer(self, analyzer_name: str, config: AnalyzerConfig): """ Register an analyzer configuration. Args: analyzer_name: Unique name for the analyzer config: Analyzer configuration """ self.analyzer_configs[analyzer_name] = config self.logger.info(f"Registered analyzer configuration: {analyzer_name}") def load_analyzer_dynamically(self, module_path: str, class_name: str, analyzer_name: str, config: AnalyzerConfig = None) -> bool: """ Dynamically load an analyzer from a module. Args: module_path: Python module path (e.g., 'playbooks.cloudwatch.custom_analyzer') class_name: Name of the analyzer class analyzer_name: Unique name for the analyzer config: Optional analyzer configuration Returns: True if successfully loaded, False otherwise """ try: # Import the module module = importlib.import_module(module_path) # Get the analyzer class analyzer_class = getattr(module, class_name) # Verify it's a BaseAnalyzer subclass if not issubclass(analyzer_class, BaseAnalyzer): raise ValueError(f"{class_name} is not a subclass of BaseAnalyzer") # Create configuration if not provided if config is None: config = AnalyzerConfig( analyzer_class=analyzer_class, enabled=True, priority=1, timeout_seconds=self.config.default_timeout_seconds ) else: config.analyzer_class = analyzer_class # Register the analyzer self.register_analyzer(analyzer_name, config) # Load the instance self._load_single_analyzer(analyzer_name, config) self.logger.info(f"Dynamically loaded analyzer: {analyzer_name} from {module_path}.{class_name}") return True except Exception as e: self.logger.error(f"Failed to dynamically load analyzer {analyzer_name}: {str(e)}") return False def _setup_cloudwatch_performance_optimizations(self): """Set up CloudWatch-specific performance optimizations.""" try: # Configure memory management for CloudWatch data processing if self.memory_manager: # Register CloudWatch-specific cleanup callbacks self.memory_manager.register_cleanup_callback(self._cleanup_cloudwatch_analysis_data) # Set CloudWatch-specific memory thresholds from utils.memory_manager import MemoryThreshold cloudwatch_thresholds = MemoryThreshold( warning_percent=70.0, critical_percent=80.0, cleanup_percent=85.0, max_memory_mb=1536 # 1.5GB for analysis engine ) self.memory_manager.thresholds = cloudwatch_thresholds # Configure progressive timeouts for CloudWatch analyses if self.timeout_handler: from utils.progressive_timeout import TimeoutConfiguration, ComplexityLevel # CloudWatch analysis-specific timeout configuration timeout_config = TimeoutConfiguration( base_timeout=60.0, complexity_multiplier={ ComplexityLevel.VERY_LOW: 0.5, ComplexityLevel.LOW: 1.0, ComplexityLevel.MEDIUM: 2.0, ComplexityLevel.HIGH: 3.5, ComplexityLevel.VERY_HIGH: 6.0 }, data_size_multiplier=0.2, bucket_count_multiplier=0.0, # Not applicable historical_performance_weight=0.5, system_load_weight=0.3, min_timeout=20.0, max_timeout=900.0, # 15 minutes max grace_period=30.0 ) self.timeout_handler.config = timeout_config # Set up intelligent caching for CloudWatch metadata if self.cache: # Register CloudWatch-specific cache warming self.cache.register_warming_function("cloudwatch_analysis_patterns", self._warm_analysis_patterns_cache) self.logger.info("CloudWatch-specific performance optimizations configured") except Exception as e: self.logger.error(f"Error setting up CloudWatch performance optimizations: {str(e)}") def _cleanup_cloudwatch_analysis_data(self): """Cleanup callback for CloudWatch analysis data.""" try: # Clean up temporary analysis data if hasattr(self, '_temp_analysis_data'): self._temp_analysis_data.clear() # Clean up analyzer-specific temporary data for analyzer in self.analyzer_instances.values(): if hasattr(analyzer, 'cleanup_temporary_data'): analyzer.cleanup_temporary_data() # Force garbage collection import gc collected = gc.collect() self.logger.debug(f"CloudWatch analysis cleanup collected {collected} objects") except Exception as e: self.logger.error(f"Error in CloudWatch analysis data cleanup: {str(e)}") def _warm_analysis_patterns_cache(self, cache, region: str = None): """Warm cache with common CloudWatch analysis patterns.""" try: if not region: region = self.region self.logger.info(f"Warming CloudWatch analysis patterns cache for region: {region}") # Cache common analysis patterns and templates patterns = [ f"general_spend_pattern_{region}", f"logs_optimization_pattern_{region}", f"metrics_optimization_pattern_{region}", f"alarms_dashboards_pattern_{region}" ] for pattern in patterns: cache.put(pattern, { "pattern": True, "region": region, "warmed_at": datetime.now().isoformat() }, ttl_seconds=3600) except Exception as e: self.logger.error(f"Error warming analysis patterns cache: {str(e)}") def _load_single_analyzer(self, analyzer_name: str, config: AnalyzerConfig): """Load a single analyzer instance.""" try: analyzer = config.analyzer_class( cost_explorer_service=None, config_service=None, metrics_service=None, cloudwatch_service=self.cloudwatch_service, pricing_service=self.pricing_service, performance_monitor=self.performance_monitor, memory_manager=self.memory_manager ) self.analyzer_registry.register(analyzer) self.analyzer_instances[analyzer_name] = analyzer except Exception as e: self.logger.error(f"Failed to load analyzer instance {analyzer_name}: {str(e)}") raise async def run_analysis(self, analysis_type: str, **kwargs) -> Dict[str, Any]: """ Run a specific analysis type with comprehensive error handling and performance optimization. Args: analysis_type: Type of analysis to run **kwargs: Analysis parameters including cost preferences Returns: Dictionary containing analysis results """ start_time = datetime.now() execution_id = f"{analysis_type}_{int(start_time.timestamp())}" # Start performance monitoring monitoring_session = None if self.performance_monitor: monitoring_session = self.performance_monitor.start_analysis_monitoring( analysis_type, execution_id ) # Start memory tracking memory_tracker = None if self.memory_manager: memory_tracker = self.memory_manager.start_memory_tracking( f"analysis_{analysis_type}_{execution_id}" ) try: # Validate analyzer exists and is enabled if analysis_type not in self.analyzer_instances: return self._create_error_result( analysis_type, start_time, execution_id, f'Unknown analysis type: {analysis_type}', available_types=list(self.analyzer_instances.keys()), monitoring_session=monitoring_session, memory_tracker=memory_tracker ) # Get analyzer configuration config = self.analyzer_configs.get(analysis_type) if not config or not config.enabled: return self._create_error_result( analysis_type, start_time, execution_id, f'Analyzer {analysis_type} is disabled', monitoring_session=monitoring_session, memory_tracker=memory_tracker ) # Check cache first cache_key = None if self.cache and config.cache_results: cache_key = self._generate_cache_key(analysis_type, kwargs) cached_result = self.cache.get(cache_key) if cached_result: self.logger.info(f"Cache hit for analysis {analysis_type}") if self.performance_monitor: self.performance_monitor.record_cache_hit("analysis_results", analysis_type) # Add cache metadata cached_result['cache_metadata'] = { 'cache_hit': True, 'cached_at': cached_result.get('timestamp'), 'retrieved_at': datetime.now().isoformat() } # End monitoring self._end_monitoring(monitoring_session, memory_tracker, True) return cached_result else: if self.performance_monitor: self.performance_monitor.record_cache_miss("analysis_results", analysis_type) # Validate and sanitize cost preferences cost_preferences = self._validate_cost_preferences(kwargs) # Check dependencies dependency_results = await self._check_dependencies(analysis_type, config, **kwargs) # Track active analysis self.active_analyses[execution_id] = { 'analysis_type': analysis_type, 'start_time': start_time, 'status': 'running', 'monitoring_session': monitoring_session, 'memory_tracker': memory_tracker } log_cloudwatch_operation(self.logger, "analysis_start", analysis_type=analysis_type, execution_id=execution_id, cost_preferences=str(cost_preferences), dependencies_met=len(dependency_results)) # Get the analyzer analyzer = self.analyzer_instances[analysis_type] # Calculate intelligent timeout for this analysis intelligent_timeout = self._calculate_intelligent_timeout(analysis_type, config, **kwargs) kwargs['calculated_timeout'] = intelligent_timeout # Run the analysis with timeout and retry logic result = await self._run_analysis_with_retry( analyzer, analysis_type, config, dependency_results, **kwargs ) # Add comprehensive metadata result = self._add_analysis_metadata( result, analysis_type, start_time, execution_id, cost_preferences, dependency_results ) # Cache the result if successful if (self.cache and config.cache_results and result.get('status') == 'success'): self.cache.put( cache_key, result, ttl_seconds=self.config.cache_ttl_seconds, tags={'analysis_type': analysis_type, 'region': self.region} ) # Update statistics self.total_analyses_run += 1 if result.get('status') == 'success': self.successful_analyses += 1 else: self.failed_analyses += 1 log_cloudwatch_operation(self.logger, "analysis_complete", analysis_type=analysis_type, execution_id=execution_id, status=result.get('status', 'unknown'), execution_time=result.get('execution_time', 0)) # End monitoring self._end_monitoring(monitoring_session, memory_tracker, result.get('status') == 'success') # Remove from active analyses self.active_analyses.pop(execution_id, None) return result except Exception as e: self.logger.error(f"Analysis {analysis_type} failed with exception: {str(e)}") # Update statistics self.total_analyses_run += 1 self.failed_analyses += 1 # End monitoring self._end_monitoring(monitoring_session, memory_tracker, False, str(e)) # Remove from active analyses self.active_analyses.pop(execution_id, None) import traceback full_traceback = traceback.format_exc() return self._create_error_result( analysis_type, start_time, execution_id, str(e), error_type=type(e).__name__, full_traceback=full_traceback, monitoring_session=monitoring_session, memory_tracker=memory_tracker ) def _validate_cost_preferences(self, kwargs: Dict[str, Any]) -> CostPreferences: """Validate and sanitize cost preferences.""" raw_preferences = { 'allow_cost_explorer': kwargs.get('allow_cost_explorer', False), 'allow_aws_config': kwargs.get('allow_aws_config', False), 'allow_cloudtrail': kwargs.get('allow_cloudtrail', False), 'allow_minimal_cost_metrics': kwargs.get('allow_minimal_cost_metrics', False), } try: return self.cost_controller.validate_and_sanitize_preferences(raw_preferences) except ValueError as e: raise ValueError(f'Invalid cost preferences: {str(e)}') def _generate_cache_key(self, analysis_type: str, kwargs: Dict[str, Any]) -> List[Any]: """Generate cache key for analysis results.""" # Include relevant parameters that affect the analysis cache_components = [ 'cloudwatch_analysis', analysis_type, self.region, kwargs.get('lookback_days', 30), kwargs.get('allow_cost_explorer', False), kwargs.get('allow_aws_config', False), kwargs.get('allow_cloudtrail', False), kwargs.get('allow_minimal_cost_metrics', False), # Add specific resource filters if provided tuple(sorted(kwargs.get('log_group_names', []))), tuple(sorted(kwargs.get('alarm_names', []))), tuple(sorted(kwargs.get('dashboard_names', []))) ] return cache_components async def _check_dependencies(self, analysis_type: str, config: AnalyzerConfig, **kwargs) -> Dict[str, Any]: """Check and resolve analyzer dependencies.""" dependency_results = {} for dependency in config.dependencies: if dependency in self.analyzer_instances: try: # Run dependency analysis if not already available dep_result = await self.run_analysis(dependency, **kwargs) dependency_results[dependency] = dep_result if dep_result.get('status') != 'success': self.logger.warning( f"Dependency {dependency} for {analysis_type} failed: " f"{dep_result.get('error_message', 'Unknown error')}" ) except Exception as e: self.logger.error(f"Failed to resolve dependency {dependency}: {str(e)}") dependency_results[dependency] = { 'status': 'error', 'error_message': str(e) } return dependency_results async def _run_analysis_with_retry(self, analyzer: BaseAnalyzer, analysis_type: str, config: AnalyzerConfig, dependency_results: Dict[str, Any], **kwargs) -> Dict[str, Any]: """Run analysis with retry logic and intelligent timeout handling.""" last_error = None # Use calculated timeout or fallback to config timeout_seconds = kwargs.get('calculated_timeout', kwargs.get('timeout_seconds', config.timeout_seconds)) for attempt in range(config.retry_attempts + 1): try: # Register large object for memory management if self.memory_manager: analysis_id = f"{analysis_type}_attempt_{attempt}_{int(datetime.now().timestamp())}" self.memory_manager.register_large_object( analysis_id, kwargs, size_mb=0.5, # Estimated size cleanup_callback=lambda: self.logger.debug(f"Cleaned up analysis data for {analysis_id}") ) # Run analysis with intelligent timeout result = await asyncio.wait_for( analyzer.execute_with_error_handling(**kwargs), timeout=timeout_seconds ) # Record successful execution time for future timeout calculations if self.timeout_handler and 'execution_time' in result: from utils.progressive_timeout import ComplexityLevel complexity = self.timeout_handler.get_complexity_level(analysis_type, **kwargs) self.timeout_handler.record_execution_time( analysis_type, result['execution_time'], complexity ) # Add dependency results to the analysis result if dependency_results: result['dependency_results'] = dependency_results # Add timeout metadata result['timeout_metadata'] = { 'calculated_timeout': timeout_seconds, 'attempt_number': attempt + 1, 'intelligent_timeout_used': 'calculated_timeout' in kwargs } return result except (asyncio.TimeoutError, ConcurrentTimeoutError) as e: last_error = f"Analysis timed out after {timeout_seconds} seconds" self.logger.warning(f"Attempt {attempt + 1} for {analysis_type} timed out after {timeout_seconds}s") # Record timeout for future timeout calculations if self.timeout_handler: from utils.progressive_timeout import ComplexityLevel complexity = self.timeout_handler.get_complexity_level(analysis_type, **kwargs) # Record timeout as a very long execution time to adjust future timeouts self.timeout_handler.record_execution_time( analysis_type, timeout_seconds * 1.5, # Indicate it would have taken longer complexity ) if attempt < config.retry_attempts: # Exponential backoff for retries with increased timeout await asyncio.sleep(2 ** attempt) timeout_seconds *= 1.5 # Increase timeout for retry continue else: break except Exception as e: last_error = str(e) self.logger.error(f"Attempt {attempt + 1} for {analysis_type} failed: {str(e)}") if attempt < config.retry_attempts: await asyncio.sleep(2 ** attempt) continue else: break # All attempts failed return { 'status': 'error', 'error_message': last_error or 'Analysis failed after all retry attempts', 'analysis_type': analysis_type, 'retry_attempts': config.retry_attempts, 'dependency_results': dependency_results, 'timeout_metadata': { 'final_timeout_used': timeout_seconds, 'total_attempts': config.retry_attempts + 1, 'intelligent_timeout_used': 'calculated_timeout' in kwargs } } def _add_analysis_metadata(self, result: Dict[str, Any], analysis_type: str, start_time: datetime, execution_id: str, cost_preferences: CostPreferences, dependency_results: Dict[str, Any]) -> Dict[str, Any]: """Add comprehensive metadata to analysis results.""" execution_time = (datetime.now() - start_time).total_seconds() result['engine_metadata'] = { 'analysis_engine_version': '2.0.0', 'engine_id': self.engine_id, 'execution_id': execution_id, 'session_id': self.session_id, 'region': self.region, 'cost_preferences': cost_preferences.__dict__, 'execution_time': execution_time, 'dependencies_count': len(dependency_results), 'cache_enabled': self.config.enable_caching, 'performance_monitoring': self.config.enable_performance_monitoring, 'memory_management': self.config.enable_memory_management } # Ensure required fields exist if 'timestamp' not in result: result['timestamp'] = datetime.now().isoformat() if 'execution_time' not in result: result['execution_time'] = execution_time return result def _create_error_result(self, analysis_type: str, start_time: datetime, execution_id: str, error_message: str, error_type: str = None, available_types: List[str] = None, full_traceback: str = None, monitoring_session: str = None, memory_tracker: str = None) -> Dict[str, Any]: """Create standardized error result with full exception details.""" execution_time = (datetime.now() - start_time).total_seconds() result = { 'status': 'error', 'error_message': error_message, 'analysis_type': analysis_type, 'timestamp': start_time.isoformat(), 'execution_time': execution_time, 'engine_metadata': { 'analysis_engine_version': '2.0.0', 'engine_id': self.engine_id, 'execution_id': execution_id, 'session_id': self.session_id, 'region': self.region } } if error_type: result['error_type'] = error_type if available_types: result['available_types'] = available_types if full_traceback: result['full_exception_details'] = { 'traceback': full_traceback, 'error_location': self._extract_error_location(full_traceback) } # End monitoring for error case self._end_monitoring(monitoring_session, memory_tracker, False, error_message) return result def _extract_error_location(self, traceback_str: str) -> Dict[str, Any]: """Extract error location information from traceback.""" try: lines = traceback_str.strip().split('\n') # Find the last "File" line which indicates where the error occurred for line in reversed(lines): if line.strip().startswith('File "'): # Extract file, line number, and function import re match = re.search(r'File "([^"]+)", line (\d+), in (.+)', line) if match: return { "file": match.group(1), "line": int(match.group(2)), "function": match.group(3) } except Exception: pass return {"file": "unknown", "line": 0, "function": "unknown"} def _calculate_intelligent_timeout(self, analysis_type: str, config: AnalyzerConfig, **kwargs) -> float: """Calculate intelligent timeout based on analysis complexity and historical performance.""" try: if self.timeout_handler: # Create analysis context for timeout calculation context = self.timeout_handler.create_analysis_context(analysis_type, **kwargs) # Calculate timeout using progressive timeout handler timeout_result = self.timeout_handler.calculate_timeout(context) self.logger.debug(f"Calculated intelligent timeout for {analysis_type}: {timeout_result.final_timeout:.1f}s") self.logger.debug(f"Timeout reasoning: {'; '.join(timeout_result.reasoning)}") return timeout_result.final_timeout else: # Fallback to configuration timeout return config.timeout_seconds except Exception as e: self.logger.warning(f"Error calculating intelligent timeout for {analysis_type}: {str(e)}") return config.timeout_seconds def _end_monitoring(self, monitoring_session: str = None, memory_tracker: str = None, success: bool = True, error_message: str = None): """End performance and memory monitoring.""" if monitoring_session and self.performance_monitor: self.performance_monitor.end_analysis_monitoring( monitoring_session, success, error_message ) if memory_tracker and self.memory_manager: self.memory_manager.stop_memory_tracking(memory_tracker) async def run_comprehensive_analysis(self, **kwargs) -> Dict[str, Any]: """ Run all enabled CloudWatch analyses in parallel with intelligent orchestration. Args: **kwargs: Analysis parameters including cost preferences Returns: Dictionary containing comprehensive analysis results """ start_time = datetime.now() execution_id = f"comprehensive_{int(start_time.timestamp())}" # Start comprehensive monitoring monitoring_session = None if self.performance_monitor: monitoring_session = self.performance_monitor.start_analysis_monitoring( "comprehensive_analysis", execution_id ) memory_tracker = None if self.memory_manager: memory_tracker = self.memory_manager.start_memory_tracking( f"comprehensive_analysis_{execution_id}" ) try: # Validate cost preferences cost_preferences = self._validate_cost_preferences(kwargs) log_cloudwatch_operation(self.logger, "comprehensive_analysis_start", execution_id=execution_id, enabled_analyzers=len(self.analyzer_instances), cost_preferences=str(cost_preferences)) # Check cache for comprehensive analysis cache_key = None if self.cache and self.config.enable_caching: cache_key = self._generate_cache_key("comprehensive", kwargs) cached_result = self.cache.get(cache_key) if cached_result: self.logger.info("Cache hit for comprehensive analysis") if self.performance_monitor: self.performance_monitor.record_cache_hit("analysis_results", "comprehensive") cached_result['cache_metadata'] = { 'cache_hit': True, 'cached_at': cached_result.get('timestamp'), 'retrieved_at': datetime.now().isoformat() } self._end_monitoring(monitoring_session, memory_tracker, True) return cached_result else: if self.performance_monitor: self.performance_monitor.record_cache_miss("analysis_results", "comprehensive") # Get enabled analyzers sorted by priority and dependencies execution_plan = self._create_execution_plan() if not execution_plan: return self._create_error_result( "comprehensive", start_time, execution_id, "No enabled analyzers found", monitoring_session=monitoring_session, memory_tracker=memory_tracker ) # Execute analyses according to plan analysis_results = await self._execute_analysis_plan(execution_plan, **kwargs) # Generate cross-analysis insights cross_insights = {} if self.config.cross_analysis_insights: cross_insights = self._generate_cross_analysis_insights(analysis_results) # Create comprehensive result comprehensive_result = self._create_comprehensive_result( analysis_results, cross_insights, cost_preferences, start_time, execution_id, execution_plan ) # Cache the result if successful if (self.cache and self.config.enable_caching and comprehensive_result.get('status') in ['success', 'partial']): self.cache.put( cache_key, comprehensive_result, ttl_seconds=self.config.cache_ttl_seconds, tags={'analysis_type': 'comprehensive', 'region': self.region} ) # Update statistics self.total_analyses_run += 1 if comprehensive_result.get('status') == 'success': self.successful_analyses += 1 else: self.failed_analyses += 1 log_cloudwatch_operation(self.logger, "comprehensive_analysis_complete", execution_id=execution_id, status=comprehensive_result.get('status'), total_analyses=len(execution_plan), successful_analyses=comprehensive_result.get('analysis_summary', {}).get('successful_analyses', 0), execution_time=comprehensive_result.get('total_execution_time', 0)) self._end_monitoring(monitoring_session, memory_tracker, comprehensive_result.get('status') in ['success', 'partial']) return comprehensive_result except Exception as e: self.logger.error(f"Comprehensive analysis failed: {str(e)}") self.total_analyses_run += 1 self.failed_analyses += 1 self._end_monitoring(monitoring_session, memory_tracker, False, str(e)) return self._create_error_result( "comprehensive", start_time, execution_id, str(e), error_type=type(e).__name__, monitoring_session=monitoring_session, memory_tracker=memory_tracker ) def _create_execution_plan(self) -> List[Dict[str, Any]]: """Create intelligent execution plan based on priorities and dependencies.""" enabled_analyzers = { name: config for name, config in self.analyzer_configs.items() if config.enabled and name in self.analyzer_instances } if not enabled_analyzers: return [] # Sort by priority (higher priority first) and handle dependencies execution_plan = [] executed = set() # Create dependency graph dependency_graph = {} for name, config in enabled_analyzers.items(): dependency_graph[name] = [ dep for dep in config.dependencies if dep in enabled_analyzers ] # Topological sort with priority consideration while len(executed) < len(enabled_analyzers): # Find analyzers with no unmet dependencies ready_analyzers = [] for name, config in enabled_analyzers.items(): if name not in executed: unmet_deps = [ dep for dep in dependency_graph[name] if dep not in executed ] if not unmet_deps: ready_analyzers.append((name, config)) if not ready_analyzers: # Circular dependency or other issue remaining = set(enabled_analyzers.keys()) - executed self.logger.warning(f"Circular dependencies detected for analyzers: {remaining}") # Add remaining analyzers anyway for name in remaining: config = enabled_analyzers[name] ready_analyzers.append((name, config)) # Sort ready analyzers by priority ready_analyzers.sort(key=lambda x: x[1].priority, reverse=True) # Add to execution plan for name, config in ready_analyzers: if name not in executed: execution_plan.append({ 'analyzer_name': name, 'config': config, 'dependencies': dependency_graph[name].copy() }) executed.add(name) return execution_plan async def _execute_analysis_plan(self, execution_plan: List[Dict[str, Any]], **kwargs) -> Dict[str, Any]: """Execute analyses according to the execution plan.""" analysis_results = {} dependency_results = {} # Determine if we can run analyses in parallel max_parallel = min(self.config.max_parallel_analyzers, len(execution_plan)) if max_parallel > 1: # Parallel execution for independent analyses return await self._execute_parallel_analyses(execution_plan, **kwargs) else: # Sequential execution return await self._execute_sequential_analyses(execution_plan, **kwargs) async def _execute_parallel_analyses(self, execution_plan: List[Dict[str, Any]], **kwargs) -> Dict[str, Any]: """Execute analyses in parallel where possible.""" analysis_results = {} # Group analyses by dependency level dependency_levels = [] remaining_plan = execution_plan.copy() completed = set() while remaining_plan: current_level = [] next_remaining = [] for plan_item in remaining_plan: analyzer_name = plan_item['analyzer_name'] dependencies = plan_item['dependencies'] # Check if all dependencies are completed if all(dep in completed for dep in dependencies): current_level.append(plan_item) else: next_remaining.append(plan_item) if current_level: dependency_levels.append(current_level) completed.update(item['analyzer_name'] for item in current_level) remaining_plan = next_remaining else: # No progress possible, add remaining items dependency_levels.append(next_remaining) break # Execute each dependency level for level_index, level_analyses in enumerate(dependency_levels): self.logger.info(f"Executing dependency level {level_index + 1} with {len(level_analyses)} analyses") # Create tasks for this level level_tasks = [] for plan_item in level_analyses: analyzer_name = plan_item['analyzer_name'] # Create analysis task async def run_single_analysis(name=analyzer_name): return await self.run_analysis(name, **kwargs) level_tasks.append(run_single_analysis()) # Execute level in parallel level_results = await asyncio.gather(*level_tasks, return_exceptions=True) # Process results for i, result in enumerate(level_results): analyzer_name = level_analyses[i]['analyzer_name'] if isinstance(result, Exception): analysis_results[analyzer_name] = { 'status': 'error', 'error_message': str(result), 'analysis_type': analyzer_name } else: analysis_results[analyzer_name] = result return analysis_results async def _execute_sequential_analyses(self, execution_plan: List[Dict[str, Any]], **kwargs) -> Dict[str, Any]: """Execute analyses sequentially.""" analysis_results = {} for plan_item in execution_plan: analyzer_name = plan_item['analyzer_name'] try: result = await self.run_analysis(analyzer_name, **kwargs) analysis_results[analyzer_name] = result except Exception as e: self.logger.error(f"Sequential analysis {analyzer_name} failed: {str(e)}") analysis_results[analyzer_name] = { 'status': 'error', 'error_message': str(e), 'analysis_type': analyzer_name } return analysis_results def _create_comprehensive_result(self, analysis_results: Dict[str, Any], cross_insights: Dict[str, Any], cost_preferences: CostPreferences, start_time: datetime, execution_id: str, execution_plan: List[Dict[str, Any]]) -> Dict[str, Any]: """Create comprehensive analysis result.""" execution_time = (datetime.now() - start_time).total_seconds() # Analyze results successful_analyses = [] failed_analyses = [] partial_analyses = [] for analyzer_name, result in analysis_results.items(): status = result.get('status', 'unknown') if status == 'success': successful_analyses.append(analyzer_name) elif status == 'partial': partial_analyses.append(analyzer_name) else: failed_analyses.append(analyzer_name) # Determine overall status if failed_analyses and not successful_analyses and not partial_analyses: overall_status = 'error' elif failed_analyses or partial_analyses: overall_status = 'partial' else: overall_status = 'success' # Aggregate cost information total_cost_incurred = any( result.get('cost_incurred', False) for result in analysis_results.values() ) all_cost_operations = [] for result in analysis_results.values(): all_cost_operations.extend(result.get('cost_incurring_operations', [])) # Generate comprehensive recommendations comprehensive_recommendations = self._generate_comprehensive_recommendations( analysis_results, cross_insights ) return { 'status': overall_status, 'analysis_type': 'comprehensive', 'timestamp': start_time.isoformat(), 'total_execution_time': execution_time, 'cost_incurred': total_cost_incurred, 'cost_incurring_operations': list(set(all_cost_operations)), 'primary_data_source': 'multiple', 'individual_analyses': analysis_results, 'cross_analysis_insights': cross_insights, 'comprehensive_recommendations': comprehensive_recommendations, 'analysis_summary': { 'total_analyses': len(execution_plan), 'successful_analyses': len(successful_analyses), 'partial_analyses': len(partial_analyses), 'failed_analyses': len(failed_analyses), 'successful_types': successful_analyses, 'partial_types': partial_analyses, 'failed_types': failed_analyses }, 'engine_metadata': { 'analysis_engine_version': '2.0.0', 'engine_id': self.engine_id, 'execution_id': execution_id, 'session_id': self.session_id, 'region': self.region, 'cost_preferences': cost_preferences.__dict__, 'execution_plan_length': len(execution_plan), 'parallel_execution': self.config.max_parallel_analyzers > 1, 'cross_insights_enabled': self.config.cross_analysis_insights } } def _get_analysis_priority(self, analysis_type: str) -> int: """Get priority for analysis type (higher number = higher priority).""" priority_map = { 'general_spend': 4, # Highest priority - foundational analysis 'logs_optimization': 3, # High priority - often highest cost impact 'metrics_optimization': 2, # Medium priority 'alarms_and_dashboards': 1 # Lower priority - efficiency focused } return priority_map.get(analysis_type, 1) def _aggregate_analysis_results(self, execution_results: Dict[str, Any], cost_preferences: CostPreferences, start_time: datetime, **kwargs) -> Dict[str, Any]: """Aggregate results from all analyses into comprehensive report.""" # Initialize comprehensive result structure comprehensive_result = { 'status': 'success', 'analysis_type': 'comprehensive', 'timestamp': start_time.isoformat(), 'total_execution_time': (datetime.now() - start_time).total_seconds(), 'cost_incurred': False, 'cost_incurring_operations': [], 'primary_data_source': 'cloudwatch_config', 'fallback_used': False, 'individual_analyses': {}, 'aggregated_insights': {}, 'comprehensive_recommendations': [], 'session_metadata': { 'session_id': self.session_id, 'stored_tables': execution_results.get('stored_tables', []), 'execution_summary': execution_results }, 'cost_summary': self.cost_controller.get_cost_summary(cost_preferences) } # Process individual analysis results successful_analyses = [] failed_analyses = [] for task_id, task_result in execution_results.get('results', {}).items(): if task_result['status'] == 'success': # Extract analysis type from task_id or operation analysis_type = self._extract_analysis_type_from_task(task_id, task_result) if analysis_type: # Get the actual analysis result data (would need to be extracted from stored data) analysis_data = self._get_analysis_data_from_session(task_result.get('stored_table')) if analysis_data: comprehensive_result['individual_analyses'][analysis_type] = analysis_data successful_analyses.append(analysis_type) # Aggregate cost information if analysis_data.get('cost_incurred', False): comprehensive_result['cost_incurred'] = True cost_ops = analysis_data.get('cost_incurring_operations', []) comprehensive_result['cost_incurring_operations'].extend(cost_ops) # Check for fallback usage if analysis_data.get('fallback_used', False): comprehensive_result['fallback_used'] = True # Update primary data source if Cost Explorer was used if analysis_data.get('primary_data_source') == 'cost_explorer': comprehensive_result['primary_data_source'] = 'cost_explorer' else: analysis_type = self._extract_analysis_type_from_task(task_id, task_result) if analysis_type: failed_analyses.append(analysis_type) comprehensive_result['individual_analyses'][analysis_type] = { 'status': 'error', 'error_message': task_result.get('error', 'Unknown error'), 'analysis_type': analysis_type } # Generate aggregated insights comprehensive_result['aggregated_insights'] = self._generate_cross_analysis_insights( comprehensive_result['individual_analyses'] ) # Generate comprehensive recommendations comprehensive_result['comprehensive_recommendations'] = self._generate_comprehensive_recommendations( comprehensive_result['individual_analyses'], comprehensive_result['aggregated_insights'] ) # Update overall status if failed_analyses and not successful_analyses: comprehensive_result['status'] = 'error' elif failed_analyses: comprehensive_result['status'] = 'partial' comprehensive_result['analysis_summary'] = { 'total_analyses': len(self.analyzers), 'successful_analyses': len(successful_analyses), 'failed_analyses': len(failed_analyses), 'successful_types': successful_analyses, 'failed_types': failed_analyses } return comprehensive_result def _extract_analysis_type_from_task(self, task_id: str, task_result: Dict[str, Any]) -> Optional[str]: """Extract analysis type from task ID or result.""" # Try to extract from task_id (format: cloudwatch_{analysis_type}_{timestamp}) if 'cloudwatch_' in task_id: parts = task_id.split('_') if len(parts) >= 3: return '_'.join(parts[1:-1]) # Everything between 'cloudwatch' and timestamp # Try to extract from operation name operation = task_result.get('operation', '') if operation.startswith('analyze_'): return operation[8:] # Remove 'analyze_' prefix return None def _get_analysis_data_from_session(self, table_name: Optional[str]) -> Optional[Dict[str, Any]]: """Get analysis data from session storage.""" if not table_name or not self.session_id: return None try: # Query the stored analysis data query = f'SELECT * FROM "{table_name}" LIMIT 1' results = self.service_orchestrator.query_session_data(query) if results: # The first result should contain the analysis data return results[0] except Exception as e: self.logger.error(f"Error retrieving analysis data from {table_name}: {str(e)}") return None def _generate_cross_analysis_insights(self, individual_analyses: Dict[str, Any]) -> Dict[str, Any]: """Generate insights that span across multiple analyses.""" insights = { 'cost_correlations': [], 'optimization_synergies': [], 'resource_relationships': [], 'efficiency_patterns': [] } try: # Analyze cost correlations between different CloudWatch components if ('general_spend' in individual_analyses and 'logs_optimization' in individual_analyses): general_data = individual_analyses['general_spend'].get('data', {}) logs_data = individual_analyses['logs_optimization'].get('data', {}) # Look for high log costs correlation with overall spend cost_breakdown = general_data.get('cost_breakdown', {}) logs_costs = cost_breakdown.get('logs_costs', {}) if logs_costs.get('estimated_monthly', 0) > 50: # $50+ monthly insights['cost_correlations'].append({ 'type': 'high_logs_cost_impact', 'description': 'High CloudWatch Logs costs significantly impact overall spend', 'estimated_impact': logs_costs.get('estimated_monthly', 0), 'recommendation': 'Prioritize logs optimization for maximum cost reduction' }) # Analyze optimization synergies if ('metrics_optimization' in individual_analyses and 'alarms_and_dashboards' in individual_analyses): metrics_data = individual_analyses['metrics_optimization'].get('data', {}) alarms_data = individual_analyses['alarms_and_dashboards'].get('data', {}) # Look for unused metrics that could affect alarms metrics_config = metrics_data.get('metrics_configuration_analysis', {}) alarm_efficiency = alarms_data.get('alarm_efficiency', {}) custom_metrics = metrics_config.get('metrics_analysis', {}).get('custom_metrics_count', 0) unused_alarms = alarm_efficiency.get('unused_alarms_count', 0) if custom_metrics > 10 and unused_alarms > 5: insights['optimization_synergies'].append({ 'type': 'metrics_alarms_cleanup_synergy', 'description': 'Cleaning up unused alarms and custom metrics together provides compound savings', 'custom_metrics': custom_metrics, 'unused_alarms': unused_alarms, 'recommendation': 'Coordinate metrics and alarms cleanup for maximum efficiency' }) except Exception as e: self.logger.error(f"Error generating cross-analysis insights: {str(e)}") return insights def _generate_comprehensive_recommendations(self, individual_analyses: Dict[str, Any], aggregated_insights: Dict[str, Any]) -> List[Dict[str, Any]]: """Generate comprehensive recommendations based on all analyses.""" recommendations = [] try: # Collect all individual recommendations all_individual_recommendations = [] for analysis_type, analysis_data in individual_analyses.items(): if analysis_data.get('status') == 'success': individual_recs = analysis_data.get('recommendations', []) for rec in individual_recs: rec['source_analysis'] = analysis_type all_individual_recommendations.append(rec) # Prioritize recommendations by cost impact high_impact_recs = [r for r in all_individual_recommendations if r.get('priority') == 'high' or r.get('potential_savings', 0) > 100] if high_impact_recs: recommendations.append({ 'type': 'high_impact_optimization', 'priority': 'critical', 'title': 'High-Impact Cost Optimization Opportunities', 'description': f'Found {len(high_impact_recs)} high-impact optimization opportunities', 'total_potential_savings': sum(r.get('potential_savings', 0) for r in high_impact_recs), 'recommendations': high_impact_recs[:5] # Top 5 }) # Add cross-analysis recommendations for insight_category, insights in aggregated_insights.items(): if insights: recommendations.append({ 'type': f'cross_analysis_{insight_category}', 'priority': 'medium', 'title': f'Cross-Analysis Insights: {insight_category.replace("_", " ").title()}', 'description': f'Found {len(insights)} insights from cross-analysis', 'insights': insights }) # Add implementation strategy recommendation if len(individual_analyses) > 2: recommendations.append({ 'type': 'implementation_strategy', 'priority': 'low', 'title': 'Recommended Implementation Strategy', 'description': 'Suggested order for implementing CloudWatch optimizations', 'strategy': [ '1. Start with logs optimization (typically highest cost impact)', '2. Clean up unused alarms and dashboards (quick wins)', '3. Optimize custom metrics (ongoing cost reduction)', '4. Implement monitoring governance (prevent future waste)' ] }) except Exception as e: self.logger.error(f"Error generating comprehensive recommendations: {str(e)}") return recommendations def get_available_analyses(self) -> List[str]: """Get list of available analysis types.""" return list(self.analyzer_instances.keys()) def get_enabled_analyses(self) -> List[str]: """Get list of enabled analysis types.""" return [ name for name, config in self.analyzer_configs.items() if config.enabled and name in self.analyzer_instances ] def get_analysis_info(self, analysis_type: str) -> Dict[str, Any]: """Get comprehensive information about a specific analysis type.""" if analysis_type not in self.analyzer_instances: return { 'error': f'Unknown analysis type: {analysis_type}', 'available_types': list(self.analyzer_instances.keys()) } analyzer = self.analyzer_instances[analysis_type] config = self.analyzer_configs.get(analysis_type) info = { 'analysis_type': analysis_type, 'version': getattr(analyzer, 'version', 'unknown'), 'description': analyzer.__class__.__doc__ or 'No description available', 'class_name': analyzer.__class__.__name__, 'analyzer_info': analyzer.get_analyzer_info() } if config: info['configuration'] = { 'enabled': config.enabled, 'priority': config.priority, 'timeout_seconds': config.timeout_seconds, 'retry_attempts': config.retry_attempts, 'cache_results': config.cache_results, 'dependencies': config.dependencies } return info def enable_analyzer(self, analysis_type: str) -> bool: """Enable a specific analyzer.""" if analysis_type not in self.analyzer_configs: self.logger.error(f"Cannot enable unknown analyzer: {analysis_type}") return False config = self.analyzer_configs[analysis_type] config.enabled = True # Load instance if not already loaded if analysis_type not in self.analyzer_instances: try: self._load_single_analyzer(analysis_type, config) except Exception as e: self.logger.error(f"Failed to load analyzer {analysis_type}: {str(e)}") config.enabled = False return False self.logger.info(f"Enabled analyzer: {analysis_type}") return True def disable_analyzer(self, analysis_type: str) -> bool: """Disable a specific analyzer.""" if analysis_type not in self.analyzer_configs: self.logger.error(f"Cannot disable unknown analyzer: {analysis_type}") return False self.analyzer_configs[analysis_type].enabled = False self.logger.info(f"Disabled analyzer: {analysis_type}") return True def get_engine_status(self) -> Dict[str, Any]: """Get comprehensive engine status and health information.""" uptime = (datetime.now() - self.engine_start_time).total_seconds() # Calculate success rate success_rate = 0.0 if self.total_analyses_run > 0: success_rate = (self.successful_analyses / self.total_analyses_run) * 100 # Get active analyses active_count = len(self.active_analyses) # Get cache statistics cache_stats = {} if self.cache: cache_stats = self.cache.get_statistics() # Get memory statistics memory_stats = {} if self.memory_manager: memory_stats = self.memory_manager.get_memory_statistics() # Get performance statistics performance_stats = {} if self.performance_monitor: performance_stats = self.performance_monitor.get_performance_summary() return { 'engine_info': { 'engine_version': '2.0.0', 'engine_id': self.engine_id, 'region': self.region, 'session_id': self.session_id, 'uptime_seconds': uptime, 'started_at': self.engine_start_time.isoformat() }, 'analyzer_status': { 'total_analyzers': len(self.analyzer_configs), 'enabled_analyzers': len(self.get_enabled_analyses()), 'loaded_analyzers': len(self.analyzer_instances), 'analyzer_types': list(self.analyzer_instances.keys()), 'enabled_types': self.get_enabled_analyses() }, 'execution_statistics': { 'total_analyses_run': self.total_analyses_run, 'successful_analyses': self.successful_analyses, 'failed_analyses': self.failed_analyses, 'success_rate_percent': success_rate, 'active_analyses': active_count }, 'configuration': { 'max_parallel_analyzers': self.config.max_parallel_analyzers, 'default_timeout_seconds': self.config.default_timeout_seconds, 'enable_caching': self.config.enable_caching, 'enable_performance_monitoring': self.config.enable_performance_monitoring, 'enable_memory_management': self.config.enable_memory_management, 'cache_ttl_seconds': self.config.cache_ttl_seconds, 'cross_analysis_insights': self.config.cross_analysis_insights }, 'service_orchestrator_status': self.service_orchestrator.get_session_info(), 'cache_statistics': cache_stats, 'memory_statistics': memory_stats, 'performance_statistics': performance_stats } def get_active_analyses(self) -> Dict[str, Any]: """Get information about currently running analyses.""" active_info = {} for execution_id, analysis_info in self.active_analyses.items(): runtime = (datetime.now() - analysis_info['start_time']).total_seconds() active_info[execution_id] = { 'analysis_type': analysis_info['analysis_type'], 'start_time': analysis_info['start_time'].isoformat(), 'runtime_seconds': runtime, 'status': analysis_info['status'], 'has_monitoring': analysis_info.get('monitoring_session') is not None, 'has_memory_tracking': analysis_info.get('memory_tracker') is not None } return active_info def cancel_analysis(self, execution_id: str) -> bool: """ Cancel a running analysis (best effort). Args: execution_id: Execution ID of the analysis to cancel Returns: True if cancellation was attempted, False if analysis not found """ if execution_id not in self.active_analyses: return False analysis_info = self.active_analyses[execution_id] analysis_info['status'] = 'cancelled' # End monitoring monitoring_session = analysis_info.get('monitoring_session') memory_tracker = analysis_info.get('memory_tracker') self._end_monitoring(monitoring_session, memory_tracker, False, "Analysis cancelled") # Remove from active analyses del self.active_analyses[execution_id] self.logger.info(f"Cancelled analysis: {execution_id}") return True def clear_cache(self, analysis_type: str = None) -> bool: """ Clear analysis results cache. Args: analysis_type: Specific analysis type to clear, or None for all Returns: True if cache was cleared successfully """ if not self.cache: return False try: if analysis_type: # Clear specific analysis type cleared = self.cache.invalidate_by_tags({'analysis_type': analysis_type}) self.logger.info(f"Cleared {cleared} cache entries for {analysis_type}") else: # Clear all analysis results self.cache.clear() self.logger.info("Cleared all analysis results cache") return True except Exception as e: self.logger.error(f"Failed to clear cache: {str(e)}") return False def warm_cache(self, analysis_types: List[str] = None, **kwargs) -> Dict[str, bool]: """ Warm the cache by running analyses and caching results. Args: analysis_types: List of analysis types to warm, or None for all enabled **kwargs: Parameters for the analyses Returns: Dictionary mapping analysis types to success status """ if not self.cache: return {} if analysis_types is None: analysis_types = self.get_enabled_analyses() warming_results = {} for analysis_type in analysis_types: if analysis_type not in self.analyzer_instances: warming_results[analysis_type] = False continue try: # Run analysis to warm cache asyncio.create_task(self.run_analysis(analysis_type, **kwargs)) warming_results[analysis_type] = True self.logger.info(f"Started cache warming for {analysis_type}") except Exception as e: self.logger.error(f"Failed to warm cache for {analysis_type}: {str(e)}") warming_results[analysis_type] = False return warming_results def shutdown(self): """Shutdown the analysis engine and cleanup resources.""" self.logger.info(f"Shutting down CloudWatch Analysis Engine {self.engine_id}") # Cancel active analyses for execution_id in list(self.active_analyses.keys()): self.cancel_analysis(execution_id) # Cleanup service orchestrator if hasattr(self.service_orchestrator, 'cleanup_session'): self.service_orchestrator.cleanup_session() # Shutdown performance components if self.performance_monitor and hasattr(self.performance_monitor, 'shutdown'): self.performance_monitor.shutdown() if self.memory_manager and hasattr(self.memory_manager, 'shutdown'): self.memory_manager.shutdown() if self.cache and hasattr(self.cache, 'shutdown'): self.cache.shutdown() self.logger.info("CloudWatch Analysis Engine shutdown complete") # Global analysis engine instance management _analysis_engines: Dict[str, CloudWatchAnalysisEngine] = {} def get_analysis_engine(region: str = None, session_id: str = None, config: EngineConfig = None) -> CloudWatchAnalysisEngine: """ Get or create a CloudWatch Analysis Engine instance. Args: region: AWS region session_id: Session ID for data persistence config: Engine configuration Returns: CloudWatchAnalysisEngine instance """ engine_key = f"{region or 'default'}_{session_id or 'default'}" if engine_key not in _analysis_engines: _analysis_engines[engine_key] = CloudWatchAnalysisEngine( region=region, session_id=session_id, config=config ) return _analysis_engines[engine_key] def shutdown_all_engines(): """Shutdown all analysis engine instances.""" for engine_key, engine in _analysis_engines.items(): try: engine.shutdown() except Exception as e: logger.error(f"Error shutting down engine {engine_key}: {str(e)}") _analysis_engines.clear() logger.info("All CloudWatch Analysis Engines shut down") # Convenience functions for common operations async def run_cloudwatch_analysis(analysis_type: str, region: str = None, session_id: str = None, **kwargs) -> Dict[str, Any]: """ Convenience function to run a single CloudWatch analysis. Args: analysis_type: Type of analysis to run region: AWS region session_id: Session ID **kwargs: Analysis parameters Returns: Analysis results """ engine = get_analysis_engine(region=region, session_id=session_id) return await engine.run_analysis(analysis_type, **kwargs) async def run_comprehensive_cloudwatch_analysis(region: str = None, session_id: str = None, **kwargs) -> Dict[str, Any]: """ Convenience function to run comprehensive CloudWatch analysis. Args: region: AWS region session_id: Session ID **kwargs: Analysis parameters Returns: Comprehensive analysis results """ engine = get_analysis_engine(region=region, session_id=session_id) return await engine.run_comprehensive_analysis(**kwargs) def get_cloudwatch_analysis_status(region: str = None, session_id: str = None) -> Dict[str, Any]: """ Get status of CloudWatch analysis engine. Args: region: AWS region session_id: Session ID Returns: Engine status information """ engine_key = f"{region or 'default'}_{session_id or 'default'}" if engine_key not in _analysis_engines: return { 'engine_exists': False, 'message': 'No analysis engine found for the specified region and session' } engine = _analysis_engines[engine_key] status = engine.get_engine_status() status['engine_exists'] = True return status

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aws-samples/sample-cfm-tips-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server