Skip to main content
Glama
entity_schema.py15.5 kB
""" 实体类 Schema 分析工具 从 responseOriginal.data 中提取并分析实体类结构,生成嵌套实体类 """ from typing import Dict, Any, List, Optional, Set import re def extract_data_from_response(response: Dict[str, Any]) -> Optional[Any]: """ 从响应数据中提取 data 字段(忽略 ResponseData 包装层) Args: response: API 响应数据,可能包含 responseExample、responseOriginal 等字段 Returns: data 字段的内容,如果不存在则返回 None """ if not response: return None # 优先从 responseExample 中提取(JSON 字符串) if "responseExample" in response: import json try: response_example = response["responseExample"] if isinstance(response_example, str): parsed = json.loads(response_example) if isinstance(parsed, dict): # 如果有 data 字段,返回 data 字段的内容(忽略 ResponseData 包装) if "data" in parsed: return parsed["data"] # 如果没有 data 字段,检查是否是包装格式 {code, msg, data, hasNext} if all(key in parsed for key in ["code", "msg"]): # 可能有 data 字段或 hasNext,返回 data 字段 return parsed.get("data") except Exception as e: # 解析失败,继续尝试其他方式 pass # 其次从 responseOriginal 中提取 if "responseOriginal" in response: response_original = response["responseOriginal"] if isinstance(response_original, dict): # 如果有 data 字段,返回 data 字段的内容 if "data" in response_original: return response_original["data"] # 从 responseText 中解析(JSON 字符串) if "responseText" in response: import json try: response_text = response["responseText"] if isinstance(response_text, str): parsed = json.loads(response_text) if isinstance(parsed, dict) and "data" in parsed: return parsed["data"] except: pass # 从 example 或 body 中提取 example = response.get("example") or response.get("body") if isinstance(example, dict): # 如果有 data 字段,返回 data 字段的内容 if "data" in example: return example["data"] # 如果没有 data 字段,检查是否是包装格式 if all(key in example for key in ["code", "msg"]): return example.get("data") # 否则返回整个 example(可能没有包装) return example # 如果 example 是字符串,尝试解析 JSON if isinstance(example, str): import json try: parsed = json.loads(example) if isinstance(parsed, dict) and "data" in parsed: return parsed["data"] except: pass return None def analyze_entity_schema(data: Any, base_name: str = "Item") -> Dict[str, Any]: """ 分析数据并生成实体类 Schema(支持嵌套对象) Args: data: 要分析的数据(可能是 dict、list、基本类型等) base_name: 基础类名(用于生成嵌套类名) Returns: { "schema": {...}, # 当前实体的 schema "nested": {...} # 嵌套的实体类定义 } """ nested_entities = {} def analyze_value(value: Any, field_name: str = "") -> Dict[str, Any]: """分析值并返回类型信息""" if value is None: return {"type": "Any?", "nullable": True, "nested": None} if isinstance(value, bool): return {"type": "Boolean", "nullable": False, "nested": None} elif isinstance(value, int): return {"type": "Int", "nullable": False, "nested": None} elif isinstance(value, float): return {"type": "Double", "nullable": False, "nested": None} elif isinstance(value, str): return {"type": "String", "nullable": False, "nested": None} elif isinstance(value, list): if value: # 分析列表元素的类型 first_item = value[0] item_info = analyze_value(first_item, field_name) item_type = item_info["type"] nested_entity = item_info.get("nested") if nested_entity: # 列表元素是嵌套对象 nested_name = nested_entity["name"] nested_entities[nested_name] = nested_entity["schema"] return { "type": f"MutableList<{nested_name}>", "nullable": False, "nested": nested_entity } else: # 确保基本类型也使用 MutableList return { "type": f"MutableList<{item_type}>", "nullable": False, "nested": None } else: return {"type": "MutableList<Any>", "nullable": False, "nested": None} elif isinstance(value, dict): # 这是一个嵌套对象,需要生成单独的实体类 nested_name = generate_nested_entity_name(field_name, base_name) nested_schema = {} nested_nested = {} for key, val in value.items(): field_info = analyze_value(val, key) nested_schema[key] = { "type": field_info["type"], "nullable": field_info["nullable"] } # 如果有嵌套对象,也要收集 if field_info.get("nested"): nested_nested[field_info["nested"]["name"]] = field_info["nested"]["schema"] # 合并嵌套的实体类 nested_entities.update(nested_nested) nested_entities[nested_name] = nested_schema return { "type": nested_name, "nullable": False, "nested": {"name": nested_name, "schema": nested_schema} } else: return {"type": "Any", "nullable": False, "nested": None} # 分析主数据结构 if isinstance(data, dict): schema = {} for key, value in data.items(): field_info = analyze_value(value, key) schema[key] = { "type": field_info["type"], "nullable": field_info["nullable"] } # 如果有嵌套对象,也要收集 if field_info.get("nested"): nested_info = field_info["nested"] nested_entities[nested_info["name"]] = nested_info["schema"] return {"schema": schema, "nested": nested_entities} elif isinstance(data, list) and data: # 如果是列表,分析列表元素的类型 first_item = data[0] if isinstance(first_item, dict): # 列表元素是对象,生成实体类 item_schema = analyze_entity_schema(first_item, base_name) return item_schema else: # 列表元素是基本类型 item_type = analyze_value(first_item, "") return { "schema": {}, "nested": {} } else: return {"schema": {}, "nested": {}} def generate_nested_entity_name(field_name: str, base_name: str) -> str: """ 生成嵌套实体类名称(会自动添加 Bean 后缀) Args: field_name: 字段名 base_name: 基础类名 Returns: 嵌套实体类名称(带 Bean 后缀) """ from .utils import sanitize_class_name # 移除 base_name 的 Bean 后缀(如果存在),因为我们要重新添加 clean_base_name = base_name if clean_base_name.endswith("Bean"): clean_base_name = clean_base_name[:-4] if field_name: # 使用字段名生成类名 class_name = sanitize_class_name(field_name) # 添加 Bean 后缀(用于混淆规则适配) if not class_name.endswith("Bean"): class_name = class_name + "Bean" return class_name else: # 使用基础类名 + Item + Bean return f"{clean_base_name}ItemBean" def _map_showdoc_type_to_kotlin(param_type: str) -> str: """ 将 ShowDoc 参数类型映射到 Kotlin 类型 Args: param_type: ShowDoc 参数类型(如 "string", "int", "boolean" 等) Returns: Kotlin 类型名称 """ param_type = (param_type or "").lower().strip() type_mapping = { "string": "String", "str": "String", "int": "Int", "integer": "Int", "long": "Long", "float": "Float", "double": "Double", "boolean": "Boolean", "bool": "Boolean", "array": "MutableList<Any>", "list": "MutableList<Any>", "object": "Any", "json": "Any", "file": "Any", # 文件上传,通常需要特殊处理 } return type_mapping.get(param_type, "String") def build_request_schema_from_params(param_list: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: """ 从参数列表构建请求实体类 Schema Args: param_list: 参数列表,每个参数包含 name, type, value, require 等字段 Returns: Schema 字典,格式为 {字段名: {type: "类型", nullable: bool}} """ schema = {} for param in param_list: if not isinstance(param, dict): continue name = param.get("name", "").strip() if not name: # 忽略空名称 continue param_type = param.get("type", "string") require = param.get("require", "1") # "1" 表示必需,"0" 表示可选 nullable = (require != "1") kotlin_type = _map_showdoc_type_to_kotlin(param_type) schema[name] = { "type": kotlin_type, "nullable": nullable } return schema def extract_data_from_request(params: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ 从 ShowDoc 的请求参数结构中提取实际的请求参数或参数列表 ShowDoc 的 api.body 包含: { "mode": "urlencoded" | "formdata" | "json", "urlencoded": [...], # 参数列表 "formdata": [...], # 参数列表 "json": "...", # JSON 字符串 "jsonDesc": [...] # JSON 参数描述列表 } Args: params: ShowDoc 的 params 对象(api.body) Returns: - 如果是 JSON 模式:返回解析后的 JSON 对象(字典) - 如果是 urlencoded/formdata 模式:返回参数列表(用于后续构建 schema) - 如果无法提取:返回 None """ if not params or not isinstance(params, dict): return None # 检查是否是 ShowDoc 的 params 结构(包含 mode 字段) mode = params.get("mode", "").lower() # 如果包含 mode 字段,说明这是 ShowDoc 的 params 结构,需要提取实际参数 if mode: # 根据 mode 选择参数来源 if mode == "json": # JSON 模式:从 json 字段或 jsonDesc 中提取 json_str = params.get("json", "") json_desc = params.get("jsonDesc", []) # 优先使用 json 字段(如果有有效的 JSON) if json_str and json_str.strip(): import json try: parsed = json.loads(json_str) if isinstance(parsed, dict): return parsed except: pass # 如果没有有效的 json 字符串,尝试从 jsonDesc 构建 # 但 jsonDesc 通常是扁平化的参数列表,需要特殊处理 if json_desc and isinstance(json_desc, list) and json_desc: # 标记为参数列表格式 return {"__param_list__": json_desc, "__mode__": "json"} return None elif mode == "urlencoded": # URL 编码模式:从 urlencoded 列表中提取 urlencoded = params.get("urlencoded", []) if urlencoded and isinstance(urlencoded, list): # 过滤掉空名称和已禁用的参数 valid_params = [] for item in urlencoded: if isinstance(item, dict): name = item.get("name", "").strip() disable = item.get("disable", "0") if name and disable != "1": valid_params.append(item) if valid_params: # 标记为参数列表格式 return {"__param_list__": valid_params, "__mode__": "urlencoded"} return None elif mode == "formdata": # 表单数据模式:从 formdata 列表中提取 formdata = params.get("formdata", []) if formdata and isinstance(formdata, list): # 过滤掉空名称 valid_params = [] for item in formdata: if isinstance(item, dict): name = item.get("name", "").strip() if name: valid_params.append(item) if valid_params: # 标记为参数列表格式 return {"__param_list__": valid_params, "__mode__": "formdata"} return None # 如果没有匹配的模式,返回 None return None # 如果没有 mode 字段,可能是直接的参数字典,直接返回 # 但需要过滤掉 ShowDoc 的元数据字段 showdoc_meta_fields = {"mode", "urlencoded", "formdata", "json", "jsonDesc"} filtered = {k: v for k, v in params.items() if k not in showdoc_meta_fields} return filtered if filtered else None def sanitize_category_name(cat_name: str) -> str: """ 清理分类名称,用于文件夹名称和包名(必须全部是英文) Args: cat_name: 分类名称(可能是中文) Returns: 清理后的文件夹名称(全英文,小写) """ from .utils import translate_chinese_to_english # 先转换为英文 name = translate_chinese_to_english(cat_name) if not name: return "default" # 移除特殊字符,只保留字母、数字、下划线 name = re.sub(r'[^a-zA-Z0-9_]', '', name) # 确保是有效的包名(不能以数字开头) if name and name[0].isdigit(): name = "cat_" + name if not name or not name.isascii(): return "default" return name.lower()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yfcyfc123234/showdoc_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server