client-example.pyโข5.6 kB
#!/usr/bin/env python3
"""
Simple CLI client to interact with the MCP-LLama Bridge Service
This demonstrates how to use the integrated LLama + Kafka Schema Registry MCP system
"""
import argparse
import asyncio
import json
import sys
import httpx
class MCPLlamaClient:
def __init__(self, bridge_url: str = "http://localhost:8080"):
self.bridge_url = bridge_url
async def health_check(self) -> dict:
"""Check if the bridge service is healthy"""
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.bridge_url}/health")
response.raise_for_status()
return response.json()
async def chat(self, message: str, model: str = "llama3.2:3b", use_mcp: bool = True) -> dict:
"""Send a chat message to LLama with optional MCP integration"""
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(
f"{self.bridge_url}/chat", json={"message": message, "model": model, "use_mcp": use_mcp}
)
response.raise_for_status()
return response.json()
async def interactive_chat(self, model: str = "llama3.2:3b"):
"""Start an interactive chat session"""
print(f"๐ค Starting interactive chat with {model}")
print("๐ก Type 'help' for Schema Registry commands, 'quit' to exit")
print("๐ง MCP tools are enabled - you can ask about schemas, subjects, etc.")
print("-" * 60)
while True:
try:
user_input = input("\n๐ง You: ").strip()
if user_input.lower() in ["quit", "exit", "q"]:
print("๐ Goodbye!")
break
if user_input.lower() == "help":
self.show_help()
continue
if not user_input:
continue
print("๐ค LLama: Thinking...", end="", flush=True)
response = await self.chat(user_input, model=model)
print(f"\r๐ค LLama: {response['response']}")
if response.get("used_tools"):
print(f"๐ง Used tools: {', '.join(response['used_tools'])}")
except KeyboardInterrupt:
print("\n๐ Goodbye!")
break
except Exception as e:
print(f"\nโ Error: {e}")
def show_help(self):
"""Show help information"""
print(
"""
๐ Schema Registry Commands You Can Try:
๐ Basic Operations:
โข "List all subjects in the schema registry"
โข "Show me the schemas in the user-events subject"
โข "Get the latest version of the order-schema"
โข "Show me all contexts"
๐ Schema Analysis:
โข "Show me the structure of the user-profile schema"
โข "Compare versions 1 and 2 of the payment-events schema"
โข "Find all schemas that contain a field called 'user_id'"
โ
Compatibility & Validation:
โข "Check if this schema is compatible: {schema_json}"
โข "Validate this schema against AVRO standards"
โข "Show me compatibility requirements for subject X"
๐ Registry Management:
โข "Export all schemas from the production context"
โข "Show me registry statistics and usage"
โข "List all schema versions for subject Y"
๐ง Configuration:
โข "Show global compatibility settings"
โข "What's the current mode of the registry?"
โข "Update compatibility mode for subject Z"
๐ก You can ask in natural language - LLama will understand and use the appropriate tools!
"""
)
async def main():
parser = argparse.ArgumentParser(description="MCP-LLama Client")
parser.add_argument("--bridge-url", default="http://localhost:8080", help="Bridge service URL")
parser.add_argument("--model", default="llama3.2:3b", help="LLama model to use")
parser.add_argument("--message", help="Single message to send")
parser.add_argument("--no-mcp", action="store_true", help="Disable MCP tools")
parser.add_argument("--health", action="store_true", help="Check service health")
args = parser.parse_args()
client = MCPLlamaClient(args.bridge_url)
try:
if args.health:
print("๐ฅ Checking service health...")
health = await client.health_check()
print(json.dumps(health, indent=2))
return
# Check if service is available
health = await client.health_check()
if health["status"] != "healthy":
print(f"โ ๏ธ Service is {health['status']}")
if health["status"] == "unhealthy":
print("โ Bridge service is not available")
sys.exit(1)
if args.message:
# Single message mode
print(f"๐ค Sending message to {args.model}...")
response = await client.chat(args.message, model=args.model, use_mcp=not args.no_mcp)
print(f"\n๐ค Response: {response['response']}")
if response.get("used_tools"):
print(f"๐ง Used tools: {', '.join(response['used_tools'])}")
else:
# Interactive mode
await client.interactive_chat(args.model)
except httpx.ConnectError:
print("โ Cannot connect to bridge service. Make sure it's running at:", args.bridge_url)
print("๐ก Try running: ./run-llama-mcp.sh start")
sys.exit(1)
except Exception as e:
print(f"โ Error: {e}")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())