Skip to main content
Glama

Chimera MCP Server

by Hank-coder
fastmcp_utils.py8.97 kB
""" FastMCP 工具函数 包含MCP服务器的辅助函数,保持主服务器文件简洁 """ from typing import List, Dict, Any from loguru import logger from utils.page_content_fetcher import get_page_content_for_intent_search def get_bearer_token(ctx): """ 从FastMCP上下文中获取Bearer token """ request = ctx.get_http_request() headers = request.headers # Check if 'Authorization' header is present authorization_header = headers.get('Authorization') if authorization_header: # Split the header into 'Bearer <token>' parts = authorization_header.split() if len(parts) == 2 and parts[0] == 'Bearer': return parts[1] else: raise ValueError("Invalid Authorization header format") else: raise ValueError("Authorization header missing") async def get_path_contents_async(notion_client, path_titles: List[str], path_ids: List[str], include_files: bool = True, max_content_length: int = 8000, max_file_content_length: int = 8000) -> List[Dict[str, Any]]: """ 获取路径中所有页面的内容,支持文档提取和长度控制,从缓存中获取时间信息 Args: notion_client: Notion客户端实例 path_titles: 页面标题列表 path_ids: 页面ID列表 include_files: 是否提取文档内容 max_content_length: 单个页面内容最大长度 max_file_content_length: 单个文档内容最大长度 Returns: 包含页面内容的字典列表 """ if not notion_client: from core.notion_client import NotionClient notion_client = NotionClient() # 从缓存中加载页面时间信息 cache_pages = {} try: import json from pathlib import Path cache_file = Path("llm_cache/chimera_cache.json") if cache_file.exists(): with open(cache_file, 'r', encoding='utf-8') as f: cache_data = json.load(f) cache_pages = cache_data.get("pages", {}) except Exception as e: logger.warning(f"无法加载缓存文件获取时间信息: {e}") # 临时设置文件提取器的长度限制 from core.file_extractor import file_extractor original_max_length = file_extractor.max_content_length if max_file_content_length > 0: file_extractor.max_content_length = max_file_content_length # 🚀 并发处理所有页面内容获取 import asyncio async def fetch_single_page(i: int, title: str, page_id: str) -> Dict[str, Any]: """获取单个页面内容的异步函数""" try: # 从缓存中获取时间信息(作为备用) page_cache = cache_pages.get(page_id, {}) # 使用统一的页面内容获取器 if include_files: content, latest_timestamp, metadata = await get_page_content_for_intent_search( page_id=page_id, is_core_page=True, # FastMCP通常用于核心页面 max_length=max_content_length ) else: # 不包含文件时,使用最小配置 from utils.page_content_fetcher import PageContentFetcher fetcher = PageContentFetcher() config = { 'include_files': False, 'include_tables': True, # 保留表格 'max_content_length': max_content_length } content, latest_timestamp, metadata = await fetcher.get_page_content( page_id=page_id, config=config, purpose="fastmcp_minimal" ) # 使用实时时间戳,如果获取失败则使用缓存时间 if latest_timestamp: last_edited_time = latest_timestamp else: last_edited_time = page_cache.get('lastEditedTime', '') # 额外的长度控制(防止单个页面过长) if max_content_length > 0 and len(content) > max_content_length: content = truncate_content_smart(content, max_content_length) return { "position": i, "title": title, "notion_id": page_id, "content": content, "has_files": include_files, "content_length": len(content), "last_edited_time": last_edited_time, "status": "success" } except Exception as e: error_msg = str(e) # 页面获取失败,返回友好错误信息 if ("Could not find block with ID" in error_msg or "Make sure the relevant pages and databases are shared" in error_msg or "页面不存在或未授权访问" in error_msg): logger.warning(f"页面 {page_id} 无法访问: {error_msg}") return { "position": i, "title": title, "notion_id": page_id, "content": f"⚠️ 页面无法访问: {title}\n原因: 页面已删除或权限不足", "has_files": False, "content_length": 0, "status": "inaccessible" } else: # 其他错误 return { "position": i, "title": title, "notion_id": page_id, "content": f"获取内容失败: {error_msg}", "has_files": False, "content_length": 0, "status": "error" } try: # 🚀 并发获取所有页面内容(带并发控制) logger.debug(f"开始并发获取 {len(path_titles)} 个页面内容") start_time = asyncio.get_event_loop().time() # 控制并发数量,避免对Notion API造成过大压力 max_concurrent = min(5, len(path_titles)) # 最多5个并发请求 semaphore = asyncio.Semaphore(max_concurrent) async def rate_limited_fetch(i: int, title: str, page_id: str) -> Dict[str, Any]: """带速率限制的页面获取函数""" async with semaphore: return await fetch_single_page(i, title, page_id) # 创建并发任务 tasks = [ rate_limited_fetch(i, title, page_id) for i, (title, page_id) in enumerate(zip(path_titles, path_ids)) ] # 等待所有任务完成 results = await asyncio.gather(*tasks, return_exceptions=True) # 处理结果 path_contents = [] for result in results: if isinstance(result, Exception): # 处理异常情况 logger.error(f"页面获取异常: {result}") path_contents.append({ "position": len(path_contents), "title": "Unknown", "notion_id": "unknown", "content": f"获取失败: {str(result)}", "has_files": False, "content_length": 0, "status": "exception" }) else: path_contents.append(result) # 按position排序,确保顺序正确 path_contents.sort(key=lambda x: x["position"]) end_time = asyncio.get_event_loop().time() processing_time = (end_time - start_time) * 1000 # 转换为毫秒 success_count = sum(1 for content in path_contents if content.get("status") == "success") logger.info(f"✅ 并发获取页面内容完成: {success_count}/{len(path_contents)} 成功, 并发数: {max_concurrent}, 耗时: {processing_time:.2f}ms") finally: # 恢复原始设置 file_extractor.max_content_length = original_max_length return path_contents def truncate_content_smart(content: str, max_length: int) -> str: """ 截断内容,保留重要部分 Args: content: 原始内容 max_length: 最大长度 Returns: 截断后的内容 """ if len(content) <= max_length: return content # 保留前80%和后10%的内容 front_length = int(max_length * 0.8) back_length = int(max_length * 0.1) front_part = content[:front_length] back_part = content[-back_length:] if back_length > 0 else "" truncated = front_part if back_part: truncated += f"\n\n... [内容已截断,省略 {len(content) - front_length - back_length} 字符] ...\n\n" + back_part else: truncated += f"\n\n[内容已截断: 显示 {front_length}/{len(content)} 字符]" return truncated

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Hank-coder/chimera_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server