Skip to main content
Glama

CFM Tips - Cost Optimization MCP Server

by aws-samples
cleanup_manager.py10.6 kB
""" Cleanup Manager for CFM Tips MCP Server Handles automatic cleanup of sessions, results, and temporary data. """ import logging import threading import time import os import glob from datetime import datetime, timedelta from typing import Dict, Any, Optional logger = logging.getLogger(__name__) class CleanupManager: """Manages automatic cleanup of sessions and temporary data.""" def __init__(self, session_timeout_minutes: int = 60, result_retention_minutes: int = 120, cleanup_interval_minutes: int = 15): self.session_timeout_minutes = session_timeout_minutes self.result_retention_minutes = result_retention_minutes self.cleanup_interval_minutes = cleanup_interval_minutes self._shutdown = False self._cleanup_thread = None # Start cleanup thread self._start_cleanup_thread() def _start_cleanup_thread(self): """Start the background cleanup thread.""" if self._cleanup_thread is None or not self._cleanup_thread.is_alive(): self._cleanup_thread = threading.Thread( target=self._cleanup_worker, daemon=True, name="CleanupManager" ) self._cleanup_thread.start() logger.info(f"Cleanup manager started (session timeout: {self.session_timeout_minutes}min)") def _cleanup_worker(self): """Background worker for periodic cleanup.""" while not self._shutdown: try: self._perform_cleanup() time.sleep(self.cleanup_interval_minutes * 60) except Exception as e: logger.error(f"Error in cleanup worker: {e}") time.sleep(60) # Wait 1 minute on error def _perform_cleanup(self): """Perform all cleanup operations.""" logger.debug("Starting periodic cleanup") # Clean up session files self._cleanup_session_files() # Clean up temporary files self._cleanup_temp_files() # Clean up old log files self._cleanup_log_files() logger.debug("Periodic cleanup completed") def _cleanup_session_files(self): """Clean up old session database files.""" try: sessions_dir = "sessions" if not os.path.exists(sessions_dir): return cutoff_time = datetime.now() - timedelta(minutes=self.session_timeout_minutes) cleaned_count = 0 # Find all session database files session_files = glob.glob(os.path.join(sessions_dir, "session_*.db")) for session_file in session_files: try: # Check file modification time file_mtime = datetime.fromtimestamp(os.path.getmtime(session_file)) if file_mtime < cutoff_time: # Remove old session file os.remove(session_file) cleaned_count += 1 logger.debug(f"Cleaned up old session file: {session_file}") # Also remove any associated WAL and SHM files for ext in ['-wal', '-shm']: wal_file = session_file + ext if os.path.exists(wal_file): os.remove(wal_file) except Exception as e: logger.warning(f"Error cleaning session file {session_file}: {e}") if cleaned_count > 0: logger.info(f"Cleaned up {cleaned_count} old session files") except Exception as e: logger.error(f"Error in session file cleanup: {e}") def _cleanup_temp_files(self): """Clean up temporary files and directories.""" try: temp_patterns = [ "*.tmp", "*.temp", "__pycache__/*.pyc", "*.log.old" ] cleaned_count = 0 for pattern in temp_patterns: temp_files = glob.glob(pattern, recursive=True) for temp_file in temp_files: try: # Check if file is older than retention period file_mtime = datetime.fromtimestamp(os.path.getmtime(temp_file)) cutoff_time = datetime.now() - timedelta(minutes=self.result_retention_minutes) if file_mtime < cutoff_time: if os.path.isfile(temp_file): os.remove(temp_file) cleaned_count += 1 elif os.path.isdir(temp_file): import shutil shutil.rmtree(temp_file) cleaned_count += 1 except Exception as e: logger.warning(f"Error cleaning temp file {temp_file}: {e}") if cleaned_count > 0: logger.info(f"Cleaned up {cleaned_count} temporary files") except Exception as e: logger.error(f"Error in temp file cleanup: {e}") def _cleanup_log_files(self): """Clean up old log files.""" try: logs_dir = "logs" if not os.path.exists(logs_dir): return # Keep logs for 7 days cutoff_time = datetime.now() - timedelta(days=7) cleaned_count = 0 log_files = glob.glob(os.path.join(logs_dir, "*.log.*")) for log_file in log_files: try: file_mtime = datetime.fromtimestamp(os.path.getmtime(log_file)) if file_mtime < cutoff_time: os.remove(log_file) cleaned_count += 1 logger.debug(f"Cleaned up old log file: {log_file}") except Exception as e: logger.warning(f"Error cleaning log file {log_file}: {e}") if cleaned_count > 0: logger.info(f"Cleaned up {cleaned_count} old log files") except Exception as e: logger.error(f"Error in log file cleanup: {e}") def force_cleanup(self): """Force immediate cleanup of all resources.""" logger.info("Forcing immediate cleanup") self._perform_cleanup() # Also clean up from session manager and parallel executor try: from . import get_session_manager, get_parallel_executor # Clean up session manager session_manager = get_session_manager() session_manager._cleanup_expired_sessions() # Clean up parallel executor results executor = get_parallel_executor() executor.clear_results(older_than_minutes=self.result_retention_minutes) logger.info("Force cleanup completed") except Exception as e: logger.error(f"Error in force cleanup: {e}") def get_cleanup_stats(self) -> Dict[str, Any]: """Get statistics about cleanup operations.""" try: stats = { 'session_timeout_minutes': self.session_timeout_minutes, 'result_retention_minutes': self.result_retention_minutes, 'cleanup_interval_minutes': self.cleanup_interval_minutes, 'cleanup_thread_alive': self._cleanup_thread.is_alive() if self._cleanup_thread else False, 'sessions_directory_exists': os.path.exists('sessions'), 'logs_directory_exists': os.path.exists('logs') } # Count current files if os.path.exists('sessions'): session_files = glob.glob('sessions/session_*.db') stats['active_session_files'] = len(session_files) else: stats['active_session_files'] = 0 if os.path.exists('logs'): log_files = glob.glob('logs/*.log*') stats['log_files'] = len(log_files) else: stats['log_files'] = 0 return stats except Exception as e: logger.error(f"Error getting cleanup stats: {e}") return {'error': str(e)} def update_settings(self, session_timeout_minutes: Optional[int] = None, result_retention_minutes: Optional[int] = None, cleanup_interval_minutes: Optional[int] = None): """Update cleanup settings.""" if session_timeout_minutes is not None: self.session_timeout_minutes = session_timeout_minutes logger.info(f"Updated session timeout to {session_timeout_minutes} minutes") if result_retention_minutes is not None: self.result_retention_minutes = result_retention_minutes logger.info(f"Updated result retention to {result_retention_minutes} minutes") if cleanup_interval_minutes is not None: self.cleanup_interval_minutes = cleanup_interval_minutes logger.info(f"Updated cleanup interval to {cleanup_interval_minutes} minutes") def shutdown(self): """Shutdown the cleanup manager.""" logger.info("Shutting down cleanup manager") self._shutdown = True # Perform final cleanup try: self._perform_cleanup() except Exception as e: logger.error(f"Error in final cleanup: {e}") # Wait for cleanup thread to finish if self._cleanup_thread and self._cleanup_thread.is_alive(): self._cleanup_thread.join(timeout=10) # Global cleanup manager instance _cleanup_manager = None def get_cleanup_manager() -> CleanupManager: """Get the global cleanup manager instance.""" global _cleanup_manager if _cleanup_manager is None: _cleanup_manager = CleanupManager() return _cleanup_manager

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aws-samples/sample-cfm-tips-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server