Skip to main content
Glama
source_map.py16.9 kB
""" Source Map 机制 记录原始 Markdown 文档与 SIR 节点之间的位置映射关系, 支持双向位置转换和编辑操作的源位置追踪。 遵循文章《智能体如何高效处理 Markdown:结构化解析与语义编辑方案》的设计理念。 """ from typing import Dict, List, Optional, Any, Tuple, Union from dataclasses import dataclass import re from enum import Enum from .sir_schema import SIRNode, SourceLocation, SourcePosition class MappingType(str, Enum): """映射类型枚举""" EXACT = "exact" # 精确匹配 APPROXIMATE = "approximate" # 近似匹配 GENERATED = "generated" # 生成的内容 DELETED = "deleted" # 已删除的内容 @dataclass class SourceMapping: """源映射条目""" sir_node_id: str original_text: str original_location: SourceLocation mapping_type: MappingType confidence: float # 映射置信度 (0.0 - 1.0) metadata: Dict[str, Any] @dataclass class EditOperationMapping: """编辑操作映射""" operation_id: str operation_type: str sir_node_ids: List[str] original_locations: List[SourceLocation] new_content: str timestamp: float user: Optional[str] = None class SourceMap: """Source Map 管理器""" def __init__(self, original_content: str, source_file: Optional[str] = None): self.original_content = original_content self.source_file = source_file self.mappings: Dict[str, SourceMapping] = {} # sir_node_id -> mapping self.edit_history: List[EditOperationMapping] = [] self.line_offsets: List[int] = self._calculate_line_offsets(original_content) def add_mapping(self, sir_node: SIRNode, original_text: str, start_line: int, start_col: int, end_line: int, end_col: int, mapping_type: MappingType = MappingType.EXACT, confidence: float = 1.0, metadata: Optional[Dict[str, Any]] = None) -> bool: """添加源映射""" try: node_id = sir_node.get("id") if not node_id: return False location = SourceLocation( file=self.source_file, start=SourcePosition(line=start_line, column=start_col, offset=0), end=SourcePosition(line=end_line, column=end_col, offset=0) ) mapping = SourceMapping( sir_node_id=node_id, original_text=original_text, original_location=location, mapping_type=mapping_type, confidence=confidence, metadata=metadata or {} ) self.mappings[node_id] = mapping # 更新节点的源位置信息 sir_node["source_location"] = location return True except Exception: return False def get_mapping(self, sir_node_id: str) -> Optional[SourceMapping]: """获取节点的源映射""" return self.mappings.get(sir_node_id) def find_mapping_by_position(self, line: int, col: int) -> Optional[SourceMapping]: """根据源文件位置查找映射""" for mapping in self.mappings.values(): loc = mapping.original_location if (loc.start.line <= line <= loc.end.line and (loc.start.line != line or loc.start.column <= col) and (loc.end.line != line or loc.end.column >= col)): return mapping return None def find_mapping_by_text(self, text: str, exact_match: bool = True) -> List[SourceMapping]: """根据文本内容查找映射""" results = [] for mapping in self.mappings.values(): if exact_match: if mapping.original_text == text: results.append(mapping) else: if text in mapping.original_text: results.append(mapping) return results def record_edit_operation(self, operation_type: str, sir_node_ids: List[str], new_content: str, user: Optional[str] = None) -> str: """记录编辑操作""" import time import uuid # 收集原始位置信息 original_locations = [] for node_id in sir_node_ids: mapping = self.get_mapping(node_id) if mapping: original_locations.append(mapping.original_location) operation_id = f"edit_{uuid.uuid4().hex[:8]}" edit_op = EditOperationMapping( operation_id=operation_id, operation_type=operation_type, sir_node_ids=sir_node_ids, original_locations=original_locations, new_content=new_content, timestamp=time.time(), user=user ) self.edit_history.append(edit_op) return operation_id def get_edit_history(self) -> List[EditOperationMapping]: """获取编辑历史""" return self.edit_history def get_original_text(self, sir_node_id: str) -> Optional[str]: """获取节点的原始文本""" mapping = self.get_mapping(sir_node_id) return mapping.original_text if mapping else None def get_original_location(self, sir_node_id: str) -> Optional[SourceLocation]: """获取节点的原始位置""" mapping = self.get_mapping(sir_node_id) return mapping.original_location if mapping else None def update_mapping_after_edit(self, sir_node_id: str, new_text: str, new_start_line: int, new_start_col: int, new_end_line: int, new_end_col: int) -> bool: """编辑后更新映射""" mapping = self.get_mapping(sir_node_id) if not mapping: return False # 创建新的位置信息 new_location = SourceLocation( file=self.source_file, start=SourcePosition(line=new_start_line, column=new_start_col, offset=0), end=SourcePosition(line=new_end_line, column=new_end_col, offset=0) ) # 更新映射 mapping.original_text = new_text mapping.original_location = new_location mapping.mapping_type = MappingType.GENERATED mapping.confidence = 0.8 # 降低置信度 return True def calculate_position_offset(self, line: int, col: int, content_delta: int) -> Tuple[int, int]: """计算内容变化后的位置偏移""" # 简单的实现:假设编辑发生在特定位置 # 在实际应用中应该更复杂 return line, col def get_text_at_location(self, location: SourceLocation) -> str: """获取指定位置的原始文本""" try: lines = self.original_content.split('\n') if location.start.line == location.end.line: # 单行范围 line = lines[location.start.line - 1] return line[location.start.column - 1:location.end.column] else: # 多行范围 result = [] # 第一行 first_line = lines[location.start.line - 1] result.append(first_line[location.start.column - 1:]) # 中间行 for line_num in range(location.start.line, location.end.line - 1): result.append(lines[line_num]) # 最后一行 last_line = lines[location.end.line - 1] result.append(last_line[:location.end.column]) return '\n'.join(result) except IndexError: return "" def _calculate_line_offsets(self, content: str) -> List[int]: """计算每行的偏移量""" offsets = [0] offset = 0 for char in content: offset += 1 if char == '\n': offsets.append(offset) return offsets def get_line_offset(self, line_number: int) -> int: """获取指定行的偏移量""" if 1 <= line_number <= len(self.line_offsets): return self.line_offsets[line_number - 1] return 0 def find_node_by_original_text(self, text: str) -> List[str]: """根据原始文本查找节点ID""" results = [] for node_id, mapping in self.mappings.items(): if mapping.original_text == text: results.append(node_id) return results def get_coverage_statistics(self) -> Dict[str, Any]: """获取映射覆盖统计信息""" total_chars = len(self.original_content) mapped_chars = 0 for mapping in self.mappings.values(): loc = mapping.original_location # 计算映射的字符数(简化计算) if loc.start.line == loc.end.line: mapped_chars += (loc.end.column - loc.start.column + 1) else: # 多行映射,近似计算 mapped_chars += 100 # 近似值 coverage = (mapped_chars / total_chars) * 100 if total_chars > 0 else 0 return { "total_characters": total_chars, "mapped_characters": mapped_chars, "coverage_percentage": round(coverage, 2), "total_mappings": len(self.mappings), "exact_mappings": sum(1 for m in self.mappings.values() if m.mapping_type == MappingType.EXACT), "approximate_mappings": sum(1 for m in self.mappings.values() if m.mapping_type == MappingType.APPROXIMATE), "generated_mappings": sum(1 for m in self.mappings.values() if m.mapping_type == MappingType.GENERATED) } def to_dict(self) -> Dict[str, Any]: """将 SourceMap 转换为字典格式""" return { "original_content": self.original_content, "source_file": self.source_file, "mappings": { node_id: { "original_text": mapping.original_text, "original_location": { "file": mapping.original_location["file"], "start": { "line": mapping.original_location["start"]["line"], "column": mapping.original_location["start"]["column"], "offset": mapping.original_location["start"]["offset"] }, "end": { "line": mapping.original_location["end"]["line"], "column": mapping.original_location["end"]["column"], "offset": mapping.original_location["end"]["offset"] } }, "mapping_type": mapping.mapping_type.value, "confidence": mapping.confidence, "metadata": mapping.metadata } for node_id, mapping in self.mappings.items() }, "edit_history": [ { "operation_id": op.operation_id, "operation_type": op.operation_type, "sir_node_ids": op.sir_node_ids, "original_locations": [ { "file": loc["file"], "start": { "line": loc["start"]["line"], "column": loc["start"]["column"], "offset": loc["start"]["offset"] }, "end": { "line": loc["end"]["line"], "column": loc["end"]["column"], "offset": loc["end"]["offset"] } } for loc in op.original_locations ], "new_content": op.new_content, "timestamp": op.timestamp, "user": op.user } for op in self.edit_history ] } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'SourceMap': """从字典格式创建 SourceMap 实例""" source_map = cls(data["original_content"], data.get("source_file")) # 恢复映射 for node_id, mapping_data in data.get("mappings", {}).items(): loc_data = mapping_data["original_location"] start_data = loc_data["start"] end_data = loc_data["end"] mapping = SourceMapping( sir_node_id=node_id, original_text=mapping_data["original_text"], original_location=SourceLocation( file=loc_data["file"], start=SourcePosition( line=start_data["line"], column=start_data["column"], offset=start_data.get("offset", 0) ), end=SourcePosition( line=end_data["line"], column=end_data["column"], offset=end_data.get("offset", 0) ) ), mapping_type=MappingType(mapping_data["mapping_type"]), confidence=mapping_data["confidence"], metadata=mapping_data.get("metadata", {}) ) source_map.mappings[node_id] = mapping # 恢复编辑历史 for op_data in data.get("edit_history", []): op = EditOperationMapping( operation_id=op_data["operation_id"], operation_type=op_data["operation_type"], sir_node_ids=op_data["sir_node_ids"], original_locations=[ SourceLocation( file=loc_data["file"], start=SourcePosition( line=loc_data["start"]["line"], column=loc_data["start"]["column"], offset=loc_data["start"].get("offset", 0) ), end=SourcePosition( line=loc_data["end"]["line"], column=loc_data["end"]["column"], offset=loc_data["end"].get("offset", 0) ) ) for loc_data in op_data["original_locations"] ], new_content=op_data["new_content"], timestamp=op_data["timestamp"], user=op_data.get("user") ) source_map.edit_history.append(op) return source_map def create_source_map(original_content: str, source_file: Optional[str] = None) -> SourceMap: """创建 Source Map 实例""" return SourceMap(original_content, source_file) def calculate_source_position(content: str, char_offset: int) -> SourcePosition: """计算字符偏移量对应的源位置""" line = 1 col = 1 current_offset = 0 for char in content: if current_offset >= char_offset: break if char == '\n': line += 1 col = 1 else: col += 1 current_offset += 1 return SourcePosition(line=line, column=col) def find_text_in_content(content: str, search_text: str, start_line: int = 1, start_col: int = 1) -> Optional[SourceLocation]: """在内容中查找文本并返回位置""" lines = content.split('\n') # 从指定位置开始搜索 for line_idx in range(start_line - 1, len(lines)): line = lines[line_idx] # 确定起始列 start_search_col = start_col if line_idx == start_line - 1 else 1 pos = line.find(search_text, start_search_col - 1) if pos != -1: return SourceLocation( file=None, start=SourcePosition(line=line_idx + 1, column=pos + 1), end=SourcePosition(line=line_idx + 1, column=pos + len(search_text) + 1) ) return None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ForceInjection/markdown-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server