Skip to main content
Glama
file_processor.py21.4 kB
""" File processor component for scanning, parsing, and indexing files """ import fnmatch import hashlib import json import logging import os import time from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import Dict, List, Optional, Set, Tuple from tqdm import tqdm logger = logging.getLogger("files-db-mcp.file_processor") class FileProcessor: """ Processes files in a project directory for indexing in the vector database """ def __init__(self, vector_search, project_path: str, ignore_patterns: List[str], data_dir: str): self.vector_search = vector_search self.project_path = Path(project_path) self.ignore_patterns = ignore_patterns.copy() # Create a copy to avoid modifying the original self.data_dir = Path(data_dir) # Indexing state self.indexing_in_progress = False self.files_indexed = 0 self.total_files = 0 self.last_indexed_files: Set[str] = set() # Performance metrics self.last_batch_speed = 0.0 # Enhanced file tracking: file path -> {hash, mtime, size} self.file_metadata: Dict[str, Dict[str, any]] = {} # Create data directory if it doesn't exist os.makedirs(self.data_dir, exist_ok=True) # Add state file to ignore patterns state_file_rel = os.path.relpath(str(self.data_dir / "file_processor_state.json"), self.project_path) if state_file_rel not in self.ignore_patterns: self.ignore_patterns.append(state_file_rel) # Also add a pattern for the directory if it's inside the project state_dir_rel = os.path.relpath(str(self.data_dir), self.project_path) if not state_dir_rel.startswith('..'): self.ignore_patterns.append(f"{state_dir_rel}/**") # Load state and file metadata if available self.load_state() def load_state(self): """Load state from disk""" state_file = self.data_dir / "file_processor_state.json" if state_file.exists(): try: with open(state_file, "r") as f: state = json.load(f) self.last_indexed_files = set(state.get("indexed_files", [])) self.file_metadata = state.get("file_metadata", {}) logger.info( f"Loaded state: {len(self.last_indexed_files)} previously indexed files" ) except Exception as e: logger.error(f"Error loading state: {e!s}") # Initialize empty metadata if loading fails self.file_metadata = {} def save_state(self): """Save state to disk""" state_file = self.data_dir / "file_processor_state.json" try: with open(state_file, "w") as f: json.dump( { "indexed_files": list(self.last_indexed_files), "file_metadata": self.file_metadata, "last_updated": time.time() }, f, indent=2 # Pretty-print for readability ) logger.info(f"Saved state: {len(self.last_indexed_files)} indexed files") except Exception as e: logger.error(f"Error saving state: {e!s}") def is_ignored(self, file_path: str) -> bool: """Check if a file should be ignored""" return any(fnmatch.fnmatch(file_path, pattern) for pattern in self.ignore_patterns) def compute_file_hash(self, file_path: str) -> Optional[str]: """ Compute SHA-256 hash of file content Args: file_path: Absolute path to the file Returns: Hex digest of hash or None if file couldn't be read """ try: hasher = hashlib.sha256() with open(file_path, 'rb') as f: # Read file in chunks to handle large files efficiently for chunk in iter(lambda: f.read(65536), b''): hasher.update(chunk) return hasher.hexdigest() except Exception as e: logger.warning(f"Failed to compute hash for {file_path}: {e!s}") return None def get_file_stats(self, abs_path: str) -> Tuple[Optional[float], Optional[int], Optional[str]]: """ Get file modification time, size, and hash Args: abs_path: Absolute path to file Returns: Tuple of (mtime, size, hash) or None values if stats couldn't be retrieved """ try: # Get file stats stat_result = os.stat(abs_path) mtime = stat_result.st_mtime size = stat_result.st_size # Only compute hash for text files and limit by size # to avoid performance issues with large files if size < 10 * 1024 * 1024: # 10 MB limit file_hash = self.compute_file_hash(abs_path) else: # For large files, use size+mtime instead of content hash file_hash = f"size:{size}_mtime:{mtime}" return mtime, size, file_hash except Exception as e: logger.warning(f"Failed to get stats for {abs_path}: {e!s}") return None, None, None def file_needs_update(self, rel_path: str) -> bool: """ Check if a file needs to be reindexed based on its metadata Args: rel_path: Relative path to the file Returns: True if file needs updating, False otherwise """ abs_path = os.path.join(self.project_path, rel_path) # If file doesn't exist, it definitely doesn't need updating if not os.path.isfile(abs_path): return False # If file is not in metadata or not in indexed files, it needs updating if rel_path not in self.file_metadata or rel_path not in self.last_indexed_files: return True # Get current file stats curr_mtime, curr_size, curr_hash = self.get_file_stats(abs_path) if curr_mtime is None: # If we can't get stats, assume it needs updating return True # Get stored metadata metadata = self.file_metadata.get(rel_path, {}) stored_mtime = metadata.get('mtime') stored_size = metadata.get('size') stored_hash = metadata.get('hash') # If hash exists and is unchanged, file is the same if stored_hash and curr_hash and stored_hash == curr_hash: return False # If size and mtime match, probably unchanged if (stored_size == curr_size and stored_mtime == curr_mtime and abs(curr_mtime - stored_mtime) < 0.001): # mtime precision can vary return False # Otherwise, consider the file changed return True def get_file_list(self) -> List[str]: """Get list of files to index""" file_list = [] logger.info(f"Scanning project directory: {self.project_path}") # Walk through all files in the project recursively for root, dirs, files in os.walk(self.project_path): # Filter directories in-place to avoid traversing ignored directories dirs[:] = [d for d in dirs if not self.is_ignored(d)] for file in files: file_path = os.path.join(root, file) rel_path = os.path.relpath(file_path, self.project_path) # Skip ignored files if self.is_ignored(rel_path): continue file_list.append(rel_path) logger.info(f"Found {len(file_list)} files in project") return file_list def get_modified_files(self) -> Tuple[List[str], List[str], int]: """ Get lists of new/modified files and calculate files to remove Returns: Tuple of (files_to_update, files_to_remove, total_files) """ current_files = set(self.get_file_list()) total_files = len(current_files) # Files that have been deleted since last indexing files_to_remove = list(self.last_indexed_files - current_files) # Files that might need updating (new or modified) files_to_check = list(current_files) # Filter to only get files that actually need updating based on metadata files_to_update = [f for f in files_to_check if self.file_needs_update(f)] logger.info(f"Found {len(files_to_update)} files that need indexing") logger.info(f"Found {len(files_to_remove)} files that have been deleted") return files_to_update, files_to_remove, total_files def process_file(self, rel_path: str) -> bool: """Process a single file for indexing""" try: file_path = os.path.join(self.project_path, rel_path) # Check if file exists and is readable if not os.path.isfile(file_path) or not os.access(file_path, os.R_OK): logger.warning(f"File {rel_path} is not accessible") return False # Get file metadata mtime, size, file_hash = self.get_file_stats(file_path) # Read file content try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() except UnicodeDecodeError: # Skip binary files logger.warning(f"File {rel_path} appears to be binary, skipping content extraction") content = f"[Binary file: {rel_path}]" # Simple content chunking for large files # If content is too large, truncate it to 5000 characters to avoid performance issues if len(content) > 5000: logger.debug(f"File {rel_path} is large ({len(content)} chars), truncating for indexing") content = content[:5000] + f"\n\n[Truncated: file is {len(content)} characters]" # Add to vector search engine with metadata metadata = { "mtime": mtime, "size": size, "hash": file_hash, "indexed_at": time.time() } # Add to vector search engine with metadata self.vector_search.index_file(rel_path, content, metadata) # Update our tracking info self.last_indexed_files.add(rel_path) self.file_metadata[rel_path] = metadata return True except Exception as e: logger.error(f"Error processing file {rel_path}: {e!s}") return False def index_files(self, incremental: bool = True): """ Index files in the project directory Args: incremental: Whether to use incremental indexing (default: True) """ if self.indexing_in_progress: logger.warning("Indexing already in progress") return self.indexing_in_progress = True try: start_time = time.time() if incremental and self.last_indexed_files: # Get files that need updating files_to_update, files_to_remove, self.total_files = self.get_modified_files() # Remove files that have been deleted if files_to_remove: logger.info(f"Removing {len(files_to_remove)} deleted files from index") for rel_path in files_to_remove: self.vector_search.delete_file(rel_path) if rel_path in self.last_indexed_files: self.last_indexed_files.remove(rel_path) if rel_path in self.file_metadata: del self.file_metadata[rel_path] file_list = files_to_update logger.info(f"Running incremental indexing for {len(file_list)} modified files") else: # Full indexing file_list = self.get_file_list() self.total_files = len(file_list) logger.info(f"Running full indexing for {len(file_list)} files") self.files_indexed = 0 # Process files in optimized batches max_batch_size = 50 # Define maximum batch size for each batch operation batch_workers = 8 # Use more worker threads for processing # Break down the file list into batches batches = [file_list[i:i + max_batch_size] for i in range(0, len(file_list), max_batch_size)] logger.info(f"Processing {len(file_list)} files in {len(batches)} batches of maximum {max_batch_size} files") # Process each batch with multiple workers batch_processed = 0 for batch in tqdm(batches, desc="Indexing batches"): batch_files = [] batch_contents = [] batch_metadata = [] # First, read all files in the batch in parallel with ThreadPoolExecutor(max_workers=batch_workers) as executor: # Map function to process files in parallel def read_file(rel_path): try: file_path = os.path.join(self.project_path, rel_path) # Check if file exists and is readable if not os.path.isfile(file_path) or not os.access(file_path, os.R_OK): logger.warning(f"File {rel_path} is not accessible") return None # Get file metadata mtime, size, file_hash = self.get_file_stats(file_path) # Read file content try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() except UnicodeDecodeError: # Skip binary files logger.warning(f"File {rel_path} appears to be binary, skipping content extraction") content = f"[Binary file: {rel_path}]" # Simple content chunking for large files if len(content) > 5000: logger.debug(f"File {rel_path} is large ({len(content)} chars), truncating for indexing") content = content[:5000] + f"\n\n[Truncated: file is {len(content)} characters]" # Prepare metadata metadata = { "mtime": mtime, "size": size, "hash": file_hash, "indexed_at": time.time() } return (rel_path, content, metadata) except Exception as e: logger.error(f"Error reading file {rel_path}: {e!s}") return None # Process all files in batch results = list(executor.map(read_file, batch)) # Filter out None results and prepare batch data for result in results: if result: rel_path, content, metadata = result batch_files.append(rel_path) batch_contents.append(content) batch_metadata.append(metadata) # Now process all files in a single batch operation if any valid files exist if batch_files: try: # Use the new batch indexing functionality for maximum performance batch_start_time = time.time() logger.info(f"Processing batch {batch_processed+1}/{len(batches)} with {len(batch_files)} files") # Use the batch index method instead of individual indexing success_list = self.vector_search.batch_index_files(batch_files, batch_contents, batch_metadata) # Update tracking variables based on success list for i, (success, rel_path, metadata) in enumerate(zip(success_list, batch_files, batch_metadata)): if success: self.files_indexed += 1 self.last_indexed_files.add(rel_path) self.file_metadata[rel_path] = metadata batch_processed += 1 batch_time = time.time() - batch_start_time files_per_sec = len(batch_files) / batch_time if batch_time > 0 else 0 # Store the batch speed for the health endpoint self.last_batch_speed = files_per_sec # Report progress after each batch files_processed = min(self.files_indexed, len(file_list)) progress_pct = (files_processed / len(file_list) * 100) if file_list else 100.0 logger.info( f"Indexing progress: {files_processed}/{len(file_list)} files ({progress_pct:.1f}%), " f"batch {batch_processed}/{len(batches)}, speed: {files_per_sec:.2f} files/sec" ) except Exception as e: logger.error(f"Error processing batch: {e!s}") # Save state after indexing self.save_state() elapsed_time = time.time() - start_time logger.info(f"Indexing complete: {self.files_indexed} files indexed in {elapsed_time:.2f} seconds") except Exception as e: logger.error(f"Error during indexing: {e!s}") finally: self.indexing_in_progress = False def handle_file_change(self, event_type: str, file_path: str): """Handle file change event from file watcher""" try: # Convert to relative path rel_path = os.path.relpath(file_path, self.project_path) # Skip ignored files if self.is_ignored(rel_path): return # Skip the state file itself to prevent infinite update loops state_file_path = str(self.data_dir / "file_processor_state.json") if os.path.abspath(file_path) == os.path.abspath(state_file_path): logger.debug(f"Ignoring change to state file: {file_path}") return logger.info(f"File change detected: {event_type} - {rel_path}") if event_type in ["created", "modified"]: # Add or update file self.process_file(rel_path) elif event_type == "deleted": # Remove file from index self.vector_search.delete_file(rel_path) if rel_path in self.last_indexed_files: self.last_indexed_files.remove(rel_path) if rel_path in self.file_metadata: del self.file_metadata[rel_path] # Save state after change self.save_state() except Exception as e: logger.error(f"Error handling file change {event_type} - {file_path}: {e!s}") def is_indexing_complete(self) -> bool: """Check if initial indexing is complete""" return not self.indexing_in_progress def get_indexing_progress(self) -> float: """Get indexing progress as a percentage""" if self.total_files == 0: return 100.0 return (self.files_indexed / self.total_files) * 100.0 def get_files_indexed(self) -> int: """Get number of files indexed""" return len(self.last_indexed_files) def get_total_files(self) -> int: """Get total number of files to index""" return self.total_files def schedule_indexing(self, incremental: bool = True): """ Schedule indexing to run in a background thread Args: incremental: Whether to use incremental indexing (default: True) """ import threading thread = threading.Thread(target=lambda: self.index_files(incremental=incremental)) thread.daemon = True thread.start()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/randomm/files-db-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server