"""
CSV file operations and utilities.
"""
import os
import pandas as pd
import shutil
from pathlib import Path
from typing import Dict, List, Any, Optional, Union
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class CSVManager:
"""Manages CSV file operations with safety checks and validation."""
def __init__(self, storage_path: str = ".", max_file_size_mb: int = 50, backup_enabled: bool = True):
self.storage_path = Path(storage_path)
self.max_file_size_mb = max_file_size_mb
self.backup_enabled = backup_enabled
self.max_file_size_bytes = max_file_size_mb * 1024 * 1024
# Enable absolute path support by default for better filesystem access
self.support_absolute_paths = os.getenv("CSV_SUPPORT_ABSOLUTE_PATHS", "true").lower() == "true"
# Ensure storage directory exists
self.storage_path.mkdir(parents=True, exist_ok=True)
# Create backup directory if backups are enabled
if self.backup_enabled:
self.backup_path = self.storage_path / ".csv_backups"
self.backup_path.mkdir(parents=True, exist_ok=True)
def _validate_file_path(self, filepath: Path) -> None:
"""Validate file path for security and safety."""
if not self.support_absolute_paths:
return
# Check if path is absolute
if filepath.is_absolute():
# Ensure the path is within reasonable bounds (not system directories)
try:
# Resolve any symlinks to get the real path
real_path = filepath.resolve()
# Check if path is in common system directories (add more as needed)
system_dirs = []
# Windows system directories
if os.name == 'nt':
system_dirs.extend([
Path("C:/Windows"), Path("C:/Program Files"), Path("C:/Program Files (x86)"),
Path("C:/System32"), Path("C:/Users/Default"), Path("C:/Users/Public"),
Path("C:/Windows/System32"), Path("C:/Windows/SysWOW64")
])
else:
# Unix/Linux system directories
system_dirs.extend([
Path("/etc"), Path("/var"), Path("/usr"), Path("/bin"), Path("/sbin"),
Path("/boot"), Path("/dev"), Path("/proc"), Path("/sys"), Path("/root")
])
for sys_dir in system_dirs:
if sys_dir.exists() and real_path.is_relative_to(sys_dir):
raise ValueError(f"Access to system directory '{sys_dir}' is not allowed")
# Check if path is accessible (parent directories exist and are readable)
parent = real_path.parent
if not parent.exists():
raise ValueError(f"Parent directory '{parent}' does not exist")
if not os.access(parent, os.R_OK | os.W_OK):
raise ValueError(f"Parent directory '{parent}' is not accessible")
except (OSError, RuntimeError) as e:
raise ValueError(f"Invalid file path: {e}")
def _get_file_path(self, filename: str) -> Path:
"""Get the full path for a CSV file, supporting both relative and absolute paths."""
if self.support_absolute_paths and os.path.isabs(filename):
# Handle absolute file paths
filepath = Path(filename)
if not filepath.suffix:
filepath = filepath.with_suffix('.csv')
# Validate the path for security
self._validate_file_path(filepath)
return filepath
else:
# Handle relative paths (existing behavior)
filename = Path(filename).name
if not filename.endswith('.csv'):
filename += '.csv'
return self.storage_path / filename
def _ensure_directory_exists(self, filepath: Path) -> None:
"""Ensure the directory for the file exists."""
if filepath.is_absolute():
# For absolute paths, ensure the parent directory exists
parent_dir = filepath.parent
if not parent_dir.exists():
try:
parent_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Created directory: {parent_dir}")
except Exception as e:
raise ValueError(f"Failed to create directory '{parent_dir}': {e}")
def _create_backup(self, filepath: Path) -> Optional[Path]:
"""Create a backup of the file before modification."""
if not self.backup_enabled or not filepath.exists():
return None
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"{filepath.stem}_{timestamp}.csv"
backup_path = self.backup_path / backup_name
try:
shutil.copy2(filepath, backup_path)
logger.info(f"Backup created: {backup_path}")
return backup_path
except Exception as e:
logger.error(f"Failed to create backup: {e}")
return None
def _validate_file_size(self, filepath: Path) -> None:
"""Check if file size is within limits."""
if filepath.exists() and filepath.stat().st_size > self.max_file_size_bytes:
raise ValueError(f"File size exceeds limit of {self.max_file_size_mb}MB")
def create_csv(self, filename: str, headers: List[str], data: Optional[List[List[Any]]] = None) -> Dict[str, Any]:
"""Create a new CSV file with headers and optional initial data."""
filepath = self._get_file_path(filename)
if filepath.exists():
raise FileExistsError(f"CSV file '{filename}' already exists")
try:
# Ensure directory exists for absolute paths
self._ensure_directory_exists(filepath)
# Create DataFrame
df = pd.DataFrame(columns=headers)
if data:
df = pd.DataFrame(data, columns=headers)
# Save to CSV
df.to_csv(filepath, index=False)
self._validate_file_size(filepath)
logger.info(f"Created CSV file: {filepath}")
return {
"success": True,
"filename": filename,
"filepath": str(filepath),
"rows_created": len(df),
"columns": headers
}
except Exception as e:
logger.error(f"Failed to create CSV: {e}")
raise
def read_csv(self, filename: str, limit: Optional[int] = None) -> Dict[str, Any]:
"""Read CSV file contents."""
filepath = self._get_file_path(filename)
if not filepath.exists():
raise FileNotFoundError(f"CSV file '{filename}' not found")
try:
df = pd.read_csv(filepath)
# Apply limit if specified
if limit and limit > 0:
df = df.head(limit)
return {
"success": True,
"filename": filename,
"data": df.to_dict('records'),
"columns": list(df.columns),
"total_rows": len(df),
"shape": df.shape
}
except Exception as e:
logger.error(f"Failed to read CSV: {e}")
raise
def update_csv(self, filename: str, row_index: int, column: str, value: Any) -> Dict[str, Any]:
"""Update a specific cell in the CSV file."""
filepath = self._get_file_path(filename)
if not filepath.exists():
raise FileNotFoundError(f"CSV file '{filename}' not found")
# Create backup
self._create_backup(filepath)
try:
df = pd.read_csv(filepath)
if row_index >= len(df):
raise IndexError(f"Row index {row_index} out of range (max: {len(df)-1})")
if column not in df.columns:
raise ValueError(f"Column '{column}' not found in CSV")
old_value = df.loc[row_index, column]
df.loc[row_index, column] = value
df.to_csv(filepath, index=False)
self._validate_file_size(filepath)
logger.info(f"Updated CSV file: {filepath}")
return {
"success": True,
"filename": filename,
"row_index": row_index,
"column": column,
"old_value": old_value,
"new_value": value
}
except Exception as e:
logger.error(f"Failed to update CSV: {e}")
raise
def add_row(self, filename: str, row_data: Dict[str, Any]) -> Dict[str, Any]:
"""Add a new row to the CSV file."""
filepath = self._get_file_path(filename)
if not filepath.exists():
raise FileNotFoundError(f"CSV file '{filename}' not found")
# Create backup
self._create_backup(filepath)
try:
df = pd.read_csv(filepath)
# Create new row DataFrame
new_row = pd.DataFrame([row_data])
# Append the new row
df = pd.concat([df, new_row], ignore_index=True)
df.to_csv(filepath, index=False)
self._validate_file_size(filepath)
logger.info(f"Added row to CSV file: {filepath}")
return {
"success": True,
"filename": filename,
"row_added": row_data,
"new_total_rows": len(df)
}
except Exception as e:
logger.error(f"Failed to add row: {e}")
raise
def remove_row(self, filename: str, row_index: int) -> Dict[str, Any]:
"""Remove a specific row from the CSV file."""
filepath = self._get_file_path(filename)
if not filepath.exists():
raise FileNotFoundError(f"CSV file '{filename}' not found")
# Create backup
self._create_backup(filepath)
try:
df = pd.read_csv(filepath)
if row_index >= len(df):
raise IndexError(f"Row index {row_index} out of range (max: {len(df)-1})")
removed_row = df.iloc[row_index].to_dict()
df = df.drop(df.index[row_index])
df.to_csv(filepath, index=False)
logger.info(f"Removed row from CSV file: {filepath}")
return {
"success": True,
"filename": filename,
"removed_row": removed_row,
"new_total_rows": len(df)
}
except Exception as e:
logger.error(f"Failed to remove row: {e}")
raise
def get_info(self, filename: str) -> Dict[str, Any]:
"""Get basic information about a CSV file."""
filepath = self._get_file_path(filename)
if not filepath.exists():
raise FileNotFoundError(f"CSV file '{filename}' not found")
try:
df = pd.read_csv(filepath)
file_stat = filepath.stat()
return {
"success": True,
"filename": filename,
"filepath": str(filepath),
"size_bytes": file_stat.st_size,
"size_mb": round(file_stat.st_size / (1024 * 1024), 2),
"modified_time": datetime.fromtimestamp(file_stat.st_mtime).isoformat(),
"rows": len(df),
"columns": len(df.columns),
"column_names": list(df.columns),
"dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()},
"memory_usage": df.memory_usage(deep=True).sum()
}
except Exception as e:
logger.error(f"Failed to get CSV info: {e}")
raise
def get_statistics(self, filename: str) -> Dict[str, Any]:
"""Get statistical summary of numeric columns in the CSV file."""
filepath = self._get_file_path(filename)
if not filepath.exists():
raise FileNotFoundError(f"CSV file '{filename}' not found")
try:
df = pd.read_csv(filepath)
# Get numeric columns only
numeric_df = df.select_dtypes(include=['number'])
if numeric_df.empty:
return {
"success": True,
"filename": filename,
"message": "No numeric columns found",
"statistics": {}
}
# Convert describe results to serializable format
stats_dict = {}
for col in numeric_df.columns:
col_stats = numeric_df[col].describe()
stats_dict[col] = {stat: float(value) if pd.notna(value) else None
for stat, value in col_stats.items()}
return {
"success": True,
"filename": filename,
"statistics": stats_dict,
"numeric_columns": list(numeric_df.columns),
"total_columns": len(df.columns),
"null_counts": {col: int(count) for col, count in numeric_df.isnull().sum().items()}
}
except Exception as e:
logger.error(f"Failed to get statistics: {e}")
raise
def filter_data(self, filename: str, conditions: Dict[str, Any], limit: Optional[int] = None) -> Dict[str, Any]:
"""Filter CSV data based on conditions."""
filepath = self._get_file_path(filename)
if not filepath.exists():
raise FileNotFoundError(f"CSV file '{filename}' not found")
try:
df = pd.read_csv(filepath)
# Apply filters
for column, condition in conditions.items():
if column not in df.columns:
raise ValueError(f"Column '{column}' not found in CSV")
if isinstance(condition, dict):
# Handle complex conditions like {"gt": 5, "lt": 10}
if "eq" in condition:
df = df[df[column] == condition["eq"]]
if "ne" in condition:
df = df[df[column] != condition["ne"]]
if "gt" in condition:
df = df[df[column] > condition["gt"]]
if "gte" in condition:
df = df[df[column] >= condition["gte"]]
if "lt" in condition:
df = df[df[column] < condition["lt"]]
if "lte" in condition:
df = df[df[column] <= condition["lte"]]
if "contains" in condition:
df = df[df[column].astype(str).str.contains(condition["contains"], na=False)]
else:
# Simple equality filter
df = df[df[column] == condition]
# Apply limit if specified
if limit and limit > 0:
df = df.head(limit)
return {
"success": True,
"filename": filename,
"conditions": conditions,
"filtered_data": df.to_dict('records'),
"filtered_rows": len(df),
"original_rows": len(pd.read_csv(filepath))
}
except Exception as e:
logger.error(f"Failed to filter data: {e}")
raise
def sort_data(self, filename: str, columns: Union[str, List[str]], ascending: bool = True, limit: Optional[int] = None) -> Dict[str, Any]:
"""Sort CSV data by specified columns."""
filepath = self._get_file_path(filename)
if not filepath.exists():
raise FileNotFoundError(f"CSV file '{filename}' not found")
try:
df = pd.read_csv(filepath)
# Ensure columns is a list
if isinstance(columns, str):
columns = [columns]
# Validate columns exist
for col in columns:
if col not in df.columns:
raise ValueError(f"Column '{col}' not found in CSV")
# Sort data
df_sorted = df.sort_values(by=columns, ascending=ascending)
# Apply limit if specified
if limit and limit > 0:
df_sorted = df_sorted.head(limit)
return {
"success": True,
"filename": filename,
"sort_columns": columns,
"ascending": ascending,
"sorted_data": df_sorted.to_dict('records'),
"total_rows": len(df_sorted)
}
except Exception as e:
logger.error(f"Failed to sort data: {e}")
raise
def group_data(self, filename: str, group_by: Union[str, List[str]], aggregations: Dict[str, str]) -> Dict[str, Any]:
"""Group and aggregate CSV data."""
filepath = self._get_file_path(filename)
if not filepath.exists():
raise FileNotFoundError(f"CSV file '{filename}' not found")
try:
df = pd.read_csv(filepath)
# Ensure group_by is a list
if isinstance(group_by, str):
group_by = [group_by]
# Validate group_by columns exist
for col in group_by:
if col not in df.columns:
raise ValueError(f"Group by column '{col}' not found in CSV")
# Validate aggregation columns exist
for col in aggregations.keys():
if col not in df.columns:
raise ValueError(f"Aggregation column '{col}' not found in CSV")
# Group and aggregate
grouped = df.groupby(group_by).agg(aggregations).reset_index()
return {
"success": True,
"filename": filename,
"group_by": group_by,
"aggregations": aggregations,
"grouped_data": grouped.to_dict('records'),
"group_count": len(grouped)
}
except Exception as e:
logger.error(f"Failed to group data: {e}")
raise
def validate_data(self, filename: str) -> Dict[str, Any]:
"""Validate CSV data integrity and format."""
filepath = self._get_file_path(filename)
if not filepath.exists():
raise FileNotFoundError(f"CSV file '{filename}' not found")
try:
df = pd.read_csv(filepath)
validation_results = {
"success": True,
"filename": filename,
"total_rows": len(df),
"total_columns": len(df.columns),
"issues": [],
"warnings": []
}
# Check for empty rows
empty_rows = df.isnull().all(axis=1).sum()
if empty_rows > 0:
validation_results["issues"].append(f"Found {empty_rows} completely empty rows")
# Check for duplicate rows
duplicate_rows = df.duplicated().sum()
if duplicate_rows > 0:
validation_results["warnings"].append(f"Found {duplicate_rows} duplicate rows")
# Check for missing values by column
null_counts = df.isnull().sum()
for col, null_count in null_counts.items():
if null_count > 0:
percentage = (null_count / len(df)) * 100
validation_results["warnings"].append(f"Column '{col}' has {null_count} missing values ({percentage:.1f}%)")
# Check for columns with mixed data types (if possible)
for col in df.columns:
if df[col].dtype == 'object':
# Try to detect mixed numeric/text data
numeric_count = pd.to_numeric(df[col], errors='coerce').notna().sum()
if 0 < numeric_count < len(df):
validation_results["warnings"].append(f"Column '{col}' appears to have mixed data types")
# Check for unusually long text values
for col in df.select_dtypes(include=['object']).columns:
max_length = df[col].astype(str).str.len().max()
if max_length > 1000:
validation_results["warnings"].append(f"Column '{col}' has very long text values (max: {max_length} characters)")
validation_results["is_valid"] = len(validation_results["issues"]) == 0
return validation_results
except Exception as e:
logger.error(f"Failed to validate data: {e}")
raise
def delete_csv(self, filename: str) -> Dict[str, Any]:
"""Delete a CSV file."""
filepath = self._get_file_path(filename)
if not filepath.exists():
raise FileNotFoundError(f"CSV file '{filename}' not found")
try:
# Create backup before deletion
backup_path = self._create_backup(filepath)
# Delete the file
filepath.unlink()
logger.info(f"Deleted CSV file: {filepath}")
return {
"success": True,
"filename": filename,
"deleted_filepath": str(filepath),
"backup_created": backup_path is not None,
"backup_path": str(backup_path) if backup_path else None
}
except Exception as e:
logger.error(f"Failed to delete CSV: {e}")
raise
def list_csv_files(self) -> Dict[str, Any]:
"""List all CSV files in the storage directory."""
try:
csv_files = []
for filepath in self.storage_path.glob("*.csv"):
if filepath.is_file():
stat = filepath.stat()
csv_files.append({
"filename": filepath.name,
"size_bytes": stat.st_size,
"size_mb": round(stat.st_size / (1024 * 1024), 2),
"modified_time": datetime.fromtimestamp(stat.st_mtime).isoformat()
})
return {
"success": True,
"csv_files": csv_files,
"total_files": len(csv_files),
"storage_path": str(self.storage_path)
}
except Exception as e:
logger.error(f"Failed to list CSV files: {e}")
raise
def get_absolute_path_info(self, filepath: str) -> Dict[str, Any]:
"""Get information about a file path, supporting both relative and absolute paths."""
try:
path = Path(filepath)
# Determine if it's absolute or relative
is_absolute = path.is_absolute()
# Get the resolved path (follows symlinks)
try:
resolved_path = path.resolve()
except (OSError, RuntimeError):
resolved_path = path
# Check if file exists
exists = path.exists()
# Get file info if it exists
file_info = None
if exists and path.is_file():
stat = path.stat()
file_info = {
"size_bytes": stat.st_size,
"size_mb": round(stat.st_size / (1024 * 1024), 2),
"modified_time": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"is_csv": path.suffix.lower() == '.csv'
}
# Check directory access
parent_dir = path.parent
parent_exists = parent_dir.exists()
parent_accessible = parent_exists and os.access(parent_dir, os.R_OK | os.W_OK)
return {
"success": True,
"original_path": filepath,
"is_absolute": is_absolute,
"resolved_path": str(resolved_path),
"parent_directory": str(parent_dir),
"parent_exists": parent_exists,
"parent_accessible": parent_accessible,
"file_exists": exists,
"is_file": exists and path.is_file(),
"is_directory": exists and path.is_dir(),
"file_info": file_info,
"path_valid": self._is_path_safe(path) if is_absolute else True
}
except Exception as e:
logger.error(f"Failed to get path info: {e}")
return {"success": False, "error": str(e)}
def _is_path_safe(self, filepath: Path) -> bool:
"""Check if a path is safe to access (not in system directories)."""
try:
if not filepath.is_absolute():
return True
real_path = filepath.resolve()
# Check if path is in common system directories
system_dirs = []
# Windows system directories
if os.name == 'nt':
system_dirs.extend([
Path("C:/Windows"), Path("C:/Program Files"), Path("C:/Program Files (x86)"),
Path("C:/System32"), Path("C:/Users/Default"), Path("C:/Users/Public"),
Path("C:/Windows/System32"), Path("C:/Windows/SysWOW64")
])
else:
# Unix/Linux system directories
system_dirs.extend([
Path("/etc"), Path("/var"), Path("/usr"), Path("/bin"), Path("/sbin"),
Path("/boot"), Path("/dev"), Path("/proc"), Path("/sys"), Path("/root")
])
for sys_dir in system_dirs:
if sys_dir.exists() and real_path.is_relative_to(sys_dir):
return False
return True
except (OSError, RuntimeError):
return False