Skip to main content
Glama
main.py20.1 kB
import os from mcp.server.fastmcp import FastMCP, Context from typing import List, Dict import httpx import json from dataclasses import dataclass from dotenv import load_dotenv import pandas as pd # Load environment variables from .env file load_dotenv() # Initialize MCP server mcp = FastMCP("Uniswap Pools", dependencies=["httpx", "python-dotenv", "pandas"]) # Retrieve API key from .env API_KEY = os.getenv("THEGRAPH_API_KEY") if not API_KEY: raise ValueError("THEGRAPH_API_KEY not found in .env file") BASE_URL = "https://gateway.thegraph.com/api/subgraphs/id" # Uniswap V2 Subgraph endpoint UNISWAP_V2_SUBGRAPH = f"{BASE_URL}/A3Np3RQbaBA6oKJgiwDJeo5T3zrYfGHPWFYayMwtNDum" # Uniswap V3 Subgraph endpoint UNISWAP_V3_SUBGRAPH = f"{BASE_URL}/5zvR82QoaXYFyDEKLZ9t6v9adgnptxYpKpSbxtgVENFV" # Uniswap V4 Subgraph endpoint UNISWAP_V4_SUBGRAPH = f"{BASE_URL}/DiYPVdygkfjDWhbxGSqAQxwBKmfKnkWQojqeM2rkLb3G" # Define data structure for V2 pair @dataclass class Pair: id: str token0: str token1: str reserveUSD: str volumeUSD: str pair: str # Format: token0.symbol/token1.symbol # Define data structure for V3 pool @dataclass class Pool: id: str token0: str token1: str feeTier: int liquidity: str volumeUSD: str feesUSD: str pair: str # Format: token0.symbol/token1.symbol # Define data structure for V4 pool @dataclass class PoolV4: id: str token0: str token1: str feeTier: str liquidity: str volumeUSD: str feesUSD: str pair: str # Format: token0.symbol/token1.symbol # Query Subgraph to fetch V2 pairs for a given token async def query_pairs_v2(token_address: str) -> List[Pair]: query = """ query($token: ID!) { pairs( where: { or: [ {token0: $token}, {token1: $token} ] } orderBy: volumeUSD orderDirection: desc ) { id token0 { id symbol } token1 { id symbol } reserveUSD volumeUSD } } """ async with httpx.AsyncClient() as client: response = await client.post( UNISWAP_V2_SUBGRAPH, headers={ "Authorization": f"Bearer {API_KEY}" }, json={ "query": query, "variables": {"token": token_address.lower()} } ) response.raise_for_status() data = response.json() if "errors" in data: raise ValueError(f"GraphQL errors: {data['errors']}") return [ Pair( id=pair["id"], token0=pair["token0"]["id"], token1=pair["token1"]["id"], reserveUSD=pair["reserveUSD"], volumeUSD=pair["volumeUSD"], pair=f"{pair['token0']['symbol']}/{pair['token1']['symbol']}" ) for pair in data["data"]["pairs"] ] # Query Subgraph to fetch a specific V2 pair by ID async def query_pair_v2_by_id(pair_id: str) -> Pair: query = """ query($id: ID!) { pairs(where: { id: $id }) { id token0 { id symbol } token1 { id symbol } reserveUSD volumeUSD } } """ async with httpx.AsyncClient() as client: response = await client.post( UNISWAP_V2_SUBGRAPH, headers={ "Authorization": f"Bearer {API_KEY}" }, json={ "query": query, "variables": {"id": pair_id.lower()} } ) response.raise_for_status() data = response.json() if "errors" in data: raise ValueError(f"GraphQL errors: {data['errors']}") pairs = data["data"]["pairs"] if not pairs: raise ValueError(f"No V2 pair found with ID: {pair_id}") pair = pairs[0] return Pair( id=pair["id"], token0=pair["token0"]["id"], token1=pair["token1"]["id"], reserveUSD=pair["reserveUSD"], volumeUSD=pair["volumeUSD"], pair=f"{pair['token0']['symbol']}/{pair['token1']['symbol']}" ) # Query Subgraph to fetch V3 pools for a given token async def query_pools_v3(token_address: str) -> List[Pool]: query = """ query($token: Bytes!) { pools( where: { or: [ {token0: $token}, {token1: $token} ] } orderBy: volumeUSD orderDirection: desc ) { id token0 { id symbol } token1 { id symbol } feeTier liquidity volumeUSD feesUSD } } """ async with httpx.AsyncClient() as client: response = await client.post( UNISWAP_V3_SUBGRAPH, headers={ "Authorization": f"Bearer {API_KEY}" }, json={ "query": query, "variables": {"token": token_address.lower()} } ) response.raise_for_status() data = response.json() if "errors" in data: raise ValueError(f"GraphQL errors: {data['errors']}") return [ Pool( id=pool["id"], token0=pool["token0"]["id"], token1=pool["token1"]["id"], feeTier=pool["feeTier"], liquidity=pool["liquidity"], volumeUSD=pool["volumeUSD"], feesUSD=pool["feesUSD"], pair=f"{pool['token0']['symbol']}/{pool['token1']['symbol']}" ) for pool in data["data"]["pools"] ] # Query Subgraph to fetch a specific V3 pool by ID async def query_pool_v3_by_id(pool_id: str) -> Pool: query = """ query($id: ID!) { pools(where: { id: $id }) { id token0 { id symbol } token1 { id symbol } feeTier liquidity volumeUSD feesUSD } } """ async with httpx.AsyncClient() as client: response = await client.post( UNISWAP_V3_SUBGRAPH, headers={ "Authorization": f"Bearer {API_KEY}" }, json={ "query": query, "variables": {"id": pool_id.lower()} } ) response.raise_for_status() data = response.json() if "errors" in data: raise ValueError(f"GraphQL errors: {data['errors']}") pools = data["data"]["pools"] if not pools: raise ValueError(f"No V3 pool found with ID: {pool_id}") pool = pools[0] return Pool( id=pool["id"], token0=pool["token0"]["id"], token1=pool["token1"]["id"], feeTier=pool["feeTier"], liquidity=pool["liquidity"], volumeUSD=pool["volumeUSD"], feesUSD=pool["feesUSD"], pair=f"{pool['token0']['symbol']}/{pool['token1']['symbol']}" ) # Query Subgraph to fetch V4 pools for a given token async def query_pools_v4(token_address: str) -> List[PoolV4]: query = """ query($token: ID!) { pools( where: { or: [ {token0: $token}, {token1: $token} ] } orderBy: volumeUSD orderDirection: desc ) { id token0: token0 { id symbol } token1: token1 { id symbol } feeTier liquidity volumeUSD feesUSD } } """ async with httpx.AsyncClient() as client: response = await client.post( UNISWAP_V4_SUBGRAPH, headers={ 'Authorization': f'Bearer {API_KEY}', }, json={ 'query': query, 'variables': {'token': token_address.lower()} } ) response.raise_for_status() data = response.json() if 'errors' in data: raise ValueError(f"GraphQL errors: {data['errors']}") return [ PoolV4( id=pool['id'], token0=pool['token0']['id'], token1=pool['token1']['id'], feeTier=str(pool['feeTier']), liquidity=str(pool['liquidity']), volumeUSD=pool['volumeUSD'], feesUSD=pool['feesUSD'], pair=f"{pool['token0']['symbol']}/{pool['token1']['symbol']}" ) for pool in data['data']['pools'] ] # Query Subgraph to fetch a specific V4 pool by ID async def query_pool_v4_by_id(pool_id: str) -> PoolV4: query = """ query($id: ID!) { pools(where: { id: $id }) { id token0 { id symbol } token1 { id symbol } feeTier liquidity volumeUSD feesUSD } } """ async with httpx.AsyncClient() as client: response = await client.post( UNISWAP_V4_SUBGRAPH, headers={ "Authorization": f"Bearer {API_KEY}" }, json={ "query": query, "variables": {"id": pool_id.lower()} } ) response.raise_for_status() data = response.json() if "errors" in data: raise ValueError(f"GraphQL errors: {data['errors']}") pools = data["data"]["pools"] if not pools: raise ValueError(f"No V4 pool found with ID: {pool_id}") pool = pools[0] return PoolV4( id=pool["id"], token0=pool["token0"]["id"], token1=pool["token1"]["id"], feeTier=str(pool["feeTier"]), liquidity=str(pool["liquidity"]), volumeUSD=pool["volumeUSD"], feesUSD=pool["feesUSD"], pair=f"{pool['token0']['symbol']}/{pool['token1']['symbol']}" ) # Tool: Query all Uniswap V2, V3, and V4 pools/pairs for a specific token and return as a formatted table @mcp.tool() async def get_token_pools(token_address: str, ctx: Context) -> str: """ Query all Uniswap V2, V3, and V4 pools/pairs for a specific token and return as a formatted markdown table. Parameters: token_address (str): The Ethereum address of the token to query (e.g., '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48'). ctx (Context): The API context for logging and error handling. Returns: A markdown-formatted string containing a table with columns: Version, ID, Pair, Fee Tier, Volume USD, Liquidity/ReserveUSD, Fees USD, sorted by Volume USD in descending order. """ ctx.info(f"Querying V2, V3, and V4 pools/pairs for token: {token_address}") try: # Query V2 pairs v2_pairs = await query_pairs_v2(token_address) ctx.info(f"Found {len(v2_pairs)} V2 pairs") # Query V3 pools v3_pools = await query_pools_v3(token_address) ctx.info(f"Found {len(v3_pools)} V3 pools") # Query V4 pools v4_pools = await query_pools_v4(token_address) ctx.info(f"Found {len(v4_pools)} V4 pools") # Combine V2, V3, and V4 data into a single DataFrame df = pd.DataFrame([ { "Version": "v2", "ID": pair.id, "Pair": pair.pair, "Fee Tier": "3000", # V2 fixed 0.3% fee (3000 basis points) "Volume USD": pair.volumeUSD, "Liquidity/ReserveUSD": pair.reserveUSD, "Fees USD": "N/A" # V2 schema doesn't provide feesUSD } for pair in v2_pairs ] + [ { "Version": "v3", "ID": pool.id, "Pair": pool.pair, "Fee Tier": str(pool.feeTier), "Volume USD": pool.volumeUSD, "Liquidity/ReserveUSD": pool.liquidity, "Fees USD": pool.feesUSD } for pool in v3_pools ] + [ { "Version": "v4", "ID": pool.id, "Pair": pool.pair, "Fee Tier": pool.feeTier, "Volume USD": pool.volumeUSD, "Liquidity/ReserveUSD": pool.liquidity, "Fees USD": pool.feesUSD } for pool in v4_pools ]) # Sort by Volume USD (descending) and select columns df = df.sort_values(by="Volume USD", ascending=False) return df.to_markdown(index=False) except Exception as e: ctx.error(f"Failed to query V2/V3/V4 pools: {str(e)}") raise # Tool: Query all Uniswap V2 pairs for a specific token and return as a formatted table @mcp.tool() async def get_token_pools_v2(token_address: str, ctx: Context) -> str: """ Query all Uniswap V2 pairs for a specific token and return as a formatted markdown table. Parameters: token_address (str): The Ethereum address of the token to query (e.g., '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48'). ctx (Context): The API context for logging and error handling. Returns: A markdown-formatted string containing a table with columns: Version, ID, Pair, Volume USD, ReserveUSD. """ ctx.info(f"Querying V2 pairs for token: {token_address}") try: pairs = await query_pairs_v2(token_address) ctx.info(f"Found {len(pairs)} V2 pairs") # Create DataFrame directly from pairs list df = pd.DataFrame([ { "Version": "v2", "ID": pair.id, "Pair": pair.pair, "Volume USD": pair.volumeUSD, "ReserveUSD": pair.reserveUSD } for pair in pairs ]) return df.to_markdown(index=False) except Exception as e: ctx.error(f"Failed to query V2 pairs: {str(e)}") raise # Tool: Query all Uniswap V3 pools for a specific token and return as a formatted table @mcp.tool() async def get_token_pools_v3(token_address: str, ctx: Context) -> str: """ Query all Uniswap V3 pools for a specific token and return as a formatted markdown table. Parameters: token_address (str): The Ethereum address of the token to query (e.g., '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48'). ctx (Context): The API context for logging and error handling. Returns: A markdown-formatted string containing a table with columns: Version, ID, Pair, Fee Tier, Volume USD, Liquidity. """ ctx.info(f"Querying V3 pools for token: {token_address}") try: pools = await query_pools_v3(token_address) ctx.info(f"Found {len(pools)} V3 pools") # Create DataFrame directly from pools list df = pd.DataFrame([ { "Version": "v3", "ID": pool.id, "Pair": pool.pair, "Fee Tier": pool.feeTier, "Volume USD": pool.volumeUSD, "Liquidity": pool.liquidity } for pool in pools ]) return df.to_markdown(index=False) except Exception as e: ctx.error(f"Failed to query V3 pools: {str(e)}") raise # Tool: Query all Uniswap V4 pools for a specific token and return as a formatted table @mcp.tool() async def get_token_pools_v4(token_address: str, ctx: Context) -> str: """ Query all Uniswap V4 pools for a specific token and return as a formatted markdown table. Parameters: token_address (str): The Ethereum address of the token to query (e.g., '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48'). ctx (Context): The API context for logging and error handling. Returns: A markdown-formatted string containing a table with columns: Version, ID, Pair, Fee Tier, Volume USD, Liquidity. """ ctx.info(f"Querying V4 pools for token: {token_address}") try: pools = await query_pools_v4(token_address) ctx.info(f"Found {len(pools)} V4 pools") # Create DataFrame directly from pools list df = pd.DataFrame([ { "Version": "v4", "ID": pool.id, "Pair": pool.pair, "Fee Tier": pool.feeTier, "Volume USD": pool.volumeUSD, "Liquidity": pool.liquidity } for pool in pools ]) return df.to_markdown(index=False) except Exception as e: ctx.error(f"Failed to query V4 pools: {str(e)}") raise # Tool: Query a specific Uniswap pool/pair by version and ID and return as markdown text @mcp.tool() async def get_pool_data(version: str, pool_id: str, ctx: Context) -> str: """ Query a specific Uniswap pool/pair by version (v2, v3, v4) and ID and return as markdown text. Parameters: version (str): The Uniswap version to query ('v2', 'v3', or 'v4'). pool_id (str): The Ethereum address of the pool or pair to query (e.g., '0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc'). ctx (Context): The API context for logging and error handling. Returns: A markdown-formatted string containing details of the pool/pair, including Version, ID, Pair, Token0 Address, Token1 Address, Fee Tier, Volume USD, Liquidity/ReserveUSD, and Fees USD. """ ctx.info(f"Querying {version} pool/pair with ID: {pool_id}") try: if version.lower() == "v2": pair = await query_pair_v2_by_id(pool_id) result = f""" **Uniswap V2 Pair Details** - **Version**: v2 - **ID**: {pair.id} - **Pair**: {pair.pair} - **Token0 Address**: {pair.token0} - **Token1 Address**: {pair.token1} - **Fee Tier**: 3000 (0.3%) - **Volume USD**: {pair.volumeUSD} - **Liquidity/ReserveUSD**: {pair.reserveUSD} - **Fees USD**: N/A """ elif version.lower() == "v3": pool = await query_pool_v3_by_id(pool_id) result = f""" **Uniswap V3 Pool Details** - **Version**: v3 - **ID**: {pool.id} - **Pair**: {pool.pair} - **Token0 Address**: {pool.token0} - **Token1 Address**: {pool.token1} - **Fee Tier**: {pool.feeTier} - **Volume USD**: {pool.volumeUSD} - **Liquidity/ReserveUSD**: {pool.liquidity} - **Fees USD**: {pool.feesUSD} """ elif version.lower() == "v4": pool = await query_pool_v4_by_id(pool_id) result = f""" **Uniswap V4 Pool Details** - **Version**: v4 - **ID**: {pool.id} - **Pair**: {pool.pair} - **Token0 Address**: {pool.token0} - **Token1 Address**: {pool.token1} - **Fee Tier**: {pool.feeTier} - **Volume USD**: {pool.volumeUSD} - **Liquidity/ReserveUSD**: {pool.liquidity} - **Fees USD**: {pool.feesUSD} """ else: raise ValueError(f"Invalid version: {version}. Must be 'v2', 'v3', or 'v4'") return result.strip() except Exception as e: ctx.error(f"Failed to query {version} pool/pair: {str(e)}") raise # Main entry point if __name__ == "__main__": mcp.run()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kukapay/uniswap-pools-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server