Skip to main content
Glama
utils.py6.37 kB
from flask import session from pydantic import BaseModel from typing import Optional, Tuple, Dict, Any import os import asyncio import requests from mcp_manager import MCPManager from fastapi import FastAPI from contextlib import asynccontextmanager from pyext import RuntimeModule, _RuntimeModule import os,sys,json current_dir = os.path.dirname(os.path.abspath(__file__)) with open(os.path.join(current_dir, "../configs/mcp_config.json"), "r") as f: config = json.load(f) class CodeRequest(BaseModel): code: str timeout: Optional[int] = 180 # 默认超时30秒 session_id:str = "test_id" class CodeResponse(BaseModel): output: str error: Optional[str] execution_time: float session_id:str class CodeSubmitRequest(BaseModel): code:str timeout: Optional[int] = 180 session_id: str = "test_id" class CodeSubmitResponse(BaseModel): status: str session_id: str class SandboxStreamRequest(BaseModel): session_id:str item:Dict[str, Any] class SandboxStreamResponse(BaseModel): session_id:str flag:bool def create_lifespan(manager:MCPManager, temp_dir:str): @asynccontextmanager async def lifespan(app: FastAPI): print("Lifespan startup") await manager.ready() yield print("Lifespan shutdown") for client in manager.client_list: await client.cleanup() return lifespan agent_tools = ["browse_master", "info_master", "intern_s1"] browse_comp_tools = ["batch_search_and_filter"] def build_tools_functions(manager:MCPManager): initial = '''import sys, os current_dir = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.dirname(os.path.dirname(current_dir))) from tool_caller import call_tool\n''' code = "" for tool in manager.get_tools(): schema = tool.get("input_schema") arg = "" arg_dict = " tool_args = {" if schema: for in_arg in schema.get("properties").keys(): default = "" if not (in_arg in schema.get("required", [])): value = schema["properties"][in_arg].get("default") if isinstance(value, str): default = f"='{value}'" else: default = "=" + str(value) arg += f"{in_arg}{default}," arg_dict += f"'{in_arg}':{in_arg}," arg = arg.rstrip(",") arg_dict = arg_dict.rstrip(",") + "}" if (tool['name'] not in agent_tools) and (tool['name'] not in browse_comp_tools): code += f"def {tool['name']}({arg}):\n{arg_dict}\n inform_handler.post_tool_start('{tool['name']}')\n result = call_tool('{tool['name']}', tool_args, inform_handler.session_id)\n inform_handler.post_tool_result('{tool['name']}', result)\n return result\n" elif tool['name'] in browse_comp_tools: code += f"def {tool['name']}({arg}):\n{arg_dict}\n tool_args['session_id']=inform_handler.session_id\n inform_handler.post_tool_start('{tool['name']}')\n result = call_tool('{tool['name']}', tool_args, inform_handler.session_id)\n inform_handler.post_tool_result('{tool['name']}', result)\n return result\n" else: code += f"def {tool['name']}({arg}):\n{arg_dict}\n tool_args['stream_id']=inform_handler.session_id\n inform_handler.post_tool_start('{tool['name']}')\n result = call_tool('{tool['name']}', tool_args, inform_handler.session_id)\n inform_handler.post_tool_result('{tool['name']}', result)\n return result\n" return initial + code def form_item(main_stream_type:str, content:str, stream_state:str): default = { "main_stream_type":main_stream_type, "sub_stream_type":"", "content": content, "from_sandbox": True, "stream_state":stream_state, "other_info": {} } return default def post_item_info(session_id:str, item:Dict[str, Any]): url = config['mcp_server_url'] headers = { "Content-Type": "application/json" } payload = { "session_id":session_id, "item":item } resp = requests.post( f"{url}/put_item", headers=headers, json=payload ) # print(resp.json()) # print(resp.content) response = resp.json() return response class SessionInformHandler: def __init__(self, session_id:str): self.session_id = session_id self.async_inform_queue = asyncio.Queue() def post_tool_start(self, tool_name:str,): print(f'-----------{self.session_id}---------------') formated_item = form_item("tool_result", "", "running") formated_item["other_info"]["tool_name"] = tool_name response = post_item_info(self.session_id, formated_item) def post_tool_result(self, tool_name:str, item): print(f'-----------{self.session_id}---------------') # print(item) formated_item = form_item("tool_result", "", "running") formated_item['other_info'][tool_name] = item response = post_item_info(self.session_id, formated_item) class SessionManager: def __init__(self, mcp_manager:MCPManager): self.sessions: Dict[str, _RuntimeModule] = {} self.mcp_manager = mcp_manager def build_lib(self): return build_tools_functions(self.mcp_manager) def get_session(self, session_id: str) -> _RuntimeModule: if session_id not in self.sessions: # 使用pyext.RuntimeModule创建可重复执行的模块 code_string = self.build_lib() # print(code_string) self.sessions[session_id] = RuntimeModule.from_string(f"session_{session_id}","", "") # 初始化 runtime module 对应session 管理的变量和函数,异步队列收取当前代码流式返回 self.sessions[session_id].__dict__.update( {"inform_handler":SessionInformHandler(session_id=session_id)} ) exec(code_string, self.sessions[session_id].__dict__) return self.sessions[session_id] def clear_session(self, session_id: str): if session_id in self.sessions: del self.sessions[session_id]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sjtu-sai-agents/mcp_sandbox'

If you have feedback or need assistance with the MCP directory API, please join our Discord server