Skip to main content
Glama

MCP Power - Knowledge Search Server

by wspotter
indexer.pyโ€ข8.47 kB
#!/usr/bin/env python3 """Dataset indexing utilities for MCPower.""" from __future__ import annotations import json import time from datetime import datetime from pathlib import Path from typing import Iterable, List, Tuple import numpy as np import typer app = typer.Typer(help="Create FAISS datasets from local documents") SUPPORTED_EXTENSIONS = {".txt", ".md"} DEFAULT_MODEL = "sentence-transformers/all-MiniLM-L6-v2" class DatasetIndexerError(Exception): """Raised when the dataset indexer encounters an error.""" def validate_dataset_id(dataset_id: str) -> None: if not dataset_id or not dataset_id.isascii(): raise DatasetIndexerError("Dataset ID must be ASCII lowercase alphanumeric with hyphens") if any(ch for ch in dataset_id if ch not in "abcdefghijklmnopqrstuvwxyz0123456789-"): raise DatasetIndexerError("Dataset ID must be lowercase alphanumeric with hyphens only") def discover_files(source: Path) -> Iterable[Path]: if source.is_file(): if source.suffix.lower() in SUPPORTED_EXTENSIONS: yield source return for path in sorted(source.rglob('*')): if path.is_file() and path.suffix.lower() in SUPPORTED_EXTENSIONS: yield path def read_text(path: Path) -> str: try: return path.read_text(encoding='utf-8') except UnicodeDecodeError: return path.read_text(encoding='utf-8', errors='ignore') def extract_title(path: Path, text: str) -> str: for line in text.splitlines(): stripped = line.strip() if not stripped: continue if path.suffix.lower() == '.md' and stripped.startswith('#'): return stripped.lstrip('#').strip() return stripped[:120] return path.stem.replace('-', ' ').title() def chunk_text(text: str, chunk_size: int, chunk_overlap: int) -> List[str]: words = text.split() if not words: return [] if chunk_size <= 0: raise DatasetIndexerError('Chunk size must be greater than zero') if chunk_overlap >= chunk_size: chunk_overlap = chunk_size // 4 step = max(chunk_size - chunk_overlap, 1) chunks: List[str] = [] for start in range(0, len(words), step): chunk_words = words[start:start + chunk_size] if not chunk_words: continue chunk = ' '.join(chunk_words) chunks.append(chunk) if start + chunk_size >= len(words): break return chunks def safe_relative(path: Path, base: Path) -> str: try: return str(path.relative_to(base)) except ValueError: return path.name def _require_faiss(): try: import faiss # type: ignore except ImportError as exc: # pragma: no cover - runtime dependency check raise DatasetIndexerError('faiss-cpu is not installed') from exc return faiss def build_index( texts: List[str], model_name: str, batch_size: int = 32 ) -> Tuple[np.ndarray, int]: try: from sentence_transformers import SentenceTransformer except ImportError as exc: # pragma: no cover - runtime dependency check raise DatasetIndexerError('sentence-transformers is not installed') from exc model = SentenceTransformer(model_name) embeddings = model.encode( texts, batch_size=batch_size, convert_to_numpy=True, show_progress_bar=False ) faiss = _require_faiss() embeddings = embeddings.astype('float32') faiss.normalize_L2(embeddings) dimension = embeddings.shape[1] return embeddings, dimension def write_index(path: Path, embeddings: np.ndarray) -> None: faiss = _require_faiss() dimension = embeddings.shape[1] faiss_index = faiss.IndexFlatIP(dimension) faiss_index.add(embeddings) path.parent.mkdir(parents=True, exist_ok=True) faiss.write_index(faiss_index, str(path)) @app.command() def index( source: Path = typer.Option(..., '--source', exists=True, file_okay=True, dir_okay=True, readable=True, help='Source file or directory'), dataset_id: str = typer.Option(..., '--dataset-id', help='Dataset identifier (lowercase, hyphenated)'), output: Path = typer.Option(Path('./datasets'), '--output', help='Output datasets directory'), name: str = typer.Option(..., '--name', help='Dataset display name'), description: str = typer.Option('', '--description', help='Dataset description'), model: str = typer.Option(DEFAULT_MODEL, '--model', help='Embedding model name'), chunk_size: int = typer.Option(512, '--chunk-size', min=64, max=4096, help='Number of words per chunk'), chunk_overlap: int = typer.Option(64, '--chunk-overlap', min=0, max=1024, help='Number of overlapping words between chunks') ) -> None: start_time = time.time() try: validate_dataset_id(dataset_id) files = list(discover_files(source)) if not files: raise DatasetIndexerError('No supported files found to index') dataset_dir = output.joinpath(dataset_id) if dataset_dir.exists(): raise DatasetIndexerError(f"Dataset '{dataset_id}' already exists") dataset_dir.mkdir(parents=True, exist_ok=False) index_dir = dataset_dir.joinpath('index') metadata_path = dataset_dir.joinpath('metadata.json') manifest_path = dataset_dir.joinpath('manifest.json') documents: List[dict] = [] embedding_texts: List[str] = [] indexed_files = 0 total_chunks = 0 for file_path in files: text = read_text(file_path) if not text.strip(): continue chunks = chunk_text(text, chunk_size, chunk_overlap) if not chunks: continue indexed_files += 1 title = extract_title(file_path, text) rel_path = safe_relative(file_path, source) for idx, chunk in enumerate(chunks): doc_id = f"{dataset_id}-{total_chunks + 1}" snippet = chunk[:200].strip() documents.append({ 'id': doc_id, 'title': title, 'path': rel_path, 'content': chunk, 'snippet': snippet if len(snippet) < len(chunk) else snippet, 'chunk': idx + 1, 'chunk_total': len(chunks) }) embedding_texts.append(chunk) total_chunks += 1 if not embedding_texts: raise DatasetIndexerError('No text content available to index') embeddings, dimension = build_index(embedding_texts, model) write_index(index_dir.joinpath('docs.index'), embeddings) now_iso = datetime.utcnow().isoformat() + 'Z' metadata = { 'datasetId': dataset_id, 'name': name, 'description': description, 'model': model, 'createdAt': now_iso, 'stats': { 'totalFiles': indexed_files, 'totalChunks': len(documents), 'embeddingDimensions': dimension, 'indexedAt': now_iso }, 'documents': documents } metadata_path.write_text(json.dumps(metadata, indent=2), encoding='utf-8') manifest = { 'id': dataset_id, 'name': name, 'description': description or f'Dataset created from {source}', 'index': 'index', 'metadata': 'metadata.json', 'defaultTopK': 5 } manifest_path.write_text(json.dumps(manifest, indent=2), encoding='utf-8') result = { 'datasetId': dataset_id, 'documents': metadata['stats']['totalFiles'], 'chunks': metadata['stats']['totalChunks'], 'model': model, 'embeddingDimensions': dimension, 'indexPath': str(index_dir.resolve()), 'metadataPath': str(metadata_path.resolve()), 'durationMs': int((time.time() - start_time) * 1000) } print(json.dumps(result)) except DatasetIndexerError as error: typer.echo(json.dumps({'error': str(error)}), err=True) raise typer.Exit(code=1) except Exception as error: # pragma: no cover - safeguard typer.echo(json.dumps({'error': str(error)}), err=True) raise typer.Exit(code=1) if __name__ == '__main__': app()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wspotter/mcpower'

If you have feedback or need assistance with the MCP directory API, please join our Discord server