Skip to main content
Glama

Magic-API MCP Server

by Dwsy
resource.py45 kB
"""Magic-API 资源管理工具模块。 此模块提供完整的Magic-API资源管理系统,包括: - 资源树浏览和查询 - 资源创建、更新、删除操作 - 分组管理和组织 - 资源导入导出功能 - 资源统计和分析 主要工具: - get_resource_tree: 获取资源树,支持多种过滤和导出格式 - get_resource_detail: 获取特定资源的详细信息 - save_group: 保存资源分组,支持创建和更新 - create_api_resource: 创建新的API资源 - copy_resource: 复制现有资源 - move_resource: 移动资源到其他分组 - delete_resource: 删除资源(支持软删除) - read_set_lock_status: 读取或设置资源的锁定状态(支持读取、锁定、解锁) - list_resource_groups: 列出所有资源分组 - export_resource_tree: 导出完整的资源树结构 - get_resource_stats: 获取资源统计信息 """ from __future__ import annotations from typing import TYPE_CHECKING, Annotated, Any, Dict, List, Optional try: from typing import Literal except ImportError: from typing_extensions import Literal import re from pydantic import Field from magicapi_tools.logging_config import get_logger from magicapi_tools.utils.extractor import ( MagicAPIExtractorError, filter_endpoints, _filter_nodes, _flatten_tree, _nodes_to_csv, ) from magicapi_tools.utils.resource_manager import build_api_save_kwargs_from_detail from magicapi_tools.utils import error_response if TYPE_CHECKING: from fastmcp import FastMCP from magicapi_mcp.tool_registry import ToolContext # 获取资源管理工具的logger logger = get_logger('tools.resource') class ResourceManagementTools: """资源管理工具模块。""" def register_tools(self, mcp_app: "FastMCP", context: "ToolContext") -> None: """注册资源管理相关工具。""" @mcp_app.tool( name="get_resource_tree", description="获取 Magic-API 资源树,支持多种过滤和导出格式。", tags={"resource", "tree", "api", "filtering"}, meta={"version": "2.0", "category": "resource-management"} ) def resource_tree( kind: Annotated[ str, Field( description="资源类型过滤器:api(API接口)、function(函数)、task(任务)、datasource(数据源)或all(全部)") ] = "api", format: Annotated[ str, Field(description="输出格式:json(扁平化JSON数组)、csv(CSV格式)、tree(树形JSON结构)") ] = "tree", depth: Annotated[ Optional[int], Field(description="限制显示的资源树深度,正整数", ge=1, le=10) ] = None, method_filter: Annotated[ Optional[str], Field(description="HTTP方法过滤器,如'GET'、'POST'、'PUT'、'DELETE'") ] = None, path_filter: Annotated[ Optional[str], Field(description="路径正则表达式过滤器,用于匹配API路径") ] = None, name_filter: Annotated[ Optional[str], Field(description="名称正则表达式过滤器,用于匹配资源名称") ] = None, query_filter: Annotated[ Optional[str], Field(description="通用查询过滤器,支持复杂的搜索条件") ] = None, ) -> Dict[str, Any]: """获取 Magic-API 资源树。""" try: # 获取资源树数据 ok, payload = context.http_client.resource_tree() if not ok: return error_response(payload.get("code"), payload.get("message", "无法获取资源树"), payload.get("detail")) # 过滤资源类型 kind_normalized = kind if kind in { "api", "function", "task", "datasource", "all"} else "api" allowed = [ kind_normalized] if kind_normalized != "all" else ["all"] # 根据format参数返回不同格式 if format == "tree": # 返回树形结构 def filter_tree_node(node: Dict[str, Any]) -> Dict[str, Any]: """过滤树节点""" node_copy = dict(node) # 过滤node信息 if "node" in node_copy: node_info = node_copy["node"] node_type = node_info.get("type") method = node_info.get("method") node_name = node_info.get("name") # 应用分组识别逻辑:如果有子节点,则设置为分组类型 has_children = "children" in node_copy and node_copy["children"] if has_children and node_type: # 确保分组类型以 "-group" 结尾 if not node_type.endswith("-group"): node_info["type"] = f"{node_type}-group" # 检查是否应该包含此节点 should_include = True if allowed != ["all"]: if node_type and node_type not in allowed: should_include = False elif method and "api" in allowed: should_include = True elif not node_type and not method: # 根节点或其他无类型节点:如果是根节点(name='root')或者有子节点,则保留 if node_name == "root" or ("children" in node_copy and node_copy["children"]): should_include = True else: should_include = False if not should_include: return None # 应用深度限制 if depth is not None and "children" in node_copy: def limit_depth(children: List[Dict], current_depth: int): if current_depth >= depth: # 移除子节点 for child in children: if "children" in child: child["children"] = [] else: for child in children: if "children" in child: limit_depth( child["children"], current_depth + 1) limit_depth(node_copy["children"], 0) # 递归过滤子节点 if "children" in node_copy: filtered_children = [] for child in node_copy["children"]: filtered_child = filter_tree_node(child) if filtered_child is not None: filtered_children.append(filtered_child) node_copy["children"] = filtered_children return node_copy # 获取指定类型的树 tree_data = payload.get(kind_normalized, {}) if kind_normalized != "all": filtered_tree = filter_tree_node(tree_data) result_tree = filtered_tree if filtered_tree else { "node": {}, "children": []} else: # 对于"all",需要处理所有类型的树 result_tree = {} for tree_type in ["api", "function", "task", "datasource"]: if tree_type in payload: filtered = filter_tree_node(payload[tree_type]) if filtered: result_tree[tree_type] = filtered return { "format": "tree", "kind": kind_normalized, "tree": result_tree, "filters_applied": { "method": method_filter, "path": path_filter, "name": name_filter, "query": query_filter, "depth": depth, } } else: # json 或 csv 格式:使用扁平化结构 nodes = _flatten_tree(payload, allowed, depth) # 如果有高级过滤器,转换为端点列表进行过滤 if method_filter or path_filter or name_filter or query_filter: # 转换为端点字符串格式进行过滤 endpoints = [] for node in nodes: method = node.get("method", "") path = node.get("path", "") name = node.get("name", "") if method and path: endpoint_str = f"{method} {path}" if name: endpoint_str += f" [{name}]" endpoints.append(endpoint_str) # 应用高级过滤器 filtered_endpoints = filter_endpoints( endpoints, path_filter=path_filter, name_filter=name_filter, method_filter=method_filter, query_filter=query_filter, ) # 转换回节点格式 filtered_nodes = [] for endpoint in filtered_endpoints: if "[" in endpoint and "]" in endpoint: method_path, name = endpoint.split(" [", 1) name = name.rstrip("]") else: method_path, name = endpoint, "" method, path = method_path.split(" ", 1) # 从原始节点中找到匹配的节点(保留ID等信息) for original_node in nodes: if (original_node.get("method") == method and original_node.get("path") == path and original_node.get("name") == name): filtered_nodes.append(original_node) break nodes = filtered_nodes else: # 使用原有搜索逻辑保持兼容性 nodes = _filter_nodes(nodes, query_filter) if format == "json": # 返回扁平化的JSON数组 return { "format": "json", "kind": kind_normalized, "count": len(nodes), "nodes": nodes, "filters_applied": { "method": method_filter, "path": path_filter, "name": name_filter, "query": query_filter, "depth": depth, } } elif format == "csv": # 返回CSV格式 return { "format": "csv", "kind": kind_normalized, "count": len(nodes), "csv": _nodes_to_csv(nodes), "filters_applied": { "method": method_filter, "path": path_filter, "name": name_filter, "query": query_filter, "depth": depth, } } except MagicAPIExtractorError as e: return error_response("extraction_error", f"资源树提取失败: {str(e)}") except Exception as e: return error_response("unexpected_error", f"意外错误: {str(e)}") @mcp_app.tool( name="save_group", description="保存资源分组,支持单个分组创建或更新,包含完整的分组配置选项。", tags={"resource", "group", "save", "create", "update", "management", "full-config"}, meta={"version": "2.1", "category": "resource-management"} ) def save_group( # 创建操作必需参数 name: Annotated[ Optional[str], Field(description="分组名称(创建新分组时必需)") ], parent_id: Annotated[ str, Field(description="父分组ID (必须提供)") ], # 更新操作必需参数 id: Annotated[ Optional[str], Field(description="分组ID(更新现有分组时必需),用于标识要更新的分组") ] = None, # 通用参数 type: Annotated[ str, Field( description="分组类型:api(API接口组)、function(函数组)、task(任务组)、datasource(数据源组)") ] = "api", path: Annotated[ Optional[str], Field(description="分组路径,可选的URL路径前缀") ] = None, options: Annotated[ Optional[str], Field(description="分组选项配置,JSON格式字符串") ] = None, groups_data: Annotated[ Optional[str], Field(description="批量分组数据,JSON数组格式,每个对象包含name,id等字段(批量操作时使用)") ] = None, ) -> Dict[str, Any]: """保存分组(支持单个创建/更新和批量操作)。 - 创建操作:需要提供 name 等必需参数,不提供 id - 更新操作:只需要提供 id,其他参数都是可选的,只更新提供的参数 """ if id == "null": id = None import json is_update = id is not None if is_update: # 更新操作:只必需id,其他参数都是可选的 if not id: return error_response("invalid_params", "更新操作需要提供id") else: # 创建操作:必需name if not name: return error_response("invalid_params", "创建操作需要提供name") groups_list = None if groups_data: try: groups_list = json.loads(groups_data) except json.JSONDecodeError: return error_response("invalid_json", f"groups_data 格式错误: {groups_data}") result = context.resource_tools.save_group_tool( name=name, id=id, parent_id=parent_id, type=type, path=path, options=options, groups_data=groups_list, ) if "success" in result: return result else: error_info = result.get("error", {}) return error_response( error_info.get("code", "save_group_failed"), error_info.get("message", "保存分组失败"), result # 包含完整的原始错误信息 ) @mcp_app.tool( name="save_api_endpoint", description="保存API接口,支持单个接口创建或更新,包含完整的API配置选项。", tags={"api", "endpoint", "save", "create", "update", "management", "full-config"}, meta={"version": "2.2", "category": "resource-management"} ) def save_api_endpoint( # 创建操作必需参数 group_id: Annotated[ Optional[str], Field(description="分组ID(创建新API时必需),指定API所属的分组") ], name: Annotated[ Optional[str], Field(description="API接口名称(创建新API时必需)") ], method: Annotated[ Optional[str], Field(description="HTTP请求方法(创建新API时必需),默认为GET") ], path: Annotated[ Optional[str], Field(description="API路径,如'/api/users'(创建新API时必需)") ], script: Annotated[ Optional[str], Field(description="API执行脚本,Magic-Script代码(创建新API时必需)") ], # 更新操作必需参数 id: Annotated[ Optional[str], Field(description="文件ID(更新现有API时必需),用于标识要更新的API接口") ] = None, # 扩展参数(创建和更新都可选) description: Annotated[ Optional[str], Field(description="API接口描述") ] = None, parameters: Annotated[ Optional[str], Field(description="查询参数列表,JSON数组格式,每个参数包含name,type,value等字段") ] = None, headers: Annotated[ Optional[str], Field(description="请求头列表,JSON数组格式,每个请求头包含name,value等字段") ] = None, paths: Annotated[ Optional[str], Field(description="路径变量列表,JSON数组格式,每个路径变量包含name,value等字段") ] = None, request_body: Annotated[ Optional[str], Field(description="请求体示例内容") ] = None, request_body_definition: Annotated[ Optional[str], Field(description="请求体结构定义,JSON格式") ] = None, response_body: Annotated[ Optional[str], Field(description="响应体示例内容") ] = None, response_body_definition: Annotated[ Optional[str], Field(description="响应体结构定义,JSON格式") ] = None, options: Annotated[ Optional[str], Field(description="接口选项配置,JSON数组格式,每个选项包含name,value等字段") ] = None, ) -> Dict[str, Any]: """保存API接口(支持单个创建或更新操作)。 - 创建操作:需要提供 group_id, name, method, path, script 等必需参数 - 更新操作:只需要提供 id,其他参数都是可选的,只更新提供的参数 """ if id == "null": id = None # MCP工具调用日志 operation_type = "更新" if id else "创建" logger.info(f"MCP工具调用: save_api_endpoint ({operation_type})") logger.info(f" API ID: {id if id else 'N/A'}") logger.info(f" API名称: {name}") logger.info(f" API路径: {path}") logger.info(f" HTTP方法: {method}") logger.info(f" 分组ID: {group_id}") if description: logger.debug(f" 描述: {description[:100]}...") if script: logger.debug(f" 脚本长度: {len(script)} 字符") import json is_update = id is not None if is_update: # 更新操作:只必需id,其他参数都是可选的 if not id: return error_response("invalid_params", "更新操作需要提供id") else: # 创建操作:必需group_id, name, method, path, script required_fields = { "group_id": group_id, "name": name, "method": method, "path": path, "script": script } missing_fields = [ k for k, v in required_fields.items() if v is None] if missing_fields: return error_response("invalid_params", f"创建操作需要提供以下必需参数: {', '.join(missing_fields)}") # 对于创建操作,如果没有提供method,默认设置为GET if method is None: method = "GET" # 解析JSON参数 parsed_parameters = None if parameters: try: parsed_parameters = json.loads(parameters) except json.JSONDecodeError: return error_response("invalid_json", f"parameters 格式错误: {parameters}") parsed_headers = None if headers: try: parsed_headers = json.loads(headers) except json.JSONDecodeError: return error_response("invalid_json", f"headers 格式错误: {headers}") parsed_paths = None if paths: try: parsed_paths = json.loads(paths) except json.JSONDecodeError: return error_response("invalid_json", f"paths 格式错误: {paths}") parsed_options = None if options: try: parsed_options = json.loads(options) except json.JSONDecodeError: return error_response("invalid_json", f"options 格式错误: {options}") parsed_request_body_definition = None if request_body_definition: try: parsed_request_body_definition = json.loads( request_body_definition) except json.JSONDecodeError: return error_response("invalid_json", f"request_body_definition 格式错误: {request_body_definition}") parsed_response_body_definition = None if response_body_definition: try: parsed_response_body_definition = json.loads( response_body_definition) except json.JSONDecodeError: return error_response("invalid_json", f"response_body_definition 格式错误: {response_body_definition}") # 调用工具方法 result = context.resource_tools.create_api_tool( group_id=group_id, name=name, method=method, path=path, script=script, description=description, parameters=parsed_parameters, headers=parsed_headers, paths=parsed_paths, request_body=request_body, request_body_definition=parsed_request_body_definition, response_body=response_body, response_body_definition=parsed_response_body_definition, options=parsed_options, id=id, # 更新操作时传入id,创建操作时为None ) if "success" not in result: # 记录详细的原始错误信息 error_info = result.get("error", {}) operation_type = "更新" if id else "创建" logger.error( f"{operation_type}API失败: {error_info.get('message', '未知错误')}") logger.error(f" API ID: {id if id else 'N/A'}") logger.error(f" API名称: {name}") logger.error(f" API路径: {path}") logger.error(f" HTTP方法: {method}") logger.error(f" 错误代码: {error_info.get('code', 'unknown')}") logger.debug(f" 原始错误信息: {result}") return error_response( error_info.get("code", "api_save_failed"), error_info.get("message", f"{operation_type}API失败"), result # 包含完整的原始错误信息 ) return result @mcp_app.tool( name="copy_resource", description="复制资源到指定的目标位置。", tags={"resource", "copy", "management"}, meta={"version": "1.0", "category": "resource-management"} ) def copy_resource(src_id: str, target_id: str) -> Dict[str, Any]: """复制资源到指定位置。""" try: # 清理参数 clean_src_id = str(src_id).strip() clean_target_id = str(target_id).strip() if not clean_src_id or not clean_target_id: return error_response("invalid_params", "src_id和target_id不能为空") # 调用工具方法,获取结构化的错误信息 result = context.resource_tools.copy_resource_tool( clean_src_id, clean_target_id) if "success" in result: return result else: error_info = result.get("error", {}) return error_response( error_info.get("code", "copy_failed"), error_info.get("message", f"复制资源 {clean_src_id} 失败"), result # 包含完整的原始错误信息 ) except Exception as e: return error_response("unexpected_error", f"复制资源时发生异常: {str(e)}") @mcp_app.tool( name="move_resource", description="移动资源到指定的目标位置。", tags={"resource", "move", "management"}, meta={"version": "1.0", "category": "resource-management"} ) def move_resource(src_id: str, target_id: str) -> Dict[str, Any]: """移动资源到指定位置。""" try: # 清理参数 clean_src_id = str(src_id).strip() clean_target_id = str(target_id).strip() if not clean_src_id or not clean_target_id: return error_response("invalid_params", "src_id和target_id不能为空") # 调用工具方法,获取结构化的错误信息 result = context.resource_tools.move_resource_tool( clean_src_id, clean_target_id) if "success" in result: return result else: error_info = result.get("error", {}) return error_response( error_info.get("code", "move_failed"), error_info.get("message", f"移动资源 {clean_src_id} 失败"), result # 包含完整的原始错误信息 ) except Exception as e: return error_response("unexpected_error", f"移动资源时发生异常: {str(e)}") @mcp_app.tool( name="delete_resource", description="删除资源,支持单个资源删除或批量资源删除。", tags={"resource", "delete", "management"}, meta={"version": "2.0", "category": "resource-management"} ) def delete_resource( resource_id: Annotated[ Optional[str], Field(description="单个资源ID(单个删除操作时使用)") ] = None, resource_ids: Annotated[ Optional[str], Field( description="资源ID列表,JSON数组格式如['id1','id2','id3'](批量删除操作时使用)") ] = None, ) -> Dict[str, Any]: """删除资源(支持单个和批量操作)。""" import json try: # 处理批量删除 if resource_ids: try: ids_list = json.loads(resource_ids) except json.JSONDecodeError: return error_response("invalid_json", f"resource_ids 格式错误: {resource_ids}") # 调用工具方法进行批量删除 result = context.resource_tools.delete_resource_tool( resource_ids=ids_list) if "success" in result: return result else: error_info = result.get("error", {}) return error_response( error_info.get("code", "batch_delete_failed"), error_info.get("message", "批量删除资源失败"), result # 包含完整的原始错误信息 ) # 单个删除 elif resource_id: # 清理resource_id(去除前后空格) clean_resource_id = str(resource_id).strip() if not clean_resource_id: return error_response("invalid_params", "resource_id不能为空") # 调用工具方法进行单个删除 result = context.resource_tools.delete_resource_tool( resource_id=clean_resource_id) if "success" in result: return result else: error_info = result.get("error", {}) return error_response( error_info.get("code", "delete_failed"), error_info.get( "message", f"删除资源 {clean_resource_id} 失败"), result # 包含完整的原始错误信息 ) else: return error_response("invalid_params", "必须提供 resource_id 或 resource_ids 参数") except Exception as e: return error_response("unexpected_error", f"删除资源时发生异常: {str(e)}") @mcp_app.tool( name="list_resource_groups", description="列出所有资源分组及其基本信息,支持搜索和数量限制。", tags={"resource", "group", "list", "search", "filter"}, meta={"version": "2.0", "category": "resource-management"} ) def list_groups( search: Annotated[ Optional[str], Field(description="搜索关键词,支持分组名称、路径、类型的模糊匹配") ] = None, limit: Annotated[ int, Field(description="返回结果的最大数量,默认50条") ] = 50, ) -> Dict[str, Any]: """列出所有分组,支持搜索和数量限制。""" result = context.resource_tools.list_groups_tool() if "error" in result: error_info = result["error"] return error_response( error_info.get("code", "list_groups_failed"), error_info.get("message", "获取分组列表失败"), result # 包含完整的原始错误信息 ) groups = result.get("groups", []) # 应用搜索过滤(在Python端完成) if search: search_lower = search.lower() filtered_groups = [] for group in groups: # 搜索多个字段 searchable_fields = [ group.get('name', ''), group.get('path', ''), group.get('type', ''), group.get('comment', ''), ] # 检查是否匹配搜索关键词 if any(search_lower in str(field).lower() for field in searchable_fields if field): filtered_groups.append(group) groups = filtered_groups # 应用数量限制 total_count = len(groups) if limit > 0: groups = groups[:limit] return { "total_count": total_count, "returned_count": len(groups), "limit": limit, "search_applied": search, "groups": groups, } @mcp_app.tool( name="export_resource_tree", description="导出资源树结构,支持JSON和CSV格式。", tags={"resource", "export", "tree"}, meta={"version": "1.0", "category": "resource-management"}, enabled=False ) def export_resource_tree(kind: str = "api", format: str = "json") -> Dict[str, Any]: """导出资源树。""" try: result = context.resource_tools.export_resource_tree_tool( kind=kind, format=format) if "success" in result: return result else: error_info = result.get("error", {}) return error_response( error_info.get("code", "export_failed"), error_info.get("message", "导出资源树失败"), result # 包含完整的原始错误信息 ) except Exception as e: print(f"DEBUG MCP: export_resource_tree error: {e}") import traceback traceback.print_exc() return error_response("unexpected_error", f"意外错误: {str(e)}") @mcp_app.tool( name="read_set_lock_status", description="读取或设置资源的锁定状态,支持读取当前锁定状态、锁定和解锁操作。", tags={"resource", "lock", "unlock", "status", "management"}, meta={"version": "2.0", "category": "resource-management"} ) def read_set_lock_status( resource_id: Annotated[ str, Field(description="资源ID,用于标识要操作的资源") ], action: Annotated[ str, Field(description="操作类型:read(读取锁定状态)、lock(锁定资源)、unlock(解锁资源)") ], ) -> Dict[str, Any]: """读取或设置资源的锁定状态。""" try: # 清理参数 clean_resource_id = str(resource_id).strip() if not clean_resource_id: return error_response("invalid_params", "resource_id不能为空") if action == "read": # 读取锁定状态 - 使用 GET /resource/file/{id} 接口 ok, payload = context.http_client.api_detail( clean_resource_id) if not ok: return error_response(payload.get("code"), payload.get("message", "无法获取资源信息"), payload.get("detail")) # 从返回的数据中提取锁定状态 lock_status = payload.get("lock", "0") # 默认解锁状态 is_locked = lock_status == "1" return { "success": True, "resource_id": clean_resource_id, "action": "read", "is_locked": is_locked, "lock_status": lock_status, "lock_status_description": "已锁定" if is_locked else "未锁定", "resource_info": payload # 返回完整的资源信息 } elif action == "lock": # 锁定资源 result = context.resource_tools.lock_resource_tool( resource_id=clean_resource_id) if "success" in result: return { "success": True, "resource_id": clean_resource_id, "action": "lock", "message": f"资源 {clean_resource_id} 已成功锁定" } else: error_info = result.get("error", {}) return error_response( error_info.get("code", "lock_failed"), error_info.get( "message", f"锁定资源 {clean_resource_id} 失败"), result # 包含完整的原始错误信息 ) elif action == "unlock": # 解锁资源 result = context.resource_tools.unlock_resource_tool( resource_id=clean_resource_id) if "success" in result: return { "success": True, "resource_id": clean_resource_id, "action": "unlock", "message": f"资源 {clean_resource_id} 已成功解锁" } else: error_info = result.get("error", {}) return error_response( error_info.get("code", "unlock_failed"), error_info.get( "message", f"解锁资源 {clean_resource_id} 失败"), result # 包含完整的原始错误信息 ) else: return error_response("invalid_action", f"不支持的操作类型: {action}") except Exception as e: return error_response("unexpected_error", f"操作资源锁定状态时发生异常: {str(e)}") @mcp_app.tool( name="get_resource_statistics", description="获取资源统计信息,包括各类资源数量和分布。", tags={"resource", "statistics", "analytics"}, meta={"version": "1.0", "category": "resource-management"} ) def get_resource_stats() -> Dict[str, Any]: """获取资源统计信息。""" result = context.resource_tools.get_resource_stats_tool() if "success" in result: return result else: error_info = result.get("error", {}) return error_response( error_info.get("code", "stats_failed"), error_info.get("message", "获取资源统计信息失败"), result # 包含完整的原始错误信息 ) @mcp_app.tool( name="replace_api_script", description="按ID替换指定 Magic-Script 片段并保存接口,支持一次或全局替换。", tags={"api", "update", "script", "replace"}, meta={"version": "1.0", "category": "resource-management"} ) def replace_api_script( id: Annotated[ str, Field(description="API 文件ID") ], search: Annotated[ str, Field(description="待查找的脚本内容片段,大小写不敏感") ], replacement: Annotated[ str, Field(description="用于替换的脚本内容片段") ], mode: Annotated[ str, Field(description="替换模式:once为替换首次匹配;all为替换所有匹配项") ] = "once", ) -> Dict[str, Any]: """替换 Magic-API 接口脚本中的指定内容并保存。""" try: clean_id = str(id).strip() if not clean_id: return error_response("invalid_params", "id 不能为空") if not search: return error_response("invalid_params", "search 不能为空") # 获取接口详情 ok_detail, payload = context.http_client.api_detail(clean_id) if not ok_detail or not payload: detail_error = payload if isinstance(payload, dict) else {} print( f"❌ 获取API详情失败: {detail_error.get('message', '无法获取接口详情')}") print(f" API ID: {clean_id}") print(f" 操作: 脚本替换") print(f" HTTP状态: {ok_detail}") print(f" 响应数据: {payload}") print(f" 原始错误: {detail_error}") return error_response( detail_error.get("code", "detail_error"), detail_error.get("message", "无法获取接口详情"), detail_error.get("detail"), ) script_content = payload.get("script", "") if script_content is None: return error_response("invalid_state", "接口脚本为空,无法执行替换") # 执行替换 count = 1 if mode == "once" else 0 replaced_script, replaced_times = re.subn( pattern=re.escape(search), repl=replacement, string=script_content, count=count, flags=re.IGNORECASE, ) if replaced_times == 0: return error_response("not_found", "未在脚本中找到匹配内容,未执行替换") # 构建保存参数 try: save_kwargs = build_api_save_kwargs_from_detail(payload) except ValueError as exc: return error_response("invalid_detail", f"接口详情数据格式异常: {exc}") save_kwargs["script"] = replaced_script result = context.resource_tools.create_api_tool(**save_kwargs) if "success" not in result: error_info = result.get("error", {}) return error_response( error_info.get("code", "save_failed"), error_info.get("message", "保存接口失败"), result # 包含完整的原始错误信息 ) return { "success": True, "id": result.get("id", result.get("file_id", clean_id)), "file_id": result.get("id", result.get("file_id", clean_id)), "replaced_times": replaced_times, "mode": mode, } except Exception as exc: return error_response("unexpected_error", f"替换脚本时发生异常: {exc}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Dwsy/magic-api-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server