Skip to main content
Glama
metadata_store.py•17.1 kB
"""SQLite-based metadata storage for repositories and skills. Design Decision: SQLite Replacement for JSON Storage Rationale: Migrating from JSON to SQLite provides O(1) indexed lookups, transaction safety, and relational queries without adding external dependencies (sqlite3 is in Python standard library). Trade-offs: - Performance: O(1) indexed queries vs. O(n) JSON linear scans - Complexity: Slightly more code than JSON, but standard SQL patterns - Dependencies: None (sqlite3 is stdlib) - Migration: Automatic migration from JSON preserves backward compatibility Scalability: Handles 10K+ repositories efficiently. For >100K repositories, consider connection pooling and read replicas. Error Handling: - IntegrityError: Duplicate primary keys or foreign key violations - OperationalError: Database locked, disk full, corrupted database - All operations use transactions for atomicity """ import logging import sqlite3 from collections.abc import Iterator from contextlib import contextmanager from datetime import datetime from pathlib import Path from mcp_skills.models.repository import Repository logger = logging.getLogger(__name__) class MetadataStore: """SQLite-based metadata storage for repositories and skills. Provides O(1) indexed access to repository metadata with transaction safety and relational integrity. Replaces JSON file storage. Performance: - Time Complexity: O(1) for get operations via indexed queries - Space Complexity: O(n) for n repositories - Transaction Overhead: ~1-2ms per operation (negligible) Schema Features: - Foreign keys ensure referential integrity - Indexes on priority, category, and repo_id for fast lookups - ON DELETE CASCADE prevents orphaned skill records """ SCHEMA_VERSION = 1 def __init__(self, db_path: Path | None = None) -> None: """Initialize metadata store. Args: db_path: Path to SQLite database file. Defaults to ~/.mcp-skillset/metadata.db Error Handling: - Database creation failure: Propagates OperationalError - Schema initialization failure: Rolls back transaction """ self.db_path = db_path or (Path.home() / ".mcp-skillset" / "metadata.db") self.db_path.parent.mkdir(parents=True, exist_ok=True) # Initialize database schema self._init_db() def _init_db(self) -> None: """Initialize database schema if not exists. Creates tables with indexes and enables foreign key constraints. Uses IF NOT EXISTS to allow safe re-initialization. Design Decision: Enable Foreign Keys SQLite disables foreign keys by default for backward compatibility. We explicitly enable them to enforce referential integrity and cascade deletes when repositories are removed. """ with self._get_connection() as conn: # Enable foreign key constraints conn.execute("PRAGMA foreign_keys = ON") # Create repositories table conn.execute( """ CREATE TABLE IF NOT EXISTS repositories ( id TEXT PRIMARY KEY, url TEXT NOT NULL, local_path TEXT NOT NULL, priority INTEGER DEFAULT 0, last_updated TIMESTAMP, skill_count INTEGER DEFAULT 0, license TEXT ) """ ) # Create skills table for future use conn.execute( """ CREATE TABLE IF NOT EXISTS skills ( id TEXT PRIMARY KEY, name TEXT NOT NULL, description TEXT, category TEXT, repository_id TEXT, file_path TEXT, version TEXT, author TEXT, FOREIGN KEY (repository_id) REFERENCES repositories(id) ON DELETE CASCADE ) """ ) # Create skill tags table for many-to-many relationship conn.execute( """ CREATE TABLE IF NOT EXISTS skill_tags ( skill_id TEXT, tag TEXT, FOREIGN KEY (skill_id) REFERENCES skills(id) ON DELETE CASCADE, PRIMARY KEY (skill_id, tag) ) """ ) # Create skill dependencies table for skill relationships conn.execute( """ CREATE TABLE IF NOT EXISTS skill_dependencies ( skill_id TEXT, dependency_id TEXT, FOREIGN KEY (skill_id) REFERENCES skills(id) ON DELETE CASCADE, PRIMARY KEY (skill_id, dependency_id) ) """ ) # Create indexes for fast lookups conn.execute( """ CREATE INDEX IF NOT EXISTS idx_repos_priority ON repositories(priority DESC) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_skills_category ON skills(category) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_skills_repo ON skills(repository_id) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_skill_tags_tag ON skill_tags(tag) """ ) conn.commit() @contextmanager def _get_connection(self) -> Iterator[sqlite3.Connection]: """Get database connection with context manager. Yields: SQLite connection with row_factory set for dict-like access Design Decision: Context Manager for Connections Rationale: Use context manager to ensure connections are properly closed and transactions are committed/rolled back automatically. Sets row_factory to sqlite3.Row for dict-like column access. Error Handling: - Connection errors propagate to caller - Transactions auto-rollback on exception - Connection always closed in finally block """ conn = sqlite3.connect(str(self.db_path)) conn.row_factory = sqlite3.Row try: yield conn finally: conn.close() # Repository CRUD Operations def add_repository(self, repository: Repository) -> None: """Add new repository to metadata store. Args: repository: Repository object to persist Raises: sqlite3.IntegrityError: If repository ID already exists Error Handling: - Duplicate ID: Raises IntegrityError (caller should check first) - Database locked: Retries handled by SQLite default (5 seconds) - Transaction failure: Automatically rolled back """ with self._get_connection() as conn: conn.execute( """ INSERT INTO repositories (id, url, local_path, priority, last_updated, skill_count, license) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( repository.id, repository.url, str(repository.local_path), repository.priority, repository.last_updated.isoformat(), repository.skill_count, repository.license, ), ) conn.commit() logger.debug(f"Added repository {repository.id} to metadata store") def get_repository(self, repo_id: str) -> Repository | None: """Get repository by ID. Args: repo_id: Repository identifier Returns: Repository object or None if not found Performance: - Time Complexity: O(1) via primary key index - No table scan required """ with self._get_connection() as conn: cursor = conn.execute("SELECT * FROM repositories WHERE id = ?", (repo_id,)) row = cursor.fetchone() if not row: return None return self._row_to_repository(row) def list_repositories(self) -> list[Repository]: """List all repositories sorted by priority. Returns: List of Repository objects sorted by priority (highest first) Performance: - Time Complexity: O(n log n) due to ORDER BY - Uses idx_repos_priority index for optimization - For current scale (<100 repos), this is <1ms Index Optimization: The ORDER BY priority DESC clause uses the idx_repos_priority index for efficient sorting without full table scan. """ with self._get_connection() as conn: cursor = conn.execute("SELECT * FROM repositories ORDER BY priority DESC") rows = cursor.fetchall() return [self._row_to_repository(row) for row in rows] def update_repository(self, repository: Repository) -> None: """Update existing repository metadata. Args: repository: Repository with updated fields Raises: ValueError: If repository ID not found Error Handling: - Repository not found: Raises ValueError - Transaction failure: Automatically rolled back """ with self._get_connection() as conn: cursor = conn.execute( """ UPDATE repositories SET url = ?, local_path = ?, priority = ?, last_updated = ?, skill_count = ?, license = ? WHERE id = ? """, ( repository.url, str(repository.local_path), repository.priority, repository.last_updated.isoformat(), repository.skill_count, repository.license, repository.id, ), ) if cursor.rowcount == 0: raise ValueError(f"Repository not found: {repository.id}") conn.commit() logger.debug(f"Updated repository {repository.id} in metadata store") def delete_repository(self, repo_id: str) -> None: """Delete repository and cascade to related skills. Args: repo_id: Repository identifier to delete Raises: ValueError: If repository not found Data Consistency: - Uses ON DELETE CASCADE to remove related skills automatically - Transaction ensures atomic deletion (all or nothing) - No orphaned skill records possible """ with self._get_connection() as conn: cursor = conn.execute("DELETE FROM repositories WHERE id = ?", (repo_id,)) if cursor.rowcount == 0: raise ValueError(f"Repository not found: {repo_id}") conn.commit() logger.debug(f"Deleted repository {repo_id} from metadata store") # Skill CRUD Operations (for future use) def add_skill(self, skill_id: str, skill_data: dict) -> None: """Add skill to metadata store (placeholder for future use). Args: skill_id: Unique skill identifier skill_data: Dictionary with skill fields Note: Full skill management will be implemented in Phase 1 Task 4 """ raise NotImplementedError("Skill management not yet implemented") def get_skill(self, skill_id: str) -> dict | None: """Get skill by ID (placeholder for future use). Args: skill_id: Skill identifier Returns: Skill data dictionary or None if not found Note: Full skill management will be implemented in Phase 1 Task 4 """ raise NotImplementedError("Skill management not yet implemented") def list_skills(self, repo_id: str | None = None) -> list[dict]: """List skills, optionally filtered by repository (placeholder). Args: repo_id: Optional repository ID to filter by Returns: List of skill data dictionaries Note: Full skill management will be implemented in Phase 1 Task 4 """ raise NotImplementedError("Skill management not yet implemented") def delete_skills_by_repository(self, repo_id: str) -> None: """Delete all skills for a repository (placeholder). Args: repo_id: Repository identifier Note: This is automatically handled by ON DELETE CASCADE. Placeholder for explicit cleanup if needed. """ # Cascade delete handles this automatically pass # Migration from JSON def migrate_from_json(self, json_path: Path) -> int: """Migrate repository data from JSON file to SQLite. Args: json_path: Path to repos.json file Returns: Number of repositories migrated Error Handling: - JSON parse errors: Logs error and returns 0 - Duplicate entries: Skips and logs warning - Transaction failure: Rolls back all changes (atomic migration) Migration Strategy: - Atomic migration: Either all repos migrate or none - Preserves all repository metadata - Idempotent: Safe to run multiple times (skips duplicates) """ if not json_path.exists(): logger.warning(f"JSON file not found for migration: {json_path}") return 0 import json try: with open(json_path) as f: data = json.load(f) repositories = [] for repo_data in data.get("repositories", []): try: repo = Repository.from_dict(repo_data) repositories.append(repo) except (KeyError, ValueError) as e: logger.warning(f"Skipping malformed repository entry: {e}") # Atomic migration using transaction with self._get_connection() as conn: migrated_count = 0 for repo in repositories: try: conn.execute( """ INSERT INTO repositories (id, url, local_path, priority, last_updated, skill_count, license) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( repo.id, repo.url, str(repo.local_path), repo.priority, repo.last_updated.isoformat(), repo.skill_count, repo.license, ), ) migrated_count += 1 except sqlite3.IntegrityError: # Skip duplicate entries logger.debug(f"Skipping duplicate repository: {repo.id}") conn.commit() logger.info( f"Migrated {migrated_count}/{len(repositories)} repositories " f"from JSON to SQLite" ) return migrated_count except (json.JSONDecodeError, OSError) as e: logger.error(f"Failed to migrate from JSON: {e}") return 0 def has_data(self) -> bool: """Check if database has any repository data. Returns: True if database contains at least one repository Usage: Used to determine if migration is needed """ with self._get_connection() as conn: cursor = conn.execute("SELECT COUNT(*) FROM repositories") result = cursor.fetchone() count: int = result[0] if result else 0 return count > 0 # Helper Methods def _row_to_repository(self, row: sqlite3.Row) -> Repository: """Convert SQLite row to Repository object. Args: row: SQLite Row object from query Returns: Repository instance with data from row """ return Repository( id=row["id"], url=row["url"], local_path=Path(row["local_path"]), priority=row["priority"], last_updated=datetime.fromisoformat(row["last_updated"]), skill_count=row["skill_count"], license=row["license"], )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bobmatnyc/mcp-skills'

If you have feedback or need assistance with the MCP directory API, please join our Discord server