Skip to main content
Glama
quick_registry_check.pyโ€ข1.63 kB
#!/usr/bin/env python3 """ Quick test script to verify Schema Registry connections """ import json from typing import Any, Dict import requests def test_registry_connection(url: str) -> Dict[str, Any]: """Test connection to a Schema Registry instance.""" try: response = requests.get(f"{url}/subjects", timeout=5) if response.status_code == 200: subjects = response.json() return { "status": "connected", "url": url, "subjects_count": len(subjects), "subjects": subjects[:5], # Show first 5 subjects } else: return { "status": "error", "url": url, "error": f"HTTP {response.status_code}: {response.text}", } except Exception as e: return {"status": "error", "url": url, "error": str(e)} def main(): # Test DEV registry print("\nTesting DEV Schema Registry (localhost:38081)...") dev_result = test_registry_connection("http://localhost:38081") print(json.dumps(dev_result, indent=2)) # Test PROD registry print("\nTesting PROD Schema Registry (localhost:38082)...") prod_result = test_registry_connection("http://localhost:38082") print(json.dumps(prod_result, indent=2)) # Print summary print("\nConnection Summary:") print(f"DEV Registry: {'โœ… Connected' if dev_result['status'] == 'connected' else 'โŒ Failed'}") print(f"PROD Registry: {'โœ… Connected' if prod_result['status'] == 'connected' else 'โŒ Failed'}") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aywengo/kafka-schema-reg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server