Skip to main content
Glama

Chimera MCP Server

by Hank-coder
page_content_fetcher.py13.2 kB
#!/usr/bin/env python3 """ 统一的Notion页面内容获取器 整合表格、文档、图片等多种内容类型的处理 用于Deep Research、Intent Search等模块 """ from typing import Dict, Any, Optional, Tuple, List from loguru import logger from core.notion_client import NotionClient from core.file_extractor import FileContentExtractor class PageContentFetcher: """ 统一的页面内容获取器 功能: 1. 获取Notion页面的完整内容(包括表格、子块等) 2. 提取并处理附件文档(PDF、Word、Excel等) 3. 处理图片和其他媒体内容 4. 支持内容长度限制和截断策略 5. 统一的错误处理和降级策略 """ def __init__(self, notion_client: Optional[NotionClient] = None): """初始化页面内容获取器""" self.notion_client = notion_client or NotionClient() self.file_extractor = FileContentExtractor() # 默认配置 self.default_config = { 'include_files': True, # 是否包含文档文件 'include_tables': True, # 是否包含表格内容 'include_linked_pages': False, # 是否包含链接页面(通常不需要) 'max_content_length': 8000, # 最大内容长度 'max_file_content': 6000, # 单个文件最大内容长度 'table_format': 'markdown', # 表格格式: 'markdown' | 'plain' 'truncate_strategy': 'smart' # 截断策略: 'smart' | 'simple' } async def get_page_content( self, page_id: str, config: Optional[Dict[str, Any]] = None, purpose: str = "" ) -> Tuple[str, str, Dict[str, Any]]: """ 获取页面的完整内容 Args: page_id: Notion页面ID config: 配置选项,会与默认配置合并 purpose: 用途说明,用于优化内容提取策略 Returns: Tuple[content, timestamp, metadata] - content: 页面完整内容 - timestamp: 最后编辑时间 - metadata: 元数据信息(包含统计、文件列表等) """ try: # 合并配置 final_config = {**self.default_config, **(config or {})} logger.debug(f"开始获取页面内容: {page_id}, purpose: {purpose}") # 获取基础页面内容 - 使用NotionExtractor的方法 normalized_id = self.notion_client._normalize_page_id(page_id) # 获取页面基本信息(包含时间戳) page_info = await self.notion_client.extractor.get_page_basic_info(normalized_id) timestamp = page_info.get('last_edited_time', '') if page_info else '' # 根据配置获取页面内容 if final_config['include_files']: # 自动解析文件 处理 PDF WORD EXCEL content = await self.notion_client.extractor.get_page_content_with_files(normalized_id) else: content = await self.notion_client.extractor.get_page_content(normalized_id) # 处理内容为空的情况 if not content or not content.strip(): page_title = page_info.get('title', 'Unknown') if page_info else 'Unknown' content = f"页面 '{page_title}' 没有内容, 作为路径参考。" else: content = content.strip() # 构建元数据 metadata = { 'page_id': page_id, 'timestamp': timestamp, 'content_length': len(content) if content else 0, 'has_tables': self._detect_tables(content) if content else False, 'has_files': self._detect_files(content) if content else False, 'config_used': final_config, 'purpose': purpose } # 内容后处理 if content: content = self._post_process_content(content, final_config, metadata) logger.debug(f"页面内容获取完成: {len(content) if content else 0} 字符") return content or "", timestamp or "", metadata except Exception as e: error_msg = str(e) logger.error(f"获取页面内容失败 {page_id}: {e}") # 处理常见错误情况 if "Could not find block with ID" in error_msg: error_content = f"无法访问页面 {page_id}: 页面不存在或未授权访问。请确保:\n1. 页面ID正确\n2. 页面已与Notion integration分享\n3. 检查页面权限设置" elif "Make sure the relevant pages and databases are shared" in error_msg: error_content = f"权限错误: 页面 {page_id} 未与integration分享。请在Notion中将此页面分享给你的integration。" else: error_content = f"无法获取页面内容: {error_msg}" return error_content, "", { 'page_id': page_id, 'error': str(e), 'content_length': len(error_content), 'has_tables': False, 'has_files': False } async def get_multiple_pages_content( self, page_ids: List[str], config: Optional[Dict[str, Any]] = None, purpose: str = "" ) -> List[Dict[str, Any]]: """ 批量获取多个页面的内容 Args: page_ids: 页面ID列表 config: 配置选项 purpose: 用途说明 Returns: List of page content dictionaries """ try: import asyncio # 创建并发任务 tasks = [] for page_id in page_ids: task = self._get_single_page_with_error_handling(page_id, config, purpose) tasks.append(task) # 并发执行 results = await asyncio.gather(*tasks, return_exceptions=True) # 处理结果 processed_results = [] for i, result in enumerate(results): if isinstance(result, Exception): logger.warning(f"页面 {page_ids[i]} 获取失败: {result}") processed_results.append({ 'page_id': page_ids[i], 'content': "", 'timestamp': "", 'metadata': {'error': str(result)}, 'success': False }) else: processed_results.append({ 'page_id': page_ids[i], 'content': result[0], 'timestamp': result[1], 'metadata': result[2], 'success': True }) return processed_results except Exception as e: logger.error(f"批量获取页面内容失败: {e}") return [] async def _get_single_page_with_error_handling( self, page_id: str, config: Optional[Dict[str, Any]], purpose: str ) -> Tuple[str, str, Dict[str, Any]]: """带错误处理的单页面获取""" try: return await self.get_page_content(page_id, config, purpose) except Exception as e: logger.warning(f"页面 {page_id} 内容获取失败: {e}") return "", "", {'error': str(e)} def _post_process_content( self, content: str, config: Dict[str, Any], metadata: Dict[str, Any] ) -> str: """内容后处理""" try: # 表格格式化 if config.get('table_format') == 'markdown' and metadata.get('has_tables'): content = self._format_tables_as_markdown(content) # 智能截断 if config.get('truncate_strategy') == 'smart' and len(content) > config['max_content_length']: content = self._smart_truncate(content, config['max_content_length']) # 内容清理 content = self._clean_content(content) return content except Exception as e: logger.warning(f"内容后处理失败: {e}") return content def _detect_tables(self, content: str) -> bool: """检测内容中是否包含表格""" table_indicators = [ "--- 表格 ---", " | ", "\n|", "table_row", "[Table with" ] return any(indicator in content for indicator in table_indicators) def _detect_files(self, content: str) -> bool: """检测内容中是否包含文件""" file_indicators = [ "[PDF文档]", "[Word文档]", "[Excel文档]", "[文档内容]", "文件名:" ] return any(indicator in content for indicator in file_indicators) def _format_tables_as_markdown(self, content: str) -> str: """将表格格式化为Markdown格式""" try: # 这里可以添加更复杂的表格格式化逻辑 # 目前保持简单实现 return content except Exception as e: logger.warning(f"表格格式化失败: {e}") return content def _smart_truncate(self, content: str, max_length: int) -> str: """智能截断:优先保留完整的段落和表格""" try: if len(content) <= max_length: return content # 尝试在段落边界截断 truncated = content[:max_length] # 查找最后一个段落边界 last_paragraph = truncated.rfind('\n\n') if last_paragraph > max_length * 0.7: # 如果截断位置合理 truncated = truncated[:last_paragraph] # 查找最后一个完整句子 last_sentence = truncated.rfind('。') if last_sentence > max_length * 0.8: # 如果截断位置合理 truncated = truncated[:last_sentence + 1] return truncated + "\n\n... (内容已截断,显示前部分)" except Exception as e: logger.warning(f"智能截断失败: {e}") return content[:max_length] + "..." def _clean_content(self, content: str) -> str: """清理内容:去除多余空白、格式化等""" try: # 去除多余的空行 lines = content.split('\n') cleaned_lines = [] prev_empty = False for line in lines: line = line.strip() if not line: if not prev_empty: cleaned_lines.append('') prev_empty = True else: cleaned_lines.append(line) prev_empty = False return '\n'.join(cleaned_lines).strip() except Exception as e: logger.warning(f"内容清理失败: {e}") return content # 便利函数 async def get_page_content_for_deep_research( page_id: str, complexity: str = "standard", max_length: int = 7000 ) -> Tuple[str, str, Dict[str, Any]]: """ 为Deep Research优化的页面内容获取 """ fetcher = PageContentFetcher() config = { 'include_files': True, 'include_tables': True, 'max_content_length': max_length, 'truncate_strategy': 'smart', 'table_format': 'markdown' } return await fetcher.get_page_content( page_id=page_id, config=config, purpose=f"deep_research_{complexity}" ) async def get_page_content_for_intent_search( page_id: str, is_core_page: bool = True, max_length: int = 8000 ) -> Tuple[str, str, Dict[str, Any]]: """ 为Intent Search优化的页面内容获取 """ fetcher = PageContentFetcher() config = { 'include_files': True, 'include_tables': True, 'max_content_length': max_length if is_core_page else 6000, 'truncate_strategy': 'smart', 'table_format': 'plain' # Intent Search可能更适合纯文本 } return await fetcher.get_page_content( page_id=page_id, config=config, purpose=f"intent_search_{'core' if is_core_page else 'related'}" ) # 全局实例(可选) _global_fetcher = None def get_global_page_fetcher() -> PageContentFetcher: """获取全局页面内容获取器实例""" global _global_fetcher if _global_fetcher is None: _global_fetcher = PageContentFetcher() return _global_fetcher

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Hank-coder/chimera_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server