Skip to main content
Glama

CodeGraph CLI MCP Server

by Jakedismo
view_debug_logs.py9.5 kB
#!/usr/bin/env python3 """ Debug log viewer for CodeGraph agentic tools Parses and displays debug logs generated when CODEGRAPH_DEBUG=1 Usage: python tools/view_debug_logs.py # View latest log python tools/view_debug_logs.py <logfile> # View specific log python tools/view_debug_logs.py --follow # Follow latest log (tail -f) """ import sys import json from pathlib import Path from datetime import datetime import time def find_latest_log(): """Find the most recent debug log file. Search order: 1) CODEGRAPH_DEBUG_DIR (if set) 2) ./.codegraph/debug (cwd) 3) ~/.codegraph/debug (user cache) """ candidates = [] import os env_dir = os.environ.get("CODEGRAPH_DEBUG_DIR") if env_dir: candidates.append(Path(env_dir)) candidates.append(Path(".codegraph/debug")) candidates.append(Path.home() / ".codegraph" / "debug") for debug_dir in candidates: if not debug_dir.exists(): continue log_files = list(debug_dir.glob("agentic_debug_*.jsonl")) if log_files: latest = max(log_files, key=lambda p: p.stat().st_mtime) return latest print("❌ No debug log files found.") print(" Looked in:") for d in candidates: print(f" - {d}") print(" Set CODEGRAPH_DEBUG=1 and run agentic tools to generate logs") sys.exit(1) def format_timestamp(ts_str): """Format ISO timestamp to readable format""" try: dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00')) return dt.strftime("%H:%M:%S.%f")[:-3] # HH:MM:SS.mmm except: return ts_str def format_json_compact(data, max_len=80): """Format JSON compactly for display""" json_str = json.dumps(data, separators=(',', ':')) if len(json_str) <= max_len: return json_str return json_str[:max_len] + "..." def print_tool_start(entry): """Print tool call start event""" ts = format_timestamp(entry['timestamp']) tool = entry['tool'] params = entry.get('parameters', {}) print(f"\n{ts} 🔧 TOOL CALL: {tool}") print(f" 📥 Parameters:") for key, value in params.items(): print(f" {key}: {format_json_compact(value, 100)}") def print_tool_finish(entry): """Print tool call finish event""" ts = format_timestamp(entry['timestamp']) tool = entry['tool'] result = entry.get('result', {}) summary = entry.get('result_summary', {}) print(f"{ts} ✅ TOOL RESULT: {tool}") # Show summary first if summary: result_type = summary.get('type', 'unknown') if result_type == 'array': count = summary.get('count', 0) print(f" 📊 Returned {count} results") if count > 0 and 'sample' in summary: print(f" 📝 Sample result:") sample_str = json.dumps(summary['sample'], indent=2) # Print first few lines of sample for line in sample_str.split('\n')[:10]: print(f" {line}") if sample_str.count('\n') > 10: print(f" ... ({sample_str.count(chr(10)) - 10} more lines)") elif result_type == 'object': keys = summary.get('keys', []) print(f" 📊 Object with keys: {', '.join(keys)}") # Show actual result data for verification if isinstance(result, dict) and result.get('result'): actual_result = result['result'] if isinstance(actual_result, list) and len(actual_result) == 0: print(f" ⚠️ WARNING: Empty result array!") elif isinstance(actual_result, list): print(f" ℹ️ Result count: {len(actual_result)} items") # Show first 3 results with key fields for i, item in enumerate(actual_result[:3]): print(f" 📄 Result [{i}]:") if isinstance(item, dict): # Show key fields for code search results for key in ['node_id', 'name', 'file_path', 'kind', 'vector_score', 'text']: if key in item: value = item[key] # Truncate long text if key == 'text' and isinstance(value, str) and len(value) > 200: value = value[:200] + '...' print(f" {key}: {value}") if len(actual_result) > 3: print(f" ... and {len(actual_result) - 3} more results") elif isinstance(actual_result, dict): print(f" 📄 Result object:") result_str = json.dumps(actual_result, indent=2) for line in result_str.split('\n')[:20]: print(f" {line}") if result_str.count('\n') > 20: print(f" ... ({result_str.count(chr(10)) - 20} more lines)") def print_reasoning_step(entry): """Print reasoning step event""" ts = format_timestamp(entry['timestamp']) step = entry.get('step', '?') thought = entry.get('thought', '') action = entry.get('action') print(f"\n{ts} 💭 STEP {step}: {thought[:100]}") if action: print(f" 🎬 Action: {action}") def print_tool_error(entry): """Print tool call error event""" ts = format_timestamp(entry['timestamp']) tool = entry.get('tool', 'unknown') error = entry.get('error', 'unknown error') params = entry.get('parameters', {}) print(f"{ts} ❌ TOOL ERROR: {tool}") print(f" ⚠️ Error: {error}") if params: print(" 📥 Parameters:") for key, value in params.items(): print(f" {key}: {format_json_compact(value, 100)}") def print_agent_start(entry): """Print agent execution start""" ts = format_timestamp(entry['timestamp']) query = entry.get('query', '') analysis_type = entry.get('analysis_type', '') tier = entry.get('context_tier', '') print(f"\n{'='*80}") print(f"{ts} 🚀 AGENT STARTED") print(f" Query: {query}") print(f" Type: {analysis_type}") print(f" Tier: {tier}") print(f"{'='*80}") def print_agent_finish(entry): """Print agent execution finish""" ts = format_timestamp(entry['timestamp']) success = entry.get('success', False) error = entry.get('error') output = entry.get('output') print(f"\n{'='*80}") if success: print(f"{ts} ✅ AGENT COMPLETED") if output: print(f" 📤 Output available") else: print(f"{ts} ❌ AGENT FAILED") if error: print(f" Error: {error}") print(f"{'='*80}\n") def process_entry(entry): """Process a single log entry""" event_type = entry.get('event', 'unknown') handlers = { 'tool_call_start': print_tool_start, 'tool_call_finish': print_tool_finish, 'tool_call_error': print_tool_error, 'reasoning_step': print_reasoning_step, 'agent_execution_start': print_agent_start, 'agent_execution_finish': print_agent_finish, } handler = handlers.get(event_type) if handler: handler(entry) else: print(f"Unknown event: {event_type}") def view_log(log_file, follow=False): """View a debug log file""" print(f"📖 Viewing: {log_file}\n") if follow: # Tail -f mode with open(log_file, 'r') as f: # Read existing content first f.seek(0, 2) # Seek to end file_size = f.tell() f.seek(max(file_size - 10000, 0)) # Start from last 10KB # Process existing lines for line in f: line = line.strip() if line: try: entry = json.loads(line) process_entry(entry) except json.JSONDecodeError: pass # Now follow print("\n👁️ Watching for new entries (Ctrl+C to stop)...\n") while True: line = f.readline() if line: line = line.strip() if line: try: entry = json.loads(line) process_entry(entry) except json.JSONDecodeError: pass else: time.sleep(0.1) else: # Regular mode with open(log_file, 'r') as f: for line in f: line = line.strip() if line: try: entry = json.loads(line) process_entry(entry) except json.JSONDecodeError as e: print(f"⚠️ Parse error: {e}") def main(): if len(sys.argv) > 1: arg = sys.argv[1] if arg == '--follow' or arg == '-f': log_file = find_latest_log() view_log(log_file, follow=True) else: log_file = Path(arg) if not log_file.exists(): print(f"❌ Log file not found: {log_file}") sys.exit(1) view_log(log_file) else: log_file = find_latest_log() view_log(log_file) if __name__ == '__main__': try: main() except KeyboardInterrupt: print("\n\n👋 Stopped") sys.exit(0)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jakedismo/codegraph-rust'

If you have feedback or need assistance with the MCP directory API, please join our Discord server