"""
FastAPI MCP 演示应用
提供 MCP 协议的 RESTful API 接口
"""
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import Dict, Any, Optional, List
import logging
import traceback
from datetime import datetime
from config import config
from mcp_server import mcp_server
# 配置日志
logging.basicConfig(level=getattr(logging, config.LOG_LEVEL))
logger = logging.getLogger(__name__)
# 创建 FastAPI 应用
app = FastAPI(
title="MCP FastAPI Demo",
description="Model Context Protocol (MCP) 演示服务器,基于 FastAPI 实现",
version=config.MCP_SERVER_VERSION,
docs_url="/docs",
redoc_url="/redoc"
)
# 添加 CORS 中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 在生产环境中应该限制具体的域名
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Pydantic 模型定义
class ToolCallRequest(BaseModel):
"""工具调用请求模型"""
name: str = Field(..., description="工具名称")
arguments: Dict[str, Any] = Field(default_factory=dict, description="工具参数")
class ResourceReadRequest(BaseModel):
"""资源读取请求模型"""
uri: str = Field(..., description="资源URI")
class ErrorResponse(BaseModel):
"""错误响应模型"""
error: str
detail: Optional[str] = None
timestamp: str
# 异常处理器
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
"""全局异常处理器"""
logger.error(f"Unhandled exception: {exc}")
logger.error(traceback.format_exc())
return JSONResponse(
status_code=500,
content=ErrorResponse(
error="Internal Server Error",
detail=str(exc),
timestamp=datetime.now().isoformat()
).dict()
)
# API 路由定义
@app.get("/", response_model=Dict[str, Any])
async def root():
"""根路径 - 健康检查和服务器信息"""
return {
"message": "MCP FastAPI Demo Server",
"status": "running",
"timestamp": datetime.now().isoformat(),
"server_info": mcp_server.get_server_info(),
"endpoints": {
"health": "/health",
"tools_list": "/mcp/tools/list",
"tools_call": "/mcp/tools/call",
"resources_list": "/mcp/resources/list",
"resources_read": "/mcp/resources/read",
"docs": "/docs"
}
}
@app.get("/health", response_model=Dict[str, Any])
async def health_check():
"""健康检查端点"""
return mcp_server.ping()
# MCP 工具相关端点
@app.post("/mcp/tools/list", response_model=Dict[str, Any])
async def list_tools():
"""获取所有可用工具列表"""
try:
result = mcp_server.list_tools()
logger.info("Tools list requested")
return result
except Exception as e:
logger.error(f"Error listing tools: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/mcp/tools/call", response_model=Dict[str, Any])
async def call_tool(request: ToolCallRequest):
"""调用指定的工具"""
try:
logger.info(f"Tool call requested: {request.name}")
result = mcp_server.call_tool(request.name, request.arguments)
# 如果结果中包含错误,返回 400 状态码
if "error" in result:
logger.warning(f"Tool call error: {result['error']}")
raise HTTPException(status_code=400, detail=result["error"])
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error calling tool: {e}")
raise HTTPException(status_code=500, detail=str(e))
# MCP 资源相关端点
@app.post("/mcp/resources/list", response_model=Dict[str, Any])
async def list_resources():
"""获取所有可用资源列表"""
try:
result = mcp_server.list_resources()
logger.info("Resources list requested")
return result
except Exception as e:
logger.error(f"Error listing resources: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/mcp/resources/read", response_model=Dict[str, Any])
async def read_resource(request: ResourceReadRequest):
"""读取指定资源"""
try:
logger.info(f"Resource read requested: {request.uri}")
result = mcp_server.read_resource(request.uri)
# 如果结果中包含错误,返回 404 状态码
if "error" in result:
logger.warning(f"Resource read error: {result['error']}")
raise HTTPException(status_code=404, detail=result["error"])
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error reading resource: {e}")
raise HTTPException(status_code=500, detail=str(e))
# 额外的便利端点
@app.get("/mcp/server/info", response_model=Dict[str, Any])
async def get_server_info():
"""获取详细的服务器信息"""
return {
**mcp_server.get_server_info(),
"config": config.get_mcp_server_info(),
"timestamp": datetime.now().isoformat()
}
@app.get("/mcp/tools/{tool_name}/schema", response_model=Dict[str, Any])
async def get_tool_schema(tool_name: str):
"""获取特定工具的模式定义"""
tools_result = mcp_server.list_tools()
if "tools" not in tools_result:
raise HTTPException(status_code=500, detail="Failed to get tools list")
for tool in tools_result["tools"]:
if tool["name"] == tool_name:
return tool
raise HTTPException(status_code=404, detail=f"Tool '{tool_name}' not found")
# 启动信息
@app.on_event("startup")
async def startup_event():
"""应用启动事件"""
logger.info(f"Starting {config.MCP_SERVER_NAME} v{config.MCP_SERVER_VERSION}")
logger.info(f"Server running on {config.HOST}:{config.PORT}")
logger.info(f"Debug mode: {config.DEBUG}")
@app.on_event("shutdown")
async def shutdown_event():
"""应用关闭事件"""
logger.info("Shutting down MCP FastAPI Demo Server")
# 开发环境启动
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host=config.HOST,
port=config.PORT,
reload=config.DEBUG,
log_level=config.LOG_LEVEL.lower()
)