Skip to main content
Glama
csv_manager.py27 kB
""" CSV file operations and utilities. """ import os import pandas as pd import shutil from pathlib import Path from typing import Dict, List, Any, Optional, Union from datetime import datetime import logging logger = logging.getLogger(__name__) class CSVManager: """Manages CSV file operations with safety checks and validation.""" def __init__(self, storage_path: str = ".", max_file_size_mb: int = 50, backup_enabled: bool = True): self.storage_path = Path(storage_path) self.max_file_size_mb = max_file_size_mb self.backup_enabled = backup_enabled self.max_file_size_bytes = max_file_size_mb * 1024 * 1024 # Enable absolute path support by default for better filesystem access self.support_absolute_paths = os.getenv("CSV_SUPPORT_ABSOLUTE_PATHS", "true").lower() == "true" # Ensure storage directory exists self.storage_path.mkdir(parents=True, exist_ok=True) # Create backup directory if backups are enabled if self.backup_enabled: self.backup_path = self.storage_path / ".csv_backups" self.backup_path.mkdir(parents=True, exist_ok=True) def _validate_file_path(self, filepath: Path) -> None: """Validate file path for security and safety.""" if not self.support_absolute_paths: return # Check if path is absolute if filepath.is_absolute(): # Ensure the path is within reasonable bounds (not system directories) try: # Resolve any symlinks to get the real path real_path = filepath.resolve() # Check if path is in common system directories (add more as needed) system_dirs = [] # Windows system directories if os.name == 'nt': system_dirs.extend([ Path("C:/Windows"), Path("C:/Program Files"), Path("C:/Program Files (x86)"), Path("C:/System32"), Path("C:/Users/Default"), Path("C:/Users/Public"), Path("C:/Windows/System32"), Path("C:/Windows/SysWOW64") ]) else: # Unix/Linux system directories system_dirs.extend([ Path("/etc"), Path("/var"), Path("/usr"), Path("/bin"), Path("/sbin"), Path("/boot"), Path("/dev"), Path("/proc"), Path("/sys"), Path("/root") ]) for sys_dir in system_dirs: if sys_dir.exists() and real_path.is_relative_to(sys_dir): raise ValueError(f"Access to system directory '{sys_dir}' is not allowed") # Check if path is accessible (parent directories exist and are readable) parent = real_path.parent if not parent.exists(): raise ValueError(f"Parent directory '{parent}' does not exist") if not os.access(parent, os.R_OK | os.W_OK): raise ValueError(f"Parent directory '{parent}' is not accessible") except (OSError, RuntimeError) as e: raise ValueError(f"Invalid file path: {e}") def _get_file_path(self, filename: str) -> Path: """Get the full path for a CSV file, supporting both relative and absolute paths.""" if self.support_absolute_paths and os.path.isabs(filename): # Handle absolute file paths filepath = Path(filename) if not filepath.suffix: filepath = filepath.with_suffix('.csv') # Validate the path for security self._validate_file_path(filepath) return filepath else: # Handle relative paths (existing behavior) filename = Path(filename).name if not filename.endswith('.csv'): filename += '.csv' return self.storage_path / filename def _ensure_directory_exists(self, filepath: Path) -> None: """Ensure the directory for the file exists.""" if filepath.is_absolute(): # For absolute paths, ensure the parent directory exists parent_dir = filepath.parent if not parent_dir.exists(): try: parent_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Created directory: {parent_dir}") except Exception as e: raise ValueError(f"Failed to create directory '{parent_dir}': {e}") def _create_backup(self, filepath: Path) -> Optional[Path]: """Create a backup of the file before modification.""" if not self.backup_enabled or not filepath.exists(): return None timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_name = f"{filepath.stem}_{timestamp}.csv" backup_path = self.backup_path / backup_name try: shutil.copy2(filepath, backup_path) logger.info(f"Backup created: {backup_path}") return backup_path except Exception as e: logger.error(f"Failed to create backup: {e}") return None def _validate_file_size(self, filepath: Path) -> None: """Check if file size is within limits.""" if filepath.exists() and filepath.stat().st_size > self.max_file_size_bytes: raise ValueError(f"File size exceeds limit of {self.max_file_size_mb}MB") def create_csv(self, filename: str, headers: List[str], data: Optional[List[List[Any]]] = None) -> Dict[str, Any]: """Create a new CSV file with headers and optional initial data.""" filepath = self._get_file_path(filename) if filepath.exists(): raise FileExistsError(f"CSV file '{filename}' already exists") try: # Ensure directory exists for absolute paths self._ensure_directory_exists(filepath) # Create DataFrame df = pd.DataFrame(columns=headers) if data: df = pd.DataFrame(data, columns=headers) # Save to CSV df.to_csv(filepath, index=False) self._validate_file_size(filepath) logger.info(f"Created CSV file: {filepath}") return { "success": True, "filename": filename, "filepath": str(filepath), "rows_created": len(df), "columns": headers } except Exception as e: logger.error(f"Failed to create CSV: {e}") raise def read_csv(self, filename: str, limit: Optional[int] = None) -> Dict[str, Any]: """Read CSV file contents.""" filepath = self._get_file_path(filename) if not filepath.exists(): raise FileNotFoundError(f"CSV file '{filename}' not found") try: df = pd.read_csv(filepath) # Apply limit if specified if limit and limit > 0: df = df.head(limit) return { "success": True, "filename": filename, "data": df.to_dict('records'), "columns": list(df.columns), "total_rows": len(df), "shape": df.shape } except Exception as e: logger.error(f"Failed to read CSV: {e}") raise def update_csv(self, filename: str, row_index: int, column: str, value: Any) -> Dict[str, Any]: """Update a specific cell in the CSV file.""" filepath = self._get_file_path(filename) if not filepath.exists(): raise FileNotFoundError(f"CSV file '{filename}' not found") # Create backup self._create_backup(filepath) try: df = pd.read_csv(filepath) if row_index >= len(df): raise IndexError(f"Row index {row_index} out of range (max: {len(df)-1})") if column not in df.columns: raise ValueError(f"Column '{column}' not found in CSV") old_value = df.loc[row_index, column] df.loc[row_index, column] = value df.to_csv(filepath, index=False) self._validate_file_size(filepath) logger.info(f"Updated CSV file: {filepath}") return { "success": True, "filename": filename, "row_index": row_index, "column": column, "old_value": old_value, "new_value": value } except Exception as e: logger.error(f"Failed to update CSV: {e}") raise def add_row(self, filename: str, row_data: Dict[str, Any]) -> Dict[str, Any]: """Add a new row to the CSV file.""" filepath = self._get_file_path(filename) if not filepath.exists(): raise FileNotFoundError(f"CSV file '{filename}' not found") # Create backup self._create_backup(filepath) try: df = pd.read_csv(filepath) # Create new row DataFrame new_row = pd.DataFrame([row_data]) # Append the new row df = pd.concat([df, new_row], ignore_index=True) df.to_csv(filepath, index=False) self._validate_file_size(filepath) logger.info(f"Added row to CSV file: {filepath}") return { "success": True, "filename": filename, "row_added": row_data, "new_total_rows": len(df) } except Exception as e: logger.error(f"Failed to add row: {e}") raise def remove_row(self, filename: str, row_index: int) -> Dict[str, Any]: """Remove a specific row from the CSV file.""" filepath = self._get_file_path(filename) if not filepath.exists(): raise FileNotFoundError(f"CSV file '{filename}' not found") # Create backup self._create_backup(filepath) try: df = pd.read_csv(filepath) if row_index >= len(df): raise IndexError(f"Row index {row_index} out of range (max: {len(df)-1})") removed_row = df.iloc[row_index].to_dict() df = df.drop(df.index[row_index]) df.to_csv(filepath, index=False) logger.info(f"Removed row from CSV file: {filepath}") return { "success": True, "filename": filename, "removed_row": removed_row, "new_total_rows": len(df) } except Exception as e: logger.error(f"Failed to remove row: {e}") raise def get_info(self, filename: str) -> Dict[str, Any]: """Get basic information about a CSV file.""" filepath = self._get_file_path(filename) if not filepath.exists(): raise FileNotFoundError(f"CSV file '{filename}' not found") try: df = pd.read_csv(filepath) file_stat = filepath.stat() return { "success": True, "filename": filename, "filepath": str(filepath), "size_bytes": file_stat.st_size, "size_mb": round(file_stat.st_size / (1024 * 1024), 2), "modified_time": datetime.fromtimestamp(file_stat.st_mtime).isoformat(), "rows": len(df), "columns": len(df.columns), "column_names": list(df.columns), "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()}, "memory_usage": df.memory_usage(deep=True).sum() } except Exception as e: logger.error(f"Failed to get CSV info: {e}") raise def get_statistics(self, filename: str) -> Dict[str, Any]: """Get statistical summary of numeric columns in the CSV file.""" filepath = self._get_file_path(filename) if not filepath.exists(): raise FileNotFoundError(f"CSV file '{filename}' not found") try: df = pd.read_csv(filepath) # Get numeric columns only numeric_df = df.select_dtypes(include=['number']) if numeric_df.empty: return { "success": True, "filename": filename, "message": "No numeric columns found", "statistics": {} } # Convert describe results to serializable format stats_dict = {} for col in numeric_df.columns: col_stats = numeric_df[col].describe() stats_dict[col] = {stat: float(value) if pd.notna(value) else None for stat, value in col_stats.items()} return { "success": True, "filename": filename, "statistics": stats_dict, "numeric_columns": list(numeric_df.columns), "total_columns": len(df.columns), "null_counts": {col: int(count) for col, count in numeric_df.isnull().sum().items()} } except Exception as e: logger.error(f"Failed to get statistics: {e}") raise def filter_data(self, filename: str, conditions: Dict[str, Any], limit: Optional[int] = None) -> Dict[str, Any]: """Filter CSV data based on conditions.""" filepath = self._get_file_path(filename) if not filepath.exists(): raise FileNotFoundError(f"CSV file '{filename}' not found") try: df = pd.read_csv(filepath) # Apply filters for column, condition in conditions.items(): if column not in df.columns: raise ValueError(f"Column '{column}' not found in CSV") if isinstance(condition, dict): # Handle complex conditions like {"gt": 5, "lt": 10} if "eq" in condition: df = df[df[column] == condition["eq"]] if "ne" in condition: df = df[df[column] != condition["ne"]] if "gt" in condition: df = df[df[column] > condition["gt"]] if "gte" in condition: df = df[df[column] >= condition["gte"]] if "lt" in condition: df = df[df[column] < condition["lt"]] if "lte" in condition: df = df[df[column] <= condition["lte"]] if "contains" in condition: df = df[df[column].astype(str).str.contains(condition["contains"], na=False)] else: # Simple equality filter df = df[df[column] == condition] # Apply limit if specified if limit and limit > 0: df = df.head(limit) return { "success": True, "filename": filename, "conditions": conditions, "filtered_data": df.to_dict('records'), "filtered_rows": len(df), "original_rows": len(pd.read_csv(filepath)) } except Exception as e: logger.error(f"Failed to filter data: {e}") raise def sort_data(self, filename: str, columns: Union[str, List[str]], ascending: bool = True, limit: Optional[int] = None) -> Dict[str, Any]: """Sort CSV data by specified columns.""" filepath = self._get_file_path(filename) if not filepath.exists(): raise FileNotFoundError(f"CSV file '{filename}' not found") try: df = pd.read_csv(filepath) # Ensure columns is a list if isinstance(columns, str): columns = [columns] # Validate columns exist for col in columns: if col not in df.columns: raise ValueError(f"Column '{col}' not found in CSV") # Sort data df_sorted = df.sort_values(by=columns, ascending=ascending) # Apply limit if specified if limit and limit > 0: df_sorted = df_sorted.head(limit) return { "success": True, "filename": filename, "sort_columns": columns, "ascending": ascending, "sorted_data": df_sorted.to_dict('records'), "total_rows": len(df_sorted) } except Exception as e: logger.error(f"Failed to sort data: {e}") raise def group_data(self, filename: str, group_by: Union[str, List[str]], aggregations: Dict[str, str]) -> Dict[str, Any]: """Group and aggregate CSV data.""" filepath = self._get_file_path(filename) if not filepath.exists(): raise FileNotFoundError(f"CSV file '{filename}' not found") try: df = pd.read_csv(filepath) # Ensure group_by is a list if isinstance(group_by, str): group_by = [group_by] # Validate group_by columns exist for col in group_by: if col not in df.columns: raise ValueError(f"Group by column '{col}' not found in CSV") # Validate aggregation columns exist for col in aggregations.keys(): if col not in df.columns: raise ValueError(f"Aggregation column '{col}' not found in CSV") # Group and aggregate grouped = df.groupby(group_by).agg(aggregations).reset_index() return { "success": True, "filename": filename, "group_by": group_by, "aggregations": aggregations, "grouped_data": grouped.to_dict('records'), "group_count": len(grouped) } except Exception as e: logger.error(f"Failed to group data: {e}") raise def validate_data(self, filename: str) -> Dict[str, Any]: """Validate CSV data integrity and format.""" filepath = self._get_file_path(filename) if not filepath.exists(): raise FileNotFoundError(f"CSV file '{filename}' not found") try: df = pd.read_csv(filepath) validation_results = { "success": True, "filename": filename, "total_rows": len(df), "total_columns": len(df.columns), "issues": [], "warnings": [] } # Check for empty rows empty_rows = df.isnull().all(axis=1).sum() if empty_rows > 0: validation_results["issues"].append(f"Found {empty_rows} completely empty rows") # Check for duplicate rows duplicate_rows = df.duplicated().sum() if duplicate_rows > 0: validation_results["warnings"].append(f"Found {duplicate_rows} duplicate rows") # Check for missing values by column null_counts = df.isnull().sum() for col, null_count in null_counts.items(): if null_count > 0: percentage = (null_count / len(df)) * 100 validation_results["warnings"].append(f"Column '{col}' has {null_count} missing values ({percentage:.1f}%)") # Check for columns with mixed data types (if possible) for col in df.columns: if df[col].dtype == 'object': # Try to detect mixed numeric/text data numeric_count = pd.to_numeric(df[col], errors='coerce').notna().sum() if 0 < numeric_count < len(df): validation_results["warnings"].append(f"Column '{col}' appears to have mixed data types") # Check for unusually long text values for col in df.select_dtypes(include=['object']).columns: max_length = df[col].astype(str).str.len().max() if max_length > 1000: validation_results["warnings"].append(f"Column '{col}' has very long text values (max: {max_length} characters)") validation_results["is_valid"] = len(validation_results["issues"]) == 0 return validation_results except Exception as e: logger.error(f"Failed to validate data: {e}") raise def delete_csv(self, filename: str) -> Dict[str, Any]: """Delete a CSV file.""" filepath = self._get_file_path(filename) if not filepath.exists(): raise FileNotFoundError(f"CSV file '{filename}' not found") try: # Create backup before deletion backup_path = self._create_backup(filepath) # Delete the file filepath.unlink() logger.info(f"Deleted CSV file: {filepath}") return { "success": True, "filename": filename, "deleted_filepath": str(filepath), "backup_created": backup_path is not None, "backup_path": str(backup_path) if backup_path else None } except Exception as e: logger.error(f"Failed to delete CSV: {e}") raise def list_csv_files(self) -> Dict[str, Any]: """List all CSV files in the storage directory.""" try: csv_files = [] for filepath in self.storage_path.glob("*.csv"): if filepath.is_file(): stat = filepath.stat() csv_files.append({ "filename": filepath.name, "size_bytes": stat.st_size, "size_mb": round(stat.st_size / (1024 * 1024), 2), "modified_time": datetime.fromtimestamp(stat.st_mtime).isoformat() }) return { "success": True, "csv_files": csv_files, "total_files": len(csv_files), "storage_path": str(self.storage_path) } except Exception as e: logger.error(f"Failed to list CSV files: {e}") raise def get_absolute_path_info(self, filepath: str) -> Dict[str, Any]: """Get information about a file path, supporting both relative and absolute paths.""" try: path = Path(filepath) # Determine if it's absolute or relative is_absolute = path.is_absolute() # Get the resolved path (follows symlinks) try: resolved_path = path.resolve() except (OSError, RuntimeError): resolved_path = path # Check if file exists exists = path.exists() # Get file info if it exists file_info = None if exists and path.is_file(): stat = path.stat() file_info = { "size_bytes": stat.st_size, "size_mb": round(stat.st_size / (1024 * 1024), 2), "modified_time": datetime.fromtimestamp(stat.st_mtime).isoformat(), "is_csv": path.suffix.lower() == '.csv' } # Check directory access parent_dir = path.parent parent_exists = parent_dir.exists() parent_accessible = parent_exists and os.access(parent_dir, os.R_OK | os.W_OK) return { "success": True, "original_path": filepath, "is_absolute": is_absolute, "resolved_path": str(resolved_path), "parent_directory": str(parent_dir), "parent_exists": parent_exists, "parent_accessible": parent_accessible, "file_exists": exists, "is_file": exists and path.is_file(), "is_directory": exists and path.is_dir(), "file_info": file_info, "path_valid": self._is_path_safe(path) if is_absolute else True } except Exception as e: logger.error(f"Failed to get path info: {e}") return {"success": False, "error": str(e)} def _is_path_safe(self, filepath: Path) -> bool: """Check if a path is safe to access (not in system directories).""" try: if not filepath.is_absolute(): return True real_path = filepath.resolve() # Check if path is in common system directories system_dirs = [] # Windows system directories if os.name == 'nt': system_dirs.extend([ Path("C:/Windows"), Path("C:/Program Files"), Path("C:/Program Files (x86)"), Path("C:/System32"), Path("C:/Users/Default"), Path("C:/Users/Public"), Path("C:/Windows/System32"), Path("C:/Windows/SysWOW64") ]) else: # Unix/Linux system directories system_dirs.extend([ Path("/etc"), Path("/var"), Path("/usr"), Path("/bin"), Path("/sbin"), Path("/boot"), Path("/dev"), Path("/proc"), Path("/sys"), Path("/root") ]) for sys_dir in system_dirs: if sys_dir.exists() and real_path.is_relative_to(sys_dir): return False return True except (OSError, RuntimeError): return False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/NovaAI-innovation/csv-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server