Skip to main content
Glama
security.py10.5 kB
""" Sistema de segurança SQL para o MCP Databases. Implementa validações rigorosas para prevenir execução de comandos perigosos. """ import re from typing import Dict, Any, List, Tuple from mcp_databases.logger import MCPLogger logger = MCPLogger.get_logger("mcp_databases.security") class SQLSecurityError(Exception): """Exceção levantada quando uma query SQL é considerada insegura.""" pass class SQLSecurityValidator: """ Validador de segurança SQL que implementa múltiplas camadas de proteção. """ # Comandos SQL perigosos que são completamente bloqueados DANGEROUS_COMMANDS = { 'DELETE', 'DROP', 'TRUNCATE', 'ALTER', 'CREATE', 'EXEC', 'EXECUTE', 'INSERT', 'UPDATE', 'MERGE', 'BULK', 'OPENROWSET', 'OPENDATASOURCE', 'xp_', 'sp_', 'SHUTDOWN', 'KILL', 'RESTORE', 'BACKUP' } # Padrões regex para detectar SQL dinâmico e comandos perigosos DANGEROUS_PATTERNS = [ r'\bEXEC\s*\(', # EXEC( r'\bEXECUTE\s*\(', # EXECUTE( r'\bsp_executesql\b', # sp_executesql r'\bEVAL\s*\(', # EVAL( r'@@\w+', # @@version, @@servername, etc. r'\bxp_\w+', # xp_cmdshell, etc. r'\bOPENROWSET\b', # OPENROWSET r'\bOPENDATASOURCE\b', # OPENDATASOURCE r'\bBULK\s+INSERT\b', # BULK INSERT r'\bSHUTDOWN\b', # SHUTDOWN r'\bKILL\b', # KILL r';.*\s*(DELETE|DROP|TRUNCATE|ALTER|CREATE|INSERT|UPDATE)', # Múltiplos comandos r'--.*\s*(DELETE|DROP|TRUNCATE|ALTER|CREATE|INSERT|UPDATE)', # Comandos em comentários r'/\*.*\s*(DELETE|DROP|TRUNCATE|ALTER|CREATE|INSERT|UPDATE).*\*/', # Comandos em comentários de bloco ] # Comandos permitidos (whitelist) - mais restritivo ALLOWED_COMMANDS = {'SELECT', 'WITH'} # Comandos que precisam de análise especial (podem ser perigosos em contextos específicos) CONDITIONAL_COMMANDS = {'UNION', 'INTERSECT', 'EXCEPT'} @classmethod def validate_query(cls, query: str, allow_modifications: bool = False) -> Tuple[bool, str]: """ Valida se uma query SQL é segura para execução. Args: query: Query SQL a ser validada allow_modifications: Se True, permite comandos de modificação (INSERT, UPDATE, DELETE) - CUIDADO! Returns: Tuple[bool, str]: (é_segura, mensagem_de_erro_ou_aviso) """ if not query or not isinstance(query, str): return False, "Query vazia ou inválida" # Remove comentários e normaliza a query cleaned_query = cls._clean_query(query) if not cleaned_query.strip(): return False, "Query vazia após limpeza" # 1. Verificação de comandos perigosos (sempre bloqueados) dangerous_found = cls._check_dangerous_commands(cleaned_query) if dangerous_found: error_msg = f"COMANDO PERIGOSO DETECTADO: {', '.join(dangerous_found)}. Execução BLOQUEADA por segurança." logger.error(f"Tentativa de execução de comando perigoso bloqueada: {dangerous_found} na query: {query[:100]}...") return False, error_msg # 2. Verificação de padrões perigosos via regex dangerous_patterns = cls._check_dangerous_patterns(cleaned_query) if dangerous_patterns: error_msg = f"PADRÃO PERIGOSO DETECTADO: SQL dinâmico ou comando suspeito. Execução BLOQUEADA por segurança." logger.error(f"Padrão perigoso detectado na query: {query[:100]}...") return False, error_msg # 3. Verificação de comandos de modificação (se não permitidos) if not allow_modifications: modification_commands = cls._check_modification_commands(cleaned_query) if modification_commands: error_msg = f"COMANDOS DE MODIFICAÇÃO DETECTADOS: {', '.join(modification_commands)}. Apenas comandos SELECT são permitidos." logger.warning(f"Tentativa de execução de comando de modificação bloqueada: {modification_commands}") return False, error_msg # 4. Verificação de comandos condicionais (UNION, etc.) conditional_commands = cls._check_conditional_commands(cleaned_query) if conditional_commands: # UNION e similares podem ser perigosos se mal usados, então vamos ser mais cautelosos logger.warning(f"Comandos condicionais detectados: {conditional_commands}") # Por segurança, vamos bloquear por padrão - pode ser relaxado se necessário error_msg = f"COMANDOS CONDICIONAIS DETECTADOS: {', '.join(conditional_commands)}. Bloqueado por precaução de segurança." return False, error_msg # 5. Verificação de whitelist (apenas comandos permitidos) first_command = cls._get_first_command(cleaned_query) if first_command and first_command not in cls.ALLOWED_COMMANDS: if not allow_modifications or first_command in cls.DANGEROUS_COMMANDS: error_msg = f"COMANDO NÃO PERMITIDO: '{first_command}'. Apenas comandos SELECT e WITH são permitidos." logger.warning(f"Comando não permitido detectado: {first_command}") return False, error_msg # Query passou em todas as validações logger.info(f"Query validada com sucesso: {query[:100]}...") return True, "Query validada com sucesso - segura para execução" @classmethod def _clean_query(cls, query: str) -> str: """Remove comentários e normaliza a query.""" # Remove comentários de linha query = re.sub(r'--.*$', '', query, flags=re.MULTILINE) # Remove comentários de bloco query = re.sub(r'/\*.*?\*/', '', query, flags=re.DOTALL) # Normaliza espaços em branco query = ' '.join(query.split()) return query.strip() @classmethod def _check_dangerous_commands(cls, query: str) -> List[str]: """Verifica se a query contém comandos perigosos.""" query_upper = query.upper() found_dangerous = [] for dangerous_cmd in cls.DANGEROUS_COMMANDS: # Verifica se o comando aparece como palavra completa pattern = r'\b' + re.escape(dangerous_cmd.upper()) + r'\b' if re.search(pattern, query_upper): found_dangerous.append(dangerous_cmd) return found_dangerous @classmethod def _check_dangerous_patterns(cls, query: str) -> List[str]: """Verifica se a query contém padrões perigosos.""" found_patterns = [] for pattern in cls.DANGEROUS_PATTERNS: if re.search(pattern, query, re.IGNORECASE): found_patterns.append(pattern) return found_patterns @classmethod def _check_modification_commands(cls, query: str) -> List[str]: """Verifica se a query contém comandos de modificação.""" modification_commands = {'INSERT', 'UPDATE', 'DELETE', 'MERGE'} query_upper = query.upper() found_modifications = [] for mod_cmd in modification_commands: pattern = r'\b' + re.escape(mod_cmd) + r'\b' if re.search(pattern, query_upper): found_modifications.append(mod_cmd) return found_modifications @classmethod def _check_conditional_commands(cls, query: str) -> List[str]: """Verifica se a query contém comandos condicionais que podem ser perigosos.""" query_upper = query.upper() found_conditional = [] for cond_cmd in cls.CONDITIONAL_COMMANDS: pattern = r'\b' + re.escape(cond_cmd) + r'\b' if re.search(pattern, query_upper): found_conditional.append(cond_cmd) return found_conditional @classmethod def _get_first_command(cls, query: str) -> str: """Extrai o primeiro comando SQL da query.""" query_upper = query.upper().strip() if not query_upper: return "" # Extrai a primeira palavra que parece ser um comando SQL match = re.match(r'^(\w+)', query_upper) if match: return match.group(1) return "" def validate_sql_security(query: str, allow_modifications: bool = False) -> None: """ Função utilitária para validar segurança SQL. Levanta SQLSecurityError se a query for considerada insegura. Args: query: Query SQL a ser validada allow_modifications: Se True, permite comandos de modificação Raises: SQLSecurityError: Se a query for considerada insegura """ is_safe, message = SQLSecurityValidator.validate_query(query, allow_modifications) if not is_safe: raise SQLSecurityError(message) logger.info(f"Validação de segurança passou: {message}") def get_security_report(query: str) -> Dict[str, Any]: """ Gera um relatório detalhado de segurança para uma query. Args: query: Query SQL a ser analisada Returns: Dict com informações detalhadas sobre a segurança da query """ is_safe, message = SQLSecurityValidator.validate_query(query, allow_modifications=False) cleaned_query = SQLSecurityValidator._clean_query(query) dangerous_commands = SQLSecurityValidator._check_dangerous_commands(cleaned_query) dangerous_patterns = SQLSecurityValidator._check_dangerous_patterns(cleaned_query) modification_commands = SQLSecurityValidator._check_modification_commands(cleaned_query) first_command = SQLSecurityValidator._get_first_command(cleaned_query) return { "is_safe": is_safe, "message": message, "first_command": first_command, "dangerous_commands": dangerous_commands, "dangerous_patterns": dangerous_patterns, "modification_commands": modification_commands, "cleaned_query": cleaned_query[:200] + "..." if len(cleaned_query) > 200 else cleaned_query, "original_query": query[:200] + "..." if len(query) > 200 else query }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fean-developer/mcp-databases'

If you have feedback or need assistance with the MCP directory API, please join our Discord server