Skip to main content
Glama
cli.py9.36 kB
"""CLI for OpenAccess MCP.""" import asyncio import sys from pathlib import Path from typing import Optional import typer from rich.console import Console from rich.table import Table from rich.panel import Panel from .server import OpenAccessMCPServer from .audit import get_audit_logger, create_audit_signer from .secrets import initialize_secret_store app = typer.Typer(help="OpenAccess MCP - Secure remote access server") console = Console() @app.command() def start( profiles: Path = typer.Option( "./profiles", "--profiles", "-p", help="Directory containing profile JSON files" ), vault_addr: Optional[str] = typer.Option( None, "--vault-addr", help="HashiCorp Vault address" ), vault_token: Optional[str] = typer.Option( None, "--vault-token", help="HashiCorp Vault token" ), secrets_dir: Optional[Path] = typer.Option( None, "--secrets-dir", help="Directory for file-based secrets" ), audit_log: Optional[Path] = typer.Option( None, "--audit-log", help="Audit log file path" ), audit_key: Optional[Path] = typer.Option( None, "--audit-key", help="Audit signing key path" ), verbose: bool = typer.Option( False, "--verbose", "-v", help="Enable verbose logging" ) ): """Start the OpenAccess MCP server.""" try: # Validate profiles directory if not profiles.exists(): console.print(f"[red]Error: Profiles directory not found: {profiles}[/red]") sys.exit(1) # Initialize secret store if vault_addr and vault_token: initialize_secret_store( vault_addr=vault_addr, vault_token=vault_token, secrets_dir=secrets_dir ) console.print(f"[green]✓[/green] Vault integration enabled") elif secrets_dir: initialize_secret_store(secrets_dir=secrets_dir) console.print(f"[green]✓[/green] File-based secrets enabled") else: console.print("[yellow]⚠[/yellow] No secret store configured, using defaults") # Initialize audit system if audit_key: signer = create_audit_signer(audit_key) console.print(f"[green]✓[/green] Audit signing enabled with key: {audit_key}") else: signer = create_audit_signer() console.print(f"[green]✓[/green] Audit signing enabled with generated key") if audit_log: audit_logger = get_audit_logger(audit_log) console.print(f"[green]✓[/green] Audit logging enabled: {audit_log}") # Display startup information console.print(Panel.fit( "[bold blue]OpenAccess MCP Server[/bold blue]\n" f"Profiles: {profiles.absolute()}\n" f"Audit: {'enabled' if audit_log else 'default'}\n" f"Secrets: {'vault' if vault_addr else 'file' if secrets_dir else 'default'}", title="Server Configuration" )) # Create and run server server = OpenAccessMCPServer(profiles) if verbose: console.print("[dim]Starting server in verbose mode...[/dim]") asyncio.run(server.run()) except KeyboardInterrupt: console.print("\n[yellow]Server stopped by user[/yellow]") except Exception as e: console.print(f"[red]Server error: {e}[/red]") if verbose: import traceback console.print(traceback.format_exc()) sys.exit(1) @app.command() def profiles( profiles_dir: Path = typer.Option( "./profiles", "--profiles", "-p", help="Directory containing profile JSON files" ) ): """List available profiles.""" try: if not profiles_dir.exists(): console.print(f"[red]Error: Profiles directory not found: {profiles_dir}[/red]") sys.exit(1) profile_files = list(profiles_dir.glob("*.json")) if not profile_files: console.print(f"[yellow]No profile files found in {profiles_dir}[/yellow]") return table = Table(title=f"Profiles in {profiles_dir}") table.add_column("ID", style="cyan") table.add_column("Host", style="green") table.add_column("Port", style="blue") table.add_column("Protocols", style="yellow") table.add_column("Description", style="white") for profile_file in profile_files: try: import json with open(profile_file, "r") as f: profile_data = json.load(f) table.add_row( profile_data.get("id", "unknown"), profile_data.get("host", "unknown"), str(profile_data.get("port", 22)), ", ".join(profile_data.get("protocols", [])), profile_data.get("description", "") ) except Exception as e: table.add_row( profile_file.stem, "[red]Error[/red]", "", "", str(e) ) console.print(table) except Exception as e: console.print(f"[red]Error: {e}[/red]") sys.exit(1) @app.command() def audit( log_file: Path = typer.Option( None, "--log-file", "-l", help="Audit log file path" ) ): """Show audit log statistics.""" try: audit_logger = get_audit_logger(log_file) stats = audit_logger.get_log_stats() if "error" in stats: console.print(f"[red]Error reading audit log: {stats['error']}[/red]") return table = Table(title="Audit Log Statistics") table.add_column("Metric", style="cyan") table.add_column("Value", style="green") table.add_row("Total Lines", str(stats["total_lines"])) table.add_row("File Size", f"{stats['file_size_bytes']} bytes") table.add_row("Last Modified", stats["last_modified"]) console.print(table) # Show record counts by tool if stats["record_counts"]: tool_table = Table(title="Records by Tool") tool_table.add_column("Tool", style="cyan") tool_table.add_column("Result", style="yellow") tool_table.add_column("Count", style="green") for tool, results in stats["record_counts"].items(): for result, count in results.items(): tool_table.add_row(tool, result, str(count)) console.print(tool_table) except Exception as e: console.print(f"[red]Error: {e}[/red]") sys.exit(1) @app.command() def verify( log_file: Path = typer.Option( None, "--log-file", "-l", help="Audit log file path" ) ): """Verify audit log integrity.""" try: audit_logger = get_audit_logger(log_file) verification = audit_logger.verify_log_integrity() if "error" in verification: console.print(f"[red]Verification failed: {verification['error']}[/red]") return table = Table(title="Audit Log Verification") table.add_column("Metric", style="cyan") table.add_column("Value", style="green") table.add_row("Total Records", str(verification["total_records"])) table.add_row("Chain Valid", "✓" if verification["chain_valid"] else "✗") table.add_row("First Record", verification["first_record"] or "N/A") table.add_row("Last Record", verification["last_record"] or "N/A") console.print(table) if verification["chain_valid"]: console.print("[green]✓ Audit log integrity verified[/green]") else: console.print("[red]✗ Audit log integrity check failed[/red]") sys.exit(1) except Exception as e: console.print(f"[red]Error: {e}[/red]") sys.exit(1) @app.command() def generate_keys( output_dir: Path = typer.Option( "./keys", "--output-dir", "-o", help="Output directory for generated keys" ) ): """Generate new audit signing keys.""" try: signer = create_audit_signer() private_key, public_key = signer.generate_keypair(output_dir) console.print(f"[green]✓[/green] Generated new audit keypair:") console.print(f" Private key: {private_key}") console.print(f" Public key: {public_key}") console.print(f"\n[yellow]Warning: Keep the private key secure![/yellow]") except Exception as e: console.print(f"[red]Error: {e}[/red]") sys.exit(1) @app.command() def version(): """Show version information.""" from . import __version__ console.print(f"[bold blue]OpenAccess MCP[/bold blue] version {__version__}") if __name__ == "__main__": app()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keepithuman/openaccess-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server