Skip to main content
Glama

Magic-API MCP Server

by Dwsy
magic_api_debug_client.py38.9 kB
#!/usr/bin/env python3 """ Magic-API WebSocket调试客户端和断点调试工具 用于连接Magic-API WebSocket控制台,支持断点调试和实时日志监听 功能特性: 1. WebSocket连接和日志监听 2. 断点设置和操作功能 3. 交互式调试会话 4. 异常处理和调试信息 5. 灵活的API调用功能 6. 支持GET/POST/PUT/DELETE方法 7. 支持传递查询参数和请求体 8. 实时日志显示 使用方法: # 启动交互式调试会话 python3 magic_api_debug_client.py # 或者在代码中使用: from magic_api_debug_client import MagicAPIDebugClient client = MagicAPIDebugClient(WS_URL, API_BASE_URL, USERNAME, PASSWORD) # 使用调试功能... """ import asyncio import websockets import json import requests import threading import time import sys try: import readline except ImportError: # Windows 系统使用 pyreadline3 try: import pyreadline3 as readline except ImportError: # 如果都没有 readline 功能,创建一个兼容层 class MockReadline: def get_line_buffer(self): return "" def redisplay(self): pass def set_completer(self, completer): pass def set_completer_delims(self, delims): pass def parse_and_bind(self, binding): pass def read_history_file(self, filename): pass def write_history_file(self, filename): pass readline = MockReadline() import rlcompleter from typing import List, Optional class DebugCompleter: """自定义命令补全器,特别支持test命令的路径补全""" def __init__(self): self.commands = [ 'test', 'call', 'breakpoint', 'bp', 'remove_bp', 'rm_bp', 'resume', 'step', 'list_bp', 'help', 'quit' ] self.http_methods = ['GET', 'POST', 'PUT', 'DELETE'] self.common_paths = [ '/test00/test0001', '/magic/web/resource', '/api/test', '/api/search', '/api/create', '/api/update', '/api/delete' ] def complete(self, text, state): """补全函数""" if state == 0: # 第一次调用,生成补全列表 line = readline.get_line_buffer() self.matches = self._get_matches(line, text) if not self.matches: return None try: result = self.matches[state] return result except IndexError: return None def _get_matches(self, line, text): """获取匹配的补全项""" matches = [] # 如果是空行或只输入了部分命令 if not line.strip() or ' ' not in line: # 补全命令 for cmd in self.commands: if cmd.startswith(text): matches.append(cmd) else: # 解析命令和参数 parts = line.split() command = parts[0].lower() if command == 'test': # test命令的特殊处理 if len(parts) == 1: # 没有参数,补全为test if text and 'test'.startswith(text): matches.append('test ') elif len(parts) == 2: # 补全断点参数或路径 if text.startswith('/') or not text: # 补全路径 for path in self.common_paths: if path.startswith(text): matches.append(path) # 不补全断点数字 elif command in ['call', 'breakpoint', 'bp', 'remove_bp', 'rm_bp']: current_part_index = len(parts) - 1 if command == 'call': if current_part_index == 1: # 补全HTTP方法 for method in self.http_methods: if method.startswith(text.upper()): matches.append(method) elif current_part_index == 2: # 补全路径 for path in self.common_paths: if path.startswith(text): matches.append(path) # 不补全其他命令的参数 return matches def setup_readline(): """设置readline以支持方向键和自动补全""" # 清除任何现有的补全器设置 readline.set_completer(None) readline.set_completer_delims('\t\n ') # 设置补全器 completer = DebugCompleter() readline.set_completer(completer.complete) readline.set_completer_delims('\t\n') # 只用tab和换行符作为分隔符 # 启用Tab补全,覆盖任何现有绑定 readline.parse_and_bind('tab: complete') readline.parse_and_bind('set show-all-if-ambiguous off') # 启用历史记录 readline.parse_and_bind('set enable-keypad on') # 设置历史文件(可选) histfile = '.magic_debug_history' try: readline.read_history_file(histfile) except FileNotFoundError: pass # 保存历史记录 import atexit atexit.register(lambda: readline.write_history_file(histfile)) class MagicAPIDebugClient: def __init__(self, ws_url, api_base_url, username=None, password=None): self.ws_url = ws_url self.api_base_url = api_base_url self.username = username self.password = password self.websocket = None # 生成随机client_id,格式与服务器期望的一致 self.client_id = self._generate_client_id() self.breakpoints = [] # 存储断点行号 self.debug_context = None self.is_connected = asyncio.Event() # 用于同步连接状态 self.connected = False # 断点调试状态管理 self.debug_mode = False # 是否处于调试模式 self.breakpoint_hit = asyncio.Event() # 断点触发事件 self.breakpoint_data = None # 当前断点信息 self.waiting_for_resume = False # 是否等待恢复命令 def _generate_client_id(self): """生成随机client_id,格式与服务器期望的一致(16字符十六进制)""" import random import string # 生成16字符的随机十六进制字符串(小写字母+数字) chars = string.hexdigits.lower() # '0123456789abcdef' return ''.join(random.choice(chars) for _ in range(16)) async def connect(self): """连接到 WebSocket""" try: self.websocket = await websockets.connect(self.ws_url) self.connected = True print(f"✅ 已连接到 WebSocket: {self.ws_url}") self.is_connected.set() # 设置连接成功事件 await self.login() await self.listen_messages() except websockets.exceptions.ConnectionClosedOK: print("🔌 WebSocket 连接已正常关闭") self.connected = False except websockets.exceptions.ConnectionClosed as e: print(f"❌ WebSocket 连接异常关闭: {e}") self.connected = False except Exception as e: print(f"❌ WebSocket 连接失败: {e}") self.connected = False raise # 重新抛出异常以便外部捕获 async def login(self): """发送登录消息""" login_message = f"login,{self.username or 'guest'},{self.client_id}" await self.websocket.send(login_message) print(f"📤 已发送登录消息: {login_message}") async def set_breakpoint(self, line_number: int): """设置断点 - 断点通过HTTP请求头设置,不需要WebSocket消息""" if line_number not in self.breakpoints: self.breakpoints.append(line_number) print(f"🔴 设置断点在第 {line_number} 行") async def remove_breakpoint(self, line_number: int): """移除断点 - 断点通过HTTP请求头设置,不需要WebSocket消息""" if line_number in self.breakpoints: self.breakpoints.remove(line_number) print(f"🔵 移除断点在第 {line_number} 行") async def resume_breakpoint(self): """恢复断点执行""" await self._send_step_command(0) # 0表示resume print("▶️ 恢复执行") async def step_over(self): """单步执行(越过)""" await self._send_step_command(1) # 1表示step over print("⏭️ 单步执行(越过)") async def step_into(self): """单步执行(进入)""" await self._send_step_command(2) # 2表示step into print("⏬ 单步执行(进入)") async def step_out(self): """单步执行(跳出)""" await self._send_step_command(3) # 3表示step out print("⏫ 单步执行(跳出)") async def _send_step_command(self, step_type: int): """ 发送步进命令 Args: step_type: 步进类型 (0=resume, 1=step_over, 2=step_into, 3=step_out) """ try: # 获取当前断点信息中的script_id script_id = "24646387e5654d78b4898ac7ed2eb560" # 默认值 if hasattr(self, 'current_api_path') and self.current_api_path: script_id = self._get_script_id_by_path(self.current_api_path) # 获取当前断点信息 breakpoints_str = "" if self.breakpoints: breakpoints_str = "|".join(map(str, sorted(self.breakpoints))) # 构建消息: resume_breakpoint,script_id,step_type,breakpoints message = f"resume_breakpoint,{script_id},{step_type},{breakpoints_str}" await self.websocket.send(message) # 清除断点暂停状态 self.waiting_for_resume = False self.breakpoint_data = None self.breakpoint_hit.clear() except Exception as e: print(f"❌ 发送步进命令失败: {e}") async def listen_messages(self): """监听 WebSocket 消息""" try: async for message in self.websocket: await self.handle_message(message) except websockets.exceptions.ConnectionClosed: print("🔌 WebSocket 连接已关闭") self.connected = False async def handle_message(self, message): """处理接收到的消息 - 实时高效处理""" # 性能监控开始 start_time = time.time() parts = message.split(',', 1) if len(parts) < 1: return message_type = parts[0].upper() content = parts[1] if len(parts) > 1 else "" # 特殊处理PING消息:回复pong但不显示 if message_type == "PING": await self.websocket.send("pong") return # 忽略登录类型和状态消息 # if message_type in ["USER_LOGIN", "LOGIN", "LOGOUT", "USER_LOGOUT", "ONLINE_USERS","INTO_FILE_ID"]: # return # 根据消息类型进行相应处理和显示 if message_type == "LOG": # 单个日志消息 - 只显示内容 print(f"📝 {content}") elif message_type == "LOGS": # 多条日志消息 - 优化输出性能 try: logs = json.loads(content) if len(logs) > 100: # 大量日志时只显示前100条和总数 for log in logs[:100]: print(f"📝 {log}") print(f"📝 ...还有{len(logs)-100}条日志") else: for log in logs: print(f"📝 {log}") except json.JSONDecodeError: print(f"📝 {content}") elif message_type == "BREAKPOINT": # 进入断点 - 消息格式: BREAKPOINT,script_id,{json_data} try: # 解析消息格式: script_id,{json_data} if ',' in content: script_id, json_str = content.split(',', 1) else: script_id = '未知' json_str = content # 解析JSON数据 breakpoint_data = json.loads(json_str) # 提取断点信息 variables = breakpoint_data.get('variables', []) range_info = breakpoint_data.get('range', []) # 从range信息提取行号 [start_line, start_col, end_line, end_col] if len(range_info) >= 3: line_number = range_info[0] # 开始行号 else: line_number = '未知' # 高效的断点信息显示 print(f"🔴 [断点] 脚本 '{script_id}' 在第 {line_number} 行暂停") # 快速显示变量摘要 if variables: var_count = len(variables) print(f"📊 变量: {var_count} 个") # 只显示前10个重要变量,避免输出过多影响实时性 for var_info in variables[:10]: var_name = var_info.get('name', '未知') var_type = var_info.get('type', '未知').split('.')[-1] # 简化类型名 var_value = str(var_info.get('value', '未知')) # 截断过长的值 if len(var_value) > 50: var_value = var_value[:1000] + "..." print(f" {var_name} ({var_type}) = {var_value}") if var_count > 10: print(f" ...还有{var_count-10}个变量") # 简化断点范围信息 if range_info and len(range_info) >= 6: start_line, start_col, end_line, end_col = range_info[:6] print(f"📍 位置: 第{start_line}行第{start_col}列") # 设置断点状态,等待用户恢复命令 self.breakpoint_data = { 'script_id': script_id, 'line_number': line_number, 'variables': variables, 'range': range_info, 'raw_data': breakpoint_data } self.waiting_for_resume = True self.breakpoint_hit.set() print("💡 resume/step/quit") except (json.JSONDecodeError, ValueError) as e: print(f"🔴 [断点] 解析断点消息失败: {e}") print(f" 原始消息: {content}") self.breakpoint_hit.set() elif message_type == "EXCEPTION": # 请求接口发生异常 - 优化显示性能 try: exception_data = json.loads(content) exception_type = exception_data.get('type', '未知') message = exception_data.get('message', '无详细信息') # 简化异常显示,避免输出过多堆栈信息影响实时性 print(f"❌ 异常: {exception_type} - {message}") if 'stackTrace' in exception_data: stack = exception_data['stackTrace'] if len(stack) > 100: print(f" 堆栈: {stack[:97]}...") else: print(f" 堆栈: {stack}") except json.JSONDecodeError: print(f"❌ 异常: {content}") else: print(f"[{message_type}] {content}") # 性能监控结束 - 只在慢消息时警告 end_time = time.time() processing_time = end_time - start_time if processing_time > 0.1: # 处理时间超过100ms时警告 print(f"⚠️ 消息处理较慢: {message_type} 耗时 {processing_time:.3f}秒") # 强制刷新输出缓冲区和readline状态,确保debug>提示符重新显示 try: # 刷新stdout缓冲区 sys.stdout.flush() # 强制重绘readline输入行 readline.redisplay() except: # readline不可用时至少刷新stdout try: sys.stdout.flush() except: pass async def call_api_with_debug(self, api_path, method="GET", data=None, params=None, breakpoints: List[int] = None, script_id: str = "debug_script"): """调用 API 并支持断点调试""" # 保存当前API路径,用于后续step命令获取script_id self.current_api_path = api_path # 构建请求URL和请求头(在所有分支中都需要) url = f"{self.api_base_url.rstrip('/')}{api_path}" headers = { "Magic-Request-Client-Id": self.client_id, "Magic-Request-Script-Id": script_id, "magic-token": "unauthorization", "Accept": "application/json, text/plain, */*", "Content-Type": "application/x-www-form-urlencoded", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36", "sec-ch-ua": '"Chromium";v="140", "Not=A?Brand";v="24", "Google Chrome";v="140"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"macOS"', "Referer": f"{self.api_base_url}/magic/web/index.html" } if not self.connected: print("⚠️ WebSocket未连接,使用普通API调用") # 在后台线程中执行HTTP请求,避免阻塞 future = self._execute_http_request_async(method, url, headers, params, data, timeout=30) # 创建异步处理结果的任务 async def handle_response(): try: response = await asyncio.wrap_future(future) print(f"📊 响应状态: {response.status_code}") print(f"📄 响应内容: {response.text}") return response except Exception as e: print(f"❌ API调用异常: {e}") return None # 启动异步任务并等待结果(但不阻塞WebSocket) asyncio.create_task(handle_response()) return None # 如果设置了断点,进入调试模式 if breakpoints: self.debug_mode = True print(f"🐛 进入调试模式,断点: {breakpoints}") # 设置断点信息,通过HTTP请求头发送 headers["Magic-Request-Breakpoints"] = ",".join(map(str, breakpoints)) print(f"🔴 发送断点信息: {headers['Magic-Request-Breakpoints']}") print(f"🐛 调用API (调试模式): {method} {url}") if params: print(f" 查询参数: {params}") if data: print(f" 请求体: {data}") # 在后台线程中执行HTTP请求,避免阻塞asyncio事件循环 def execute_debug_request_in_thread(): """在后台线程中执行HTTP请求""" try: print("🔄 发送调试请求...") if method.upper() == "GET": response = requests.get(url, params=params, headers=headers, timeout=300) elif method.upper() == "POST": response = requests.post(url, json=data, params=params, headers=headers, timeout=300) elif method.upper() == "PUT": response = requests.put(url, json=data, params=params, headers=headers, timeout=300) elif method.upper() == "DELETE": response = requests.delete(url, params=params, headers=headers, timeout=300) else: print(f"❌ 不支持的HTTP方法: {method}") return print(f"📊 响应状态: {response.status_code}") if response.status_code == 200: content = response.text print(f"📄 响应内容: {content[:200]}..." if len(content) > 200 else f"📄 响应内容: {content}") else: print(f"📄 错误响应: {response.text}") except requests.exceptions.Timeout: print("⏰ 调试请求超时 (30秒)") except requests.exceptions.ConnectionError: print("🔌 调试请求连接失败") except Exception as e: print(f"❌ 调试请求异常: {e}") finally: # 请求完成后清理调试状态 self.debug_mode = False # 在后台线程中执行HTTP请求,不阻塞asyncio事件循环 import threading thread = threading.Thread(target=execute_debug_request_in_thread, daemon=True) thread.start() print("✅ 调试会话已启动,断点将通过WebSocket通知") return None # 立即返回,不阻塞用户界面 def _get_script_id_by_path(self, api_path: str) -> str: """ 根据API路径获取对应的script_id Args: api_path: API路径 Returns: script_id,如果找不到则返回默认值 """ try: # 导入extract_api_paths模块的功能 import sys import os script_dir = os.path.dirname(os.path.abspath(__file__)) extract_script = os.path.join(script_dir, 'extract_api_paths.py') # 使用subprocess调用extract_api_paths.py来获取ID import subprocess result = subprocess.run([ sys.executable, extract_script, '--url', 'http://127.0.0.1:10712/magic/web/resource', '--path-to-id', api_path ], capture_output=True, text=True, timeout=10) if result.returncode == 0 and result.stdout.strip(): # 解析输出,第一行为ID lines = result.stdout.strip().split('\n') if lines: # 格式: id,path,method,name,groupId first_line = lines[0].strip() if ',' in first_line: script_id = first_line.split(',')[0].strip() if script_id: return script_id except Exception as e: print(f"⚠️ 获取script_id失败: {e}") # 返回默认的script_id(如果获取失败) return "24646387e5654d78b4898ac7ed2eb560" def _execute_http_request_async(self, method, url, headers, params=None, data=None, timeout=30): """异步执行HTTP请求(在后台线程中),返回Future对象""" import concurrent.futures import threading def http_request(): """在后台线程中执行HTTP请求""" try: if method.upper() == "GET": return requests.get(url, params=params, headers=headers, timeout=timeout) elif method.upper() == "POST": return requests.post(url, json=data, params=params, headers=headers, timeout=timeout) elif method.upper() == "PUT": return requests.put(url, json=data, params=params, headers=headers, timeout=timeout) elif method.upper() == "DELETE": return requests.delete(url, params=params, headers=headers, timeout=timeout) else: raise ValueError(f"不支持的HTTP方法: {method}") except Exception as e: # 重新抛出异常,让调用方处理 raise e # 使用线程池执行器来执行HTTP请求 executor = concurrent.futures.ThreadPoolExecutor(max_workers=1, thread_name_prefix="http-request") future = executor.submit(http_request) # 确保executor在future完成后被清理 def cleanup_executor(fut): executor.shutdown(wait=False) future.add_done_callback(cleanup_executor) return future def call_api(self, api_path, method="GET", data=None, params=None, headers=None): """调用 API(普通模式)""" url = f"{self.api_base_url.rstrip('/')}{api_path}" # 默认请求头,与调试API调用保持一致 default_headers = { "Magic-Request-Client-Id": self.client_id, "Magic-Request-Script-Id": "python_client_call", "magic-token": "unauthorization", "Accept": "application/json, text/plain, */*", "Content-Type": "application/x-www-form-urlencoded", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36", "sec-ch-ua": '"Chromium";v="140", "Not=A?Brand";v="24", "Google Chrome";v="140"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"macOS"', "Referer": f"{self.api_base_url}/magic/web/index.html" } # 合并请求头 if headers: default_headers.update(headers) print(f"🌐 调用API: {method} {url}") if params: print(f" 查询参数: {params}") if data: print(f" 请求体: {data}") try: if method.upper() == "GET": response = requests.get(url, params=params, headers=default_headers, timeout=10) elif method.upper() == "POST": response = requests.post(url, json=data, params=params, headers=default_headers, timeout=10) elif method.upper() == "PUT": response = requests.put(url, json=data, params=params, headers=default_headers, timeout=10) elif method.upper() == "DELETE": response = requests.delete(url, params=params, headers=default_headers, timeout=10) else: print(f"❌ 不支持的HTTP方法: {method}") return None print(f"📊 响应状态: {response.status_code}") print(f"📄 响应内容: {response.text}") return response except requests.exceptions.Timeout: print("⏰ API调用超时 (10秒)") return None except requests.exceptions.ConnectionError: print("🔌 API连接失败") return None except Exception as e: print(f"❌ API调用异常: {e}") return None async def close(self): """关闭连接""" if self.websocket: await self.websocket.close() print("🔌 连接已关闭") self.connected = False def print_usage(): """打印使用说明""" print("Magic-API WebSocket调试客户端") print("=" * 50) print("功能: 连接Magic-API WebSocket控制台,支持断点调试和实时日志监听") print("特性: 方向键导航历史命令,Tab自动补全,test命令路径自动添加'/'前缀") print("依赖: pip install websockets requests") print("") print("使用方法:") print(" python3 magic_api_debug_client.py # 启动交互式调试会话") print("") print("交互命令:") print(" test [path] [breakpoints] - 执行测试API(可选路径和断点,如: test /api/test 5,10)") print(" call <METHOD> <PATH> [data] - 调用指定API") print(" breakpoint <line> - 设置断点") print(" remove_bp <line> - 移除断点") print(" resume - 恢复断点执行") print(" step - 单步执行") print(" list_bp - 列出所有断点") print(" help - 显示帮助") print(" quit - 退出程序") print("") print("快捷键:") print(" ↑↓ - 浏览命令历史") print(" ←→ - 编辑当前命令") print(" Tab - 自动补全命令和路径") print("") print("自动补全:") print(" 命令: test, call, breakpoint等") print(" HTTP方法: GET, POST, PUT, DELETE") print(" 路径: /test00/test0001, /magic/web/resource等") print(" test命令路径自动添加'/'前缀") print("") print("配置:") print(" WebSocket URL: ws://127.0.0.1:10712/magic/web/console") print(" API Base URL: http://127.0.0.1:10712") def preprocess_command(command_line): """预处理命令行,自动为test命令的路径添加前缀'/'""" if not command_line.strip(): return command_line parts = command_line.split() if len(parts) >= 2 and parts[0].lower() == 'test': # 检查第二个参数是否是路径(不以数字开头,且不包含逗号) path_arg = parts[1] if not path_arg.isdigit() and ',' not in path_arg and not path_arg.startswith('/'): # 这看起来是路径,自动添加'/' parts[1] = '/' + path_arg return ' '.join(parts) return command_line async def interactive_debug_session(): """交互式调试会话""" # 配置连接信息 WS_URL = "ws://127.0.0.1:10712/magic/web/console" API_BASE_URL = "http://127.0.0.1:10712" USERNAME = "unauthorization" PASSWORD = "123456" print("🚀 Magic-API 调试客户端启动") print(f"📡 WebSocket URL: {WS_URL}") print(f"🌐 API Base URL: {API_BASE_URL}") print(f"👤 用户名: {USERNAME}") print("-" * 50) # 设置readline支持方向键和自动补全 setup_readline() # 创建调试客户端 client = MagicAPIDebugClient(WS_URL, API_BASE_URL, USERNAME, PASSWORD) # 获取当前事件循环,用于在线程间安全调度协程 loop = asyncio.get_running_loop() # 在后台线程中处理用户输入 def user_input_handler(): # 快速显示界面,WebSocket连接异步建立 print("\n=== Magic-API 断点调试客户端 ===") print("💡 支持方向键导航和Tab自动补全,test命令路径会自动添加'/'前缀") print("输入 'help' 查看可用命令") # 短暂等待连接状态确认,但不阻塞UI time.sleep(0.1) # 减少等待时间 while True: try: # 确保输出缓冲区已刷新,readline状态正确 sys.stdout.flush() readline.redisplay() command_line = input("\ndebug> ").strip() # 预处理命令 command_line = preprocess_command(command_line) if not command_line: continue parts = command_line.split() command = parts[0].lower() if command == "help": print_usage() elif command == "test": # 执行测试API,支持自定义路径和断点 path = "/test00/test0001" # 默认路径 breakpoints = [] if len(parts) > 1: # 检查第一个参数是否是路径(不是纯数字且看起来像路径) first_arg = parts[1] # 如果是纯数字或数字逗号组合,认为是断点 if first_arg.isdigit() or (',' in first_arg and all(x.strip().isdigit() for x in first_arg.split(','))): try: breakpoints = [int(x.strip()) for x in first_arg.split(',')] except ValueError: print("❌ 断点格式错误,请使用逗号分隔的数字,如: 5,10") continue else: # 这是一个路径 path = first_arg # 检查是否有断点参数 if len(parts) > 2: try: breakpoints = [int(x.strip()) for x in parts[2].split(',')] except ValueError: print("❌ 断点格式错误,请使用逗号分隔的数字,如: 5,10") continue print(f"🧪 执行测试API: {path}") if breakpoints: print(f" 断点: {breakpoints}") # 使用 run_coroutine_threadsafe 在主线程的事件循环中执行异步调试调用 future = asyncio.run_coroutine_threadsafe( client.call_api_with_debug( path, "GET", params={"debug": "true", "test_mode": "interactive"}, breakpoints=breakpoints, script_id="e411103cbd334af9b264fe3fe55d1a42" ), loop ) # 等待异步调用完成 result = future.result(timeout=60.0) # 最多等待60秒,包括断点等待时间 if result: print("✅ 测试完成") else: print("❌ 测试失败") elif command == "call": if len(parts) < 3: print("❌ 用法: call <METHOD> <PATH> [json_data]") continue method = parts[1].upper() path = parts[2] data = None if len(parts) > 3: data_str = ' '.join(parts[3:]) try: data = json.loads(data_str) except json.JSONDecodeError: print("❌ JSON数据格式错误") continue # call命令不支持断点调试,使用普通同步调用 result = client.call_api(path, method, data=data) if result: print("✅ API调用完成") else: print("❌ API调用失败") elif command == "breakpoint" or command == "bp": if len(parts) < 2: print("❌ 用法: breakpoint <line_number>") continue try: line_number = int(parts[1]) # 使用 run_coroutine_threadsafe 在主线程的事件循环中执行协程 future = asyncio.run_coroutine_threadsafe( client.set_breakpoint(line_number), loop ) # 等待断点操作完成,确保UI正确刷新 future.result(timeout=5.0) except ValueError: print("❌ 行号必须是数字") except Exception as e: print(f"❌ 设置断点失败: {e}") elif command == "remove_bp" or command == "rm_bp": if len(parts) < 2: print("❌ 用法: remove_bp <line_number>") continue try: line_number = int(parts[1]) # 使用 run_coroutine_threadsafe 在主线程的事件循环中执行协程 future = asyncio.run_coroutine_threadsafe( client.remove_breakpoint(line_number), loop ) # 等待断点操作完成,确保UI正确刷新 future.result(timeout=5.0) except ValueError: print("❌ 行号必须是数字") except Exception as e: print(f"❌ 移除断点失败: {e}") elif command == "resume": # 使用 run_coroutine_threadsafe 在主线程的事件循环中执行协程 future = asyncio.run_coroutine_threadsafe( client.resume_breakpoint(), loop ) # 等待恢复操作完成 try: future.result(timeout=5.0) except Exception as e: print(f"❌ 恢复断点失败: {e}") elif command == "step": # 使用 run_coroutine_threadsafe 在主线程的事件循环中执行协程 future = asyncio.run_coroutine_threadsafe( client.step_over(), loop ) # 等待单步操作完成 try: future.result(timeout=5.0) except Exception as e: print(f"❌ 单步执行失败: {e}") elif command == "list_bp": if client.breakpoints: print("🔴 当前断点:") for bp in sorted(client.breakpoints): print(f" 第 {bp} 行") else: print("📝 当前没有设置断点") elif command == "quit": print("👋 退出调试客户端...") break else: print(f"❌ 未知命令: {command},输入 'help' 查看可用命令") except KeyboardInterrupt: print("\n👋 程序被用户中断") break except Exception as e: print(f"❌ 处理命令时出错: {e}") # 启动用户输入处理线程 input_thread = threading.Thread(target=user_input_handler) input_thread.daemon = True input_thread.start() # 连接 WebSocket 并开始监听 try: await client.connect() except KeyboardInterrupt: print("\n⏹️ 程序被用户中断") except Exception as e: print(f"❌ 连接失败: {e}") finally: await client.close() async def main(): """主函数""" if len(sys.argv) > 1 and sys.argv[1] in ['--help', '-h']: print_usage() sys.exit(0) # 启动交互式调试会话 await interactive_debug_session() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n👋 程序已退出") except Exception as e: print(f"❌ 程序异常: {e}") sys.exit(1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Dwsy/magic-api-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server