Skip to main content
Glama

Magic-API MCP Server

by Dwsy
extractor.py14.2 kB
"""Magic-API 资源树提取与查询工具函数。""" from __future__ import annotations import json import re from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Iterable, List, Mapping, Optional from .http_client import MagicAPIHTTPClient class MagicAPIExtractorError(RuntimeError): """资源树提取或解析失败时抛出的异常。""" @dataclass(slots=True) class ResourceTree: raw: Dict[str, Any] @property def api_nodes(self) -> Iterable[Dict[str, Any]]: api_data = self.raw.get("api", {}) return api_data.get("children", []) or [] def _clean_path(path: str) -> str: if not path: return "" path = path.strip("/") while "//" in path: path = path.replace("//", "/") return path def _traverse_api_tree(node: Dict[str, Any], parent_path: str, results: List[str]) -> None: node_info = node.get("node", {}) current_path = node_info.get("path", "") method = node_info.get("method") name = node_info.get("name", "") if current_path and parent_path: full_path = f"{parent_path}/{current_path}" elif current_path: full_path = current_path else: full_path = parent_path full_path = _clean_path(full_path) if method and full_path: display = f"{method} {full_path}" if name and name != current_path: display += f" [{name}]" results.append(display) for child in node.get("children", []) or []: _traverse_api_tree(child, full_path, results) def _normalize_path(path: str) -> str: return _clean_path(path) def _find_api_by_path(node: Dict[str, Any], target_path: str, parent_path: str, results: List[Dict[str, Any]]) -> None: node_info = node.get("node", {}) current_path = node_info.get("path", "") method = node_info.get("method") api_id = node_info.get("id") name = node_info.get("name", "") if current_path and parent_path: full_path = f"{parent_path}/{current_path}" elif current_path: full_path = current_path else: full_path = parent_path full_path = _clean_path(full_path) normalized_target = _normalize_path(target_path) if method and full_path and api_id: normalized_full_path = _normalize_path(full_path) if ( normalized_full_path == normalized_target or normalized_full_path.startswith(normalized_target + "/") or normalized_target.startswith(normalized_full_path + "/") ): results.append({ "id": api_id, "path": full_path, "method": method, "name": name, "groupId": node_info.get("groupId"), }) for child in node.get("children", []) or []: _find_api_by_path(child, target_path, full_path, results) def load_resource_tree( source: Optional[str] = None, *, client: Optional[MagicAPIHTTPClient] = None, ) -> ResourceTree: """加载资源树数据。""" if client is not None: ok, payload = client.resource_tree() if not ok: raise MagicAPIExtractorError(payload) return ResourceTree(raw=payload or {}) if not source: raise MagicAPIExtractorError("必须提供资源树数据源或 HTTP 客户端") path = Path(source) if not path.exists(): raise MagicAPIExtractorError(f"找不到资源文件: {source}") try: data = json.loads(path.read_text(encoding="utf-8")) except json.JSONDecodeError as exc: raise MagicAPIExtractorError(f"JSON 解析失败: {exc}") from exc return ResourceTree(raw=data.get("data", {})) def extract_api_endpoints(tree: ResourceTree) -> List[str]: results: List[str] = [] for child in tree.api_nodes: _traverse_api_tree(child, "", results) return sorted(results) def find_api_id_by_path(tree: ResourceTree, target_path: str) -> List[Dict[str, Any]]: matches: List[Dict[str, Any]] = [] for child in tree.api_nodes: _find_api_by_path(child, target_path, "", matches) return matches def find_api_detail_by_path( path: str, *, client: MagicAPIHTTPClient, fuzzy: bool = True, ) -> List[Dict[str, Any]]: """通过路径获取接口详情列表。""" tree = load_resource_tree(client=client) matches = find_api_id_by_path(tree, path) if not matches: return [] details: List[Dict[str, Any]] = [] for match in matches: file_id = match.get("id") if not file_id: continue ok, payload = client.api_detail(file_id) if ok: details.append({"meta": match, "detail": payload}) else: details.append({"meta": match, "error": payload}) if not fuzzy: break return details def filter_endpoints( endpoints: List[str], *, path_filter: Optional[str] = None, name_filter: Optional[str] = None, method_filter: Optional[str] = None, query_filter: Optional[str] = None, ) -> List[str]: filtered = endpoints if method_filter: method_filter = method_filter.upper() filtered = [ep for ep in filtered if ep.startswith(f"{method_filter} ")] if query_filter: try: pattern = re.compile(query_filter, re.IGNORECASE) except re.error as exc: raise MagicAPIExtractorError(f"查询过滤器正则错误: {exc}") from exc filtered = [ ep for ep in filtered if pattern.search(ep.split(" ", 1)[1]) or (pattern.search(ep) if "[" in ep else False) ] if path_filter: try: pattern = re.compile(path_filter, re.IGNORECASE) except re.error as exc: raise MagicAPIExtractorError(f"路径过滤器正则错误: {exc}") from exc filtered = [ep for ep in filtered if pattern.search(ep.split(" ", 1)[1])] if name_filter: try: pattern = re.compile(name_filter, re.IGNORECASE) except re.error as exc: raise MagicAPIExtractorError(f"名称过滤器正则错误: {exc}") from exc filtered = [ep for ep in filtered if "[" in ep and pattern.search(ep)] return filtered def _flatten_tree( tree_data: Mapping[str, Any], allowed_types: List[str], max_depth: Optional[int] = None, ) -> List[Dict[str, Any]]: """展平资源树结构为节点列表。""" nodes: List[Dict[str, Any]] = [] def walk(children: List[Mapping[str, Any]], depth: int) -> None: if max_depth is not None and depth > max_depth: return for child in children or []: node_info = child.get("node", {}) node_type = node_info.get("type") method = node_info.get("method") path = node_info.get("path") full_path = node_info.get("full_path", path) # 优先使用full_path name = node_info.get("name") file_id = node_info.get("id") # 确定节点类型:优先使用原始类型,但要根据是否有子节点调整为分组类型 has_children = child.get("children") and len(child["children"]) > 0 if has_children: # 有子节点的一定是分组 final_type = f"{folder_type}-group" if folder_type else "unknown-group" elif node_type: # 有原始类型但没有子节点,使用原始类型 final_type = node_type elif method: # 有method的是API端点 final_type = "api" else: # 既没有type也没有method也没有子节点 final_type = folder_type or "unknown" entry = { "name": name, "type": final_type, "path": full_path, # 使用完整路径 "method": method, "id": file_id, "original_path": path, # 保留原始路径以备后用 } # 过滤逻辑:检查节点是否应该被包含 should_include = False if allowed_types == ["all"]: should_include = True elif entry["type"] in allowed_types: should_include = True elif method and "api" in allowed_types: should_include = True # 特殊规则:如果节点既没有method也没有子节点,则不包含(除非它是分组) if should_include and not method: has_children = child.get("children") and len(child["children"]) > 0 if not has_children: should_include = False if should_include: nodes.append(entry) child_children = child.get("children", []) if child_children: walk(child_children, depth + 1) for folder_type, subtree in tree_data.items(): if allowed_types != ["all"] and folder_type not in allowed_types: continue root_children = subtree.get("children", []) if isinstance(subtree, Mapping) else [] walk(root_children, depth=1) return nodes def _filter_nodes( nodes: List[Dict[str, Any]], search: Optional[str] = None, ) -> List[Dict[str, Any]]: """过滤节点列表。""" if not search: return nodes try: pattern = re.compile(search, re.IGNORECASE) return [ node for node in nodes if (node.get("name") and pattern.search(node["name"])) or (node.get("path") and pattern.search(node["path"])) ] except re.error: term = search.lower() return [ node for node in nodes if term in (node.get("name") or "").lower() or term in (node.get("path") or "").lower() ] def _nodes_to_csv(nodes: List[Dict[str, Any]]) -> str: """将节点列表转换为CSV格式。""" if not nodes: return "" headers = ["name", "path", "method", "type", "id", "full_path"] rows = [",".join(headers)] for node in nodes: row = [] for key in headers: if key == "full_path": # full_path是新加的字段,可能不存在,使用path作为后备 value = node.get(key, node.get("path")) else: value = node.get(key) text = "" if value is None else str(value) if "," in text or '"' in text: text = '"' + text.replace('"', '""') + '"' row.append(text) rows.append(",".join(row)) return "\n".join(rows) def format_file_detail(file_data: Dict[str, Any]) -> str: lines: List[str] = [] lines.append("=== API接口详情 ===") lines.append(f"ID: {file_data.get('id', 'N/A')}") lines.append(f"名称: {file_data.get('name', 'N/A')}") lines.append(f"路径: {file_data.get('path', 'N/A')}") lines.append(f"方法: {file_data.get('method', 'N/A')}") lines.append(f"分组ID: {file_data.get('groupId', 'N/A')}") lines.append("") script = file_data.get("script", "") lines.append("=== 脚本内容 ===") lines.append(script or "(无脚本内容)") lines.append("") lines.append("=== 元数据信息 ===") lines.append(f"创建时间: {file_data.get('createTime', 'N/A')}") lines.append(f"更新时间: {file_data.get('updateTime', 'N/A')}") lines.append(f"创建者: {file_data.get('createBy', 'N/A')}") lines.append(f"更新者: {file_data.get('updateBy', 'N/A')}") description = file_data.get("description") if description: lines.append("=== 接口描述 ===") lines.append(description) lines.append("") def _format_items(title: str, items: Optional[List[Dict[str, Any]]]) -> None: if not items: return lines.append(title) for item in items: required = "✓" if item.get("required", False) else "○" lines.append( f" {required} {item.get('name', '')}: {item.get('value', '')} " f"({item.get('dataType', 'String')})" ) lines.append("") _format_items("请求头 (Headers):", file_data.get("headers")) _format_items("路径参数 (Path Parameters):", file_data.get("paths")) _format_items("查询参数 (Query Parameters):", file_data.get("parameters")) request_body = file_data.get("requestBody") if request_body: lines.append("请求体 (Request Body):") if isinstance(request_body, str): lines.append(request_body) else: lines.append(json.dumps(request_body, ensure_ascii=False, indent=2)) lines.append("") properties = file_data.get("properties", {}) if properties: lines.append("属性配置 (Properties):") for key, value in properties.items(): lines.append(f" {key}: {value}") lines.append("") options = file_data.get("options", []) if options: lines.append("选项配置 (Options):") for option in options: lines.append(f" {option}") lines.append("") for key, value in file_data.items(): if key in { "id", "name", "path", "method", "groupId", "script", "createTime", "updateTime", "createBy", "updateBy", "description", "headers", "paths", "parameters", "requestBody", "properties", "options", }: continue if isinstance(value, (list, dict)) and not value: continue lines.append(f"{key}: {value}") return "\n".join(lines) __all__ = [ "MagicAPIExtractorError", "ResourceTree", "load_resource_tree", "extract_api_endpoints", "find_api_id_by_path", "find_api_detail_by_path", "filter_endpoints", "format_file_detail", "_flatten_tree", "_filter_nodes", "_nodes_to_csv", ]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Dwsy/magic-api-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server