Skip to main content
Glama
server.py•17.3 kB
#!/usr/bin/env python3 """ S3 Butler - MCP server for S3 bucket management and analytics """ from fastmcp import FastMCP import os import logging from typing import Optional, List, Dict, Any from datetime import datetime, timedelta import json from dotenv import load_dotenv import boto3 import s3 load_dotenv() # Import ClickHouse only if configured try: from clickhouse_config import get_clickhouse_config import clickhouse_connect CLICKHOUSE_AVAILABLE = get_clickhouse_config().enabled except ImportError: CLICKHOUSE_AVAILABLE = False except Exception: CLICKHOUSE_AVAILABLE = False # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger("S3Butler") mcp = FastMCP("S3Butler") @mcp.tool() def get_team_name() -> str: """Get the name of the team or organization This tool returns the team or organization name associated with this S3 Butler instance. Currently returns a hardcoded value but can be configured to return dynamic team information. Returns: str: The team/organization name (default: 'team1') """ return "team1" # S3/IAM Tools # Use environment variables for S3/IAM configuration s3_access_key = os.getenv('S3_ACCESS_KEY') s3_secret_key = os.getenv('S3_SECRET_KEY') iam_endpoint = os.getenv('IAM_ENDPOINT', 'http://127.0.0.1:8600') s3_endpoint = os.getenv('S3_ENDPOINT', 'http://127.0.0.1:8000') if s3_access_key and s3_secret_key: session = boto3.Session( aws_access_key_id=s3_access_key, aws_secret_access_key=s3_secret_key ) iam_client = session.client('iam', endpoint_url=iam_endpoint) s3_client = session.client('s3', endpoint_url=s3_endpoint) else: # Fallback to profile if env vars not set session = boto3.Session(profile_name='scality-dev') iam_client = session.client('iam', endpoint_url=iam_endpoint) s3_client = session.client('s3', endpoint_url=s3_endpoint) # Create a wrapper function that FastMCP can use @mcp.tool() def get_iam_policies_for_bucket(bucket_name: str = "") -> str: """Retrieve all IAM policies that grant access to a specific S3 bucket This tool scans all IAM users in your account and identifies which policies (both inline and attached) grant permissions to the specified S3 bucket. Useful for auditing bucket access and understanding permission boundaries. Args: bucket_name: The name of the S3 bucket to check policies for (e.g., 'my-data-bucket'). Required parameter - must provide a valid bucket name. Returns: JSON string containing a dictionary where keys are usernames and values are their policies that reference the bucket. Returns error message if bucket name is not provided. """ if not bucket_name or bucket_name.strip() == "": return json.dumps({ "error": "Bucket name is required", "message": "Please provide a bucket name to check which IAM policies grant access to it.", "usage": "Specify a bucket name (e.g., 'finance', 'engineering')" }) return s3.get_iam_policies_for_bucket(iam_client, bucket_name) @mcp.tool() def list_buckets() -> str: """List all S3 buckets accessible with the configured credentials This tool retrieves all S3 buckets that the configured AWS credentials have permission to list. Provides bucket names, creation dates, and owner information. Useful for discovering available storage resources and auditing bucket inventory. Returns: JSON string containing: - 'buckets': List of bucket objects with 'name' and 'creation_date' - 'count': Total number of buckets found - 'owner': Bucket owner information (DisplayName and ID) Returns error message if S3 access fails. """ try: response = s3_client.list_buckets() buckets = [] for bucket in response.get('Buckets', []): buckets.append({ 'name': bucket['Name'], 'creation_date': bucket['CreationDate'].isoformat() if bucket.get('CreationDate') else None }) result = { 'buckets': buckets, 'count': len(buckets), 'owner': response.get('Owner', {}) } logger.info(f"Listed {len(buckets)} buckets") return json.dumps(result, indent=2) except Exception as e: logger.error(f"Error listing buckets: {e}") return json.dumps({ "error": "Failed to list buckets", "message": str(e) }) # ClickHouse Tools - only registered if ClickHouse is configured if CLICKHOUSE_AVAILABLE: def create_clickhouse_client(): """Create a ClickHouse client connection.""" config = get_clickhouse_config() client_config = config.get_client_config() logger.info( f"Creating ClickHouse client connection to {client_config['host']}:{client_config['port']} " f"(secure={client_config['secure']}, database={client_config.get('database', 'default')})" ) try: client = clickhouse_connect.get_client(**client_config) # Test the connection version = client.server_version logger.info(f"Successfully connected to ClickHouse server version {version}") return client except Exception as e: logger.error(f"Failed to connect to ClickHouse: {str(e)}") raise @mcp.tool() def get_top_buckets_by_operations( limit: int = 10, hours_back: Optional[int] = None, start_time: Optional[str] = None, end_time: Optional[str] = None ) -> List[Dict[str, Any]]: """Get the top N buckets ranked by total number of operations This analytics tool queries ClickHouse to identify which S3 buckets have the most activity based on total operation count. Useful for identifying hot buckets, understanding usage patterns, and capacity planning. Time range defaults to last 10 days if not specified. End time always defaults to current time if not provided. Args: limit: Number of top buckets to return (default: 10, max recommended: 100) hours_back: Number of hours to look back from now (e.g., 24 for last day) Mutually exclusive with start_time/end_time start_time: Start time in ISO format (e.g., '2024-01-01T00:00:00Z') If provided without end_time, end_time defaults to now end_time: End time in ISO format (e.g., '2024-01-31T23:59:59Z') Optional, defaults to current time Returns: List[Dict[str, Any]]: List of dictionaries, each containing: - 'bucket_name': Name of the S3 bucket - 'number_of_operations': Total operation count in the time period Sorted by operation count (descending) """ logger.info(f"Getting top {limit} buckets by operations") # Build time filter with defaults time_conditions = [] if hours_back: time_conditions.append( f"timestamp >= now() - INTERVAL {hours_back} HOUR" ) time_conditions.append("timestamp <= now()") elif start_time or end_time: if start_time: time_conditions.append(f"timestamp >= '{start_time}'") if end_time: time_conditions.append(f"timestamp <= '{end_time}'") else: time_conditions.append("timestamp <= now()") else: # Default to last 10 days time_conditions.append("timestamp >= now() - INTERVAL 10 DAY") time_conditions.append("timestamp <= now()") # Build the query based on the Grafana template for operations where_clause = "WHERE " + " AND ".join(time_conditions) + " AND " query = f""" SELECT bucketName, sum(number_of_op) as number_of_operations FROM logs.cloudserver_aggregated_federated {where_clause} bucketName <> '' GROUP BY bucketName ORDER BY number_of_operations DESC, bucketName ASC LIMIT {limit} """ client = create_clickhouse_client() try: result = client.query(query) buckets = [] for row in result.result_rows: buckets.append({ "bucket_name": row[0], "number_of_operations": row[1] }) logger.info(f"Found {len(buckets)} buckets") return buckets except Exception as e: logger.error(f"Error getting top buckets by operations: {e}") raise ValueError(f"Failed to get top buckets by operations: {str(e)}") @mcp.tool() def get_top_buckets_by_inbound_traffic( limit: int = 10, hours_back: Optional[int] = None, start_time: Optional[str] = None, end_time: Optional[str] = None ) -> List[Dict[str, Any]]: """Get the top N buckets ranked by inbound traffic (data uploaded) This analytics tool queries ClickHouse to identify which S3 buckets receive the most data uploads, tracking PutObject and UploadPart operations. Useful for understanding data ingestion patterns, identifying heavy write workloads, and storage growth trends. Time range defaults to last 10 days if not specified. End time always defaults to current time if not provided. Args: limit: Number of top buckets to return (default: 10, max recommended: 100) hours_back: Number of hours to look back from now (e.g., 24 for last day) Mutually exclusive with start_time/end_time start_time: Start time in ISO format (e.g., '2024-01-01T00:00:00Z') If provided without end_time, end_time defaults to now end_time: End time in ISO format (e.g., '2024-01-31T23:59:59Z') Optional, defaults to current time Returns: List[Dict[str, Any]]: List of dictionaries, each containing: - 'bucket_name': Name of the S3 bucket - 'total_bytes': Total bytes uploaded (raw number) - 'inbound_traffic': Human-readable size (e.g., '1.5 GB', '500 MB') Sorted by total bytes uploaded (descending) """ logger.info(f"Getting top {limit} buckets by inbound traffic") # Build time filter with defaults time_conditions = [] if hours_back: time_conditions.append( f"timestamp >= now() - INTERVAL {hours_back} HOUR" ) time_conditions.append("timestamp <= now()") elif start_time or end_time: if start_time: time_conditions.append(f"timestamp >= '{start_time}'") if end_time: time_conditions.append(f"timestamp <= '{end_time}'") else: time_conditions.append("timestamp <= now()") else: # Default to last 10 days time_conditions.append("timestamp >= now() - INTERVAL 10 DAY") time_conditions.append("timestamp <= now()") # Build the query based on the Grafana template for inbound traffic where_clause = "WHERE " + " AND ".join(time_conditions) + " AND " query = f""" SELECT bucketName, sum(contentLength) AS totalContentLength, formatReadableSize(sum(contentLength)) AS readableSize FROM logs.cloudserver_aggregated_federated {where_clause} action IN ('PutObject', 'UploadPart') GROUP BY bucketName ORDER BY totalContentLength DESC, bucketName ASC LIMIT {limit} """ client = create_clickhouse_client() try: result = client.query(query) buckets = [] for row in result.result_rows: buckets.append({ "bucket_name": row[0], "total_bytes": row[1], "inbound_traffic": row[2] }) logger.info(f"Found {len(buckets)} buckets") return buckets except Exception as e: logger.error(f"Error getting top buckets by inbound traffic: {e}") raise ValueError(f"Failed to get top buckets by inbound traffic: {str(e)}") @mcp.tool() def get_top_buckets_by_outbound_traffic( limit: int = 10, hours_back: Optional[int] = None, start_time: Optional[str] = None, end_time: Optional[str] = None ) -> List[Dict[str, Any]]: """Get the top N buckets ranked by outbound traffic (data downloaded) This analytics tool queries ClickHouse to identify which S3 buckets serve the most data downloads, tracking GetObject operations. Useful for understanding data consumption patterns, identifying high-demand content, and optimizing CDN strategies. Time range defaults to last 10 days if not specified. End time always defaults to current time if not provided. Args: limit: Number of top buckets to return (default: 10, max recommended: 100) hours_back: Number of hours to look back from now (e.g., 24 for last day) Mutually exclusive with start_time/end_time start_time: Start time in ISO format (e.g., '2024-01-01T00:00:00Z') If provided without end_time, end_time defaults to now end_time: End time in ISO format (e.g., '2024-01-31T23:59:59Z') Optional, defaults to current time Returns: List[Dict[str, Any]]: List of dictionaries, each containing: - 'bucket_name': Name of the S3 bucket - 'total_bytes': Total bytes downloaded (raw number) - 'outbound_traffic': Human-readable size (e.g., '10.5 TB', '750 GB') Sorted by total bytes downloaded (descending) """ logger.info(f"Getting top {limit} buckets by outbound traffic") # Build time filter with defaults time_conditions = [] if hours_back: time_conditions.append( f"timestamp >= now() - INTERVAL {hours_back} HOUR" ) time_conditions.append("timestamp <= now()") elif start_time or end_time: if start_time: time_conditions.append(f"timestamp >= '{start_time}'") if end_time: time_conditions.append(f"timestamp <= '{end_time}'") else: time_conditions.append("timestamp <= now()") else: # Default to last 10 days time_conditions.append("timestamp >= now() - INTERVAL 10 DAY") time_conditions.append("timestamp <= now()") # Build the query based on the Grafana template for outbound traffic where_clause = "WHERE " + " AND ".join(time_conditions) + " AND " query = f""" SELECT bucketName, sum(contentLength) AS totalContentLength, formatReadableSize(sum(contentLength)) AS readableSize FROM logs.cloudserver_aggregated_federated {where_clause} action = 'GetObject' GROUP BY bucketName ORDER BY totalContentLength DESC, bucketName ASC LIMIT {limit} """ client = create_clickhouse_client() try: result = client.query(query) buckets = [] for row in result.result_rows: buckets.append({ "bucket_name": row[0], "total_bytes": row[1], "outbound_traffic": row[2] }) logger.info(f"Found {len(buckets)} buckets") return buckets except Exception as e: logger.error(f"Error getting top buckets by outbound traffic: {e}") raise ValueError(f"Failed to get top buckets by outbound traffic: {str(e)}") logger.info("ClickHouse tools registered") else: logger.info("ClickHouse is not configured, skipping ClickHouse tools") if __name__ == "__main__": host = os.getenv("MCP_HOST", "0.0.0.0") port = int(os.getenv("MCP_PORT", "8000")) path = os.getenv("MCP_PATH", "/mcp") print(f"Starting S3 Butler MCP server...") print(f"Server will be available at http://{host}:{port}{path}") print(f"Using HTTP streaming transport (FastMCP 2.3+)") mcp.run( transport="http", host=host, port=port, path=path )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mistral-mcp-hackathon/mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server