Skip to main content
Glama

CFM Tips - Cost Optimization MCP Server

by aws-samples
s3_service.py59.3 kB
""" AWS S3 service module with strict no-cost constraints. This module provides functions for interacting with AWS S3, CloudWatch, and Cost Explorer APIs for S3 cost optimization analysis. All S3 operations are strictly limited to no-cost API calls only. CRITICAL: This service enforces NO-COST constraints to prevent customer billing. Only metadata-only S3 operations are allowed. Object-level operations are FORBIDDEN. """ import asyncio import logging import time from typing import Dict, List, Optional, Any, Iterator, Set import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) # CRITICAL: Whitelist of allowed no-cost S3 API operations ALLOWED_S3_OPERATIONS = { 'list_buckets', # List bucket names only - NO COST 'get_bucket_location', # Get bucket region - NO COST 'get_bucket_lifecycle_configuration', # Get lifecycle policies - NO COST 'get_bucket_versioning', # Get versioning status - NO COST 'get_bucket_tagging', # Get bucket tags - NO COST 'list_multipart_uploads', # List incomplete uploads (metadata only) - NO COST 'get_bucket_policy', # Get bucket policy - NO COST 'get_bucket_acl', # Get bucket ACL - NO COST 'get_bucket_encryption', # Get encryption config - NO COST 'get_public_access_block', # Get public access block - NO COST 'get_bucket_notification_configuration', # Get notification config - NO COST 'get_bucket_cors', # Get CORS config - NO COST 'get_bucket_website', # Get website config - NO COST 'get_bucket_logging', # Get logging config - NO COST 'get_bucket_replication', # Get replication config - NO COST 'get_bucket_request_payment', # Get request payment config - NO COST } # CRITICAL: Forbidden operations that incur costs FORBIDDEN_S3_OPERATIONS = { 'list_objects', # COSTS per 1000 requests 'list_objects_v2', # COSTS per 1000 requests 'head_object', # COSTS per request 'get_object', # COSTS per request + data transfer 'get_object_attributes', # COSTS per request 'select_object_content', # COSTS per request + data scanned 'restore_object', # COSTS for retrieval } class S3CostConstraintViolationError(Exception): """Raised when attempting to use a cost-incurring S3 operation.""" pass class S3Service: """ Enhanced S3 service class with strict no-cost constraints and async support. CRITICAL: This service enforces NO-COST operations only to prevent customer billing. All S3 operations are validated against a whitelist of allowed no-cost API calls. """ def __init__(self, region: Optional[str] = None): """ Initialize S3Service with AWS clients and no-cost validation. Args: region: AWS region (optional) """ self.region = region self._operation_call_count = {} # Track API calls for monitoring try: # Initialize AWS clients self.s3_client = boto3.client('s3', region_name=region) self.cloudwatch_client = boto3.client('cloudwatch', region_name=region) # Cost Explorer is only available in us-east-1 self.ce_client = boto3.client('ce', region_name='us-east-1') logger.info(f"S3Service initialized for region: {region or 'default'} with NO-COST constraints enabled") except Exception as e: logger.error(f"Failed to initialize S3Service: {str(e)}") raise def _validate_s3_operation(self, operation_name: str) -> None: """ Validate that an S3 operation is allowed (no-cost only). Args: operation_name: Name of the S3 operation Raises: S3CostConstraintViolationError: If operation would incur costs """ if operation_name in FORBIDDEN_S3_OPERATIONS: error_msg = ( f"FORBIDDEN: S3 operation '{operation_name}' would incur costs. " f"This service only allows no-cost operations: {sorted(ALLOWED_S3_OPERATIONS)}" ) logger.error(error_msg) raise S3CostConstraintViolationError(error_msg) if operation_name not in ALLOWED_S3_OPERATIONS: logger.warning(f"S3 operation '{operation_name}' not in whitelist. Proceeding with caution.") # Track operation calls for monitoring self._operation_call_count[operation_name] = self._operation_call_count.get(operation_name, 0) + 1 logger.debug(f"S3 operation '{operation_name}' validated as no-cost (call #{self._operation_call_count[operation_name]})") def get_operation_stats(self) -> Dict[str, int]: """ Get statistics on S3 operations called during this session. Returns: Dictionary of operation names and call counts """ return self._operation_call_count.copy() async def safe_api_call(self, api_func, *args, retry_count: int = 0, max_retries: int = 3, fallback_sources: Optional[List[str]] = None, **kwargs) -> Dict[str, Any]: """ Enhanced wrapper for safe API calls with async support, error handling, and fallback strategies. Args: api_func: The API function to call *args: Positional arguments for the API function retry_count: Current retry attempt max_retries: Maximum number of retries fallback_sources: List of alternative data sources to try on failure **kwargs: Keyword arguments for the API function Returns: Dictionary with API response or error information """ func_name = getattr(api_func, '__name__', 'unknown_function') # Validate S3 operations for cost constraints if hasattr(api_func, '__self__') and hasattr(api_func.__self__, '_service_model'): service_name = api_func.__self__._service_model.service_name if service_name == 's3': try: self._validate_s3_operation(func_name) except S3CostConstraintViolationError as e: return { "status": "error", "message": str(e), "error_code": "COST_CONSTRAINT_VIOLATION", "fallback_available": bool(fallback_sources) } try: # Run API call in thread pool for async compatibility loop = asyncio.get_event_loop() response = await loop.run_in_executor(None, lambda: api_func(*args, **kwargs)) logger.debug(f"Successful API call: {func_name}") return { "status": "success", "data": response, "source": "primary" } except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') error_message = e.response.get('Error', {}).get('Message', str(e)) # Handle specific error cases with enhanced logging if error_code == 'AccessDenied': logger.warning(f"Access denied for API call: {func_name}. Fallback sources: {fallback_sources}") return { "status": "error", "message": f"Insufficient permissions for {func_name}", "error_code": error_code, "fallback_available": bool(fallback_sources), "suggested_fallback": fallback_sources[0] if fallback_sources else None } elif error_code in ['Throttling', 'ThrottlingException', 'RequestLimitExceeded']: if retry_count < max_retries: # Exponential backoff with jitter: 2^retry_count + random(0, 1) import random sleep_time = (2 ** retry_count) + random.random() logger.info(f"Rate limited, retrying {func_name} in {sleep_time:.2f} seconds (attempt {retry_count + 1}/{max_retries})") await asyncio.sleep(sleep_time) return await self.safe_api_call(api_func, *args, retry_count=retry_count + 1, max_retries=max_retries, fallback_sources=fallback_sources, **kwargs) else: logger.error(f"Max retries exceeded for {func_name}. Fallback sources: {fallback_sources}") return { "status": "error", "message": f"Rate limit exceeded for {func_name} after {max_retries} retries", "error_code": error_code, "fallback_available": bool(fallback_sources), "suggested_fallback": fallback_sources[0] if fallback_sources else None } elif error_code in ['NoSuchBucket', 'NoSuchKey', 'NoSuchLifecycleConfiguration', 'NoSuchTagSet', 'NoSuchBucketPolicy', 'ServerSideEncryptionConfigurationNotFoundError', 'NoSuchPublicAccessBlockConfiguration']: # These are expected "not found" errors, not actual failures logger.debug(f"Resource not found for {func_name}: {error_code}") return { "status": "success", "data": {}, "message": f"Resource not found: {error_code}", "source": "primary", "resource_not_found": True } else: logger.error(f"AWS API error in {func_name}: {error_message}. Fallback sources: {fallback_sources}") return { "status": "error", "message": f"AWS API error: {error_message}", "error_code": error_code, "fallback_available": bool(fallback_sources), "suggested_fallback": fallback_sources[0] if fallback_sources else None } except Exception as e: logger.error(f"Unexpected error in {func_name}: {str(e)}. Fallback sources: {fallback_sources}") return { "status": "error", "message": f"Unexpected error: {str(e)}", "fallback_available": bool(fallback_sources), "suggested_fallback": fallback_sources[0] if fallback_sources else None } def safe_api_call_sync(self, api_func, *args, retry_count: int = 0, max_retries: int = 3, **kwargs) -> Dict[str, Any]: """ Synchronous version of safe_api_call for backward compatibility. Args: api_func: The API function to call *args: Positional arguments for the API function retry_count: Current retry attempt max_retries: Maximum number of retries **kwargs: Keyword arguments for the API function Returns: Dictionary with API response or error information """ # Run the async version in a new event loop try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete( self.safe_api_call(api_func, *args, retry_count=retry_count, max_retries=max_retries, **kwargs) ) async def list_buckets(self) -> Dict[str, Any]: """ List all S3 buckets with region information (NO-COST operation). Returns: Dictionary containing bucket list or error information """ try: # Get bucket list - NO COST operation response = await self.safe_api_call( self.s3_client.list_buckets, fallback_sources=['storage_lens', 'cost_explorer'] ) if response["status"] != "success": return response buckets = response["data"].get("Buckets", []) # Enhance bucket information with regions in parallel enhanced_buckets = [] region_tasks = [] # Create async tasks for getting bucket regions for bucket in buckets: bucket_info = { "Name": bucket["Name"], "CreationDate": bucket["CreationDate"] } enhanced_buckets.append(bucket_info) # Create task for getting bucket region - NO COST operation task = self.safe_api_call( self.s3_client.get_bucket_location, Bucket=bucket["Name"], fallback_sources=['storage_lens'] ) region_tasks.append(task) # Execute region queries in parallel if region_tasks: region_responses = await asyncio.gather(*region_tasks, return_exceptions=True) for i, region_response in enumerate(region_responses): if isinstance(region_response, Exception): enhanced_buckets[i]["Region"] = "unknown" logger.warning(f"Exception getting region for bucket {enhanced_buckets[i]['Name']}: {region_response}") elif region_response.get("status") == "success": location = region_response["data"].get("LocationConstraint") # Handle special case for us-east-1 enhanced_buckets[i]["Region"] = location if location else "us-east-1" else: enhanced_buckets[i]["Region"] = "unknown" logger.warning(f"Could not determine region for bucket {enhanced_buckets[i]['Name']}: {region_response.get('message', 'Unknown error')}") return { "status": "success", "data": { "Buckets": enhanced_buckets, "Owner": response["data"].get("Owner") }, "message": f"Retrieved {len(enhanced_buckets)} buckets with regions", "source": response.get("source", "primary") } except Exception as e: logger.error(f"Error listing buckets: {str(e)}") return { "status": "error", "message": f"Error listing buckets: {str(e)}", "fallback_available": True, "suggested_fallback": "storage_lens" } def list_buckets_sync(self) -> Dict[str, Any]: """ Synchronous version of list_buckets for backward compatibility. Returns: Dictionary containing bucket list or error information """ try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete(self.list_buckets()) async def get_bucket_metrics(self, bucket_name: str, days: int = 30) -> Dict[str, Any]: """ Get comprehensive metrics for a specific bucket using NO-COST sources only. Args: bucket_name: Name of the S3 bucket days: Number of days to look back for metrics Returns: Dictionary containing bucket metrics or error information """ try: metrics = {} # Execute metrics gathering in parallel tasks = [ self.get_bucket_size_metrics(bucket_name, days), self.get_request_metrics(bucket_name, days), self.get_storage_class_distribution_from_storage_lens(bucket_name) ] responses = await asyncio.gather(*tasks, return_exceptions=True) # Process size metrics if not isinstance(responses[0], Exception) and responses[0].get("status") == "success": metrics.update(responses[0]["data"]) # Process request metrics if not isinstance(responses[1], Exception) and responses[1].get("status") == "success": metrics.update(responses[1]["data"]) # Process storage class distribution (may require fallback to Storage Lens) if not isinstance(responses[2], Exception): if responses[2].get("status") == "success": metrics.update(responses[2]["data"]) elif responses[2].get("suggested_fallback") == "storage_lens_service": metrics["StorageClassDistribution"] = { "note": "Requires StorageLensService for no-cost analysis", "fallback_required": True } return { "status": "success", "data": metrics, "message": f"Retrieved metrics for bucket {bucket_name} (no-cost sources only)", "cost_constraint_applied": True } except Exception as e: logger.error(f"Error getting bucket metrics for {bucket_name}: {str(e)}") return { "status": "error", "message": f"Error getting bucket metrics: {str(e)}", "fallback_available": True, "suggested_fallback": "storage_lens" } async def get_bucket_size_metrics(self, bucket_name: str, days: int = 30) -> Dict[str, Any]: """ Get bucket size metrics from CloudWatch (NO-COST operation). Args: bucket_name: Name of the S3 bucket days: Number of days to look back Returns: Dictionary containing size metrics or error information """ try: from datetime import datetime, timedelta end_time = datetime.utcnow() start_time = end_time - timedelta(days=days) # Execute CloudWatch queries in parallel size_task = self.safe_api_call( self.cloudwatch_client.get_metric_statistics, Namespace='AWS/S3', MetricName='BucketSizeBytes', Dimensions=[ {'Name': 'BucketName', 'Value': bucket_name}, {'Name': 'StorageType', 'Value': 'StandardStorage'} ], StartTime=start_time, EndTime=end_time, Period=86400, # Daily Statistics=['Average'], fallback_sources=['storage_lens'] ) objects_task = self.safe_api_call( self.cloudwatch_client.get_metric_statistics, Namespace='AWS/S3', MetricName='NumberOfObjects', Dimensions=[ {'Name': 'BucketName', 'Value': bucket_name}, {'Name': 'StorageType', 'Value': 'AllStorageTypes'} ], StartTime=start_time, EndTime=end_time, Period=86400, # Daily Statistics=['Average'], fallback_sources=['storage_lens'] ) size_response, objects_response = await asyncio.gather(size_task, objects_task) metrics = {} if size_response["status"] == "success": datapoints = size_response["data"].get("Datapoints", []) if datapoints: # Get the most recent datapoint latest_size = max(datapoints, key=lambda x: x['Timestamp']) metrics["SizeBytes"] = latest_size.get("Average", 0) metrics["SizeGB"] = metrics["SizeBytes"] / (1024**3) else: metrics["SizeBytes"] = 0 metrics["SizeGB"] = 0 if objects_response["status"] == "success": datapoints = objects_response["data"].get("Datapoints", []) if datapoints: # Get the most recent datapoint latest_objects = max(datapoints, key=lambda x: x['Timestamp']) metrics["ObjectCount"] = int(latest_objects.get("Average", 0)) else: metrics["ObjectCount"] = 0 return { "status": "success", "data": metrics, "message": f"Retrieved size metrics for bucket {bucket_name}", "source": "cloudwatch" } except Exception as e: logger.error(f"Error getting size metrics for {bucket_name}: {str(e)}") return { "status": "error", "message": f"Error getting size metrics: {str(e)}", "fallback_available": True, "suggested_fallback": "storage_lens" } async def get_request_metrics(self, bucket_name: str, days: int = 30) -> Dict[str, Any]: """ Get request metrics from CloudWatch (NO-COST operation). Args: bucket_name: Name of the S3 bucket days: Number of days to look back Returns: Dictionary containing request metrics or error information """ try: from datetime import datetime, timedelta end_time = datetime.utcnow() start_time = end_time - timedelta(days=days) request_types = ['AllRequests', 'GetRequests', 'PutRequests', 'DeleteRequests', 'HeadRequests', 'PostRequests', 'ListRequests'] # Create tasks for parallel execution tasks = [] for request_type in request_types: task = self.safe_api_call( self.cloudwatch_client.get_metric_statistics, Namespace='AWS/S3', MetricName=request_type, Dimensions=[ {'Name': 'BucketName', 'Value': bucket_name} ], StartTime=start_time, EndTime=end_time, Period=86400, # Daily Statistics=['Sum'], fallback_sources=['storage_lens'] ) tasks.append((request_type, task)) # Execute all requests in parallel responses = await asyncio.gather(*[task for _, task in tasks], return_exceptions=True) metrics = {} for i, (request_type, _) in enumerate(tasks): response = responses[i] if isinstance(response, Exception): metrics[request_type] = 0 logger.warning(f"Exception getting {request_type} metrics for {bucket_name}: {response}") elif response.get("status") == "success": datapoints = response["data"].get("Datapoints", []) total_requests = sum(dp.get("Sum", 0) for dp in datapoints) metrics[request_type] = total_requests else: metrics[request_type] = 0 return { "status": "success", "data": metrics, "message": f"Retrieved request metrics for bucket {bucket_name}", "source": "cloudwatch" } except Exception as e: logger.error(f"Error getting request metrics for {bucket_name}: {str(e)}") return { "status": "error", "message": f"Error getting request metrics: {str(e)}", "fallback_available": True, "suggested_fallback": "storage_lens" } async def get_storage_class_distribution_from_storage_lens(self, bucket_name: str) -> Dict[str, Any]: """ Get storage class distribution for a bucket from Storage Lens (NO-COST operation). This method replaces the cost-incurring list_objects_v2 approach with Storage Lens data. Args: bucket_name: Name of the S3 bucket Returns: Dictionary containing storage class distribution or error information with fallback guidance """ logger.info(f"Getting storage class distribution for {bucket_name} from Storage Lens (NO-COST)") # This is a placeholder for Storage Lens integration # The actual implementation will be done by StorageLensService return { "status": "error", "message": "Storage class distribution requires Storage Lens integration", "error_code": "REQUIRES_STORAGE_LENS", "fallback_available": True, "suggested_fallback": "storage_lens_service", "cost_constraint": "FORBIDDEN: list_objects_v2 would incur costs", "alternative_approach": "Use StorageLensService.get_storage_class_distribution() instead" } async def get_bucket_lifecycle(self, bucket_name: str) -> Dict[str, Any]: """ Get lifecycle configuration for a bucket (NO-COST operation). Args: bucket_name: Name of the S3 bucket Returns: Dictionary containing lifecycle configuration or error information """ try: response = await self.safe_api_call( self.s3_client.get_bucket_lifecycle_configuration, Bucket=bucket_name, fallback_sources=['storage_lens'] ) if response["status"] == "success": if response.get("resource_not_found"): return { "status": "success", "data": {"Rules": []}, "message": f"No lifecycle configuration found for bucket {bucket_name}", "source": response.get("source", "primary") } return { "status": "success", "data": response["data"], "message": f"Retrieved lifecycle configuration for bucket {bucket_name}", "source": response.get("source", "primary") } else: return response except Exception as e: logger.error(f"Error getting lifecycle configuration for {bucket_name}: {str(e)}") return { "status": "error", "message": f"Error getting lifecycle configuration: {str(e)}", "fallback_available": True, "suggested_fallback": "storage_lens" } async def get_bucket_versioning(self, bucket_name: str) -> Dict[str, Any]: """ Get versioning configuration for a bucket (NO-COST operation). Args: bucket_name: Name of the S3 bucket Returns: Dictionary containing versioning configuration or error information """ try: response = await self.safe_api_call( self.s3_client.get_bucket_versioning, Bucket=bucket_name, fallback_sources=['storage_lens'] ) if response["status"] == "success": return { "status": "success", "data": response["data"], "message": f"Retrieved versioning configuration for bucket {bucket_name}", "source": response.get("source", "primary") } else: return response except Exception as e: logger.error(f"Error getting versioning configuration for {bucket_name}: {str(e)}") return { "status": "error", "message": f"Error getting versioning configuration: {str(e)}", "fallback_available": True, "suggested_fallback": "storage_lens" } async def get_bucket_tagging(self, bucket_name: str) -> Dict[str, Any]: """ Get tagging configuration for a bucket (NO-COST operation). Args: bucket_name: Name of the S3 bucket Returns: Dictionary containing tagging configuration or error information """ try: response = await self.safe_api_call( self.s3_client.get_bucket_tagging, Bucket=bucket_name, fallback_sources=['storage_lens'] ) if response["status"] == "success": if response.get("resource_not_found"): return { "status": "success", "data": {"TagSet": []}, "message": f"No tags found for bucket {bucket_name}", "source": response.get("source", "primary") } return { "status": "success", "data": response["data"], "message": f"Retrieved tagging configuration for bucket {bucket_name}", "source": response.get("source", "primary") } else: return response except Exception as e: logger.error(f"Error getting tagging configuration for {bucket_name}: {str(e)}") return { "status": "error", "message": f"Error getting tagging configuration: {str(e)}", "fallback_available": True, "suggested_fallback": "storage_lens" } async def get_multipart_uploads(self, bucket_name: str, max_results: int = 1000) -> Dict[str, Any]: """ Get incomplete multipart uploads for a bucket with pagination (NO-COST operation). Args: bucket_name: Name of the S3 bucket max_results: Maximum number of results per page Returns: Dictionary containing multipart uploads or error information """ try: all_uploads = [] key_marker = None upload_id_marker = None while True: params = { 'Bucket': bucket_name, 'MaxUploads': min(max_results, 1000) # AWS limit is 1000 } if key_marker: params['KeyMarker'] = key_marker if upload_id_marker: params['UploadIdMarker'] = upload_id_marker response = await self.safe_api_call( self.s3_client.list_multipart_uploads, fallback_sources=['storage_lens'], **params ) if response["status"] != "success": return response data = response["data"] uploads = data.get("Uploads", []) all_uploads.extend(uploads) # Check if there are more results if data.get("IsTruncated", False): key_marker = data.get("NextKeyMarker") upload_id_marker = data.get("NextUploadIdMarker") else: break # Safety check to prevent infinite loops if len(all_uploads) >= max_results: break return { "status": "success", "data": { "Uploads": all_uploads, "Bucket": bucket_name }, "message": f"Retrieved {len(all_uploads)} incomplete multipart uploads for bucket {bucket_name}", "source": response.get("source", "primary") } except Exception as e: logger.error(f"Error getting multipart uploads for {bucket_name}: {str(e)}") return { "status": "error", "message": f"Error getting multipart uploads: {str(e)}", "fallback_available": True, "suggested_fallback": "storage_lens" } def list_objects_v2_paginated(self, bucket_name: str, prefix: str = '', max_keys: int = 1000) -> Iterator[Dict]: """ FORBIDDEN: This method would incur S3 request costs. Use StorageLensService for object-level analysis instead. Args: bucket_name: Name of the S3 bucket prefix: Object key prefix to filter by max_keys: Maximum number of keys per page Yields: Error response indicating cost constraint violation """ logger.error(f"FORBIDDEN: list_objects_v2 would incur costs for bucket {bucket_name}") yield { "status": "error", "message": "FORBIDDEN: list_objects_v2 would incur S3 request costs", "error_code": "COST_CONSTRAINT_VIOLATION", "fallback_available": True, "suggested_fallback": "storage_lens_service", "alternative_approach": "Use StorageLensService.get_object_metrics() for object-level analysis" } async def get_s3_costs(self, start_date: str, end_date: str, next_token: str = None) -> Dict[str, Any]: """ Get S3 cost and usage data from Cost Explorer with pagination (NO-COST operation). Args: start_date: Start date in YYYY-MM-DD format end_date: End date in YYYY-MM-DD format next_token: Pagination token for next page Returns: Dictionary containing S3 cost data or error information """ try: params = { 'TimePeriod': { 'Start': start_date, 'End': end_date }, 'Granularity': 'MONTHLY', 'Metrics': ['BlendedCost', 'UnblendedCost', 'UsageQuantity'], 'Filter': { 'Dimensions': { 'Key': 'SERVICE', 'Values': ['Amazon Simple Storage Service'] } }, 'GroupBy': [ {'Type': 'DIMENSION', 'Key': 'USAGE_TYPE'}, {'Type': 'DIMENSION', 'Key': 'REGION'} ] } if next_token: params['NextPageToken'] = next_token response = await self.safe_api_call( self.ce_client.get_cost_and_usage, fallback_sources=['storage_lens'], **params ) if response["status"] == "success": return { "status": "success", "data": response["data"], "message": f"Retrieved S3 cost data from {start_date} to {end_date}", "source": response.get("source", "cost_explorer") } else: return response except Exception as e: logger.error(f"Error getting S3 costs: {str(e)}") return { "status": "error", "message": f"Error getting S3 costs: {str(e)}", "fallback_available": True, "suggested_fallback": "storage_lens" } def paginate_s3_operation(self, operation_name: str, **kwargs) -> Iterator[Dict]: """ Generic pagination handler for S3 operations. Args: operation_name: Name of the S3 operation to paginate **kwargs: Arguments for the operation Yields: Dictionary pages containing operation results """ try: if hasattr(self.s3_client, 'get_paginator'): try: paginator = self.s3_client.get_paginator(operation_name) page_iterator = paginator.paginate(**kwargs) for page in page_iterator: yield { "status": "success", "data": page } return except Exception: # Fall back to manual pagination if paginator not available pass # Manual pagination fallback next_token = None while True: if next_token: kwargs['NextToken'] = next_token operation = getattr(self.s3_client, operation_name) response = self.safe_api_call(operation, **kwargs) if response["status"] != "success": yield response break yield response # Check for next token (varies by operation) data = response["data"] next_token = data.get('NextToken') or data.get('NextContinuationToken') or data.get('NextMarker') if not next_token or not data.get('IsTruncated', False): break except Exception as e: logger.error(f"Error in paginated S3 operation {operation_name}: {str(e)}") yield { "status": "error", "message": f"Error in paginated operation: {str(e)}" } def paginate_cost_explorer(self, operation_name: str, **kwargs) -> Iterator[Dict]: """ Generic pagination handler for Cost Explorer operations. Args: operation_name: Name of the Cost Explorer operation to paginate **kwargs: Arguments for the operation Yields: Dictionary pages containing operation results """ try: next_token = None while True: if next_token: kwargs['NextPageToken'] = next_token operation = getattr(self.ce_client, operation_name) response = self.safe_api_call(operation, **kwargs) if response["status"] != "success": yield response break yield response # Check for next token data = response["data"] next_token = data.get('NextPageToken') if not next_token: break except Exception as e: logger.error(f"Error in paginated Cost Explorer operation {operation_name}: {str(e)}") yield { "status": "error", "message": f"Error in paginated operation: {str(e)}" } def paginate_cloudwatch_metrics(self, **kwargs) -> Iterator[Dict]: """ Generic pagination handler for CloudWatch metrics operations. Args: **kwargs: Arguments for the CloudWatch operation Yields: Dictionary pages containing metrics results """ try: # CloudWatch get_metric_statistics doesn't use traditional pagination # Instead, we need to handle large time ranges by breaking them into chunks from datetime import datetime, timedelta start_time = kwargs.get('StartTime') end_time = kwargs.get('EndTime') period = kwargs.get('Period', 86400) # Default to daily if not start_time or not end_time: # If no time range specified, just make the call directly response = self.safe_api_call( self.cloudwatch_client.get_metric_statistics, **kwargs ) yield response return # Break large time ranges into chunks to avoid API limits chunk_size = timedelta(days=30) # 30-day chunks current_start = start_time while current_start < end_time: current_end = min(current_start + chunk_size, end_time) chunk_kwargs = kwargs.copy() chunk_kwargs['StartTime'] = current_start chunk_kwargs['EndTime'] = current_end response = self.safe_api_call( self.cloudwatch_client.get_metric_statistics, **chunk_kwargs ) yield response if response["status"] != "success": break current_start = current_end except Exception as e: logger.error(f"Error in paginated CloudWatch operation: {str(e)}") yield { "status": "error", "message": f"Error in paginated operation: {str(e)}" } async def get_bucket_policy(self, bucket_name: str) -> Dict[str, Any]: """ Get bucket policy for a bucket (NO-COST operation). Args: bucket_name: Name of the S3 bucket Returns: Dictionary containing bucket policy or error information """ try: response = await self.safe_api_call( self.s3_client.get_bucket_policy, Bucket=bucket_name, fallback_sources=['storage_lens'] ) if response["status"] == "success": if response.get("resource_not_found"): return { "status": "success", "data": {"Policy": None}, "message": f"No bucket policy found for bucket {bucket_name}", "source": response.get("source", "primary") } return { "status": "success", "data": response["data"], "message": f"Retrieved bucket policy for bucket {bucket_name}", "source": response.get("source", "primary") } else: return response except Exception as e: logger.error(f"Error getting bucket policy for {bucket_name}: {str(e)}") return { "status": "error", "message": f"Error getting bucket policy: {str(e)}", "fallback_available": True, "suggested_fallback": "storage_lens" } async def get_bucket_acl(self, bucket_name: str) -> Dict[str, Any]: """ Get bucket ACL (Access Control List) for a bucket (NO-COST operation). Args: bucket_name: Name of the S3 bucket Returns: Dictionary containing bucket ACL or error information """ try: response = await self.safe_api_call( self.s3_client.get_bucket_acl, Bucket=bucket_name, fallback_sources=['storage_lens'] ) if response["status"] == "success": return { "status": "success", "data": response["data"], "message": f"Retrieved bucket ACL for bucket {bucket_name}", "source": response.get("source", "primary") } else: return response except Exception as e: logger.error(f"Error getting bucket ACL for {bucket_name}: {str(e)}") return { "status": "error", "message": f"Error getting bucket ACL: {str(e)}", "fallback_available": True, "suggested_fallback": "storage_lens" } def get_no_cost_operation_summary(self) -> Dict[str, Any]: """ Get a summary of the no-cost constraint implementation. Returns: Dictionary containing implementation details and constraints """ return { "status": "success", "data": { "cost_constraint_enabled": True, "allowed_s3_operations": sorted(ALLOWED_S3_OPERATIONS), "forbidden_s3_operations": sorted(FORBIDDEN_S3_OPERATIONS), "operation_call_stats": self.get_operation_stats(), "fallback_sources": ["storage_lens", "cost_explorer", "cloudwatch"], "async_support": True, "parallel_execution": True }, "message": "S3Service configured with strict no-cost constraints", "implementation_notes": [ "All S3 operations validated against whitelist", "Object-level operations (list_objects_v2, head_object, get_object) are FORBIDDEN", "Metadata-only operations (lifecycle, versioning, tagging) are allowed", "Async/await support for parallel execution", "Graceful fallback to alternative data sources", "Enhanced error handling with retry logic" ] } async def get_bucket_encryption(self, bucket_name: str) -> Dict[str, Any]: """ Get bucket encryption configuration for a bucket (NO-COST operation). Args: bucket_name: Name of the S3 bucket Returns: Dictionary containing bucket encryption configuration or error information """ try: response = await self.safe_api_call( self.s3_client.get_bucket_encryption, Bucket=bucket_name, fallback_sources=['storage_lens'] ) if response["status"] == "success": if response.get("resource_not_found"): return { "status": "success", "data": {"ServerSideEncryptionConfiguration": None}, "message": f"No encryption configuration found for bucket {bucket_name}", "source": response.get("source", "primary") } return { "status": "success", "data": response["data"], "message": f"Retrieved bucket encryption for bucket {bucket_name}", "source": response.get("source", "primary") } else: return response except Exception as e: logger.error(f"Error getting bucket encryption for {bucket_name}: {str(e)}") return { "status": "error", "message": f"Error getting bucket encryption: {str(e)}", "fallback_available": True, "suggested_fallback": "storage_lens" } async def get_bucket_public_access_block(self, bucket_name: str) -> Dict[str, Any]: """ Get public access block configuration for a bucket (NO-COST operation). Args: bucket_name: Name of the S3 bucket Returns: Dictionary containing public access block configuration or error information """ try: response = await self.safe_api_call( self.s3_client.get_public_access_block, Bucket=bucket_name, fallback_sources=['storage_lens'] ) if response["status"] == "success": if response.get("resource_not_found"): return { "status": "success", "data": {"PublicAccessBlockConfiguration": None}, "message": f"No public access block configuration found for bucket {bucket_name}", "source": response.get("source", "primary") } return { "status": "success", "data": response["data"], "message": f"Retrieved public access block for bucket {bucket_name}", "source": response.get("source", "primary") } else: return response except Exception as e: logger.error(f"Error getting public access block for {bucket_name}: {str(e)}") return { "status": "error", "message": f"Error getting public access block: {str(e)}", "fallback_available": True, "suggested_fallback": "storage_lens" } async def get_bucket_notification_configuration(self, bucket_name: str) -> Dict[str, Any]: """ Get bucket notification configuration for a bucket (NO-COST operation). Args: bucket_name: Name of the S3 bucket Returns: Dictionary containing bucket notification configuration or error information """ try: response = await self.safe_api_call( self.s3_client.get_bucket_notification_configuration, Bucket=bucket_name, fallback_sources=['storage_lens'] ) if response["status"] == "success": return { "status": "success", "data": response["data"], "message": f"Retrieved notification configuration for bucket {bucket_name}", "source": response.get("source", "primary") } else: return response except Exception as e: logger.error(f"Error getting notification configuration for {bucket_name}: {str(e)}") return { "status": "error", "message": f"Error getting notification configuration: {str(e)}", "fallback_available": True, "suggested_fallback": "storage_lens" } async def get_comprehensive_bucket_config(self, bucket_name: str) -> Dict[str, Any]: """ Get comprehensive bucket configuration using parallel execution (ALL NO-COST operations). Args: bucket_name: Name of the S3 bucket Returns: Dictionary containing all bucket configuration data """ try: logger.info(f"Getting comprehensive configuration for bucket {bucket_name} using NO-COST operations") # Execute all configuration queries in parallel tasks = { 'lifecycle': self.get_bucket_lifecycle(bucket_name), 'versioning': self.get_bucket_versioning(bucket_name), 'tagging': self.get_bucket_tagging(bucket_name), 'multipart_uploads': self.get_multipart_uploads(bucket_name), 'policy': self.get_bucket_policy(bucket_name), 'encryption': self.get_bucket_encryption(bucket_name), 'public_access_block': self.get_bucket_public_access_block(bucket_name), 'notification': self.get_bucket_notification_configuration(bucket_name) } # Wait for all tasks to complete results = {} responses = await asyncio.gather(*tasks.values(), return_exceptions=True) for i, (config_type, _) in enumerate(tasks.items()): response = responses[i] if isinstance(response, Exception): results[config_type] = { "status": "error", "message": f"Exception: {str(response)}" } logger.warning(f"Exception getting {config_type} for {bucket_name}: {response}") else: results[config_type] = response # Count successful operations successful_ops = sum(1 for result in results.values() if result.get("status") == "success") return { "status": "success", "data": { "bucket_name": bucket_name, "configurations": results, "operation_stats": self.get_operation_stats() }, "message": f"Retrieved {successful_ops}/{len(tasks)} configurations for bucket {bucket_name}", "cost_constraint_applied": True, "all_operations_no_cost": True } except Exception as e: logger.error(f"Error getting comprehensive bucket config for {bucket_name}: {str(e)}") return { "status": "error", "message": f"Error getting comprehensive bucket config: {str(e)}", "fallback_available": True, "suggested_fallback": "storage_lens" } # Convenience functions for backward compatibility with existing patterns def list_buckets(region: Optional[str] = None) -> Dict[str, Any]: """ List all S3 buckets (synchronous wrapper for async method). Args: region: AWS region (optional) Returns: Dictionary containing bucket list or error information """ service = S3Service(region) return service.list_buckets_sync() def get_bucket_metrics(bucket_name: str, days: int = 30, region: Optional[str] = None) -> Dict[str, Any]: """ Get comprehensive metrics for a specific bucket (synchronous wrapper for async method). Args: bucket_name: Name of the S3 bucket days: Number of days to look back for metrics region: AWS region (optional) Returns: Dictionary containing bucket metrics or error information """ service = S3Service(region) try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete(service.get_bucket_metrics(bucket_name, days)) def get_s3_costs(start_date: str, end_date: str, region: Optional[str] = None) -> Dict[str, Any]: """ Get S3 cost and usage data from Cost Explorer (synchronous wrapper for async method). Args: start_date: Start date in YYYY-MM-DD format end_date: End date in YYYY-MM-DD format region: AWS region (optional) Returns: Dictionary containing S3 cost data or error information """ service = S3Service(region) try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete(service.get_s3_costs(start_date, end_date)) def get_comprehensive_bucket_config(bucket_name: str, region: Optional[str] = None) -> Dict[str, Any]: """ Get comprehensive bucket configuration (synchronous wrapper for async method). Args: bucket_name: Name of the S3 bucket region: AWS region (optional) Returns: Dictionary containing all bucket configuration data """ service = S3Service(region) try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete(service.get_comprehensive_bucket_config(bucket_name)) # Async convenience functions for new code async def list_buckets_async(region: Optional[str] = None) -> Dict[str, Any]: """ List all S3 buckets (async version). Args: region: AWS region (optional) Returns: Dictionary containing bucket list or error information """ service = S3Service(region) return await service.list_buckets() async def get_bucket_metrics_async(bucket_name: str, days: int = 30, region: Optional[str] = None) -> Dict[str, Any]: """ Get comprehensive metrics for a specific bucket (async version). Args: bucket_name: Name of the S3 bucket days: Number of days to look back for metrics region: AWS region (optional) Returns: Dictionary containing bucket metrics or error information """ service = S3Service(region) return await service.get_bucket_metrics(bucket_name, days) async def get_s3_costs_async(start_date: str, end_date: str, region: Optional[str] = None) -> Dict[str, Any]: """ Get S3 cost and usage data from Cost Explorer (async version). Args: start_date: Start date in YYYY-MM-DD format end_date: End date in YYYY-MM-DD format region: AWS region (optional) Returns: Dictionary containing S3 cost data or error information """ service = S3Service(region) return await service.get_s3_costs(start_date, end_date) async def get_comprehensive_bucket_config_async(bucket_name: str, region: Optional[str] = None) -> Dict[str, Any]: """ Get comprehensive bucket configuration (async version). Args: bucket_name: Name of the S3 bucket region: AWS region (optional) Returns: Dictionary containing all bucket configuration data """ service = S3Service(region) return await service.get_comprehensive_bucket_config(bucket_name)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aws-samples/sample-cfm-tips-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server