Skip to main content
Glama

AnyDB MCP Server

by iamayuppie
filetool.py11 kB
""" File and vector database operations module for AnyDB MCP Server. Contains all file and vector database functionality including: - ChromaDB vector database management - Document embedding and chunking - Semantic search operations - File management for RAG (Retrieval Augmented Generation) """ import os import hashlib import logging from datetime import datetime from typing import Any, Dict, List, Optional import chromadb from sentence_transformers import SentenceTransformer import tiktoken # Get logger logger = logging.getLogger('mcp_server') class VectorDatabaseManager: """Manages ChromaDB vector database operations for semantic search.""" def __init__(self, collection_name: str = "file_embeddings"): try: script_dir = os.path.dirname(os.path.abspath(__file__)) self.db_path = os.path.join(script_dir, "vector_db") # Initialize ChromaDB self.chroma_client = chromadb.PersistentClient(path=self.db_path) self.collection = self.chroma_client.get_or_create_collection( name=collection_name, metadata={"hnsw:space": "cosine"} ) # Initialize sentence transformer for embeddings self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2') # Initialize tokenizer for text chunking self.tokenizer = tiktoken.get_encoding("cl100k_base") self.available = True logger.info(f"Vector database initialized at: {self.db_path}") logger.info(f"Collection '{collection_name}' ready with {self.collection.count()} documents") except Exception as e: logger.error(f"Failed to initialize vector database: {str(e)}") self.available = False def _chunk_text(self, text: str, max_tokens: int = 500, overlap: int = 50) -> List[str]: """Split text into overlapping chunks based on token count.""" tokens = self.tokenizer.encode(text) chunks = [] for i in range(0, len(tokens), max_tokens - overlap): chunk_tokens = tokens[i:i + max_tokens] chunk_text = self.tokenizer.decode(chunk_tokens) chunks.append(chunk_text) if i + max_tokens >= len(tokens): break return chunks def _generate_file_id(self, filename: str, content: str) -> str: """Generate a unique ID for a file based on filename and content hash.""" content_hash = hashlib.sha256(content.encode('utf-8')).hexdigest()[:16] return f"{filename}_{content_hash}" async def add_file(self, filename: str, content: str, metadata: Dict[str, Any] = None) -> Dict[str, Any]: """Add a file to the vector database with embeddings.""" if not getattr(self, 'available', False): return {"error": "Vector database not available", "status": "disabled"} logger.info(f"Adding file to vector database: {filename}") if metadata is None: metadata = {} # Generate unique file ID file_id = self._generate_file_id(filename, content) # Remove existing entries for this file await self.remove_file(filename) # Chunk the content chunks = self._chunk_text(content) logger.debug(f"Split file into {len(chunks)} chunks") # Prepare data for ChromaDB documents = [] metadatas = [] ids = [] for i, chunk in enumerate(chunks): chunk_id = f"{file_id}_chunk_{i}" chunk_metadata = { "filename": filename, "chunk_index": i, "total_chunks": len(chunks), "file_id": file_id, **metadata } documents.append(chunk) metadatas.append(chunk_metadata) ids.append(chunk_id) # Add to ChromaDB (embeddings are generated automatically) try: self.collection.add( documents=documents, metadatas=metadatas, ids=ids ) except Exception as e: logger.error(f"ChromaDB add operation failed: {str(e)}") import traceback logger.error(f"ChromaDB traceback: {traceback.format_exc()}") raise logger.info(f"File '{filename}' added with {len(chunks)} chunks") return { "filename": filename, "file_id": file_id, "chunks_added": len(chunks), "status": "success" } async def remove_file(self, filename: str) -> int: """Remove all chunks of a file from the vector database.""" if not getattr(self, 'available', False): return 0 try: # Find all chunks for this file results = self.collection.get( where={"filename": filename} ) if results['ids']: self.collection.delete(ids=results['ids']) logger.info(f"Removed {len(results['ids'])} chunks for file '{filename}'") return len(results['ids']) else: logger.debug(f"No chunks found for file '{filename}'") return 0 except Exception as e: logger.error(f"Error removing file '{filename}': {str(e)}") return 0 async def search_files(self, query: str, max_results: int = 5) -> List[Dict[str, Any]]: """Search for relevant file chunks based on semantic similarity.""" if not getattr(self, 'available', False): return [] logger.debug(f"Searching vector database for: {query}") # Search the collection results = self.collection.query( query_texts=[query], n_results=min(max_results, self.collection.count()) ) # Format results search_results = [] if results['documents'] and results['documents'][0]: for i in range(len(results['documents'][0])): result = { "content": results['documents'][0][i], "metadata": results['metadatas'][0][i], "similarity_score": 1 - results['distances'][0][i], # Convert distance to similarity "id": results['ids'][0][i] } search_results.append(result) logger.debug(f"Found {len(search_results)} relevant chunks") return search_results async def list_files(self) -> List[Dict[str, Any]]: """List all files in the vector database.""" if not getattr(self, 'available', False): return [] try: # Get all unique filenames all_results = self.collection.get() files = {} for metadata in all_results['metadatas']: filename = metadata['filename'] if filename not in files: files[filename] = { "filename": filename, "file_id": metadata['file_id'], "total_chunks": metadata['total_chunks'], "added_date": metadata.get('added_date', 'unknown') } return list(files.values()) except Exception as e: logger.error(f"Error listing files: {str(e)}") return [] class FileTools: """High-level file and vector database tool operations.""" def __init__(self, vector_db_manager: VectorDatabaseManager): self.vector_db_manager = vector_db_manager async def add_file_to_vector_db(self, filename: str, content: str, metadata: Dict[str, Any] = None) -> Dict[str, Any]: """Add a file to the vector database for semantic search and RAG.""" if not getattr(self.vector_db_manager, 'available', False): return {"error": "Vector database not available", "status": "disabled"} if metadata is None: metadata = {} metadata["added_date"] = datetime.now().isoformat() logger.info(f"Adding file to vector database - Filename: {filename}, Content length: {len(content)}") result = await self.vector_db_manager.add_file(filename, content, metadata) if result.get("status") == "success": logger.info(f"File added successfully - {result['chunks_added']} chunks") else: logger.warning(f"File add failed - {result}") return result async def search_vector_db(self, query: str, max_results: int = 5) -> Dict[str, Any]: """Search the vector database for relevant file content using semantic similarity.""" if not getattr(self.vector_db_manager, 'available', False): return {"error": "Vector database not available", "status": "disabled", "results": [], "total_results": 0} logger.info(f"Searching vector database - Query: {query}, Max results: {max_results}") search_results = await self.vector_db_manager.search_files(query, max_results) response_data = { "query": query, "results": search_results, "total_results": len(search_results) } logger.info(f"Found {len(search_results)} relevant results") return response_data async def list_vector_files(self) -> Dict[str, Any]: """List all files stored in the vector database.""" if not getattr(self.vector_db_manager, 'available', False): return {"error": "Vector database not available", "files": [], "total_files": 0} logger.info("Listing files in vector database") files = await self.vector_db_manager.list_files() response_data = { "files": files, "total_files": len(files) } logger.info(f"Listed {len(files)} files") return response_data async def remove_file_from_vector_db(self, filename: str) -> Dict[str, Any]: """Remove a file from the vector database.""" if not getattr(self.vector_db_manager, 'available', False): return {"error": "Vector database not available", "status": "disabled"} logger.info(f"Removing file from vector database - Filename: {filename}") chunks_removed = await self.vector_db_manager.remove_file(filename) response_data = { "filename": filename, "chunks_removed": chunks_removed, "status": "success" if chunks_removed > 0 else "no_file_found" } logger.info(f"Removed {chunks_removed} chunks") return response_data

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/iamayuppie/AnyDbApp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server