Skip to main content
Glama
cli.py21.9 kB
#!/usr/bin/env python3 """ Command-line interface for dbt tools. """ import os import sys import json import asyncio import argparse from pathlib import Path from typing import Dict, Any, Optional, List # No need for these imports anymore from src.config import initialize as initialize_config def parse_args() -> argparse.Namespace: """Parse command line arguments.""" parser = argparse.ArgumentParser(description="DBT CLI MCP Command Line Interface") # Global options parser.add_argument( "--dbt-path", help="Path to dbt executable", default=os.environ.get("DBT_PATH", "dbt") ) parser.add_argument( "--env-file", help="Path to environment file", default=os.environ.get("ENV_FILE", ".env") ) parser.add_argument( "--log-level", help="Logging level", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], default=os.environ.get("LOG_LEVEL", "INFO") ) parser.add_argument( "--format", help="Output format", choices=["text", "json"], default="text" ) # Set up subparsers for each command subparsers = parser.add_subparsers(dest="command", help="Command to execute") # dbt_run command run_parser = subparsers.add_parser("run", help="Run dbt models") run_parser.add_argument("--models", help="Specific models to run") run_parser.add_argument("--selector", help="Named selector to use") run_parser.add_argument("--exclude", help="Models to exclude") run_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".") run_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)") run_parser.add_argument("--full-refresh", help="Perform a full refresh", action="store_true") # dbt_test command test_parser = subparsers.add_parser("test", help="Run dbt tests") test_parser.add_argument("--models", help="Specific models to test") test_parser.add_argument("--selector", help="Named selector to use") test_parser.add_argument("--exclude", help="Models to exclude") test_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".") test_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)") # dbt_ls command ls_parser = subparsers.add_parser("ls", help="List dbt resources") ls_parser.add_argument("--models", help="Specific models to list") ls_parser.add_argument("--selector", help="Named selector to use") ls_parser.add_argument("--exclude", help="Models to exclude") ls_parser.add_argument("--resource-type", help="Type of resource to list") ls_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".") ls_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)") ls_parser.add_argument("--output-format", help="Output format", choices=["json", "name", "path", "selector"], default="json") ls_parser.add_argument("--verbose", help="Return full JSON output instead of simplified version", action="store_true") # dbt_compile command compile_parser = subparsers.add_parser("compile", help="Compile dbt models") compile_parser.add_argument("--models", help="Specific models to compile") compile_parser.add_argument("--selector", help="Named selector to use") compile_parser.add_argument("--exclude", help="Models to exclude") compile_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".") compile_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)") # dbt_debug command debug_parser = subparsers.add_parser("debug", help="Debug dbt project") debug_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".") debug_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)") # dbt_deps command deps_parser = subparsers.add_parser("deps", help="Install dbt package dependencies") deps_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".") deps_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)") # dbt_seed command seed_parser = subparsers.add_parser("seed", help="Load CSV files as seed data") seed_parser.add_argument("--selector", help="Named selector to use") seed_parser.add_argument("--exclude", help="Seeds to exclude") seed_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".") seed_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)") # dbt_show command show_parser = subparsers.add_parser("show", help="Preview model results") show_parser.add_argument("--models", help="Specific model to show", required=True) show_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".") show_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)") show_parser.add_argument("--limit", help="Limit the number of rows returned", type=int) show_parser.add_argument("--output-format", help="Output format (json, table, etc.)", default="json") # dbt_build command build_parser = subparsers.add_parser("build", help="Run build command") build_parser.add_argument("--models", help="Specific models to build") build_parser.add_argument("--selector", help="Named selector to use") build_parser.add_argument("--exclude", help="Models to exclude") build_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".") build_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)") build_parser.add_argument("--full-refresh", help="Perform a full refresh", action="store_true") # configure command configure_parser = subparsers.add_parser("configure", help="Configure dbt path") configure_parser.add_argument("path", help="Path to dbt executable") return parser.parse_args() # Define tool functions directly async def run_dbt_run(models=None, selector=None, exclude=None, project_dir=".", profiles_dir=None, full_refresh=False): """Run dbt models.""" command = ["run"] if models: command.extend(["-s", models]) if selector: command.extend(["--selector", selector]) if exclude: command.extend(["--exclude", exclude]) if full_refresh: command.append("--full-refresh") from src.command import execute_dbt_command result = await execute_dbt_command(command, project_dir, profiles_dir) if not result["success"]: error_msg = f"Error executing dbt run: {result['error']}" if 'output' in result and result['output']: error_msg += f"\nOutput: {result['output']}" return error_msg return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"]) async def run_dbt_test(models=None, selector=None, exclude=None, project_dir=".", profiles_dir=None): """Run dbt tests.""" command = ["test"] if models: command.extend(["-s", models]) if selector: command.extend(["--selector", selector]) if exclude: command.extend(["--exclude", exclude]) from src.command import execute_dbt_command result = await execute_dbt_command(command, project_dir, profiles_dir) if not result["success"]: error_msg = f"Error executing dbt test: {result['error']}" if result["output"]: error_msg += f"\nOutput: {result['output']}" return error_msg return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"]) async def run_dbt_ls(models=None, selector=None, exclude=None, resource_type=None, project_dir=".", profiles_dir=None, output_format="json", verbose=False): """List dbt resources.""" command = ["ls"] if models: command.extend(["-s", models]) if selector: command.extend(["--selector", selector]) if exclude: command.extend(["--exclude", exclude]) if resource_type: command.extend(["--resource-type", resource_type]) command.extend(["--output", output_format]) from src.command import execute_dbt_command, parse_dbt_list_output import re result = await execute_dbt_command(command, project_dir, profiles_dir) if not result["success"]: error_msg = f"Error executing dbt ls: {result['error']}" if "output" in result and result["output"]: error_msg += f"\nOutput: {result['output']}" return error_msg # For json format, parse the output and return as JSON if output_format == "json": # Return raw output if it's an empty string or None if not result["output"]: return "[]" # If the output is already a list, return it directly if isinstance(result["output"], list): parsed = result["output"] else: # Parse the output parsed = parse_dbt_list_output(result["output"]) # If not verbose, simplify the output if not verbose and parsed: simplified = [] for item in parsed: if isinstance(item, dict): simplified.append({ "name": item.get("name"), "resource_type": item.get("resource_type"), "depends_on": { "nodes": item.get("depends_on", {}).get("nodes", []) } }) parsed = simplified return json.dumps(parsed, indent=2) # For other formats, return the raw output return str(result["output"]) async def run_dbt_compile(models=None, selector=None, exclude=None, project_dir=".", profiles_dir=None): """Compile dbt models.""" command = ["compile"] if models: command.extend(["-s", models]) if selector: command.extend(["--selector", selector]) if exclude: command.extend(["--exclude", exclude]) from src.command import execute_dbt_command result = await execute_dbt_command(command, project_dir, profiles_dir) if not result["success"]: error_msg = f"Error executing dbt compile: {result['error']}" if result["output"]: error_msg += f"\nOutput: {result['output']}" return error_msg return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"]) async def run_dbt_debug(project_dir=".", profiles_dir=None): """Debug dbt project.""" command = ["debug"] from src.command import execute_dbt_command result = await execute_dbt_command(command, project_dir, profiles_dir) if not result["success"]: error_msg = f"Error executing dbt debug: {result['error']}" if result["output"]: error_msg += f"\nOutput: {result['output']}" return error_msg return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"]) async def run_dbt_deps(project_dir=".", profiles_dir=None): """Install dbt package dependencies.""" command = ["deps"] from src.command import execute_dbt_command result = await execute_dbt_command(command, project_dir, profiles_dir) if not result["success"]: error_msg = f"Error executing dbt deps: {result['error']}" if result["output"]: error_msg += f"\nOutput: {result['output']}" return error_msg return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"]) async def run_dbt_seed(selector=None, exclude=None, project_dir=".", profiles_dir=None): """Load CSV files as seed data.""" command = ["seed"] if selector: command.extend(["--selector", selector]) if exclude: command.extend(["--exclude", exclude]) from src.command import execute_dbt_command result = await execute_dbt_command(command, project_dir, profiles_dir) if not result["success"]: error_msg = f"Error executing dbt seed: {result['error']}" if result["output"]: error_msg += f"\nOutput: {result['output']}" return error_msg return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"]) async def run_dbt_show(models, project_dir=".", profiles_dir=None, limit=None, output_format="json"): """Preview model results.""" # For successful cases, use --quiet and --output json for clean JSON output # For error cases, don't use --quiet to get the full error message from src.command import execute_dbt_command import re # Check if models parameter contains inline SQL is_inline_sql = models.strip().lower().startswith('select ') # If it's inline SQL, strip out any LIMIT clause as we'll handle it with the --limit parameter if is_inline_sql: # Use regex to remove LIMIT clause from the SQL models = re.sub(r'\bLIMIT\s+\d+\b', '', models, flags=re.IGNORECASE) # For inline SQL, use the --inline flag with the SQL as its value command = ["show", f"--inline={models}", "--output", output_format] if limit: command.extend(["--limit", str(limit)]) result = await execute_dbt_command(command, project_dir, profiles_dir) # Check for specific error patterns in the output if not result["success"] or ( isinstance(result["output"], str) and any(err in result["output"].lower() for err in ["error", "failed", "syntax", "exception"]) ): error_msg = "Error executing dbt show with inline SQL" if result["output"]: return error_msg + "\n" + str(result["output"]) elif result["error"]: return error_msg + "\n" + str(result["error"]) else: return f"{error_msg}: Command failed with exit code {result.get('returncode', 'unknown')}" else: # For regular model references, check if the model exists first check_command = ["ls", "-s", models] check_result = await execute_dbt_command(check_command, project_dir, profiles_dir) # If the model doesn't exist, return the error message if not check_result["success"] or "does not match any enabled nodes" in str(check_result["output"]): error_msg = "Error executing dbt show: Model does not exist or is not enabled" if check_result["output"]: return error_msg + "\n" + str(check_result["output"]) elif check_result["error"]: return error_msg + "\n" + str(check_result["error"]) else: return error_msg # If the model exists, run the show command with --quiet and --output json command = ["show", "-s", models, "--quiet", "--output", output_format] if limit: command.extend(["--limit", str(limit)]) result = await execute_dbt_command(command, project_dir, profiles_dir) # If the command succeeded, return the JSON output if result["success"]: return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"]) # If the command failed, return the error message error_msg = "Error executing dbt show: " if result["output"]: return error_msg + str(result["output"]) elif result["error"]: return error_msg + str(result["error"]) else: return f"{error_msg}Command failed with exit code {result.get('returncode', 'unknown')}" async def run_dbt_build(models=None, selector=None, exclude=None, project_dir=".", profiles_dir=None, full_refresh=False): """Run build command.""" command = ["build"] if models: command.extend(["-s", models]) if selector: command.extend(["--selector", selector]) if exclude: command.extend(["--exclude", exclude]) if full_refresh: command.append("--full-refresh") from src.command import execute_dbt_command result = await execute_dbt_command(command, project_dir, profiles_dir) if not result["success"]: error_msg = f"Error executing dbt build: {result['error']}" if result["output"]: error_msg += f"\nOutput: {result['output']}" return error_msg return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"]) async def run_configure_dbt_path(path): """Configure dbt path.""" import os from src.config import set_config if not os.path.isfile(path): return f"Error: File not found at {path}" set_config("dbt_path", path) return f"dbt path configured to: {path}" async def main_async() -> None: """Main entry point for the CLI.""" args = parse_args() # Set environment variables from arguments os.environ["DBT_PATH"] = args.dbt_path os.environ["ENV_FILE"] = args.env_file os.environ["LOG_LEVEL"] = args.log_level # Initialize configuration initialize_config() # Map commands to functions command_map = { "run": run_dbt_run, "test": run_dbt_test, "ls": run_dbt_ls, "compile": run_dbt_compile, "debug": run_dbt_debug, "deps": run_dbt_deps, "seed": run_dbt_seed, "show": run_dbt_show, "build": run_dbt_build, "configure": run_configure_dbt_path } if args.command not in command_map: print(f"Command '{args.command}' not found. Use --help for usage information.") sys.exit(1) # Prepare arguments for the function func_args = {} if args.command == "run": func_args = { "models": args.models, "selector": args.selector, "exclude": args.exclude, "project_dir": args.project_dir, "profiles_dir": args.profiles_dir, "full_refresh": args.full_refresh } elif args.command == "test": func_args = { "models": args.models, "selector": args.selector, "exclude": args.exclude, "project_dir": args.project_dir, "profiles_dir": args.profiles_dir } elif args.command == "ls": func_args = { "models": args.models, "selector": args.selector, "exclude": args.exclude, "resource_type": args.resource_type, "project_dir": args.project_dir, "profiles_dir": args.profiles_dir, "output_format": args.output_format, "verbose": args.verbose } elif args.command == "compile": func_args = { "models": args.models, "selector": args.selector, "exclude": args.exclude, "project_dir": args.project_dir, "profiles_dir": args.profiles_dir } elif args.command == "debug": func_args = { "project_dir": args.project_dir, "profiles_dir": args.profiles_dir } elif args.command == "deps": func_args = { "project_dir": args.project_dir, "profiles_dir": args.profiles_dir } elif args.command == "seed": func_args = { "selector": args.selector, "exclude": args.exclude, "project_dir": args.project_dir, "profiles_dir": args.profiles_dir } elif args.command == "show": func_args = { "models": args.models, "project_dir": args.project_dir, "profiles_dir": args.profiles_dir, "limit": args.limit, "output_format": args.output_format } elif args.command == "build": func_args = { "models": args.models, "selector": args.selector, "exclude": args.exclude, "project_dir": args.project_dir, "profiles_dir": args.profiles_dir, "full_refresh": args.full_refresh } elif args.command == "configure": func_args = { "path": args.path } # Execute the function result = await command_map[args.command](**{k: v for k, v in func_args.items() if v is not None}) # Print the result # For dbt_show command errors, print raw output regardless of format if args.command == "show" and isinstance(result, str) and result.startswith("Error executing dbt show:"): print(result) elif args.format == "json": try: # If result is already a JSON string, parse it first if isinstance(result, str) and (result.startswith("{") or result.startswith("[")): parsed = json.loads(result) print(json.dumps(parsed, indent=2)) else: print(json.dumps({"output": result}, indent=2)) except json.JSONDecodeError: print(json.dumps({"output": result}, indent=2)) else: print(result) def main_entry() -> None: """Entry point for setuptools.""" asyncio.run(main_async()) if __name__ == "__main__": asyncio.run(main_async())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MammothGrowth/dbt-cli-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server