Skip to main content
Glama

CFM Tips - Cost Optimization MCP Server

by aws-samples
cloudwatch_service.py142 kB
""" CloudWatch Service for CFM Tips MCP Server Clean architecture with specialized classes for each CloudWatch functionality: - CWGeneralSpendTips: General spending analysis and cost optimization - CWMetricsTips: Custom metrics optimization and analysis - CWLogsTips: Logs ingestion, storage, and retention optimization - CWAlarmsTips: Alarms configuration and cost optimization - CWDashboardTips: Dashboard management and optimization Internal components (not exposed outside this file): - CloudWatchDAO: Data access for CloudWatch APIs - AWSPricingDAO: Data access for AWS Pricing APIs - CloudWatchCache: Caching layer for performance """ import logging import boto3 import time import json from typing import Dict, List, Any, Optional from datetime import datetime, timedelta, timezone from botocore.exceptions import ClientError from dataclasses import dataclass, field from playbooks.cloudwatch.cost_controller import CostController, CostPreferences from utils.logging_config import log_cloudwatch_operation logger = logging.getLogger(__name__) @dataclass class CloudWatchOperationResult: """Result of a CloudWatch operation with cost tracking.""" success: bool data: Any = None error_message: Optional[str] = None operation_name: str = "" cost_incurred: bool = False operation_type: str = "free" execution_time: float = 0.0 fallback_used: bool = False primary_data_source: str = "cloudwatch_api" api_calls_made: List[str] = field(default_factory=list) @dataclass class CloudWatchServiceConfig: """Configuration for CloudWatch service operations.""" region: Optional[str] = None max_retries: int = 3 retry_delay: float = 1.0 timeout_seconds: float = 30.0 enable_cost_tracking: bool = True enable_fallback: bool = True cost_preferences: Optional[CostPreferences] = None class CloudWatchCache: """Internal caching layer for CloudWatch data. Not exposed outside this file.""" def __init__(self, ttl_seconds: int = 300): self.ttl_seconds = ttl_seconds self._cache = {} self._timestamps = {} def get(self, key: str) -> Optional[Any]: """Get cached value if not expired.""" if key not in self._cache: return None if time.time() - self._timestamps[key] > self.ttl_seconds: self._invalidate(key) return None return self._cache[key] def set(self, key: str, value: Any) -> None: """Set cached value with timestamp.""" self._cache[key] = value self._timestamps[key] = time.time() def _invalidate(self, key: str) -> None: """Remove expired cache entry.""" self._cache.pop(key, None) self._timestamps.pop(key, None) def clear(self) -> None: """Clear all cached data.""" self._cache.clear() self._timestamps.clear() def get_stats(self) -> Dict[str, Any]: """Get cache statistics.""" current_time = time.time() expired_count = sum( 1 for ts in self._timestamps.values() if current_time - ts > self.ttl_seconds ) return { 'total_entries': len(self._cache), 'expired_entries': expired_count, 'valid_entries': len(self._cache) - expired_count, 'ttl_seconds': self.ttl_seconds } class AWSPricingDAO: """Data Access Object for AWS Pricing API. Not exposed outside this file.""" def __init__(self, region: str = 'us-east-1'): self.region = region self._cache = CloudWatchCache(ttl_seconds=3600) # 1 hour cache for pricing self.pricing_client = boto3.client('pricing', region_name='us-east-1') # Region mapping for pricing API self._region_map = { 'us-east-1': 'US East (N. Virginia)', 'us-east-2': 'US East (Ohio)', 'us-west-1': 'US West (N. California)', 'us-west-2': 'US West (Oregon)', 'eu-central-1': 'Europe (Frankfurt)', 'eu-west-1': 'Europe (Ireland)', 'eu-west-2': 'Europe (London)', 'ap-southeast-1': 'Asia Pacific (Singapore)', 'ap-southeast-2': 'Asia Pacific (Sydney)', 'ap-northeast-1': 'Asia Pacific (Tokyo)', } # Free tier limits self._free_tier = { 'logs_ingestion_gb': 5.0, 'logs_storage_gb': 5.0, 'metrics_count': 10, 'api_requests': 1000000, 'alarms_count': 10, 'dashboards_count': 3, 'dashboard_metrics': 50 } def get_pricing_data(self, component: str) -> Dict[str, Any]: """Get pricing data for CloudWatch components with caching.""" cache_key = f"pricing_{component}_{self.region}" cached_result = self._cache.get(cache_key) if cached_result: return cached_result # Fallback pricing data (typical CloudWatch pricing) pricing_data = { 'logs': { 'ingestion_per_gb': 0.50, 'storage_per_gb_month': 0.03, 'insights_per_gb_scanned': 0.005, 'vended_logs_per_gb': 0.10, 'cross_region_delivery_per_gb': 0.02 }, 'metrics': { 'custom_metrics_per_metric': 0.30, 'detailed_monitoring_per_instance': 2.10, 'high_resolution_metrics_per_metric': 0.30, 'api_requests_per_1000': 0.01, 'get_metric_statistics_per_1000': 0.01, 'put_metric_data_per_1000': 0.01 }, 'alarms': { 'standard_alarms_per_alarm': 0.10, 'high_resolution_alarms_per_alarm': 0.30, 'composite_alarms_per_alarm': 0.50, 'alarm_actions_sns': 0.0, 'alarm_actions_autoscaling': 0.0, 'alarm_actions_ec2': 0.0 }, 'dashboards': { 'dashboard_per_month': 3.00, 'metrics_per_dashboard_free': 50, 'additional_metrics_per_metric': 0.0, 'dashboard_api_requests_per_1000': 0.01 } } result = pricing_data.get(component, {}) self._cache.set(cache_key, result) return result def get_free_tier_limits(self) -> Dict[str, Any]: """Get free tier limits for CloudWatch services.""" return self._free_tier.copy() def calculate_cost(self, component: str, usage: Dict[str, Any]) -> Dict[str, Any]: """Calculate costs for CloudWatch components with free tier consideration.""" pricing = self.get_pricing_data(component) if component == 'logs': return self._calculate_logs_cost(usage, pricing) elif component == 'metrics': return self._calculate_metrics_cost(usage, pricing) elif component == 'alarms': return self._calculate_alarms_cost(usage, pricing) elif component == 'dashboards': return self._calculate_dashboards_cost(usage, pricing) else: return {'status': 'error', 'message': f'Unknown component: {component}'} def _calculate_logs_cost(self, usage: Dict[str, Any], pricing: Dict[str, Any]) -> Dict[str, Any]: """Calculate CloudWatch Logs costs.""" ingestion_gb = usage.get('ingestion_gb', 0) storage_gb = usage.get('storage_gb', 0) insights_gb_scanned = usage.get('insights_gb_scanned', 0) ingestion_billable = max(0, ingestion_gb - self._free_tier['logs_ingestion_gb']) storage_billable = max(0, storage_gb - self._free_tier['logs_storage_gb']) ingestion_cost = ingestion_billable * pricing['ingestion_per_gb'] storage_cost = storage_billable * pricing['storage_per_gb_month'] insights_cost = insights_gb_scanned * pricing['insights_per_gb_scanned'] total_cost = ingestion_cost + storage_cost + insights_cost return { 'status': 'success', 'usage': usage, 'billable_usage': { 'ingestion_gb': ingestion_billable, 'storage_gb': storage_billable, 'insights_gb_scanned': insights_gb_scanned }, 'cost_breakdown': { 'ingestion_cost': round(ingestion_cost, 4), 'storage_cost': round(storage_cost, 4), 'insights_cost': round(insights_cost, 4) }, 'total_monthly_cost': round(total_cost, 4), 'total_annual_cost': round(total_cost * 12, 2) } def _calculate_metrics_cost(self, usage: Dict[str, Any], pricing: Dict[str, Any]) -> Dict[str, Any]: """Calculate CloudWatch Metrics costs.""" custom_metrics_count = usage.get('custom_metrics_count', 0) api_requests_count = usage.get('api_requests_count', 0) detailed_monitoring_instances = usage.get('detailed_monitoring_instances', 0) metrics_billable = max(0, custom_metrics_count - self._free_tier['metrics_count']) requests_billable = max(0, api_requests_count - self._free_tier['api_requests']) metrics_cost = metrics_billable * pricing['custom_metrics_per_metric'] requests_cost = (requests_billable / 1000) * pricing['api_requests_per_1000'] detailed_monitoring_cost = detailed_monitoring_instances * pricing['detailed_monitoring_per_instance'] total_cost = metrics_cost + requests_cost + detailed_monitoring_cost return { 'status': 'success', 'usage': usage, 'billable_usage': { 'custom_metrics_count': metrics_billable, 'api_requests_count': requests_billable, 'detailed_monitoring_instances': detailed_monitoring_instances }, 'cost_breakdown': { 'metrics_cost': round(metrics_cost, 4), 'requests_cost': round(requests_cost, 4), 'detailed_monitoring_cost': round(detailed_monitoring_cost, 4) }, 'total_monthly_cost': round(total_cost, 4), 'total_annual_cost': round(total_cost * 12, 2) } def _calculate_alarms_cost(self, usage: Dict[str, Any], pricing: Dict[str, Any]) -> Dict[str, Any]: """Calculate CloudWatch Alarms costs.""" standard_alarms_count = usage.get('standard_alarms_count', 0) high_resolution_alarms_count = usage.get('high_resolution_alarms_count', 0) composite_alarms_count = usage.get('composite_alarms_count', 0) standard_billable = max(0, standard_alarms_count - self._free_tier['alarms_count']) standard_cost = standard_billable * pricing['standard_alarms_per_alarm'] high_resolution_cost = high_resolution_alarms_count * pricing['high_resolution_alarms_per_alarm'] composite_cost = composite_alarms_count * pricing['composite_alarms_per_alarm'] total_cost = standard_cost + high_resolution_cost + composite_cost return { 'status': 'success', 'usage': usage, 'billable_usage': { 'standard_alarms_count': standard_billable, 'high_resolution_alarms_count': high_resolution_alarms_count, 'composite_alarms_count': composite_alarms_count }, 'cost_breakdown': { 'standard_alarms_cost': round(standard_cost, 4), 'high_resolution_alarms_cost': round(high_resolution_cost, 4), 'composite_alarms_cost': round(composite_cost, 4) }, 'total_monthly_cost': round(total_cost, 4), 'total_annual_cost': round(total_cost * 12, 2) } def _calculate_dashboards_cost(self, usage: Dict[str, Any], pricing: Dict[str, Any]) -> Dict[str, Any]: """Calculate CloudWatch Dashboards costs.""" dashboards_count = usage.get('dashboards_count', 0) dashboards_billable = max(0, dashboards_count - self._free_tier['dashboards_count']) dashboards_cost = dashboards_billable * pricing['dashboard_per_month'] return { 'status': 'success', 'usage': usage, 'billable_usage': { 'dashboards_count': dashboards_billable }, 'cost_breakdown': { 'dashboards_cost': round(dashboards_cost, 4) }, 'total_monthly_cost': round(dashboards_cost, 4), 'total_annual_cost': round(dashboards_cost * 12, 2) } class CloudWatchDAO: """Data Access Object for CloudWatch operations. Not exposed outside this file.""" def __init__(self, region: Optional[str] = None, cost_controller: Optional[CostController] = None): self.region = region self.cost_controller = cost_controller or CostController() self._cache = CloudWatchCache() # Initialize AWS clients self.cloudwatch_client = boto3.client('cloudwatch', region_name=self.region) self.logs_client = boto3.client('logs', region_name=self.region) logger.debug(f"CloudWatch DAO initialized for region: {self.region}") async def list_metrics(self, namespace: Optional[str] = None, metric_name: Optional[str] = None, dimensions: Optional[List[Dict[str, str]]] = None) -> Dict[str, Any]: """List CloudWatch metrics with caching.""" cache_key = f"metrics_{namespace}_{metric_name}_{hash(str(dimensions))}" cached_result = self._cache.get(cache_key) if cached_result: return cached_result params = {} if namespace: params['Namespace'] = namespace if metric_name: params['MetricName'] = metric_name if dimensions: params['Dimensions'] = dimensions metrics = [] paginator = self.cloudwatch_client.get_paginator('list_metrics') for page in paginator.paginate(**params): metrics.extend(page.get('Metrics', [])) result = { 'metrics': metrics, 'total_count': len(metrics), 'namespace': namespace, 'filtered': bool(namespace or metric_name or dimensions) } self._cache.set(cache_key, result) return result async def describe_alarms(self, alarm_names: Optional[List[str]] = None, alarm_name_prefix: Optional[str] = None, state_value: Optional[str] = None) -> Dict[str, Any]: """Describe CloudWatch alarms with caching.""" cache_key = f"alarms_{hash(str(alarm_names))}_{alarm_name_prefix}_{state_value}" cached_result = self._cache.get(cache_key) if cached_result: return cached_result params = {} if alarm_names: params['AlarmNames'] = alarm_names if alarm_name_prefix: params['AlarmNamePrefix'] = alarm_name_prefix if state_value: params['StateValue'] = state_value alarms = [] paginator = self.cloudwatch_client.get_paginator('describe_alarms') for page in paginator.paginate(**params): alarms.extend(page.get('MetricAlarms', [])) alarms.extend(page.get('CompositeAlarms', [])) result = { 'alarms': alarms, 'total_count': len(alarms), 'filtered': bool(alarm_names or alarm_name_prefix or state_value) } self._cache.set(cache_key, result) return result async def list_dashboards(self, dashboard_name_prefix: Optional[str] = None) -> Dict[str, Any]: """List CloudWatch dashboards with caching.""" cache_key = f"dashboards_{dashboard_name_prefix}" cached_result = self._cache.get(cache_key) if cached_result: return cached_result params = {} if dashboard_name_prefix: params['DashboardNamePrefix'] = dashboard_name_prefix dashboards = [] paginator = self.cloudwatch_client.get_paginator('list_dashboards') for page in paginator.paginate(**params): dashboards.extend(page.get('DashboardEntries', [])) result = { 'dashboards': dashboards, 'total_count': len(dashboards), 'filtered': bool(dashboard_name_prefix) } self._cache.set(cache_key, result) return result async def describe_log_groups(self, log_group_name_prefix: Optional[str] = None, log_group_names: Optional[List[str]] = None) -> Dict[str, Any]: """Describe CloudWatch log groups with caching.""" cache_key = f"log_groups_{log_group_name_prefix}_{hash(str(log_group_names))}" cached_result = self._cache.get(cache_key) if cached_result: return cached_result params = {} if log_group_name_prefix: params['logGroupNamePrefix'] = log_group_name_prefix if log_group_names: params['logGroupNames'] = log_group_names log_groups = [] paginator = self.logs_client.get_paginator('describe_log_groups') for page in paginator.paginate(**params): log_groups.extend(page.get('logGroups', [])) result = { 'log_groups': log_groups, 'total_count': len(log_groups), 'filtered': bool(log_group_name_prefix or log_group_names) } self._cache.set(cache_key, result) return result async def get_dashboard(self, dashboard_name: str) -> Dict[str, Any]: """Get dashboard configuration with caching.""" cache_key = f"dashboard_config_{dashboard_name}" cached_result = self._cache.get(cache_key) if cached_result: return cached_result response = self.cloudwatch_client.get_dashboard(DashboardName=dashboard_name) result = { 'dashboard_name': dashboard_name, 'dashboard_body': response.get('DashboardBody', '{}'), 'dashboard_arn': response.get('DashboardArn') } self._cache.set(cache_key, result) return result async def get_metric_statistics(self, namespace: str, metric_name: str, dimensions: List[Dict[str, str]], start_time: datetime, end_time: datetime, period: int = 3600, statistics: List[str] = None) -> Dict[str, Any]: """Get metric statistics (paid operation).""" stats_list = statistics or ['Average', 'Sum', 'Maximum'] response = self.cloudwatch_client.get_metric_statistics( Namespace=namespace, MetricName=metric_name, Dimensions=dimensions, StartTime=start_time, EndTime=end_time, Period=period, Statistics=stats_list ) datapoints = response.get('Datapoints', []) datapoints.sort(key=lambda x: x['Timestamp']) return { 'namespace': namespace, 'metric_name': metric_name, 'dimensions': dimensions, 'datapoints': datapoints, 'total_datapoints': len(datapoints), 'period': period, 'statistics': stats_list, 'start_time': start_time.isoformat(), 'end_time': end_time.isoformat() } async def get_metrics_usage_batch(self, metrics: List[Dict[str, Any]], lookback_days: int = 30, batch_size: int = 500) -> None: """ Get usage data for multiple metrics in batches (paid operation). Uses GetMetricData API which supports up to 500 metrics per request. Results are cached to avoid repeated API calls. Updates metrics in place with datapoint_count and usage_estimation_method. Args: metrics: List of metric dictionaries to analyze (modified in place) lookback_days: Number of days to look back for usage data batch_size: Number of metrics per API call (max 500) """ if not metrics: return end_time = datetime.now(timezone.utc) start_time = end_time - timedelta(days=lookback_days) # Process metrics in batches of up to 500 for batch_start in range(0, len(metrics), batch_size): batch = metrics[batch_start:batch_start + batch_size] # Check cache first for this batch cache_key = f"metrics_usage_{hash(str([(m['namespace'], m['metric_name'], str(m['dimensions'])) for m in batch]))}_{lookback_days}" cached_result = self._cache.get(cache_key) if cached_result: logger.debug(f"Using cached usage data for batch {batch_start//batch_size + 1}") # Apply cached results for i, metric in enumerate(batch): if i < len(cached_result): metric.update(cached_result[i]) continue # Build metric queries for this batch metric_queries = [] for idx, metric in enumerate(batch): metric_queries.append({ 'Id': f'm{idx}', 'MetricStat': { 'Metric': { 'Namespace': metric['namespace'], 'MetricName': metric['metric_name'], 'Dimensions': metric['dimensions'] }, 'Period': 3600, # 1 hour 'Stat': 'SampleCount' } }) # Single batched API call for up to 500 metrics try: logger.debug(f"Fetching usage data for batch {batch_start//batch_size + 1} ({len(batch)} metrics)") response = self.cloudwatch_client.get_metric_data( MetricDataQueries=metric_queries, StartTime=start_time, EndTime=end_time ) # Process results and update metrics in place batch_results = [] for idx, result in enumerate(response.get('MetricDataResults', [])): if idx < len(batch): datapoint_count = len(result.get('Values', [])) # Update metric in place batch[idx]['datapoint_count'] = datapoint_count batch[idx]['usage_period_days'] = lookback_days batch[idx]['usage_estimation_method'] = 'exact_paid' # Store for cache batch_results.append({ 'datapoint_count': datapoint_count, 'usage_period_days': lookback_days, 'usage_estimation_method': 'exact_paid' }) # Cache the results self._cache.set(cache_key, batch_results) except Exception as e: logger.error(f"Failed to get usage data for batch {batch_start//batch_size + 1}: {str(e)}") # Mark all metrics in batch as failed for metric in batch: metric['datapoint_count'] = 0 metric['usage_period_days'] = lookback_days metric['usage_estimation_method'] = 'failed' def clear_cache(self) -> None: """Clear all cached data.""" self._cache.clear() def get_cache_stats(self) -> Dict[str, Any]: """Get cache statistics.""" return self._cache.get_stats() class CWGeneralSpendTips: """ CloudWatch general spending analysis with 4 public methods. Public methods: - getLogs(): Returns log groups ordered by spend (descending), paginated - getMetrics(): Returns custom metrics ordered by dimension count (descending), paginated - getDashboards(): Returns dashboards ordered by custom metrics count (descending), paginated - getAlarms(): Returns alarms with cost information, paginated """ def __init__(self, dao: CloudWatchDAO, pricing_dao: AWSPricingDAO, cost_preferences: CostPreferences): self.dao = dao self.pricing_dao = pricing_dao self.cost_preferences = cost_preferences self._page_size = 10 # Items per page async def getLogs(self, page: int = 1, log_group_name_prefix: Optional[str] = None, can_spend_for_estimate: bool = False, estimate_ingestion_from_metadata: bool = True, lookback_days: int = 30) -> Dict[str, Any]: """ Get log groups ordered by estimated spend (descending), paginated. Args: page: Page number (1-based) log_group_name_prefix: Optional filter for log group names can_spend_for_estimate: If True, uses CloudWatch Metrics API (paid) to get accurate ingestion data from IncomingBytes metric. If False (default), uses free estimation methods. Enabling this provides accurate cost estimates but incurs minimal CloudWatch API charges. estimate_ingestion_from_metadata: If True (default) and can_spend_for_estimate=False, estimates ingestion using free metadata (retention policy and log group age). If False, only storage costs are calculated. This is always free. lookback_days: Number of days to analyze for ingestion (only used if can_spend_for_estimate=True) Returns: Dict with log_groups, pagination, summary, and pricing_info """ try: # Get log groups from DAO log_groups_data = await self.dao.describe_log_groups( log_group_name_prefix=log_group_name_prefix ) log_groups = log_groups_data['log_groups'] # Get pricing data pricing = self.pricing_dao.get_pricing_data('logs') free_tier = self.pricing_dao.get_free_tier_limits() # Calculate spend for each log group log_groups_with_spend = [] total_storage_gb = 0 total_ingestion_gb = 0 for lg in log_groups: stored_bytes = lg.get('storedBytes', 0) stored_gb = stored_bytes / (1024**3) retention_days = lg.get('retentionInDays', 0) log_group_name = lg.get('logGroupName') # Calculate storage cost (always accurate) storage_cost = stored_gb * pricing['storage_per_gb_month'] # Calculate ingestion cost if can_spend_for_estimate: # Option 1: Use CloudWatch Metrics API for accurate data (PAID) try: end_time = datetime.now(timezone.utc) start_time = end_time - timedelta(days=lookback_days) ingestion_data = await self.dao.get_metric_statistics( namespace='AWS/Logs', metric_name='IncomingBytes', dimensions=[{'Name': 'LogGroupName', 'Value': log_group_name}], start_time=start_time, end_time=end_time, period=86400, # Daily aggregation statistics=['Sum'] ) # Calculate total ingestion in GB total_bytes = sum(dp['Sum'] for dp in ingestion_data['datapoints']) ingestion_gb = total_bytes / (1024**3) # Normalize to monthly rate monthly_ingestion_gb = (ingestion_gb / lookback_days) * 30 ingestion_cost = monthly_ingestion_gb * pricing['ingestion_per_gb'] estimation_method = 'accurate_paid' confidence = 'high' except Exception as e: logger.warning(f"Failed to get ingestion metrics for {log_group_name}: {str(e)}") # Fallback to free estimation if metrics fail if estimate_ingestion_from_metadata: result = self._estimate_ingestion_from_metadata(lg, stored_gb) monthly_ingestion_gb = result['monthly_ingestion_gb'] ingestion_cost = monthly_ingestion_gb * pricing['ingestion_per_gb'] estimation_method = result['estimation_method'] confidence = result['confidence'] else: monthly_ingestion_gb = 0 ingestion_cost = 0 estimation_method = 'storage_only' confidence = 'high' elif estimate_ingestion_from_metadata: # Option 2: Estimate from metadata (FREE - uses retention/age) result = self._estimate_ingestion_from_metadata(lg, stored_gb) monthly_ingestion_gb = result['monthly_ingestion_gb'] ingestion_cost = monthly_ingestion_gb * pricing['ingestion_per_gb'] estimation_method = result['estimation_method'] confidence = result['confidence'] else: # Option 3: Storage only (FREE - most conservative) monthly_ingestion_gb = 0 ingestion_cost = 0 estimation_method = 'storage_only' confidence = 'high' total_cost = storage_cost + ingestion_cost total_storage_gb += stored_gb total_ingestion_gb += monthly_ingestion_gb log_groups_with_spend.append({ 'log_group_name': log_group_name, 'stored_gb': round(stored_gb, 4), 'stored_bytes': stored_bytes, 'retention_days': retention_days if retention_days else 'Never Expire', 'creation_time': lg.get('creationTime'), 'cost_breakdown': { 'storage_cost': round(storage_cost, 4), 'ingestion_cost': round(ingestion_cost, 4), 'ingestion_gb_monthly': round(monthly_ingestion_gb, 4) if monthly_ingestion_gb > 0 else None }, 'estimated_monthly_cost': round(total_cost, 4), 'estimated_annual_cost': round(total_cost * 12, 2), 'estimation_method': estimation_method, 'estimation_confidence': confidence }) # Sort by spend descending log_groups_with_spend.sort(key=lambda x: x['estimated_monthly_cost'], reverse=True) # Paginate total_items = len(log_groups_with_spend) total_pages = (total_items + self._page_size - 1) // self._page_size start_idx = (page - 1) * self._page_size end_idx = start_idx + self._page_size paginated_logs = log_groups_with_spend[start_idx:end_idx] # Calculate totals total_monthly_cost = sum(lg['estimated_monthly_cost'] for lg in log_groups_with_spend) # Free tier analysis storage_billable = max(0, total_storage_gb - free_tier['logs_storage_gb']) ingestion_billable = max(0, total_ingestion_gb - free_tier['logs_ingestion_gb']) if can_spend_for_estimate else 0 return { 'status': 'success', 'log_groups': paginated_logs, 'pagination': { 'current_page': page, 'page_size': self._page_size, 'total_items': total_items, 'total_pages': total_pages, 'has_next': page < total_pages, 'has_previous': page > 1 }, 'summary': { 'total_log_groups': total_items, 'total_storage_gb': round(total_storage_gb, 4), 'total_ingestion_gb_monthly': round(total_ingestion_gb, 4) if can_spend_for_estimate else None, 'free_tier_limit_gb': free_tier['logs_storage_gb'], 'free_tier_remaining_gb': round(max(0, free_tier['logs_storage_gb'] - total_storage_gb), 4), 'billable_storage_gb': round(storage_billable, 4), 'billable_ingestion_gb': round(ingestion_billable, 4) if can_spend_for_estimate else None, 'total_estimated_monthly_cost': round(total_monthly_cost, 4), 'total_estimated_annual_cost': round(total_monthly_cost * 12, 2), 'estimation_method': 'accurate' if can_spend_for_estimate else 'storage_only' }, 'pricing_info': pricing } except Exception as e: logger.error(f"Error getting logs by spend: {str(e)}") return { 'status': 'error', 'message': str(e), 'log_groups': [], 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0} } def _estimate_ingestion_from_metadata(self, log_group: Dict[str, Any], stored_gb: float) -> Dict[str, Any]: """ Estimate monthly ingestion using only free metadata (retention policy and age). This is a FREE operation that uses only data from describe_log_groups. Estimation methods (in order of preference): 1. Retention-based: If retention policy is set, assumes steady-state where storage = retention_days × daily_ingestion 2. Age-based: Uses log group age to calculate average daily ingestion rate 3. No estimate: If insufficient data Args: log_group: Log group metadata from describe_log_groups stored_gb: Current storage in GB Returns: Dict with monthly_ingestion_gb, estimation_method, and confidence level """ retention_days = log_group.get('retentionInDays') creation_time = log_group.get('creationTime') # Method 1: Retention-based estimation (most reliable for logs with retention) if retention_days and retention_days > 0: # Steady-state assumption: storage = retention_days × daily_ingestion # Therefore: daily_ingestion = storage / retention_days daily_ingestion_gb = stored_gb / retention_days monthly_ingestion_gb = daily_ingestion_gb * 30 return { 'monthly_ingestion_gb': monthly_ingestion_gb, 'estimation_method': 'retention_based_free', 'confidence': 'medium' } # Method 2: Age-based estimation (for logs without retention) if creation_time: age_days = (datetime.now(timezone.utc) - datetime.fromtimestamp(creation_time / 1000, tz=timezone.utc)).days if age_days >= 30: # Need at least 30 days for reasonable estimate # Average daily ingestion = total storage / age daily_ingestion_gb = stored_gb / age_days monthly_ingestion_gb = daily_ingestion_gb * 30 return { 'monthly_ingestion_gb': monthly_ingestion_gb, 'estimation_method': 'age_based_free', 'confidence': 'low' } elif age_days > 0: # Too new for reliable estimate, but provide rough estimate daily_ingestion_gb = stored_gb / age_days monthly_ingestion_gb = daily_ingestion_gb * 30 return { 'monthly_ingestion_gb': monthly_ingestion_gb, 'estimation_method': 'age_based_free_unreliable', 'confidence': 'very_low' } # Method 3: No estimate possible return { 'monthly_ingestion_gb': 0, 'estimation_method': 'storage_only', 'confidence': 'high' # High confidence in storage cost, no ingestion estimate } async def getMetrics(self, page: int = 1, namespace_filter: Optional[str] = None) -> Dict[str, Any]: """ Get custom metrics ordered by dimension count (descending), paginated. Args: page: Page number (1-based) namespace_filter: Optional filter for namespace Returns: Dict with custom_metrics, pagination, summary, and pricing_info """ try: # Get metrics from DAO metrics_data = await self.dao.list_metrics(namespace=namespace_filter) all_metrics = metrics_data['metrics'] # Filter to custom metrics only (exclude AWS/* namespaces) custom_metrics = [m for m in all_metrics if not m.get('Namespace', '').startswith('AWS/')] # Get pricing data pricing = self.pricing_dao.get_pricing_data('metrics') free_tier = self.pricing_dao.get_free_tier_limits() # Calculate dimension count and cost for each metric metrics_with_info = [] for metric in custom_metrics: dimensions = metric.get('Dimensions', []) dimension_count = len(dimensions) # Cost per metric metric_cost = pricing['custom_metrics_per_metric'] metrics_with_info.append({ 'namespace': metric.get('Namespace'), 'metric_name': metric.get('MetricName'), 'dimensions': dimensions, 'dimension_count': dimension_count, 'estimated_monthly_cost': round(metric_cost, 4), 'estimated_annual_cost': round(metric_cost * 12, 2) }) # Sort by dimension count descending metrics_with_info.sort(key=lambda x: x['dimension_count'], reverse=True) # Paginate total_items = len(metrics_with_info) total_pages = (total_items + self._page_size - 1) // self._page_size start_idx = (page - 1) * self._page_size end_idx = start_idx + self._page_size paginated_metrics = metrics_with_info[start_idx:end_idx] # Calculate totals total_monthly_cost = total_items * pricing['custom_metrics_per_metric'] billable_metrics = max(0, total_items - free_tier['metrics_count']) billable_cost = billable_metrics * pricing['custom_metrics_per_metric'] return { 'status': 'success', 'custom_metrics': paginated_metrics, 'pagination': { 'current_page': page, 'page_size': self._page_size, 'total_items': total_items, 'total_pages': total_pages, 'has_next': page < total_pages, 'has_previous': page > 1 }, 'summary': { 'total_custom_metrics': total_items, 'free_tier_limit': free_tier['metrics_count'], 'free_tier_remaining': max(0, free_tier['metrics_count'] - total_items), 'billable_metrics': billable_metrics, 'total_estimated_monthly_cost': round(billable_cost, 4), 'total_estimated_annual_cost': round(billable_cost * 12, 2) }, 'pricing_info': pricing } except Exception as e: logger.error(f"Error getting metrics by dimension count: {str(e)}") return { 'status': 'error', 'message': str(e), 'custom_metrics': [], 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0} } async def getDashboards(self, page: int = 1, dashboard_name_prefix: Optional[str] = None) -> Dict[str, Any]: """ Get dashboards ordered by complexity score (descending), paginated. Complexity score is calculated based on: - Widget count (each widget adds to complexity) - Custom metrics count (higher weight as they cost more) - Total metrics count (operational complexity) Args: page: Page number (1-based) dashboard_name_prefix: Optional filter for dashboard names Returns: Dict with dashboards, pagination, summary, and pricing_info """ try: # Get dashboards from DAO dashboards_data = await self.dao.list_dashboards(dashboard_name_prefix=dashboard_name_prefix) dashboards = dashboards_data['dashboards'] # Get pricing data dashboard_pricing = self.pricing_dao.get_pricing_data('dashboards') metrics_pricing = self.pricing_dao.get_pricing_data('metrics') free_tier = self.pricing_dao.get_free_tier_limits() # Analyze each dashboard dashboards_with_info = [] for dashboard in dashboards: dashboard_name = dashboard.get('DashboardName') # Get dashboard configuration to analyze complexity custom_metrics_count = 0 total_metrics_count = 0 widget_count = 0 try: dashboard_config = await self.dao.get_dashboard(dashboard_name) dashboard_body = json.loads(dashboard_config.get('dashboard_body', '{}')) # Analyze widgets and metrics widgets = dashboard_body.get('widgets', []) widget_count = len(widgets) for widget in widgets: properties = widget.get('properties', {}) metrics = properties.get('metrics', []) # Count all metrics and custom metrics for metric in metrics: if isinstance(metric, list) and len(metric) > 0: total_metrics_count += 1 namespace = metric[0] if isinstance(metric[0], str) else '' if namespace and not namespace.startswith('AWS/'): custom_metrics_count += 1 except Exception as e: logger.warning(f"Could not analyze dashboard {dashboard_name}: {str(e)}") # Calculate complexity score # Formula: (custom_metrics * 3) + (total_metrics * 1) + (widgets * 2) # Custom metrics weighted higher due to cost impact complexity_score = (custom_metrics_count * 3) + (total_metrics_count * 1) + (widget_count * 2) # Calculate costs dashboard_cost = dashboard_pricing['dashboard_per_month'] custom_metrics_cost = custom_metrics_count * metrics_pricing['custom_metrics_per_metric'] total_cost = dashboard_cost + custom_metrics_cost dashboards_with_info.append({ 'dashboard_name': dashboard_name, 'dashboard_arn': dashboard.get('DashboardArn'), 'last_modified': dashboard.get('LastModified'), 'size': dashboard.get('Size'), 'widget_count': widget_count, 'total_metrics_count': total_metrics_count, 'custom_metrics_count': custom_metrics_count, 'complexity_score': complexity_score, 'dashboard_cost': round(dashboard_cost, 4), 'custom_metrics_cost': round(custom_metrics_cost, 4), 'total_estimated_monthly_cost': round(total_cost, 4), 'estimated_annual_cost': round(total_cost * 12, 2) }) # Sort by complexity score descending (combines cost and operational complexity) dashboards_with_info.sort(key=lambda x: x['complexity_score'], reverse=True) # Paginate total_items = len(dashboards_with_info) total_pages = (total_items + self._page_size - 1) // self._page_size start_idx = (page - 1) * self._page_size end_idx = start_idx + self._page_size paginated_dashboards = dashboards_with_info[start_idx:end_idx] # Calculate totals billable_dashboards = max(0, total_items - free_tier['dashboards_count']) total_dashboard_cost = billable_dashboards * dashboard_pricing['dashboard_per_month'] total_metrics_cost = sum(d['custom_metrics_cost'] for d in dashboards_with_info) total_monthly_cost = total_dashboard_cost + total_metrics_cost return { 'status': 'success', 'dashboards': paginated_dashboards, 'pagination': { 'current_page': page, 'page_size': self._page_size, 'total_items': total_items, 'total_pages': total_pages, 'has_next': page < total_pages, 'has_previous': page > 1 }, 'summary': { 'total_dashboards': total_items, 'free_tier_limit': free_tier['dashboards_count'], 'free_tier_remaining': max(0, free_tier['dashboards_count'] - total_items), 'billable_dashboards': billable_dashboards, 'total_dashboard_cost': round(total_dashboard_cost, 4), 'total_custom_metrics_cost': round(total_metrics_cost, 4), 'total_estimated_monthly_cost': round(total_monthly_cost, 4), 'total_estimated_annual_cost': round(total_monthly_cost * 12, 2) }, 'pricing_info': { 'dashboard': dashboard_pricing, 'metrics': metrics_pricing } } except Exception as e: logger.error(f"Error getting dashboards by custom metrics count: {str(e)}") return { 'status': 'error', 'message': str(e), 'dashboards': [], 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0} } async def getAlarms(self, page: int = 1, alarm_name_prefix: Optional[str] = None, state_value: Optional[str] = None) -> Dict[str, Any]: """ Get alarms with cost information, paginated. Args: page: Page number (1-based) alarm_name_prefix: Optional filter for alarm names state_value: Optional filter for alarm state (OK, ALARM, INSUFFICIENT_DATA) Returns: Dict with alarms, pagination, summary, and pricing_info """ try: # Get alarms from DAO alarms_data = await self.dao.describe_alarms( alarm_name_prefix=alarm_name_prefix, state_value=state_value ) alarms = alarms_data['alarms'] # Get pricing data pricing = self.pricing_dao.get_pricing_data('alarms') free_tier = self.pricing_dao.get_free_tier_limits() # Analyze each alarm alarms_with_info = [] standard_count = 0 high_resolution_count = 0 composite_count = 0 for alarm in alarms: # Determine alarm type if 'MetricName' in alarm: period = alarm.get('Period', 300) if period < 300: alarm_type = 'high_resolution' alarm_cost = pricing['high_resolution_alarms_per_alarm'] high_resolution_count += 1 else: alarm_type = 'standard' alarm_cost = pricing['standard_alarms_per_alarm'] standard_count += 1 else: alarm_type = 'composite' alarm_cost = pricing['composite_alarms_per_alarm'] composite_count += 1 # Check if alarm has actions has_actions = bool( alarm.get('AlarmActions') or alarm.get('OKActions') or alarm.get('InsufficientDataActions') ) alarms_with_info.append({ 'alarm_name': alarm.get('AlarmName'), 'alarm_arn': alarm.get('AlarmArn'), 'alarm_type': alarm_type, 'state_value': alarm.get('StateValue'), 'state_reason': alarm.get('StateReason'), 'metric_name': alarm.get('MetricName'), 'namespace': alarm.get('Namespace'), 'period': alarm.get('Period'), 'has_actions': has_actions, 'actions_enabled': alarm.get('ActionsEnabled', False), 'estimated_monthly_cost': round(alarm_cost, 4), 'estimated_annual_cost': round(alarm_cost * 12, 2) }) # Paginate total_items = len(alarms_with_info) total_pages = (total_items + self._page_size - 1) // self._page_size start_idx = (page - 1) * self._page_size end_idx = start_idx + self._page_size paginated_alarms = alarms_with_info[start_idx:end_idx] # Calculate totals billable_standard = max(0, standard_count - free_tier['alarms_count']) total_monthly_cost = ( billable_standard * pricing['standard_alarms_per_alarm'] + high_resolution_count * pricing['high_resolution_alarms_per_alarm'] + composite_count * pricing['composite_alarms_per_alarm'] ) return { 'status': 'success', 'alarms': paginated_alarms, 'pagination': { 'current_page': page, 'page_size': self._page_size, 'total_items': total_items, 'total_pages': total_pages, 'has_next': page < total_pages, 'has_previous': page > 1 }, 'summary': { 'total_alarms': total_items, 'standard_alarms': standard_count, 'high_resolution_alarms': high_resolution_count, 'composite_alarms': composite_count, 'free_tier_limit': free_tier['alarms_count'], 'free_tier_remaining': max(0, free_tier['alarms_count'] - standard_count), 'billable_standard_alarms': billable_standard, 'total_estimated_monthly_cost': round(total_monthly_cost, 4), 'total_estimated_annual_cost': round(total_monthly_cost * 12, 2) }, 'pricing_info': pricing } except Exception as e: logger.error(f"Error getting alarms: {str(e)}") return { 'status': 'error', 'message': str(e), 'alarms': [], 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0} } class CWMetricsTips: """CloudWatch custom metrics optimization and analysis.""" def __init__(self, dao: CloudWatchDAO, pricing_dao: AWSPricingDAO, cost_preferences: CostPreferences): self.dao = dao self.pricing_dao = pricing_dao self.cost_preferences = cost_preferences self._page_size = 10 # Items per page async def listInstancesWithDetailedMonitoring(self, page: int = 1) -> Dict[str, Any]: """ Get paginated list of EC2 instances with detailed monitoring enabled. Detailed monitoring costs $2.10/month per instance vs basic monitoring (free). Args: page: Page number (1-based) Returns: Dict with instances, pagination, summary, and cost_analysis """ try: # Initialize EC2 client ec2_client = boto3.client('ec2', region_name=self.dao.region) # Get all instances instances_with_detailed = [] paginator = ec2_client.get_paginator('describe_instances') for page_response in paginator.paginate(): for reservation in page_response.get('Reservations', []): for instance in reservation.get('Instances', []): # Check if detailed monitoring is enabled monitoring_state = instance.get('Monitoring', {}).get('State', 'disabled') if monitoring_state == 'enabled': instance_id = instance.get('InstanceId') instance_type = instance.get('InstanceType') state = instance.get('State', {}).get('Name') # Get instance name from tags instance_name = None for tag in instance.get('Tags', []): if tag.get('Key') == 'Name': instance_name = tag.get('Value') break instances_with_detailed.append({ 'instance_id': instance_id, 'instance_name': instance_name, 'instance_type': instance_type, 'state': state, 'monitoring_state': monitoring_state, 'launch_time': instance.get('LaunchTime').isoformat() if instance.get('LaunchTime') else None }) # Get pricing pricing = self.pricing_dao.get_pricing_data('metrics') detailed_monitoring_cost = pricing['detailed_monitoring_per_instance'] # Add cost information to each instance for instance in instances_with_detailed: instance['monthly_cost'] = round(detailed_monitoring_cost, 2) instance['annual_cost'] = round(detailed_monitoring_cost * 12, 2) # Paginate total_items = len(instances_with_detailed) total_pages = (total_items + self._page_size - 1) // self._page_size start_idx = (page - 1) * self._page_size end_idx = start_idx + self._page_size paginated_instances = instances_with_detailed[start_idx:end_idx] # Calculate totals total_monthly_cost = total_items * detailed_monitoring_cost return { 'status': 'success', 'instances': paginated_instances, 'pagination': { 'current_page': page, 'page_size': self._page_size, 'total_items': total_items, 'total_pages': total_pages, 'has_next': page < total_pages, 'has_previous': page > 1 }, 'summary': { 'total_instances_with_detailed_monitoring': total_items, 'cost_per_instance_monthly': round(detailed_monitoring_cost, 2), 'total_monthly_cost': round(total_monthly_cost, 2), 'total_annual_cost': round(total_monthly_cost * 12, 2) }, 'optimization_tip': { 'message': 'Consider disabling detailed monitoring for instances that do not require 1-minute metrics', 'potential_savings_monthly': round(total_monthly_cost, 2), 'potential_savings_annual': round(total_monthly_cost * 12, 2) } } except Exception as e: logger.error(f"Error listing instances with detailed monitoring: {str(e)}") return { 'status': 'error', 'message': str(e), 'instances': [], 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0} } async def listCustomMetrics(self, page: int = 1, namespace_filter: Optional[str] = None, can_spend_for_exact_usage_estimate: bool = False, lookback_days: int = 30) -> Dict[str, Any]: """ Get paginated list of custom metrics sorted by dimension count (descending). Calls list_metrics twice: 1. Without RecentlyActive filter to get all metrics 2. With RecentlyActive='PT3H' to flag recently active metrics If can_spend_for_exact_usage_estimate=True, calls GetMetricData (batched) for the current page only to identify actual usage. Args: page: Page number (1-based) namespace_filter: Optional filter for namespace can_spend_for_exact_usage_estimate: If True, uses GetMetricData API (paid) to get exact usage for current page only. If False (default), only shows dimension count and recently active flag. lookback_days: Number of days to analyze for usage (only used if can_spend_for_exact_usage_estimate=True) Returns: Dict with custom_metrics, pagination, summary, and pricing_info """ try: # Call 1: Get all metrics (FREE) all_metrics_params = {} if namespace_filter: all_metrics_params['Namespace'] = namespace_filter all_metrics = [] paginator = self.dao.cloudwatch_client.get_paginator('list_metrics') for page_response in paginator.paginate(**all_metrics_params): all_metrics.extend(page_response.get('Metrics', [])) # Filter to custom metrics only (exclude AWS/* namespaces) custom_metrics = [m for m in all_metrics if not m.get('Namespace', '').startswith('AWS/')] # Call 2: Get recently active metrics (PT3H = Past 3 Hours) (FREE) recently_active_params = {'RecentlyActive': 'PT3H'} if namespace_filter: recently_active_params['Namespace'] = namespace_filter recently_active_metrics = [] for page_response in paginator.paginate(**recently_active_params): recently_active_metrics.extend(page_response.get('Metrics', [])) # Create set of recently active metric identifiers for fast lookup recently_active_set = set() for metric in recently_active_metrics: metric_id = self._get_metric_identifier(metric) recently_active_set.add(metric_id) # Get pricing data pricing = self.pricing_dao.get_pricing_data('metrics') metric_cost = pricing['custom_metrics_per_metric'] # Process each custom metric (FREE) metrics_with_info = [] for metric in custom_metrics: namespace = metric.get('Namespace') metric_name = metric.get('MetricName') dimensions = metric.get('Dimensions', []) dimension_count = len(dimensions) # Check if recently active metric_id = self._get_metric_identifier(metric) is_recently_active = metric_id in recently_active_set metric_info = { 'namespace': namespace, 'metric_name': metric_name, 'dimensions': dimensions, 'dimension_count': dimension_count, 'recently_active': is_recently_active, 'estimated_monthly_cost': round(metric_cost, 4), 'estimated_annual_cost': round(metric_cost * 12, 2), 'usage_estimation_method': 'dimension_count_free' } metrics_with_info.append(metric_info) # Sort by dimension count (FREE) metrics_with_info.sort(key=lambda x: x['dimension_count'], reverse=True) # Paginate (FREE) total_items = len(metrics_with_info) total_pages = (total_items + self._page_size - 1) // self._page_size start_idx = (page - 1) * self._page_size end_idx = start_idx + self._page_size paginated_metrics = metrics_with_info[start_idx:end_idx] # PAID OPERATION: Analyze current page only (if requested) if can_spend_for_exact_usage_estimate and paginated_metrics: # Use DAO method for batched analysis with caching await self.dao.get_metrics_usage_batch( paginated_metrics, lookback_days=lookback_days ) # Note: DAO updates metrics in place with datapoint_count and usage_estimation_method # Calculate totals free_tier = self.pricing_dao.get_free_tier_limits() billable_metrics = max(0, total_items - free_tier['metrics_count']) total_monthly_cost = billable_metrics * metric_cost recently_active_count = sum(1 for m in metrics_with_info if m['recently_active']) return { 'status': 'success', 'custom_metrics': paginated_metrics, 'pagination': { 'current_page': page, 'page_size': self._page_size, 'total_items': total_items, 'total_pages': total_pages, 'has_next': page < total_pages, 'has_previous': page > 1 }, 'summary': { 'total_custom_metrics': total_items, 'recently_active_metrics': recently_active_count, 'inactive_metrics': total_items - recently_active_count, 'free_tier_limit': free_tier['metrics_count'], 'free_tier_remaining': max(0, free_tier['metrics_count'] - total_items), 'billable_metrics': billable_metrics, 'total_estimated_monthly_cost': round(total_monthly_cost, 4), 'total_estimated_annual_cost': round(total_monthly_cost * 12, 2), 'usage_estimation_method': 'exact_paid_page_only' if can_spend_for_exact_usage_estimate else 'dimension_count_free', 'metrics_analyzed_for_usage': len(paginated_metrics) if can_spend_for_exact_usage_estimate else 0 }, 'pricing_info': pricing, 'optimization_tip': { 'message': f'Found {total_items - recently_active_count} metrics not active in past 3 hours. Consider removing unused metrics.', 'potential_savings_monthly': round((total_items - recently_active_count) * metric_cost, 2), 'potential_savings_annual': round((total_items - recently_active_count) * metric_cost * 12, 2) } } except Exception as e: logger.error(f"Error listing custom metrics: {str(e)}") return { 'status': 'error', 'message': str(e), 'custom_metrics': [], 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0} } def _get_metric_identifier(self, metric: Dict[str, Any]) -> str: """ Create a unique identifier for a metric based on namespace, name, and dimensions. Args: metric: Metric dictionary from list_metrics Returns: String identifier for the metric """ namespace = metric.get('Namespace', '') metric_name = metric.get('MetricName', '') dimensions = metric.get('Dimensions', []) # Sort dimensions for consistent comparison sorted_dims = sorted(dimensions, key=lambda d: d.get('Name', '')) dim_str = '|'.join(f"{d.get('Name')}={d.get('Value')}" for d in sorted_dims) return f"{namespace}::{metric_name}::{dim_str}" async def analyze_metrics_usage(self, namespace_filter: Optional[str] = None) -> Dict[str, Any]: """Analyze custom metrics usage and optimization opportunities.""" try: metrics_data = await self.dao.list_metrics(namespace=namespace_filter) # Categorize metrics aws_metrics = [m for m in metrics_data['metrics'] if m.get('Namespace', '').startswith('AWS/')] custom_metrics = [m for m in metrics_data['metrics'] if not m.get('Namespace', '').startswith('AWS/')] # Analyze by namespace custom_by_namespace = {} for metric in custom_metrics: namespace = metric.get('Namespace', 'Unknown') if namespace not in custom_by_namespace: custom_by_namespace[namespace] = [] custom_by_namespace[namespace].append(metric) # Calculate costs metrics_cost = self.pricing_dao.calculate_cost('metrics', { 'custom_metrics_count': len(custom_metrics), 'api_requests_count': 100000, 'detailed_monitoring_instances': 0 }) return { 'status': 'success', 'metrics_summary': { 'total_metrics': len(metrics_data['metrics']), 'aws_metrics': len(aws_metrics), 'custom_metrics': len(custom_metrics), 'custom_by_namespace': {ns: len(metrics) for ns, metrics in custom_by_namespace.items()} }, 'cost_analysis': metrics_cost, 'detailed_metrics': { 'custom_metrics': custom_metrics[:50], 'aws_metrics_sample': aws_metrics[:20] } } except Exception as e: logger.error(f"Error analyzing metrics usage: {str(e)}") return {'status': 'error', 'message': str(e)} class CWLogsTips: """CloudWatch Logs optimization and analysis.""" def __init__(self, dao: CloudWatchDAO, pricing_dao: AWSPricingDAO, cost_preferences: CostPreferences): self.dao = dao self.pricing_dao = pricing_dao self.cost_preferences = cost_preferences self._page_size = 10 # Items per page # Import and initialize VendedLogsDAO from services.cloudwatch_service_vended_log import VendedLogsDAO self._vended_logs_dao = VendedLogsDAO(region=dao.region) async def analyze_logs_usage(self, log_group_names: Optional[List[str]] = None) -> Dict[str, Any]: """Analyze CloudWatch Logs usage and optimization opportunities.""" try: log_groups_data = await self.dao.describe_log_groups(log_group_names=log_group_names) log_groups = log_groups_data['log_groups'] # Analyze retention without_retention = [] long_retention = [] for lg in log_groups: retention_days = lg.get('retentionInDays') if not retention_days: without_retention.append(lg.get('logGroupName')) elif retention_days > 365: long_retention.append({ 'name': lg.get('logGroupName'), 'retention_days': retention_days }) # Calculate costs total_stored_gb = sum(lg.get('storedBytes', 0) for lg in log_groups) / (1024**3) logs_cost = self.pricing_dao.calculate_cost('logs', { 'ingestion_gb': total_stored_gb * 0.1, 'storage_gb': total_stored_gb, 'insights_gb_scanned': 0 }) return { 'status': 'success', 'log_groups_summary': { 'total_log_groups': len(log_groups), 'total_stored_gb': total_stored_gb, 'without_retention_policy': len(without_retention), 'long_retention_groups': len(long_retention) }, 'cost_analysis': logs_cost, 'log_groups_details': log_groups[:50] } except Exception as e: logger.error(f"Error analyzing logs usage: {str(e)}") return {'status': 'error', 'message': str(e)} async def listLogsWithoutRetention(self, page: int = 1, log_group_name_prefix: Optional[str] = None) -> Dict[str, Any]: """ Get paginated list of log groups without retention policy, sorted by storage size descending. Log groups without retention policies store logs indefinitely, leading to unbounded costs. This method identifies these log groups to help set appropriate retention policies. Args: page: Page number (1-based) log_group_name_prefix: Optional filter for log group names Returns: Dict with log_groups, pagination, summary, and optimization recommendations """ try: # Get all log groups log_groups_data = await self.dao.describe_log_groups( log_group_name_prefix=log_group_name_prefix ) log_groups = log_groups_data['log_groups'] # Filter to log groups without retention logs_without_retention = [] for lg in log_groups: retention_days = lg.get('retentionInDays') if not retention_days: # None or 0 means never expire stored_bytes = lg.get('storedBytes', 0) stored_gb = stored_bytes / (1024**3) # Get pricing pricing = self.pricing_dao.get_pricing_data('logs') storage_cost = stored_gb * pricing['storage_per_gb_month'] logs_without_retention.append({ 'log_group_name': lg.get('logGroupName'), 'stored_gb': round(stored_gb, 4), 'stored_bytes': stored_bytes, 'creation_time': lg.get('creationTime'), 'retention_days': 'Never Expire', 'monthly_storage_cost': round(storage_cost, 4), 'annual_storage_cost': round(storage_cost * 12, 2), 'log_group_class': lg.get('logGroupClass', 'STANDARD') }) # Sort by storage size descending logs_without_retention.sort(key=lambda x: x['stored_gb'], reverse=True) # Paginate total_items = len(logs_without_retention) total_pages = (total_items + self._page_size - 1) // self._page_size start_idx = (page - 1) * self._page_size end_idx = start_idx + self._page_size paginated_logs = logs_without_retention[start_idx:end_idx] # Calculate totals total_storage_gb = sum(lg['stored_gb'] for lg in logs_without_retention) total_monthly_cost = sum(lg['monthly_storage_cost'] for lg in logs_without_retention) return { 'status': 'success', 'log_groups': paginated_logs, 'pagination': { 'current_page': page, 'page_size': self._page_size, 'total_items': total_items, 'total_pages': total_pages, 'has_next': page < total_pages, 'has_previous': page > 1 }, 'summary': { 'total_log_groups_without_retention': total_items, 'total_storage_gb': round(total_storage_gb, 4), 'total_monthly_cost': round(total_monthly_cost, 4), 'total_annual_cost': round(total_monthly_cost * 12, 2) }, 'optimization_recommendations': { 'message': 'Set retention policies to automatically delete old logs and control costs', 'recommended_retention_days': [7, 14, 30, 60, 90, 120, 180, 365, 400, 545, 731, 1827, 3653], 'common_retention_policies': { 'development_logs': '7-14 days', 'application_logs': '30-90 days', 'audit_logs': '365-3653 days (1-10 years)', 'compliance_logs': '2557-3653 days (7-10 years)' } } } except Exception as e: logger.error(f"Error listing logs without retention: {str(e)}") return { 'status': 'error', 'message': str(e), 'log_groups': [], 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0} } async def listVendedLogTargets(self, page: int = 1, log_group_name_prefix: Optional[str] = None) -> Dict[str, Any]: """ Get paginated list of log groups that can be vended directly to S3 to reduce costs. Vended logs bypass CloudWatch Logs ingestion and storage, significantly reducing costs for high-volume service logs like VPC Flow Logs, ELB Access Logs, etc. This is a complex retrieval task delegated to VendedLogsDAO. Args: page: Page number (1-based) log_group_name_prefix: Optional filter for log group names Returns: Dict with vended_log_opportunities, pagination, summary, and implementation guidance """ try: # Get all log groups log_groups_data = await self.dao.describe_log_groups( log_group_name_prefix=log_group_name_prefix ) log_groups = log_groups_data['log_groups'] # Get pricing data pricing = self.pricing_dao.get_pricing_data('logs') # Analyze vended log opportunities using VendedLogsDAO opportunities = await self._vended_logs_dao.analyze_vended_log_opportunities( log_groups, pricing ) # Sort by monthly savings descending (primary sort key for best ROI) # This ensures highest-value opportunities appear first opportunities.sort(key=lambda x: x['savings']['monthly_savings'], reverse=True) # Paginate total_items = len(opportunities) total_pages = (total_items + self._page_size - 1) // self._page_size start_idx = (page - 1) * self._page_size end_idx = start_idx + self._page_size paginated_opportunities = opportunities[start_idx:end_idx] # Calculate totals total_monthly_savings = sum(opp['savings']['monthly_savings'] for opp in opportunities) total_annual_savings = sum(opp['savings']['annual_savings'] for opp in opportunities) total_current_cost = sum(opp['current_costs']['total_monthly'] for opp in opportunities) total_vended_cost = sum(opp['vended_costs']['total_monthly'] for opp in opportunities) # Group by service type by_service = {} for opp in opportunities: service = opp['service'] if service not in by_service: by_service[service] = { 'count': 0, 'monthly_savings': 0, 'annual_savings': 0 } by_service[service]['count'] += 1 by_service[service]['monthly_savings'] += opp['savings']['monthly_savings'] by_service[service]['annual_savings'] += opp['savings']['annual_savings'] return { 'status': 'success', 'vended_log_opportunities': paginated_opportunities, 'pagination': { 'current_page': page, 'page_size': self._page_size, 'total_items': total_items, 'total_pages': total_pages, 'has_next': page < total_pages, 'has_previous': page > 1 }, 'summary': { 'total_vended_log_opportunities': total_items, 'total_current_monthly_cost': round(total_current_cost, 4), 'total_vended_monthly_cost': round(total_vended_cost, 4), 'total_monthly_savings': round(total_monthly_savings, 4), 'total_annual_savings': round(total_annual_savings, 2), 'average_cost_reduction_percentage': round( (total_monthly_savings / total_current_cost * 100) if total_current_cost > 0 else 0, 1 ), 'opportunities_by_service': by_service }, 'implementation_guidance': { 'overview': 'Vended logs bypass CloudWatch Logs and go directly to S3, reducing costs by 60-95%', 'benefits': [ 'Significant cost reduction (60-95% savings)', 'No CloudWatch Logs ingestion charges', 'Lower storage costs with S3', 'Can use S3 lifecycle policies for further cost optimization', 'Maintain compliance and audit requirements' ], 'considerations': [ 'Logs will no longer be searchable in CloudWatch Logs Insights', 'Need to use S3 Select, Athena, or other tools for log analysis', 'May require changes to existing log processing workflows', 'Some services require recreation of logging configuration' ], 'next_steps': [ '1. Review vended log opportunities sorted by savings', '2. Check implementation complexity for each service', '3. Follow documentation links for specific setup instructions', '4. Test with non-critical log groups first', '5. Update monitoring and alerting to use S3-based logs' ] } } except Exception as e: logger.error(f"Error listing vended log targets: {str(e)}") return { 'status': 'error', 'message': str(e), 'vended_log_opportunities': [], 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0} } async def listInfrequentAccessTargets(self, page: int = 1, log_group_name_prefix: Optional[str] = None) -> Dict[str, Any]: """ Get paginated list of log groups sorted by retention * storage bytes descending, excluding those already using INFREQUENT_ACCESS log group class. INFREQUENT_ACCESS class reduces storage costs by 50% for logs with retention >= 30 days. This method identifies the best candidates for cost savings. Args: page: Page number (1-based) log_group_name_prefix: Optional filter for log group names Returns: Dict with log_groups, pagination, summary, and optimization recommendations """ try: # Get all log groups log_groups_data = await self.dao.describe_log_groups( log_group_name_prefix=log_group_name_prefix ) log_groups = log_groups_data['log_groups'] # Get pricing pricing = self.pricing_dao.get_pricing_data('logs') standard_storage_cost = pricing['storage_per_gb_month'] infrequent_storage_cost = standard_storage_cost * 0.5 # 50% discount # Filter and analyze candidates infrequent_access_candidates = [] for lg in log_groups: log_group_class = lg.get('logGroupClass', 'STANDARD') retention_days = lg.get('retentionInDays', 0) stored_bytes = lg.get('storedBytes', 0) # Skip if already using INFREQUENT_ACCESS if log_group_class == 'INFREQUENT_ACCESS': continue # Only consider log groups with retention >= 30 days # (INFREQUENT_ACCESS requires minimum 30-day retention) if retention_days and retention_days >= 30: stored_gb = stored_bytes / (1024**3) # Calculate priority score: retention * storage # Higher score = more data stored for longer = more savings potential priority_score = retention_days * stored_bytes # Calculate current and potential costs current_monthly_cost = stored_gb * standard_storage_cost infrequent_monthly_cost = stored_gb * infrequent_storage_cost monthly_savings = current_monthly_cost - infrequent_monthly_cost infrequent_access_candidates.append({ 'log_group_name': lg.get('logGroupName'), 'stored_gb': round(stored_gb, 4), 'stored_bytes': stored_bytes, 'retention_days': retention_days, 'current_log_group_class': log_group_class, 'priority_score': priority_score, 'current_monthly_cost': round(current_monthly_cost, 4), 'infrequent_access_monthly_cost': round(infrequent_monthly_cost, 4), 'monthly_savings': round(monthly_savings, 4), 'annual_savings': round(monthly_savings * 12, 2), 'savings_percentage': 50.0 }) # Sort by priority score descending (retention * storage bytes) infrequent_access_candidates.sort(key=lambda x: x['priority_score'], reverse=True) # Paginate total_items = len(infrequent_access_candidates) total_pages = (total_items + self._page_size - 1) // self._page_size start_idx = (page - 1) * self._page_size end_idx = start_idx + self._page_size paginated_candidates = infrequent_access_candidates[start_idx:end_idx] # Calculate totals total_monthly_savings = sum(c['monthly_savings'] for c in infrequent_access_candidates) total_annual_savings = sum(c['annual_savings'] for c in infrequent_access_candidates) total_storage_gb = sum(c['stored_gb'] for c in infrequent_access_candidates) return { 'status': 'success', 'log_groups': paginated_candidates, 'pagination': { 'current_page': page, 'page_size': self._page_size, 'total_items': total_items, 'total_pages': total_pages, 'has_next': page < total_pages, 'has_previous': page > 1 }, 'summary': { 'total_infrequent_access_candidates': total_items, 'total_storage_gb': round(total_storage_gb, 4), 'total_monthly_savings_potential': round(total_monthly_savings, 4), 'total_annual_savings_potential': round(total_annual_savings, 2), 'average_savings_percentage': 50.0 }, 'optimization_recommendations': { 'message': 'Switch to INFREQUENT_ACCESS class for 50% storage cost reduction', 'requirements': [ 'Log group must have retention policy of 30 days or more', 'Best for logs that are rarely queried', 'Higher query costs (CloudWatch Logs Insights charges apply)', 'Ideal for compliance/audit logs with long retention' ], 'implementation': { 'method': 'Update log group class using AWS CLI or Console', 'cli_command': 'aws logs put-log-group-policy --log-group-name <name> --log-group-class INFREQUENT_ACCESS', 'reversible': True, 'downtime': 'None - change is immediate' }, 'cost_tradeoffs': { 'storage_savings': '50% reduction in storage costs', 'query_costs': 'CloudWatch Logs Insights queries cost $0.005/GB scanned', 'recommendation': 'Best for logs queried less than 10 times per month' } } } except Exception as e: logger.error(f"Error listing infrequent access targets: {str(e)}") return { 'status': 'error', 'message': str(e), 'log_groups': [], 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0} } async def listLogAnomalies(self, page: int = 1, log_group_name_prefix: Optional[str] = None) -> Dict[str, Any]: """ Get paginated list of log anomalies using ListLogAnomalyDetectors and ListAnomalies. Identifies log groups with anomaly detection enabled and lists detected anomalies to help identify unusual patterns, security issues, or operational problems. Args: page: Page number (1-based) log_group_name_prefix: Optional filter for log group names Returns: Dict with anomalies, pagination, summary, and analysis """ try: # List all anomaly detectors anomaly_detectors = [] paginator = self.dao.logs_client.get_paginator('list_log_anomaly_detectors') for page_response in paginator.paginate(): anomaly_detectors.extend(page_response.get('anomalyDetectors', [])) # Filter by prefix if provided if log_group_name_prefix: anomaly_detectors = [ ad for ad in anomaly_detectors if ad.get('logGroupArnList', []) and any(log_group_name_prefix in arn for arn in ad.get('logGroupArnList', [])) ] # Get anomalies for each detector all_anomalies = [] for detector in anomaly_detectors: detector_arn = detector.get('anomalyDetectorArn') log_group_arns = detector.get('logGroupArnList', []) try: # List anomalies for this detector anomalies_response = self.dao.logs_client.list_anomalies( anomalyDetectorArn=detector_arn ) anomalies = anomalies_response.get('anomalies', []) for anomaly in anomalies: # Extract log group name from ARN log_group_names = [] for arn in log_group_arns: # ARN format: arn:aws:logs:region:account:log-group:log-group-name:* parts = arn.split(':') if len(parts) >= 7: log_group_names.append(':'.join(parts[6:-1])) all_anomalies.append({ 'anomaly_id': anomaly.get('anomalyId'), 'detector_arn': detector_arn, 'log_group_names': log_group_names, 'first_seen': anomaly.get('firstSeen'), 'last_seen': anomaly.get('lastSeen'), 'description': anomaly.get('description'), 'pattern': anomaly.get('patternString'), 'priority': anomaly.get('priority'), 'state': anomaly.get('state'), 'is_pattern_level_suppression': anomaly.get('isPatternLevelSuppression', False), 'detector_evaluation_frequency': detector.get('evaluationFrequency', 'UNKNOWN'), 'detector_status': detector.get('anomalyDetectorStatus') }) except Exception as e: logger.warning(f"Could not list anomalies for detector {detector_arn}: {str(e)}") continue # Sort by last_seen descending (most recent first) all_anomalies.sort(key=lambda x: x.get('last_seen', 0), reverse=True) # Paginate total_items = len(all_anomalies) total_pages = (total_items + self._page_size - 1) // self._page_size start_idx = (page - 1) * self._page_size end_idx = start_idx + self._page_size paginated_anomalies = all_anomalies[start_idx:end_idx] # Analyze anomaly patterns by_priority = {'HIGH': 0, 'MEDIUM': 0, 'LOW': 0, 'UNKNOWN': 0} by_state = {} for anomaly in all_anomalies: priority = anomaly.get('priority', 'UNKNOWN') state = anomaly.get('state', 'UNKNOWN') by_priority[priority] = by_priority.get(priority, 0) + 1 by_state[state] = by_state.get(state, 0) + 1 return { 'status': 'success', 'anomalies': paginated_anomalies, 'pagination': { 'current_page': page, 'page_size': self._page_size, 'total_items': total_items, 'total_pages': total_pages, 'has_next': page < total_pages, 'has_previous': page > 1 }, 'summary': { 'total_anomalies': total_items, 'total_detectors': len(anomaly_detectors), 'anomalies_by_priority': by_priority, 'anomalies_by_state': by_state }, 'analysis': { 'message': 'Log anomalies indicate unusual patterns in your logs', 'high_priority_count': by_priority.get('HIGH', 0), 'recommendations': [ 'Investigate high-priority anomalies first', 'Review anomaly patterns for security issues', 'Consider creating alarms for recurring anomalies', 'Update application code to address root causes' ] } } except Exception as e: logger.error(f"Error listing log anomalies: {str(e)}") return { 'status': 'error', 'message': str(e), 'anomalies': [], 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0} } async def listIneffectiveLogAnomalies(self, page: int = 1, log_group_name_prefix: Optional[str] = None) -> Dict[str, Any]: """ Get paginated list of ineffective log anomaly detectors that have found no anomalies and could be disabled to reduce costs. Only includes detectors that have been active for at least half of their anomalyVisibilityTime to prevent flagging newly created detectors. Args: page: Page number (1-based) log_group_name_prefix: Optional filter for log group names Returns: Dict with ineffective_detectors, pagination, summary, and cost savings """ try: # List all anomaly detectors anomaly_detectors = [] paginator = self.dao.logs_client.get_paginator('list_log_anomaly_detectors') for page_response in paginator.paginate(): anomaly_detectors.extend(page_response.get('anomalyDetectors', [])) # Filter by prefix if provided if log_group_name_prefix: anomaly_detectors = [ ad for ad in anomaly_detectors if ad.get('logGroupArnList', []) and any(log_group_name_prefix in arn for arn in ad.get('logGroupArnList', [])) ] # Analyze each detector for effectiveness ineffective_detectors = [] current_time = datetime.now(timezone.utc) for detector in anomaly_detectors: detector_arn = detector.get('anomalyDetectorArn') log_group_arns = detector.get('logGroupArnList', []) creation_time = detector.get('creationTimeStamp') anomaly_visibility_time = detector.get('anomalyVisibilityTime', 7) # Default 7 days # Calculate detector age if creation_time: if isinstance(creation_time, (int, float)): creation_datetime = datetime.fromtimestamp(creation_time / 1000, tz=timezone.utc) else: creation_datetime = creation_time detector_age_days = (current_time - creation_datetime).days # Only consider detectors active for at least half of anomalyVisibilityTime min_age_days = anomaly_visibility_time / 2 if detector_age_days < min_age_days: continue # Skip new detectors try: # Check if detector has found any anomalies anomalies_response = self.dao.logs_client.list_anomalies( anomalyDetectorArn=detector_arn ) anomaly_count = len(anomalies_response.get('anomalies', [])) # If no anomalies found, this detector is ineffective if anomaly_count == 0: # Extract log group names from ARNs log_group_names = [] for arn in log_group_arns: parts = arn.split(':') if len(parts) >= 7: log_group_names.append(':'.join(parts[6:-1])) # Estimate cost (anomaly detection is included in CloudWatch Logs costs, # but disabling unused detectors reduces processing overhead) # Approximate cost: $0.01 per GB of logs analyzed estimated_monthly_cost = 0.50 # Conservative estimate per detector ineffective_detectors.append({ 'detector_arn': detector_arn, 'log_group_names': log_group_names, 'log_group_count': len(log_group_names), 'creation_time': creation_time, 'detector_age_days': detector_age_days, 'anomaly_visibility_time_days': anomaly_visibility_time, 'anomalies_found': 0, 'evaluation_frequency': detector.get('evaluationFrequency', 'UNKNOWN'), 'detector_status': detector.get('anomalyDetectorStatus'), 'estimated_monthly_cost': round(estimated_monthly_cost, 2), 'estimated_annual_cost': round(estimated_monthly_cost * 12, 2), 'recommendation': 'Consider disabling this detector as it has not found any anomalies' }) except Exception as e: logger.warning(f"Could not analyze detector {detector_arn}: {str(e)}") continue # Sort by detector age descending (oldest ineffective detectors first) ineffective_detectors.sort(key=lambda x: x['detector_age_days'], reverse=True) # Paginate total_items = len(ineffective_detectors) total_pages = (total_items + self._page_size - 1) // self._page_size start_idx = (page - 1) * self._page_size end_idx = start_idx + self._page_size paginated_detectors = ineffective_detectors[start_idx:end_idx] # Calculate totals total_monthly_cost = sum(d['estimated_monthly_cost'] for d in ineffective_detectors) total_annual_cost = sum(d['estimated_annual_cost'] for d in ineffective_detectors) return { 'status': 'success', 'ineffective_detectors': paginated_detectors, 'pagination': { 'current_page': page, 'page_size': self._page_size, 'total_items': total_items, 'total_pages': total_pages, 'has_next': page < total_pages, 'has_previous': page > 1 }, 'summary': { 'total_ineffective_detectors': total_items, 'total_detectors_analyzed': len(anomaly_detectors), 'ineffective_percentage': round( (total_items / len(anomaly_detectors) * 100) if len(anomaly_detectors) > 0 else 0, 1 ), 'estimated_monthly_cost': round(total_monthly_cost, 2), 'estimated_annual_cost': round(total_annual_cost, 2) }, 'optimization_recommendations': { 'message': 'Disable ineffective anomaly detectors to reduce processing costs', 'criteria': f'Detectors included have been active for at least half of their anomalyVisibilityTime and found no anomalies', 'actions': [ 'Review detector configuration and log patterns', 'Consider if anomaly detection is needed for these log groups', 'Disable detectors that are not providing value', 'Monitor for a period before permanent deletion' ], 'cost_savings': { 'monthly': round(total_monthly_cost, 2), 'annual': round(total_annual_cost, 2) } } } except Exception as e: logger.error(f"Error listing ineffective log anomaly detectors: {str(e)}") return { 'status': 'error', 'message': str(e), 'ineffective_detectors': [], 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0} } class CWAlarmsTips: """CloudWatch Alarms optimization and analysis.""" def __init__(self, dao: CloudWatchDAO, pricing_dao: AWSPricingDAO, cost_preferences: CostPreferences): self.dao = dao self.pricing_dao = pricing_dao self.cost_preferences = cost_preferences self._page_size = 10 # Items per page async def listAlarm(self, page: int = 1, alarm_name_prefix: Optional[str] = None, can_use_expensive_cost_to_estimate: bool = False, lookback_days: int = 30) -> Dict[str, Any]: """ List all alarms ordered by estimated cost (descending), paginated. Args: page: Page number (1-based) alarm_name_prefix: Optional filter for alarm names can_use_expensive_cost_to_estimate: If True, uses CloudWatch Metrics API (paid) to get accurate alarm evaluation counts. If False (default), estimates cost by dimension count. lookback_days: Number of days to analyze for evaluation counts (only used if can_use_expensive_cost_to_estimate=True) Returns: Dict with alarms, pagination, summary, and pricing_info """ try: # Get all alarms from DAO alarms_data = await self.dao.describe_alarms(alarm_name_prefix=alarm_name_prefix) alarms = alarms_data['alarms'] # Get pricing data pricing = self.pricing_dao.get_pricing_data('alarms') # Calculate cost for each alarm alarms_with_cost = [] for alarm in alarms: alarm_info = { 'alarm_name': alarm.get('AlarmName'), 'alarm_arn': alarm.get('AlarmArn'), 'state_value': alarm.get('StateValue'), 'state_reason': alarm.get('StateReason'), 'actions_enabled': alarm.get('ActionsEnabled', False), 'alarm_actions': alarm.get('AlarmActions', []), 'ok_actions': alarm.get('OKActions', []), 'insufficient_data_actions': alarm.get('InsufficientDataActions', []) } # Determine alarm type and calculate cost if 'MetricName' in alarm: # Metric alarm dimensions = alarm.get('Dimensions', []) dimension_count = len(dimensions) period = alarm.get('Period', 300) alarm_info['alarm_type'] = 'metric' alarm_info['metric_name'] = alarm.get('MetricName') alarm_info['namespace'] = alarm.get('Namespace') alarm_info['dimensions'] = dimensions alarm_info['dimension_count'] = dimension_count alarm_info['period'] = period # Determine if high resolution is_high_resolution = period < 300 alarm_info['is_high_resolution'] = is_high_resolution if is_high_resolution: alarm_info['monthly_cost'] = pricing['high_resolution_alarms_per_alarm'] alarm_info['cost_type'] = 'high_resolution' else: alarm_info['monthly_cost'] = pricing['standard_alarms_per_alarm'] alarm_info['cost_type'] = 'standard' # Estimate cost based on dimensions (more dimensions = more complexity) alarm_info['estimated_cost_score'] = dimension_count * alarm_info['monthly_cost'] alarm_info['cost_estimation_method'] = 'dimension_based' elif 'AlarmRule' in alarm: # Composite alarm alarm_info['alarm_type'] = 'composite' alarm_info['alarm_rule'] = alarm.get('AlarmRule') alarm_info['dimension_count'] = 0 alarm_info['monthly_cost'] = pricing['composite_alarms_per_alarm'] alarm_info['cost_type'] = 'composite' alarm_info['estimated_cost_score'] = alarm_info['monthly_cost'] alarm_info['cost_estimation_method'] = 'fixed_composite' else: # Unknown type alarm_info['alarm_type'] = 'unknown' alarm_info['dimension_count'] = 0 alarm_info['monthly_cost'] = 0 alarm_info['cost_type'] = 'unknown' alarm_info['estimated_cost_score'] = 0 alarm_info['cost_estimation_method'] = 'unknown' # If expensive cost estimation is enabled, get actual evaluation counts if can_use_expensive_cost_to_estimate and alarm_info['alarm_type'] == 'metric': try: end_time = datetime.now(timezone.utc) start_time = end_time - timedelta(days=lookback_days) # Get alarm evaluation metrics eval_data = await self.dao.get_metric_statistics( namespace='AWS/CloudWatch', metric_name='AlarmEvaluations', dimensions=[{'Name': 'AlarmName', 'Value': alarm_info['alarm_name']}], start_time=start_time, end_time=end_time, period=86400, # Daily statistics=['Sum'] ) total_evaluations = sum(dp['Sum'] for dp in eval_data['datapoints']) alarm_info['evaluation_count'] = int(total_evaluations) alarm_info['cost_estimation_method'] = 'actual_evaluations' # Actual cost is still the fixed monthly rate, but we have usage data alarm_info['actual_evaluations_per_day'] = total_evaluations / lookback_days if lookback_days > 0 else 0 except Exception as e: logger.warning(f"Failed to get evaluation metrics for {alarm_info['alarm_name']}: {str(e)}") alarm_info['evaluation_count'] = None alarm_info['cost_estimation_method'] = 'dimension_based_fallback' alarm_info['annual_cost'] = alarm_info['monthly_cost'] * 12 alarms_with_cost.append(alarm_info) # Sort by estimated cost score (descending) alarms_with_cost.sort(key=lambda x: x['estimated_cost_score'], reverse=True) # Calculate pagination total_alarms = len(alarms_with_cost) total_pages = (total_alarms + self._page_size - 1) // self._page_size page = max(1, min(page, total_pages)) if total_pages > 0 else 1 start_idx = (page - 1) * self._page_size end_idx = start_idx + self._page_size paginated_alarms = alarms_with_cost[start_idx:end_idx] # Calculate summary total_monthly_cost = sum(a['monthly_cost'] for a in alarms_with_cost) standard_count = sum(1 for a in alarms_with_cost if a.get('cost_type') == 'standard') high_res_count = sum(1 for a in alarms_with_cost if a.get('cost_type') == 'high_resolution') composite_count = sum(1 for a in alarms_with_cost if a.get('cost_type') == 'composite') free_tier = self.pricing_dao.get_free_tier_limits() billable_standard = max(0, standard_count - free_tier['alarms_count']) return { 'status': 'success', 'alarms': paginated_alarms, 'pagination': { 'current_page': page, 'page_size': self._page_size, 'total_items': total_alarms, 'total_pages': total_pages, 'has_next': page < total_pages, 'has_previous': page > 1 }, 'summary': { 'total_alarms': total_alarms, 'standard_alarms': standard_count, 'high_resolution_alarms': high_res_count, 'composite_alarms': composite_count, 'billable_standard_alarms': billable_standard, 'total_monthly_cost': round(total_monthly_cost, 2), 'total_annual_cost': round(total_monthly_cost * 12, 2), 'cost_estimation_method': 'actual_evaluations' if can_use_expensive_cost_to_estimate else 'dimension_based' }, 'pricing_info': { 'standard_alarm_cost': pricing['standard_alarms_per_alarm'], 'high_resolution_alarm_cost': pricing['high_resolution_alarms_per_alarm'], 'composite_alarm_cost': pricing['composite_alarms_per_alarm'], 'free_tier_alarms': free_tier['alarms_count'] }, 'filters_applied': { 'alarm_name_prefix': alarm_name_prefix } } except Exception as e: logger.error(f"Error listing alarms: {str(e)}") return {'status': 'error', 'message': str(e)} async def listInvalidAlarm(self, page: int = 1, alarm_name_prefix: Optional[str] = None) -> Dict[str, Any]: """ List alarms with INSUFFICIENT_DATA state, ordered by dimension count (descending), paginated. Alarms in INSUFFICIENT_DATA state may indicate: - Misconfigured metrics or dimensions - Resources that no longer exist - Metrics that are not being published - Potential cost waste on non-functional alarms Args: page: Page number (1-based) alarm_name_prefix: Optional filter for alarm names Returns: Dict with invalid alarms, pagination, summary, and optimization recommendations """ try: # Get alarms with INSUFFICIENT_DATA state alarms_data = await self.dao.describe_alarms( alarm_name_prefix=alarm_name_prefix, state_value='INSUFFICIENT_DATA' ) alarms = alarms_data['alarms'] # Get pricing data pricing = self.pricing_dao.get_pricing_data('alarms') # Process each alarm invalid_alarms = [] for alarm in alarms: alarm_info = { 'alarm_name': alarm.get('AlarmName'), 'alarm_arn': alarm.get('AlarmArn'), 'state_value': alarm.get('StateValue'), 'state_reason': alarm.get('StateReason'), 'state_updated_timestamp': alarm.get('StateUpdatedTimestamp'), 'actions_enabled': alarm.get('ActionsEnabled', False), 'alarm_actions': alarm.get('AlarmActions', []), 'alarm_description': alarm.get('AlarmDescription', '') } # Calculate how long it's been in INSUFFICIENT_DATA state state_updated = alarm.get('StateUpdatedTimestamp') if state_updated: if isinstance(state_updated, str): state_updated = datetime.fromisoformat(state_updated.replace('Z', '+00:00')) days_in_state = (datetime.now(timezone.utc) - state_updated).days alarm_info['days_in_insufficient_data_state'] = days_in_state else: alarm_info['days_in_insufficient_data_state'] = None # Determine alarm type and cost if 'MetricName' in alarm: # Metric alarm dimensions = alarm.get('Dimensions', []) dimension_count = len(dimensions) period = alarm.get('Period', 300) alarm_info['alarm_type'] = 'metric' alarm_info['metric_name'] = alarm.get('MetricName') alarm_info['namespace'] = alarm.get('Namespace') alarm_info['dimensions'] = dimensions alarm_info['dimension_count'] = dimension_count alarm_info['period'] = period # Determine if high resolution is_high_resolution = period < 300 alarm_info['is_high_resolution'] = is_high_resolution if is_high_resolution: alarm_info['monthly_cost'] = pricing['high_resolution_alarms_per_alarm'] alarm_info['cost_type'] = 'high_resolution' else: alarm_info['monthly_cost'] = pricing['standard_alarms_per_alarm'] alarm_info['cost_type'] = 'standard' elif 'AlarmRule' in alarm: # Composite alarm alarm_info['alarm_type'] = 'composite' alarm_info['alarm_rule'] = alarm.get('AlarmRule') alarm_info['dimension_count'] = 0 alarm_info['monthly_cost'] = pricing['composite_alarms_per_alarm'] alarm_info['cost_type'] = 'composite' else: # Unknown type alarm_info['alarm_type'] = 'unknown' alarm_info['dimension_count'] = 0 alarm_info['monthly_cost'] = 0 alarm_info['cost_type'] = 'unknown' alarm_info['annual_cost'] = alarm_info['monthly_cost'] * 12 # Add optimization recommendation if alarm_info.get('days_in_insufficient_data_state', 0) > 7: alarm_info['recommendation'] = 'Consider deleting - in INSUFFICIENT_DATA state for over 7 days' alarm_info['recommendation_priority'] = 'high' elif alarm_info.get('days_in_insufficient_data_state', 0) > 3: alarm_info['recommendation'] = 'Review alarm configuration - prolonged INSUFFICIENT_DATA state' alarm_info['recommendation_priority'] = 'medium' else: alarm_info['recommendation'] = 'Monitor - recently entered INSUFFICIENT_DATA state' alarm_info['recommendation_priority'] = 'low' invalid_alarms.append(alarm_info) # Sort by dimension count (descending) - more complex alarms first invalid_alarms.sort(key=lambda x: x.get('dimension_count', 0), reverse=True) # Calculate pagination total_invalid = len(invalid_alarms) total_pages = (total_invalid + self._page_size - 1) // self._page_size page = max(1, min(page, total_pages)) if total_pages > 0 else 1 start_idx = (page - 1) * self._page_size end_idx = start_idx + self._page_size paginated_alarms = invalid_alarms[start_idx:end_idx] # Calculate summary total_wasted_monthly_cost = sum(a['monthly_cost'] for a in invalid_alarms) high_priority_count = sum(1 for a in invalid_alarms if a.get('recommendation_priority') == 'high') standard_count = sum(1 for a in invalid_alarms if a.get('cost_type') == 'standard') high_res_count = sum(1 for a in invalid_alarms if a.get('cost_type') == 'high_resolution') composite_count = sum(1 for a in invalid_alarms if a.get('cost_type') == 'composite') return { 'status': 'success', 'invalid_alarms': paginated_alarms, 'pagination': { 'current_page': page, 'page_size': self._page_size, 'total_items': total_invalid, 'total_pages': total_pages, 'has_next': page < total_pages, 'has_previous': page > 1 }, 'summary': { 'total_invalid_alarms': total_invalid, 'standard_alarms': standard_count, 'high_resolution_alarms': high_res_count, 'composite_alarms': composite_count, 'high_priority_recommendations': high_priority_count, 'potential_monthly_savings': round(total_wasted_monthly_cost, 2), 'potential_annual_savings': round(total_wasted_monthly_cost * 12, 2) }, 'optimization_insight': { 'message': f'Found {total_invalid} alarms in INSUFFICIENT_DATA state, potentially wasting ${round(total_wasted_monthly_cost, 2)}/month', 'action': 'Review and delete or fix alarms that have been in INSUFFICIENT_DATA state for extended periods' }, 'filters_applied': { 'alarm_name_prefix': alarm_name_prefix, 'state_filter': 'INSUFFICIENT_DATA' } } except Exception as e: logger.error(f"Error listing invalid alarms: {str(e)}") return {'status': 'error', 'message': str(e)} async def analyze_alarms_usage(self, alarm_names: Optional[List[str]] = None) -> Dict[str, Any]: """Analyze CloudWatch Alarms usage and optimization opportunities.""" try: alarms_data = await self.dao.describe_alarms(alarm_names=alarm_names) alarms = alarms_data['alarms'] # Categorize alarms standard_alarms = 0 high_resolution_alarms = 0 composite_alarms = 0 alarms_without_actions = [] for alarm in alarms: if 'MetricName' in alarm: if alarm.get('Period', 300) < 300: high_resolution_alarms += 1 else: standard_alarms += 1 else: composite_alarms += 1 # Check for actions if not (alarm.get('AlarmActions') or alarm.get('OKActions') or alarm.get('InsufficientDataActions')): alarms_without_actions.append(alarm.get('AlarmName')) # Calculate costs alarms_cost = self.pricing_dao.calculate_cost('alarms', { 'standard_alarms_count': standard_alarms, 'high_resolution_alarms_count': high_resolution_alarms, 'composite_alarms_count': composite_alarms }) return { 'status': 'success', 'alarms_summary': { 'total_alarms': len(alarms), 'standard_alarms': standard_alarms, 'high_resolution_alarms': high_resolution_alarms, 'composite_alarms': composite_alarms, 'alarms_without_actions': len(alarms_without_actions) }, 'cost_analysis': alarms_cost, 'alarms_details': alarms[:50] } except Exception as e: logger.error(f"Error analyzing alarms usage: {str(e)}") return {'status': 'error', 'message': str(e)} class CWDashboardTips: """CloudWatch Dashboards optimization and analysis.""" def __init__(self, dao: CloudWatchDAO, pricing_dao: AWSPricingDAO, cost_preferences: CostPreferences): self.dao = dao self.pricing_dao = pricing_dao self.cost_preferences = cost_preferences self._page_size = 10 # Items per page async def listDashboard(self, page: int = 1, dashboard_name_prefix: Optional[str] = None) -> Dict[str, Any]: """ List dashboards ordered by total metric dimensions referenced (descending), paginated. Dashboards with more metric dimensions are typically more complex and may: - Be more expensive to maintain - Have performance issues - Be harder to understand and maintain Args: page: Page number (1-based) dashboard_name_prefix: Optional filter for dashboard names Returns: Dict with dashboards, pagination, summary, and pricing_info """ try: # Get all dashboards from DAO dashboards_data = await self.dao.list_dashboards(dashboard_name_prefix=dashboard_name_prefix) dashboards = dashboards_data['dashboards'] # Get pricing data pricing = self.pricing_dao.get_pricing_data('dashboards') free_tier = self.pricing_dao.get_free_tier_limits() # Process each dashboard dashboards_with_metrics = [] now = datetime.now(timezone.utc) for dashboard in dashboards: dashboard_name = dashboard.get('DashboardName') dashboard_info = { 'dashboard_name': dashboard_name, 'dashboard_arn': dashboard.get('DashboardArn'), 'last_modified': dashboard.get('LastModified'), 'size': dashboard.get('Size', 0) } # Calculate days since last modified last_modified = dashboard.get('LastModified') if last_modified: if isinstance(last_modified, str): last_modified = datetime.fromisoformat(last_modified.replace('Z', '+00:00')) days_since_modified = (now - last_modified).days dashboard_info['days_since_modified'] = days_since_modified dashboard_info['is_stale'] = days_since_modified > 90 else: dashboard_info['days_since_modified'] = None dashboard_info['is_stale'] = False # Get dashboard body to analyze metrics try: dashboard_config = await self.dao.get_dashboard(dashboard_name) dashboard_body = dashboard_config.get('dashboard_body', '{}') # Parse dashboard body if isinstance(dashboard_body, str): dashboard_body = json.loads(dashboard_body) # Count metrics and dimensions total_metrics = 0 total_dimensions = 0 unique_namespaces = set() unique_metric_names = set() widgets = dashboard_body.get('widgets', []) for widget in widgets: properties = widget.get('properties', {}) metrics = properties.get('metrics', []) for metric in metrics: if isinstance(metric, list) and len(metric) >= 2: # Metric format: [namespace, metric_name, dim1_name, dim1_value, dim2_name, dim2_value, ...] namespace = metric[0] metric_name = metric[1] unique_namespaces.add(namespace) unique_metric_names.add(f"{namespace}/{metric_name}") total_metrics += 1 # Count dimensions (pairs after namespace and metric_name) dimension_count = (len(metric) - 2) // 2 total_dimensions += dimension_count dashboard_info['total_metrics'] = total_metrics dashboard_info['total_dimensions'] = total_dimensions dashboard_info['unique_namespaces'] = len(unique_namespaces) dashboard_info['unique_metric_names'] = len(unique_metric_names) dashboard_info['widget_count'] = len(widgets) dashboard_info['avg_dimensions_per_metric'] = round(total_dimensions / total_metrics, 2) if total_metrics > 0 else 0 except Exception as e: logger.warning(f"Failed to parse dashboard {dashboard_name}: {str(e)}") dashboard_info['total_metrics'] = 0 dashboard_info['total_dimensions'] = 0 dashboard_info['unique_namespaces'] = 0 dashboard_info['unique_metric_names'] = 0 dashboard_info['widget_count'] = 0 dashboard_info['avg_dimensions_per_metric'] = 0 dashboard_info['parse_error'] = str(e) # Calculate cost (fixed per dashboard) dashboard_info['monthly_cost'] = pricing['dashboard_per_month'] dashboard_info['annual_cost'] = pricing['dashboard_per_month'] * 12 dashboards_with_metrics.append(dashboard_info) # Sort by total dimensions (descending) dashboards_with_metrics.sort(key=lambda x: x['total_dimensions'], reverse=True) # Calculate pagination total_dashboards = len(dashboards_with_metrics) total_pages = (total_dashboards + self._page_size - 1) // self._page_size page = max(1, min(page, total_pages)) if total_pages > 0 else 1 start_idx = (page - 1) * self._page_size end_idx = start_idx + self._page_size paginated_dashboards = dashboards_with_metrics[start_idx:end_idx] # Calculate summary billable_dashboards = max(0, total_dashboards - free_tier['dashboards_count']) total_monthly_cost = billable_dashboards * pricing['dashboard_per_month'] total_metrics_all = sum(d['total_metrics'] for d in dashboards_with_metrics) total_dimensions_all = sum(d['total_dimensions'] for d in dashboards_with_metrics) stale_count = sum(1 for d in dashboards_with_metrics if d['is_stale']) return { 'status': 'success', 'dashboards': paginated_dashboards, 'pagination': { 'current_page': page, 'page_size': self._page_size, 'total_items': total_dashboards, 'total_pages': total_pages, 'has_next': page < total_pages, 'has_previous': page > 1 }, 'summary': { 'total_dashboards': total_dashboards, 'billable_dashboards': billable_dashboards, 'free_tier_dashboards': min(total_dashboards, free_tier['dashboards_count']), 'total_metrics': total_metrics_all, 'total_dimensions': total_dimensions_all, 'avg_dimensions_per_dashboard': round(total_dimensions_all / total_dashboards, 2) if total_dashboards > 0 else 0, 'stale_dashboards': stale_count, 'total_monthly_cost': round(total_monthly_cost, 2), 'total_annual_cost': round(total_monthly_cost * 12, 2) }, 'pricing_info': { 'dashboard_cost': pricing['dashboard_per_month'], 'free_tier_dashboards': free_tier['dashboards_count'], 'free_tier_metrics_per_dashboard': free_tier['dashboard_metrics'] }, 'optimization_insights': { 'high_complexity_dashboards': sum(1 for d in dashboards_with_metrics if d['total_dimensions'] > 50), 'stale_dashboards': stale_count, 'recommendation': 'Review dashboards with high dimension counts for optimization opportunities' if total_dimensions_all > 100 else 'Dashboard complexity is reasonable' }, 'filters_applied': { 'dashboard_name_prefix': dashboard_name_prefix } } except Exception as e: logger.error(f"Error listing dashboards: {str(e)}") return {'status': 'error', 'message': str(e)} async def analyze_dashboards_usage(self, dashboard_names: Optional[List[str]] = None) -> Dict[str, Any]: """Analyze CloudWatch Dashboards usage and optimization opportunities.""" try: dashboards_data = await self.dao.list_dashboards() dashboards = dashboards_data['dashboards'] if dashboard_names: dashboards = [d for d in dashboards if d.get('DashboardName') in dashboard_names] # Analyze stale dashboards stale_dashboards = [] now = datetime.now(timezone.utc) for dashboard in dashboards: last_modified = dashboard.get('LastModified') if last_modified: if isinstance(last_modified, str): last_modified = datetime.fromisoformat(last_modified.replace('Z', '+00:00')) days_since_modified = (now - last_modified).days if days_since_modified > 90: stale_dashboards.append({ 'name': dashboard.get('DashboardName'), 'days_since_modified': days_since_modified }) # Calculate costs dashboards_cost = self.pricing_dao.calculate_cost('dashboards', { 'dashboards_count': len(dashboards) }) return { 'status': 'success', 'dashboards_summary': { 'total_dashboards': len(dashboards), 'exceeds_free_tier': len(dashboards) > 3, 'billable_dashboards': max(0, len(dashboards) - 3), 'stale_dashboards': len(stale_dashboards) }, 'cost_analysis': dashboards_cost, 'dashboards_details': dashboards[:20] } except Exception as e: logger.error(f"Error analyzing dashboards usage: {str(e)}") return {'status': 'error', 'message': str(e)} class CloudWatchService: """ Main CloudWatch service that manages specialized tip classes. This service acts as a factory and lifecycle manager for: - CWGeneralSpendTips: General spending analysis - CWMetricsTips: Metrics optimization - CWLogsTips: Logs optimization - CWAlarmsTips: Alarms optimization - CWDashboardTips: Dashboard optimization Usage: service = CloudWatchService(region='us-east-1') general_tips = service.getGeneralSpendService() result = await general_tips.analyze_overall_spending() """ def __init__(self, config: Optional[CloudWatchServiceConfig] = None, region: Optional[str] = None): """Initialize CloudWatch service with configuration.""" # Handle backward compatibility if region is not None and config is None: config = CloudWatchServiceConfig(region=region) self.config = config or CloudWatchServiceConfig() self.region = self.config.region # Initialize cost controller self.cost_controller = CostController() self.cost_preferences = self.config.cost_preferences or CostPreferences() # Initialize DAOs (shared by all tip classes) self._dao = CloudWatchDAO(region=self.region, cost_controller=self.cost_controller) self._pricing_dao = AWSPricingDAO(region=self.region or 'us-east-1') # Initialize tip classes (lazy loading) self._general_spend_tips = None self._metrics_tips = None self._logs_tips = None self._alarms_tips = None self._dashboard_tips = None log_cloudwatch_operation(logger, "service_initialization", region=self.region or 'default', cost_preferences=str(self.cost_preferences)) def getGeneralSpendService(self) -> CWGeneralSpendTips: """Get general spending analysis service.""" if self._general_spend_tips is None: self._general_spend_tips = CWGeneralSpendTips( self._dao, self._pricing_dao, self.cost_preferences ) return self._general_spend_tips def getMetricsService(self) -> CWMetricsTips: """Get metrics optimization service.""" if self._metrics_tips is None: self._metrics_tips = CWMetricsTips( self._dao, self._pricing_dao, self.cost_preferences ) return self._metrics_tips def getLogsService(self) -> CWLogsTips: """Get logs optimization service.""" if self._logs_tips is None: self._logs_tips = CWLogsTips( self._dao, self._pricing_dao, self.cost_preferences ) return self._logs_tips def getAlarmsService(self) -> CWAlarmsTips: """Get alarms optimization service.""" if self._alarms_tips is None: self._alarms_tips = CWAlarmsTips( self._dao, self._pricing_dao, self.cost_preferences ) return self._alarms_tips def getDashboardsService(self) -> CWDashboardTips: """Get dashboards optimization service.""" if self._dashboard_tips is None: self._dashboard_tips = CWDashboardTips( self._dao, self._pricing_dao, self.cost_preferences ) return self._dashboard_tips @property def pricing(self): """Backward compatibility property for pricing DAO.""" return self._pricing_dao def update_cost_preferences(self, preferences: CostPreferences): """Update cost control preferences for all services.""" self.cost_preferences = preferences # Update preferences in existing tip classes if self._general_spend_tips: self._general_spend_tips.cost_preferences = preferences if self._metrics_tips: self._metrics_tips.cost_preferences = preferences if self._logs_tips: self._logs_tips.cost_preferences = preferences if self._alarms_tips: self._alarms_tips.cost_preferences = preferences if self._dashboard_tips: self._dashboard_tips.cost_preferences = preferences log_cloudwatch_operation(logger, "cost_preferences_update", preferences=str(preferences)) def get_service_statistics(self) -> Dict[str, Any]: """Get service statistics including cache performance.""" return { 'service_info': { 'region': self.region, 'cost_preferences': self.cost_preferences.__dict__, 'initialized_services': { 'general_spend': self._general_spend_tips is not None, 'metrics': self._metrics_tips is not None, 'logs': self._logs_tips is not None, 'alarms': self._alarms_tips is not None, 'dashboards': self._dashboard_tips is not None } }, 'cache_statistics': self._dao.get_cache_stats(), 'cost_control_status': { 'cost_controller_active': True, 'preferences_validated': True, 'transparency_enabled': self.config.enable_cost_tracking } } def clear_cache(self) -> None: """Clear all cached data.""" self._dao.clear_cache() log_cloudwatch_operation(logger, "cache_cleared") # Convenience function for backward compatibility async def create_cloudwatch_service(region: Optional[str] = None, cost_preferences: Optional[CostPreferences] = None) -> CloudWatchService: """Create a configured CloudWatchService instance.""" config = CloudWatchServiceConfig(region=region, cost_preferences=cost_preferences) service = CloudWatchService(config) log_cloudwatch_operation(logger, "service_created", region=region or 'default') return service

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aws-samples/sample-cfm-tips-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server