"""
FastMCP 工具函数
包含MCP服务器的辅助函数,保持主服务器文件简洁
"""
from typing import List, Dict, Any
from loguru import logger
from utils.page_content_fetcher import get_page_content_for_intent_search
def get_bearer_token(ctx):
"""
从FastMCP上下文中获取Bearer token
"""
request = ctx.get_http_request()
headers = request.headers
# Check if 'Authorization' header is present
authorization_header = headers.get('Authorization')
if authorization_header:
# Split the header into 'Bearer <token>'
parts = authorization_header.split()
if len(parts) == 2 and parts[0] == 'Bearer':
return parts[1]
else:
raise ValueError("Invalid Authorization header format")
else:
raise ValueError("Authorization header missing")
async def get_path_contents_async(notion_client, path_titles: List[str], path_ids: List[str],
include_files: bool = True, max_content_length: int = 8000,
max_file_content_length: int = 8000) -> List[Dict[str, Any]]:
"""
获取路径中所有页面的内容,支持文档提取和长度控制,从缓存中获取时间信息
Args:
notion_client: Notion客户端实例
path_titles: 页面标题列表
path_ids: 页面ID列表
include_files: 是否提取文档内容
max_content_length: 单个页面内容最大长度
max_file_content_length: 单个文档内容最大长度
Returns:
包含页面内容的字典列表
"""
if not notion_client:
from core.notion_client import NotionClient
notion_client = NotionClient()
# 从缓存中加载页面时间信息
cache_pages = {}
try:
import json
from pathlib import Path
cache_file = Path("llm_cache/chimera_cache.json")
if cache_file.exists():
with open(cache_file, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
cache_pages = cache_data.get("pages", {})
except Exception as e:
logger.warning(f"无法加载缓存文件获取时间信息: {e}")
# 临时设置文件提取器的长度限制
from core.file_extractor import file_extractor
original_max_length = file_extractor.max_content_length
if max_file_content_length > 0:
file_extractor.max_content_length = max_file_content_length
# 🚀 并发处理所有页面内容获取
import asyncio
async def fetch_single_page(i: int, title: str, page_id: str) -> Dict[str, Any]:
"""获取单个页面内容的异步函数"""
try:
# 从缓存中获取时间信息(作为备用)
page_cache = cache_pages.get(page_id, {})
# 使用统一的页面内容获取器
if include_files:
content, latest_timestamp, metadata = await get_page_content_for_intent_search(
page_id=page_id,
is_core_page=True, # FastMCP通常用于核心页面
max_length=max_content_length
)
else:
# 不包含文件时,使用最小配置
from utils.page_content_fetcher import PageContentFetcher
fetcher = PageContentFetcher()
config = {
'include_files': False,
'include_tables': True, # 保留表格
'max_content_length': max_content_length
}
content, latest_timestamp, metadata = await fetcher.get_page_content(
page_id=page_id,
config=config,
purpose="fastmcp_minimal"
)
# 使用实时时间戳,如果获取失败则使用缓存时间
if latest_timestamp:
last_edited_time = latest_timestamp
else:
last_edited_time = page_cache.get('lastEditedTime', '')
# 额外的长度控制(防止单个页面过长)
if max_content_length > 0 and len(content) > max_content_length:
content = truncate_content_smart(content, max_content_length)
return {
"position": i,
"title": title,
"notion_id": page_id,
"content": content,
"has_files": include_files,
"content_length": len(content),
"last_edited_time": last_edited_time,
"status": "success"
}
except Exception as e:
error_msg = str(e)
# 页面获取失败,返回友好错误信息
if ("Could not find block with ID" in error_msg or
"Make sure the relevant pages and databases are shared" in error_msg or
"页面不存在或未授权访问" in error_msg):
logger.warning(f"页面 {page_id} 无法访问: {error_msg}")
return {
"position": i,
"title": title,
"notion_id": page_id,
"content": f"⚠️ 页面无法访问: {title}\n原因: 页面已删除或权限不足",
"has_files": False,
"content_length": 0,
"status": "inaccessible"
}
else:
# 其他错误
return {
"position": i,
"title": title,
"notion_id": page_id,
"content": f"获取内容失败: {error_msg}",
"has_files": False,
"content_length": 0,
"status": "error"
}
try:
# 🚀 并发获取所有页面内容(带并发控制)
logger.debug(f"开始并发获取 {len(path_titles)} 个页面内容")
start_time = asyncio.get_event_loop().time()
# 控制并发数量,避免对Notion API造成过大压力
max_concurrent = min(5, len(path_titles)) # 最多5个并发请求
semaphore = asyncio.Semaphore(max_concurrent)
async def rate_limited_fetch(i: int, title: str, page_id: str) -> Dict[str, Any]:
"""带速率限制的页面获取函数"""
async with semaphore:
return await fetch_single_page(i, title, page_id)
# 创建并发任务
tasks = [
rate_limited_fetch(i, title, page_id)
for i, (title, page_id) in enumerate(zip(path_titles, path_ids))
]
# 等待所有任务完成
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
path_contents = []
for result in results:
if isinstance(result, Exception):
# 处理异常情况
logger.error(f"页面获取异常: {result}")
path_contents.append({
"position": len(path_contents),
"title": "Unknown",
"notion_id": "unknown",
"content": f"获取失败: {str(result)}",
"has_files": False,
"content_length": 0,
"status": "exception"
})
else:
path_contents.append(result)
# 按position排序,确保顺序正确
path_contents.sort(key=lambda x: x["position"])
end_time = asyncio.get_event_loop().time()
processing_time = (end_time - start_time) * 1000 # 转换为毫秒
success_count = sum(1 for content in path_contents if content.get("status") == "success")
logger.info(f"✅ 并发获取页面内容完成: {success_count}/{len(path_contents)} 成功, 并发数: {max_concurrent}, 耗时: {processing_time:.2f}ms")
finally:
# 恢复原始设置
file_extractor.max_content_length = original_max_length
return path_contents
def truncate_content_smart(content: str, max_length: int) -> str:
"""
截断内容,保留重要部分
Args:
content: 原始内容
max_length: 最大长度
Returns:
截断后的内容
"""
if len(content) <= max_length:
return content
# 保留前80%和后10%的内容
front_length = int(max_length * 0.8)
back_length = int(max_length * 0.1)
front_part = content[:front_length]
back_part = content[-back_length:] if back_length > 0 else ""
truncated = front_part
if back_part:
truncated += f"\n\n... [内容已截断,省略 {len(content) - front_length - back_length} 字符] ...\n\n" + back_part
else:
truncated += f"\n\n[内容已截断: 显示 {front_length}/{len(content)} 字符]"
return truncated