Skip to main content
Glama
tools.py30.5 kB
""" MCP tool implementations for the DBT CLI MCP Server. This module defines all the MCP tools that map to dbt CLI commands. Each tool is a function decorated with @mcp.tool() that handles a specific dbt command. """ import logging import json import re from typing import Optional, Dict, Any, List from functools import partial from mcp.server.fastmcp import FastMCP from pydantic import Field from src.command import execute_dbt_command, parse_dbt_list_output, process_command_result from src.config import get_config, set_config from src.formatters import default_formatter, ls_formatter, show_formatter # Logger for this module logger = logging.getLogger(__name__) def register_tools(mcp: FastMCP) -> None: """ Register all tools with the MCP server. Args: mcp: The FastMCP server instance """ @mcp.tool() async def dbt_run( models: Optional[str] = Field( default=None, description="Specific models to run, using the dbt selection syntax (e.g., \"model_name+\")" ), selector: Optional[str] = Field( default=None, description="Named selector to use" ), exclude: Optional[str] = Field( default=None, description="Models to exclude" ), project_dir: str = Field( default=".", description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')" ), profiles_dir: Optional[str] = Field( default=None, description="Directory containing the profiles.yml file (defaults to project_dir if not specified)" ), full_refresh: bool = Field( default=False, description="Whether to perform a full refresh" ) ) -> str: """Run dbt models. An AI agent should use this tool when it needs to execute dbt models to transform data and build analytical tables in the data warehouse. This is essential for refreshing data or implementing new data transformations in a project. Returns: Output from the dbt run command as text (this command does not support JSON output format) """ command = ["run"] if models: command.extend(["-s", models]) if selector: command.extend(["--selector", selector]) if exclude: command.extend(["--exclude", exclude]) if full_refresh: command.append("--full-refresh") # The --no-print flag is not supported by dbt Cloud CLI # We'll rely on proper parsing to handle any print macros result = await execute_dbt_command(command, project_dir, profiles_dir) # Use the centralized result processor return await process_command_result(result, command_name="run") @mcp.tool() async def dbt_test( models: Optional[str] = Field( default=None, description="Specific models to test, using the dbt selection syntax" ), selector: Optional[str] = Field( default=None, description="Named selector to use" ), exclude: Optional[str] = Field( default=None, description="Models to exclude" ), project_dir: str = Field( default=".", description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')" ), profiles_dir: Optional[str] = Field( default=None, description="Directory containing the profiles.yml file (defaults to project_dir if not specified)" ) ) -> str: """Run dbt tests. An AI agent should use this tool when it needs to validate data quality and integrity by running tests defined in a dbt project. This helps ensure that data transformations meet expected business rules and constraints before being used for analysis or reporting. Returns: Output from the dbt test command as text (this command does not support JSON output format) """ command = ["test"] if models: command.extend(["-s", models]) if selector: command.extend(["--selector", selector]) if exclude: command.extend(["--exclude", exclude]) # The --no-print flag is not supported by dbt Cloud CLI # We'll rely on proper parsing to handle any print macros result = await execute_dbt_command(command, project_dir, profiles_dir) # Use the centralized result processor return await process_command_result(result, command_name="test") @mcp.tool() async def dbt_ls( models: Optional[str] = Field( default=None, description="Specific models to list, using the dbt selection syntax. Note that you probably want to specify your selection here e.g. silver.fact" ), selector: Optional[str] = Field( default=None, description="Named selector to use" ), exclude: Optional[str] = Field( default=None, description="Models to exclude" ), resource_type: Optional[str] = Field( default=None, description="Type of resource to list (model, test, source, etc.)" ), project_dir: str = Field( default=".", description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')" ), profiles_dir: Optional[str] = Field( default=None, description="Directory containing the profiles.yml file (defaults to project_dir if not specified)" ), output_format: str = Field( default="json", description="Output format (json, name, path, or selector)" ), verbose: bool = Field( default=False, description="Return full JSON output instead of simplified version" ) ) -> str: """List dbt resources. An AI agent should use this tool when it needs to discover available models, tests, sources, and other resources within a dbt project. This helps the agent understand the project structure, identify dependencies, and select specific resources for other operations like running or testing. Returns: When output_format is 'json' (default): - With verbose=False (default): returns a simplified JSON with only name, resource_type, and depends_on.nodes - With verbose=True: returns a full JSON with all resource details When output_format is 'name', 'path', or 'selector', returns plain text with the respective format. """ # Log diagnostic information logger.info(f"Starting dbt_ls with project_dir={project_dir}, output_format={output_format}") command = ["ls"] if models: command.extend(["-s", models]) if selector: command.extend(["--selector", selector]) if exclude: command.extend(["--exclude", exclude]) if resource_type: command.extend(["--resource-type", resource_type]) command.extend(["--output", output_format]) command.extend(["--quiet"]) logger.info(f"Executing dbt command: dbt {' '.join(command)}") result = await execute_dbt_command(command, project_dir, profiles_dir) logger.info(f"dbt command result: success={result['success']}, returncode={result.get('returncode')}") # Use the centralized result processor with ls_formatter formatter = partial(ls_formatter, output_format=output_format, verbose=verbose) return await process_command_result( result, command_name="ls", output_formatter=formatter, include_debug_info=True # Include extra debug info for this command ) @mcp.tool() async def dbt_compile( models: Optional[str] = Field( default=None, description="Specific models to compile, using the dbt selection syntax" ), selector: Optional[str] = Field( default=None, description="Named selector to use" ), exclude: Optional[str] = Field( default=None, description="Models to exclude" ), project_dir: str = Field( default=".", description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')" ), profiles_dir: Optional[str] = Field( default=None, description="Directory containing the profiles.yml file (defaults to project_dir if not specified)" ) ) -> str: """Compile dbt models. An AI agent should use this tool when it needs to generate the SQL that will be executed without actually running it against the database. This is valuable for validating SQL syntax, previewing transformations, or investigating how dbt interprets models before committing to execution. Returns: Output from the dbt compile command as text (this command does not support JSON output format) """ command = ["compile"] if models: command.extend(["-s", models]) if selector: command.extend(["--selector", selector]) if exclude: command.extend(["--exclude", exclude]) # The --no-print flag is not supported by dbt Cloud CLI # We'll rely on proper parsing to handle any print macros result = await execute_dbt_command(command, project_dir, profiles_dir) # Use the centralized result processor return await process_command_result(result, command_name="compile") @mcp.tool() async def dbt_debug( project_dir: str = Field( default=".", description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')" ), profiles_dir: Optional[str] = Field( default=None, description="Directory containing the profiles.yml file (defaults to project_dir if not specified)" ) ) -> str: """Run dbt debug to validate the project setup. An AI agent should use this tool when it needs to troubleshoot configuration issues, check database connectivity, or verify that all project dependencies are properly installed. This is essential for diagnosing problems before attempting to run models or tests. Returns: Output from the dbt debug command as text (this command does not support JSON output format) """ command = ["debug"] # The --no-print flag is not supported by dbt Cloud CLI # We'll rely on proper parsing to handle any print macros result = await execute_dbt_command(command, project_dir, profiles_dir) # Use the centralized result processor return await process_command_result(result, command_name="debug") @mcp.tool() async def dbt_deps( project_dir: str = Field( default=".", description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')" ), profiles_dir: Optional[str] = Field( default=None, description="Directory containing the profiles.yml file (defaults to project_dir if not specified)" ) ) -> str: """Install dbt package dependencies. An AI agent should use this tool when it needs to install or update external packages that the dbt project depends on. This ensures that all required modules, macros, and models from other packages are available before running the project. Returns: Output from the dbt deps command as text (this command does not support JSON output format) """ command = ["deps"] # The --no-print flag is not supported by dbt Cloud CLI # We'll rely on proper parsing to handle any print macros result = await execute_dbt_command(command, project_dir, profiles_dir) # Use the centralized result processor return await process_command_result(result, command_name="deps") @mcp.tool() async def dbt_seed( selector: Optional[str] = Field( default=None, description="Named selector to use" ), exclude: Optional[str] = Field( default=None, description="Seeds to exclude" ), project_dir: str = Field( default=".", description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')" ), profiles_dir: Optional[str] = Field( default=None, description="Directory containing the profiles.yml file (defaults to project_dir if not specified)" ) ) -> str: """Load CSV files as seed data. An AI agent should use this tool when it needs to load initial data from CSV files into the database. This is essential for creating reference tables, test datasets, or any static data that models will depend on. Returns: Output from the dbt seed command as text (this command does not support JSON output format) """ command = ["seed"] # The --no-print flag is not supported by dbt Cloud CLI # We'll rely on proper parsing to handle any print macros if selector: command.extend(["--selector", selector]) if exclude: command.extend(["--exclude", exclude]) result = await execute_dbt_command(command, project_dir, profiles_dir) # Use the centralized result processor return await process_command_result(result, command_name="seed") @mcp.tool() async def dbt_show( models: str = Field( description="Specific model to show. For model references, use standard dbt syntax like 'model_name'. For inline SQL, use the format 'select * from {{ ref(\"model_name\") }}' to reference other models." ), project_dir: str = Field( default=".", description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')" ), profiles_dir: Optional[str] = Field( default=None, description="Directory containing the profiles.yml file (defaults to project_dir if not specified)" ), limit: Optional[int] = Field( default=None, description="Limit the number of rows returned" ), output: Optional[str] = Field( default="json", description="Output format (json, table, etc.)" ) ) -> str: """Preview the results of a model. An AI agent should use this tool when it needs to preview data from a specific model without materializing it. This helps inspect transformation results, debug issues, or demonstrate how data looks after processing without modifying the target database. Returns: Output from the dbt show command, defaulting to JSON format if not specified """ # Use enhanced SQL detection is_inline_sql, sql_type = is_inline_sql_query(models) # If it's SQL, check for security risks if is_inline_sql: has_risk, risk_reason = contains_mutation_risk(models) if has_risk: logger.warning(f"Security risk detected in SQL: {risk_reason}") error_result = { "success": False, "output": f"Security validation failed: {risk_reason}. For security reasons, mutation operations are not allowed.", "error": "SecurityValidationError", "returncode": 1 } return await process_command_result( error_result, command_name="show", include_debug_info=True ) logger.info(f"dbt_show called with models={models}, is_inline_sql={is_inline_sql}") # If it's inline SQL, strip out any LIMIT clause as we'll handle it with the --limit parameter if is_inline_sql: # Use regex to remove LIMIT clause from the SQL original_models = models models = re.sub(r'\bLIMIT\s+\d+\b', '', models, flags=re.IGNORECASE) logger.info(f"Stripped LIMIT clause: {original_models} -> {models}") # For inline SQL, use the --inline flag with the SQL as its value command = ["show", f"--inline={models}", "--output", output or "json"] # Only add --limit if the inline type is WITH or SELECT (select_inline vs meta_inline) if limit and sql_type in ["WITH", "SELECT"]: command.extend(["--limit", str(limit)]) logger.info(f"Executing dbt command: {' '.join(command)}") # Don't use --quiet for inline SQL to ensure we get error messages result = await execute_dbt_command(command, project_dir, profiles_dir) logger.info(f"Command result: success={result['success']}, returncode={result.get('returncode')}") if isinstance(result["output"], str): logger.info(f"Output (first 100 chars): {result['output'][:100]}") elif isinstance(result["output"], (dict, list)): logger.info(f"Output structure: {json.dumps(result['output'])[:100]}") # Check for specific error patterns in the output if not result["success"] or ( isinstance(result["output"], str) and any(err in result["output"].lower() for err in ["error", "failed", "syntax", "exception"]) ): logger.warning(f"Error detected in output: {result['output'][:200]}") error_result = { "success": False, "output": f"Error executing inline SQL\n{result['output']}", "error": result["error"], "returncode": result["returncode"] } return await process_command_result( error_result, command_name="show", include_debug_info=True ) else: # For regular model references, check if the model exists first check_command = ["ls", "-s", models] check_result = await execute_dbt_command(check_command, project_dir, profiles_dir) # If the model doesn't exist, return the error message if not check_result["success"] or "does not match any enabled nodes" in str(check_result["output"]): error_result = { "success": False, "output": f"Model does not exist or is not enabled\n{check_result['output']}", "error": check_result["error"], "returncode": check_result["returncode"] } return await process_command_result( error_result, command_name="show", include_debug_info=True ) # If the model exists, run the show command with --quiet and --output json command = ["show", "-s", models, "--quiet", "--output", output or "json"] if limit: command.extend(["--limit", str(limit)]) result = await execute_dbt_command(command, project_dir, profiles_dir) # Use the centralized result processor return await process_command_result( result, command_name="show", output_formatter=show_formatter, include_debug_info=True ) @mcp.tool() async def dbt_build( models: Optional[str] = Field( default=None, description="Specific models to build, using the dbt selection syntax" ), selector: Optional[str] = Field( default=None, description="Named selector to use" ), exclude: Optional[str] = Field( default=None, description="Models to exclude" ), project_dir: str = Field( default=".", description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')" ), profiles_dir: Optional[str] = Field( default=None, description="Directory containing the profiles.yml file (defaults to project_dir if not specified)" ), full_refresh: bool = Field( default=False, description="Whether to perform a full refresh" ) ) -> str: """Run build command (seeds, tests, snapshots, and models). An AI agent should use this tool when it needs to execute a comprehensive build process that runs seeds, snapshots, models, and tests in the correct order. This is ideal for complete project deployment or ensuring all components work together. Returns: Output from the dbt build command as text (this command does not support JSON output format) """ command = ["build"] if models: command.extend(["-s", models]) if selector: command.extend(["--selector", selector]) if exclude: command.extend(["--exclude", exclude]) if full_refresh: command.append("--full-refresh") # The --no-print flag is not supported by dbt Cloud CLI # We'll rely on proper parsing to handle any print macros result = await execute_dbt_command(command, project_dir, profiles_dir) # Use the centralized result processor return await process_command_result(result, command_name="build") logger.info("Registered all dbt tools") def is_inline_sql_query(query: str) -> tuple[bool, Optional[str]]: """ Determine if the given string is an inline SQL query or a model reference. This function uses multiple heuristics to determine if a string is likely an SQL query rather than a model name: 1. Checks for common SQL keywords at the beginning 2. Looks for SQL syntax patterns 3. Considers length and complexity 4. Handles SQL with comments (both single-line and multi-line) 5. Recognizes dbt templating syntax Args: query: The string to check Returns: A tuple of (is_sql, sql_type) where: - is_sql: True if the input is SQL, False otherwise - sql_type: The type of SQL statement if is_sql is True, None otherwise (e.g., "SELECT", "WITH", "SHOW", etc.) """ # Normalize the query by trimming whitespace normalized_query = query.strip() # Skip empty queries if not normalized_query: return False, None # Check if the query contains SQL comments has_single_line_comment = '--' in normalized_query has_multi_line_comment = '/*' in normalized_query and '*/' in normalized_query # If the query only contains comments, it's still SQL if has_single_line_comment or has_multi_line_comment: # Check if it's only comments by removing them and seeing if anything remains # Remove /* */ style comments sql_no_comments = re.sub(r'/\*.*?\*/', ' ', normalized_query, flags=re.DOTALL) # Remove -- style comments sql_no_comments = re.sub(r'--.*?$', ' ', sql_no_comments, flags=re.MULTILINE) # Normalize whitespace sql_no_comments = ' '.join(sql_no_comments.split()).strip() if not sql_no_comments: # If nothing remains after removing comments, it's only comments return True, "COMMENT" # Convert to lowercase for case-insensitive matching normalized_query_lower = normalized_query.lower() # Check for SQL comments at the beginning and skip them for detection # This handles both single-line (--) and multi-line (/* */) comments comment_pattern = r'^(\s*(--[^\n]*\n|\s*/\*.*?\*/\s*)*\s*)' match = re.match(comment_pattern, normalized_query_lower, re.DOTALL) if match: # Skip past the comments for keyword detection start_pos = match.end() if start_pos >= len(normalized_query_lower): # If the query is only comments, it's still SQL return True, "COMMENT" normalized_query_lower = normalized_query_lower[start_pos:] # Common SQL statement starting keywords sql_starters = { 'select': 'SELECT', 'with': 'WITH', 'show': 'SHOW', 'describe': 'DESCRIBE', 'explain': 'EXPLAIN', 'analyze': 'ANALYZE', 'use': 'USE', 'set': 'SET' } # Check if the query starts with a common SQL keyword for keyword, sql_type in sql_starters.items(): if normalized_query_lower.startswith(keyword + ' '): return True, sql_type # Check for more complex patterns like CTEs # WITH clause followed by identifier and AS cte_pattern = r'^\s*with\s+[a-z0-9_]+\s+as\s*\(' if re.search(cte_pattern, normalized_query_lower, re.IGNORECASE): return True, "WITH" # Check for Jinja templating with SQL inside jinja_sql_pattern = r'{{\s*sql\s*}}' if re.search(jinja_sql_pattern, normalized_query_lower): return True, "JINJA" # Check for dbt ref or source macros which indicate SQL dbt_macro_pattern = r'{{\s*(ref|source)\s*\(\s*[\'"]' if re.search(dbt_macro_pattern, normalized_query_lower): return True, "DBT_MACRO" # If the query contains certain SQL syntax elements, it's likely SQL sql_syntax_elements = [ r'\bfrom\s+[a-z0-9_]+', # FROM clause r'\bjoin\s+[a-z0-9_]+', # JOIN clause r'\bwhere\s+', # WHERE clause r'\bgroup\s+by\s+', # GROUP BY clause r'\border\s+by\s+', # ORDER BY clause r'\bhaving\s+', # HAVING clause r'\bunion\s+', # UNION operator r'\bcase\s+when\s+' # CASE expression ] for pattern in sql_syntax_elements: if re.search(pattern, normalized_query_lower, re.IGNORECASE): return True, "SQL_SYNTAX" # If the query is long and contains spaces, it's more likely to be SQL than a model name if len(normalized_query_lower) > 30 and ' ' in normalized_query_lower: return True, "COMPLEX" # If none of the above conditions are met, it's likely a model name return False, None def contains_mutation_risk(sql: str) -> tuple[bool, str]: """ Check if the SQL query contains potentially dangerous operations. This function scans SQL for operations that could modify or delete data, which should be prohibited in a read-only context like dbt show. Args: sql: The SQL query to check Returns: A tuple of (has_risk, reason) where: - has_risk: True if the query contains risky operations, False otherwise - reason: A description of the risk if has_risk is True, empty string otherwise """ # Normalize the SQL by removing comments and extra whitespace # This helps prevent comment-based evasion techniques # Remove /* */ style comments sql_no_comments = re.sub(r'/\*.*?\*/', ' ', sql, flags=re.DOTALL) # Remove -- style comments sql_no_comments = re.sub(r'--.*?$', ' ', sql_no_comments, flags=re.MULTILINE) # Normalize whitespace normalized_sql = ' '.join(sql_no_comments.split()).lower() # Check for multiple SQL statements (potential SQL injection) # This needs to be checked first to ensure proper error message if ';' in normalized_sql: # Check if there's actual SQL after the semicolon statements = normalized_sql.split(';') if len(statements) > 1: for stmt in statements[1:]: if stmt.strip(): return True, "Multiple SQL statements detected - potential SQL injection risk" # Dangerous SQL operations patterns dangerous_patterns = [ # Data modification operations (r'\bdelete\s+from\b', "DELETE operation detected"), (r'\btruncate\s+table\b', "TRUNCATE operation detected"), (r'\bdrop\s+table\b', "DROP TABLE operation detected"), (r'\bdrop\s+database\b', "DROP DATABASE operation detected"), (r'\bdrop\s+schema\b', "DROP SCHEMA operation detected"), (r'\balter\s+table\b', "ALTER TABLE operation detected"), (r'\bcreate\s+table\b', "CREATE TABLE operation detected"), (r'\bcreate\s+or\s+replace\b', "CREATE OR REPLACE operation detected"), (r'\binsert\s+into\b', "INSERT operation detected"), (r'\bupdate\s+.*?\bset\b', "UPDATE operation detected"), (r'\bmerge\s+into\b', "MERGE operation detected"), # Database administration operations (r'\bgrant\b', "GRANT operation detected"), (r'\brevoke\b', "REVOKE operation detected"), (r'\bcreate\s+user\b', "CREATE USER operation detected"), (r'\balter\s+user\b', "ALTER USER operation detected"), (r'\bdrop\s+user\b', "DROP USER operation detected"), # Execution of arbitrary code (r'\bexec\b', "EXEC operation detected"), (r'\bexecute\s+immediate\b', "EXECUTE IMMEDIATE detected"), (r'\bcall\b', "CALL procedure detected") ] # Check for each dangerous pattern for pattern, reason in dangerous_patterns: if re.search(pattern, normalized_sql, re.IGNORECASE): return True, reason # Check for specific Snowflake commands that could be risky snowflake_patterns = [ (r'\bcopy\s+into\b', "Snowflake COPY INTO operation detected"), (r'\bunload\s+to\b', "Snowflake UNLOAD operation detected"), (r'\bput\b', "Snowflake PUT operation detected"), (r'\bremove\b', "Snowflake REMOVE operation detected"), (r'\bmodify\b', "Snowflake MODIFY operation detected") ] for pattern, reason in snowflake_patterns: if re.search(pattern, normalized_sql, re.IGNORECASE): return True, reason # No risks detected return False, ""

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MammothGrowth/dbt-cli-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server