Skip to main content
Glama
test_config.py58.1 kB
"""Tests for configuration loading and credential resolution.""" import os import tempfile import pytest import yaml from mcp_ssh.config import ( MAX_ENV_FILE_SIZE, MAX_KEY_PATH_LENGTH, MAX_SECRET_NAME_LENGTH, MAX_YAML_FILE_SIZE, Config, _load_env_file, _load_yaml, _log_security_event, _resolve_key_path, _resolve_secret, _validate_file_path, ) @pytest.fixture def temp_config_dir(): """Create temporary config directory with test files.""" with tempfile.TemporaryDirectory() as tmpdir: # Create test servers.yml servers = { "hosts": [ { "alias": "test1", "host": "10.0.0.1", "port": 22, "credentials": "cred1", "tags": ["web", "prod"], }, { "alias": "test2", "host": "10.0.0.2", "port": 2222, "credentials": "cred2", "tags": ["db"], }, ] } with open(os.path.join(tmpdir, "servers.yml"), "w") as f: yaml.dump(servers, f) # Create test credentials.yml credentials = { "entries": [ { "name": "cred1", "username": "user1", "key_path": "id_ed25519", }, { "name": "cred2", "username": "user2", "password_secret": "db_password", }, ] } with open(os.path.join(tmpdir, "credentials.yml"), "w") as f: yaml.dump(credentials, f) # Create test policy.yml policy = { "rules": [ { "action": "allow", "aliases": ["*"], "commands": ["uptime*"], } ] } with open(os.path.join(tmpdir, "policy.yml"), "w") as f: yaml.dump(policy, f) yield tmpdir def test_config_loads_files(temp_config_dir): """Test that Config loads all YAML files correctly.""" config = Config(temp_config_dir) # Check hosts loaded host = config.get_host("test1") assert host["host"] == "10.0.0.1" assert host["port"] == 22 # Check credentials loaded creds = config.get_credentials("cred1") assert creds["username"] == "user1" # key_path is resolved to absolute path during loading assert "id_ed25519" in creds.get("key_path", "") # Check policy loaded pol = config.get_policy() assert len(pol.get("rules", [])) > 0 def test_get_host_not_found(temp_config_dir): """Test getting non-existent host.""" config = Config(temp_config_dir) # get_host raises ValueError when host not found with pytest.raises(ValueError, match="Host alias not found"): config.get_host("nonexistent") def test_get_credentials_not_found(temp_config_dir): """Test getting non-existent credentials.""" config = Config(temp_config_dir) creds = config.get_credentials("nonexistent") assert creds == {} def test_get_credentials_key_passphrase_secret_docker_mcp(monkeypatch, temp_config_dir): """Test get_credentials resolves key_passphrase_secret via Docker MCP Gateway.""" # Update credentials.yml creds_path = os.path.join(temp_config_dir, "credentials.yml") credentials = { "entries": [ { "name": "test_cred", "username": "testuser", "key_path": "id_ed25519", "key_passphrase_secret": "SSH_KEY_PASSPHRASE_01", } ] } with open(creds_path, "w") as f: yaml.dump(credentials, f) # Set Docker MCP Gateway env var monkeypatch.setenv("SSH_KEY_PASSPHRASE_01", "my-passphrase") config = Config(temp_config_dir) creds = config.get_credentials("test_cred") assert creds["passphrase"] == "my-passphrase" def test_get_credentials_password_secret_docker_mcp(monkeypatch, temp_config_dir): """Test get_credentials resolves password_secret via Docker MCP Gateway.""" # Update credentials.yml creds_path = os.path.join(temp_config_dir, "credentials.yml") credentials = { "entries": [ { "name": "test_cred", "username": "testuser", "password_secret": "SSH_PASSWORD_SECRET_01", } ] } with open(creds_path, "w") as f: yaml.dump(credentials, f) # Set Docker MCP Gateway env var monkeypatch.setenv("SSH_PASSWORD_SECRET_01", "my-password") config = Config(temp_config_dir) creds = config.get_credentials("test_cred") assert creds["password"] == "my-password" def test_get_credentials_both_secrets_docker_mcp(monkeypatch, temp_config_dir): """Test get_credentials resolves both key_passphrase_secret and password_secret.""" # Update credentials.yml creds_path = os.path.join(temp_config_dir, "credentials.yml") credentials = { "entries": [ { "name": "test_cred", "username": "testuser", "key_path": "id_ed25519", "key_passphrase_secret": "SSH_KEY_PASSPHRASE_01", "password_secret": "SSH_PASSWORD_SECRET_01", } ] } with open(creds_path, "w") as f: yaml.dump(credentials, f) # Set both Docker MCP Gateway env vars monkeypatch.setenv("SSH_KEY_PASSPHRASE_01", "my-passphrase") monkeypatch.setenv("SSH_PASSWORD_SECRET_01", "my-password") config = Config(temp_config_dir) creds = config.get_credentials("test_cred") assert creds["passphrase"] == "my-passphrase" assert creds["password"] == "my-password" def test_get_credentials_all_secrets_docker_mcp(monkeypatch, temp_config_dir): """Test get_credentials resolves all 10 secrets (5 passphrase + 5 password).""" # Create credentials.yml with all secrets creds_path = os.path.join(temp_config_dir, "credentials.yml") entries = [] # 5 entries with passphrase secrets for i in range(1, 6): entries.append( { "name": f"passphrase_cred_{i}", "username": "testuser", "key_path": "id_ed25519", "key_passphrase_secret": f"SSH_KEY_PASSPHRASE_{i:02d}", } ) monkeypatch.setenv(f"SSH_KEY_PASSPHRASE_{i:02d}", f"passphrase-{i}") # 5 entries with password secrets for i in range(1, 6): entries.append( { "name": f"password_cred_{i}", "username": "testuser", "password_secret": f"SSH_PASSWORD_SECRET_{i:02d}", } ) monkeypatch.setenv(f"SSH_PASSWORD_SECRET_{i:02d}", f"password-{i}") credentials = {"entries": entries} with open(creds_path, "w") as f: yaml.dump(credentials, f) config = Config(temp_config_dir) # Verify all passphrase secrets for i in range(1, 6): creds = config.get_credentials(f"passphrase_cred_{i}") assert creds["passphrase"] == f"passphrase-{i}" # Verify all password secrets for i in range(1, 6): creds = config.get_credentials(f"password_cred_{i}") assert creds["password"] == f"password-{i}" def test_get_host_tags(temp_config_dir): """Test getting tags for a host.""" config = Config(temp_config_dir) tags = config.get_host_tags("test1") assert "web" in tags assert "prod" in tags def test_list_hosts(temp_config_dir): """Test listing all hosts.""" config = Config(temp_config_dir) hosts = config.list_hosts() assert len(hosts) == 2 assert "test1" in hosts assert "test2" in hosts def test_find_hosts_by_tag(temp_config_dir): """Test finding hosts by tag.""" config = Config(temp_config_dir) hosts = config.find_hosts_by_tag("web") assert "test1" in hosts assert "test2" not in hosts def test_reload_config(temp_config_dir): """Test reloading configuration.""" config = Config(temp_config_dir) # Modify servers.yml servers_path = os.path.join(temp_config_dir, "servers.yml") new_servers = { "hosts": [ { "alias": "newhost", "host": "10.0.0.3", "port": 22, "credentials": "cred1", } ] } with open(servers_path, "w") as f: yaml.dump(new_servers, f) # Reload config.reload() # Check new host loaded host = config.get_host("newhost") assert host["host"] == "10.0.0.3" def test_resolve_secret_from_env(monkeypatch): """Test resolving secret from environment variable.""" monkeypatch.setenv("MCP_SSH_SECRET_TEST_SECRET", "env-secret-value") result = _resolve_secret("test_secret") assert result == "env-secret-value" def test_resolve_secret_from_direct_env_var(monkeypatch): """Test resolving secret from direct environment variable (Docker MCP Gateway pattern).""" monkeypatch.setenv("SSH_KEY_PASSPHRASE_01", "direct-env-secret-value") result = _resolve_secret("SSH_KEY_PASSPHRASE_01") assert result == "direct-env-secret-value" def test_resolve_secret_from_prefixed_env_var(monkeypatch): """Test resolving secret from prefixed environment variable (standalone pattern).""" monkeypatch.setenv( "MCP_SSH_SECRET_SSH_KEY_PASSPHRASE_01", "prefixed-env-secret-value" ) result = _resolve_secret("SSH_KEY_PASSPHRASE_01") assert result == "prefixed-env-secret-value" def test_resolve_secret_priority_direct_env_over_prefixed(monkeypatch): """Test that direct env var takes precedence over prefixed env var.""" monkeypatch.setenv("SSH_KEY_PASSPHRASE_01", "direct-value") monkeypatch.setenv("MCP_SSH_SECRET_SSH_KEY_PASSPHRASE_01", "prefixed-value") result = _resolve_secret("SSH_KEY_PASSPHRASE_01") assert result == "direct-value" # Direct env var should win def test_resolve_secret_priority_env_over_file(monkeypatch): """Test that environment variable takes precedence over file.""" with tempfile.TemporaryDirectory() as tmpdir: secret_file = os.path.join(tmpdir, "SSH_KEY_PASSPHRASE_01") with open(secret_file, "w") as f: f.write("file-value") monkeypatch.setenv("SSH_KEY_PASSPHRASE_01", "env-value") result = _resolve_secret("SSH_KEY_PASSPHRASE_01", secrets_dir=tmpdir) assert result == "env-value" # Env var should win def test_resolve_secret_all_passphrase_secrets_docker_mcp(monkeypatch): """Test resolving all 5 passphrase secrets via Docker MCP Gateway pattern.""" for i in range(1, 6): secret_name = f"SSH_KEY_PASSPHRASE_{i:02d}" secret_value = f"passphrase-secret-{i}" monkeypatch.setenv(secret_name, secret_value) result = _resolve_secret(secret_name) assert result == secret_value def test_resolve_secret_all_password_secrets_docker_mcp(monkeypatch): """Test resolving all 5 password secrets via Docker MCP Gateway pattern.""" for i in range(1, 6): secret_name = f"SSH_PASSWORD_SECRET_{i:02d}" secret_value = f"password-secret-{i}" monkeypatch.setenv(secret_name, secret_value) result = _resolve_secret(secret_name) assert result == secret_value def test_resolve_secret_mixed_modes(monkeypatch): """Test resolving secrets in mixed mode (some direct, some prefixed).""" # Direct env var (Docker MCP Gateway) monkeypatch.setenv("SSH_KEY_PASSPHRASE_01", "direct-value") result1 = _resolve_secret("SSH_KEY_PASSPHRASE_01") assert result1 == "direct-value" # Prefixed env var (standalone) monkeypatch.setenv("MCP_SSH_SECRET_SSH_KEY_PASSPHRASE_02", "prefixed-value") result2 = _resolve_secret("SSH_KEY_PASSPHRASE_02") assert result2 == "prefixed-value" def test_resolve_secret_case_sensitivity(monkeypatch): """Test that secret resolution is case-insensitive for env var lookup.""" monkeypatch.setenv("SSH_KEY_PASSPHRASE_01", "uppercase-env") # Secret name in credentials.yml might be lowercase result = _resolve_secret("ssh_key_passphrase_01") assert result == "uppercase-env" # Should find it via .upper() def test_resolve_secret_from_file(): """Test resolving secret from file.""" with tempfile.TemporaryDirectory() as tmpdir: secret_file = os.path.join(tmpdir, "test_secret") with open(secret_file, "w") as f: f.write("file-secret-value\n") result = _resolve_secret("test_secret", secrets_dir=tmpdir) assert result == "file-secret-value" def test_resolve_secret_not_found(): """Test resolving non-existent secret.""" with tempfile.TemporaryDirectory() as tmpdir: result = _resolve_secret("nonexistent", secrets_dir=tmpdir) assert result == "" def test_resolve_secret_path_traversal_forward_slash(): """Test that path traversal with ../ is blocked.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a secret file secret_file = os.path.join(tmpdir, "valid_secret") with open(secret_file, "w") as f: f.write("valid-secret") # Create a file outside the secrets directory parent_dir = os.path.dirname(tmpdir) outside_file = os.path.join(parent_dir, "outside_secret") with open(outside_file, "w") as f: f.write("outside-secret") # Try to access it via path traversal result = _resolve_secret("../outside_secret", secrets_dir=tmpdir) assert result == "" # Clean up os.remove(outside_file) def test_resolve_secret_path_traversal_backslash(): """Test that path traversal with ..\\ is blocked.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a secret file secret_file = os.path.join(tmpdir, "valid_secret") with open(secret_file, "w") as f: f.write("valid-secret") # Try path traversal with backslash (Windows style) # Note: os.path.normpath will normalize this on any OS result = _resolve_secret("..\\outside_secret", secrets_dir=tmpdir) assert result == "" def test_resolve_secret_absolute_path_rejection(): """Test that absolute paths are rejected for secrets.""" with tempfile.TemporaryDirectory() as tmpdir: # Create secret file secret_file = os.path.join(tmpdir, "valid_secret") with open(secret_file, "w") as f: f.write("valid-secret") # Try absolute path (should be rejected) result = _resolve_secret(os.path.abspath(secret_file), secrets_dir=tmpdir) assert result == "" def test_resolve_secret_special_characters_rejection(): """Test that special characters in secret names are rejected.""" invalid_names = [ "../secret", "../../etc/passwd", "secret/../file", "secret@name", "secret#name", "secret$name", "secret%name", "secret&name", "secret*name", "secret+name", "secret=name", "secret|name", "secret<name", "secret>name", "secret?name", "secret:name", "secret;name", "secret'name", 'secret"name', "secret\nname", "secret\tname", "secret name", # space ] for invalid_name in invalid_names: result = _resolve_secret(invalid_name, secrets_dir="/app/secrets") assert result == "", f"Should reject: {invalid_name}" def test_resolve_secret_valid_characters_allowed(): """Test that valid characters in secret names are allowed.""" with tempfile.TemporaryDirectory() as tmpdir: # Valid secret names valid_names = [ "secret123", "SECRET_NAME", "secret-name", "secret_name", "SecretName123", "123secret", "a", "A1", ] for valid_name in valid_names: secret_file = os.path.join(tmpdir, valid_name) with open(secret_file, "w") as f: f.write(f"content-{valid_name}") result = _resolve_secret(valid_name, secrets_dir=tmpdir) assert result == f"content-{valid_name}", f"Should allow: {valid_name}" def test_resolve_secret_multiple_traversal_attempts(): """Test multiple levels of path traversal are blocked.""" with tempfile.TemporaryDirectory() as tmpdir: # Create nested structure nested_dir = os.path.join(tmpdir, "nested", "deep") os.makedirs(nested_dir, exist_ok=True) # Create file in nested dir nested_file = os.path.join(nested_dir, "nested_secret") with open(nested_file, "w") as f: f.write("nested-secret") # Try multiple traversal levels traversal_attempts = [ "../../outside", "../../../etc/passwd", "....//....//etc/passwd", ] for attempt in traversal_attempts: result = _resolve_secret(attempt, secrets_dir=nested_dir) assert result == "", f"Should block traversal: {attempt}" def test_resolve_secret_normal_file_access_still_works(): """Test that normal file access still works after security fixes.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a valid secret file secret_file = os.path.join(tmpdir, "valid_secret") with open(secret_file, "w") as f: f.write("valid-secret-content") # Should still work result = _resolve_secret("valid_secret", secrets_dir=tmpdir) assert result == "valid-secret-content" def test_resolve_key_path_relative(): """Test resolving relative key path.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a key file key_path = os.path.join(tmpdir, "id_ed25519") with open(key_path, "w") as f: f.write("valid-key") result = _resolve_key_path("id_ed25519", keys_dir=tmpdir) assert result == os.path.abspath(key_path) def test_resolve_key_path_absolute_within_keys_dir(): """Test resolving absolute key path within keys_dir.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a key file key_path = os.path.join(tmpdir, "id_ed25519") with open(key_path, "w") as f: f.write("valid-key") # Absolute path within keys_dir should work result = _resolve_key_path(key_path, keys_dir=tmpdir) assert result == os.path.abspath(key_path) def test_resolve_key_path_absolute_outside_keys_dir(): """Test that absolute key path outside keys_dir is rejected.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a key file outside keys_dir parent_dir = os.path.dirname(tmpdir) outside_key = os.path.join(parent_dir, "outside_key") with open(outside_key, "w") as f: f.write("outside-key") # Absolute path outside keys_dir should be rejected result = _resolve_key_path(outside_key, keys_dir=tmpdir) assert result == "" # Clean up os.remove(outside_key) def test_resolve_key_path_empty(): """Test resolving empty key path.""" path = _resolve_key_path("", keys_dir="/app/keys") assert path == "" def test_resolve_key_path_traversal_forward_slash(): """Test that path traversal with ../ is blocked.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a key file key_path = os.path.join(tmpdir, "valid_key") with open(key_path, "w") as f: f.write("valid-key") # Create a file outside the keys directory parent_dir = os.path.dirname(tmpdir) outside_file = os.path.join(parent_dir, "outside_key") with open(outside_file, "w") as f: f.write("outside-key") # Try to access it via path traversal result = _resolve_key_path("../outside_key", keys_dir=tmpdir) assert result == "" # Clean up os.remove(outside_file) def test_resolve_key_path_traversal_backslash(): """Test that path traversal with ..\\ is blocked on Windows.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a key file key_path = os.path.join(tmpdir, "valid_key") with open(key_path, "w") as f: f.write("valid-key") # Try path traversal with backslash (Windows style) # Note: os.path.normpath will normalize this on any OS result = _resolve_key_path("..\\outside_key", keys_dir=tmpdir) assert result == "" def test_resolve_key_path_multiple_traversal_attempts(): """Test multiple levels of path traversal are blocked.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a nested structure nested_dir = os.path.join(tmpdir, "nested", "deep") os.makedirs(nested_dir, exist_ok=True) # Create file in nested dir nested_file = os.path.join(nested_dir, "nested_key") with open(nested_file, "w") as f: f.write("nested-key") # Try multiple traversal levels traversal_attempts = [ "../../outside", "../../../etc/passwd", "....//....//etc/passwd", ] for attempt in traversal_attempts: result = _resolve_key_path(attempt, keys_dir=nested_dir) assert result == "", f"Should block traversal: {attempt}" def test_resolve_key_path_normal_file_access_still_works(): """Test that normal file access still works after security fixes.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a valid key file key_path = os.path.join(tmpdir, "valid_key") with open(key_path, "w") as f: f.write("valid-key-content") # Relative path should still work result = _resolve_key_path("valid_key", keys_dir=tmpdir) assert result == os.path.abspath(key_path) # Absolute path within keys_dir should still work result2 = _resolve_key_path(key_path, keys_dir=tmpdir) assert result2 == os.path.abspath(key_path) def test_resolve_key_path_absolute_same_as_keys_dir(): """Test that absolute path pointing to keys_dir itself is rejected (not a file).""" with tempfile.TemporaryDirectory() as tmpdir: # Absolute path to keys_dir itself should be rejected (it's a directory, not a file) result = _resolve_key_path(tmpdir, keys_dir=tmpdir) assert result == "" def test_validate_file_path_directory_rejection(): """Test that directory paths are rejected.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a subdirectory subdir = os.path.join(tmpdir, "subdir") os.makedirs(subdir, exist_ok=True) # Directory should be rejected result = _validate_file_path(subdir, tmpdir, require_exists=True) assert result is False def test_validate_file_path_symlink_rejection(): """Test that symlink paths are rejected.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a regular file target_file = os.path.join(tmpdir, "target") with open(target_file, "w") as f: f.write("target-content") # Create a symlink to it symlink_path = os.path.join(tmpdir, "symlink") os.symlink(target_file, symlink_path) # Symlink should be rejected result = _validate_file_path(symlink_path, tmpdir, require_exists=True) assert result is False def test_validate_file_path_outside_base_dir(): """Test that paths outside base directory are rejected.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a file outside the base directory parent_dir = os.path.dirname(tmpdir) outside_file = os.path.join(parent_dir, "outside_file") with open(outside_file, "w") as f: f.write("outside-content") # Path outside base_dir should be rejected result = _validate_file_path(outside_file, tmpdir, require_exists=True) assert result is False # Clean up os.remove(outside_file) def test_validate_file_path_regular_file_allowed(): """Test that regular files within base directory are allowed.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a regular file file_path = os.path.join(tmpdir, "regular_file") with open(file_path, "w") as f: f.write("file-content") # Regular file should be allowed result = _validate_file_path(file_path, tmpdir, require_exists=True) assert result is True def test_resolve_secret_directory_rejection(): """Test that directories are rejected when resolving secrets.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a subdirectory subdir = os.path.join(tmpdir, "subdir") os.makedirs(subdir, exist_ok=True) # Directory should be rejected result = _resolve_secret("subdir", secrets_dir=tmpdir) assert result == "" def test_resolve_secret_symlink_rejection(): """Test that symlinks are rejected when resolving secrets.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a regular file target_file = os.path.join(tmpdir, "target") with open(target_file, "w") as f: f.write("target-content") # Create a symlink to it symlink_path = os.path.join(tmpdir, "symlink") os.symlink(target_file, symlink_path) # Symlink should be rejected result = _resolve_secret("symlink", secrets_dir=tmpdir) assert result == "" def test_resolve_key_path_directory_rejection(): """Test that directories are rejected when resolving key paths.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a subdirectory subdir = os.path.join(tmpdir, "subdir") os.makedirs(subdir, exist_ok=True) # Directory should be rejected (relative path) result = _resolve_key_path("subdir", keys_dir=tmpdir) assert result == "" # Directory should be rejected (absolute path) result2 = _resolve_key_path(subdir, keys_dir=tmpdir) assert result2 == "" def test_resolve_key_path_symlink_rejection(): """Test that symlinks are rejected when resolving key paths.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a regular file target_file = os.path.join(tmpdir, "target") with open(target_file, "w") as f: f.write("target-content") # Create a symlink to it symlink_path = os.path.join(tmpdir, "symlink") os.symlink(target_file, symlink_path) # Symlink should be rejected (relative path) result = _resolve_key_path("symlink", keys_dir=tmpdir) assert result == "" # Symlink should be rejected (absolute path) result2 = _resolve_key_path(symlink_path, keys_dir=tmpdir) assert result2 == "" def test_resolve_secret_regular_file_still_works(): """Test that regular files still work after file validation.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a regular file file_path = os.path.join(tmpdir, "regular_secret") with open(file_path, "w") as f: f.write("regular-secret-content") # Should work result = _resolve_secret("regular_secret", secrets_dir=tmpdir) assert result == "regular-secret-content" def test_resolve_key_path_regular_file_still_works(): """Test that regular files still work after file validation.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a regular file file_path = os.path.join(tmpdir, "regular_key") with open(file_path, "w") as f: f.write("regular-key-content") # Should work with relative path result = _resolve_key_path("regular_key", keys_dir=tmpdir) assert result == os.path.abspath(file_path) # Should work with absolute path result2 = _resolve_key_path(file_path, keys_dir=tmpdir) assert result2 == os.path.abspath(file_path) def test_load_yaml_normal_size(): """Test that normal sized YAML files load correctly.""" with tempfile.TemporaryDirectory() as tmpdir: yaml_path = os.path.join(tmpdir, "test.yml") # Create a small YAML file (much smaller than limit) yaml_content = { "test": "data", "key": "value", "list": [1, 2, 3], } with open(yaml_path, "w") as f: yaml.dump(yaml_content, f) result = _load_yaml(yaml_path) assert result == yaml_content def test_load_yaml_oversized_file(): """Test that oversized YAML files are rejected.""" with tempfile.TemporaryDirectory() as tmpdir: yaml_path = os.path.join(tmpdir, "oversized.yml") # Create a file larger than MAX_YAML_FILE_SIZE # We'll create a file that's slightly larger than the limit oversized_size = MAX_YAML_FILE_SIZE + 1024 # 1KB over limit with open(yaml_path, "w") as f: # Write enough data to exceed the limit # YAML files are text, so we'll write a large string large_content = "key: " + "x" * oversized_size f.write(large_content) # Verify file size exceeds limit assert os.path.getsize(yaml_path) > MAX_YAML_FILE_SIZE # Should return empty dict and log security event result = _load_yaml(yaml_path) assert result == {} def test_load_yaml_at_size_limit(): """Test that YAML file at size limit is accepted.""" with tempfile.TemporaryDirectory() as tmpdir: yaml_path = os.path.join(tmpdir, "at_limit.yml") # Create a file exactly at the size limit yaml_content = { "test": "data", "large_field": "x" * (MAX_YAML_FILE_SIZE - 100), } with open(yaml_path, "w") as f: yaml.dump(yaml_content, f) # Adjust if file is slightly larger than expected current_size = os.path.getsize(yaml_path) if current_size > MAX_YAML_FILE_SIZE: # Reduce content to fit yaml_content = { "test": "data", "large_field": "x" * (MAX_YAML_FILE_SIZE - 200), } with open(yaml_path, "w") as f: yaml.dump(yaml_content, f) # File at limit should be accepted result = _load_yaml(yaml_path) assert "test" in result def test_load_yaml_missing_file(): """Test that missing YAML file returns empty dict.""" result = _load_yaml("/nonexistent/file.yml") assert result == {} def test_load_yaml_invalid_yaml(): """Test that invalid YAML returns empty dict.""" with tempfile.TemporaryDirectory() as tmpdir: yaml_path = os.path.join(tmpdir, "invalid.yml") with open(yaml_path, "w") as f: f.write("invalid: yaml: content: [unclosed") # Should return empty dict on parse error result = _load_yaml(yaml_path) assert result == {} def test_resolve_secret_name_length_at_limit(): """Test that secret name at length limit is accepted.""" with tempfile.TemporaryDirectory() as tmpdir: # Create secret file with max length name secret_name = "a" * MAX_SECRET_NAME_LENGTH secret_file = os.path.join(tmpdir, secret_name) with open(secret_file, "w") as f: f.write("secret-content") # Should work at limit result = _resolve_secret(secret_name, secrets_dir=tmpdir) assert result == "secret-content" def test_resolve_secret_name_length_exceeds_limit(): """Test that secret name exceeding length limit is rejected.""" with tempfile.TemporaryDirectory() as tmpdir: # Secret name exceeding limit secret_name = "a" * (MAX_SECRET_NAME_LENGTH + 1) # Should be rejected result = _resolve_secret(secret_name, secrets_dir=tmpdir) assert result == "" def test_resolve_secret_name_length_under_limit(): """Test that secret name under limit is accepted.""" with tempfile.TemporaryDirectory() as tmpdir: # Secret name well under limit secret_name = "normal_secret" secret_file = os.path.join(tmpdir, secret_name) with open(secret_file, "w") as f: f.write("secret-content") # Should work result = _resolve_secret(secret_name, secrets_dir=tmpdir) assert result == "secret-content" def test_resolve_key_path_length_at_limit(): """Test that key path at length limit is accepted.""" with tempfile.TemporaryDirectory() as tmpdir: # Create key file with path at limit (but reasonable for filesystem) # Filesystem has limits, so use a reasonable but still long path # Use nested path to test long path handling without exceeding filesystem limits max_reasonable_length = min( MAX_KEY_PATH_LENGTH, 200 ) # Stay within filesystem limits key_name = "a" * max_reasonable_length key_path = os.path.join(tmpdir, key_name) with open(key_path, "w") as f: f.write("key-content") # Should work at limit (use relative path) relative_path = key_name result = _resolve_key_path(relative_path, keys_dir=tmpdir) assert result == os.path.abspath(key_path) def test_resolve_key_path_length_exceeds_limit(): """Test that key path exceeding length limit is rejected.""" # Key path exceeding limit key_path = "a" * (MAX_KEY_PATH_LENGTH + 1) # Should be rejected result = _resolve_key_path(key_path, keys_dir="/app/keys") assert result == "" def test_resolve_key_path_length_under_limit(): """Test that key path under limit is accepted.""" with tempfile.TemporaryDirectory() as tmpdir: # Key path well under limit key_name = "normal_key" key_path = os.path.join(tmpdir, key_name) with open(key_path, "w") as f: f.write("key-content") # Should work result = _resolve_key_path("normal_key", keys_dir=tmpdir) assert result == os.path.abspath(key_path) def test_resolve_secret_length_validation_before_character_validation(): """Test that length validation happens before character validation.""" # Secret name that's too long but also has invalid characters secret_name = "a" * (MAX_SECRET_NAME_LENGTH + 1) + "!" # Should be rejected for length first result = _resolve_secret(secret_name, secrets_dir="/app/secrets") assert result == "" def test_resolve_key_path_length_validation_before_traversal_check(): """Test that length validation happens before traversal check.""" # Key path that's too long and contains traversal key_path = "a" * (MAX_KEY_PATH_LENGTH + 1) + "../" # Should be rejected for length first result = _resolve_key_path(key_path, keys_dir="/app/keys") assert result == "" def test_log_security_event_format(): """Test that security audit events are logged in valid JSON format.""" import json import sys from io import StringIO # Capture stderr old_stderr = sys.stderr sys.stderr = StringIO() try: _log_security_event( event_type="path_traversal_attempt", attempted_path="../etc/passwd", resolved_path="/app/secrets/../etc/passwd", reason="path_outside_allowed_directory", additional_data={"base_dir": "/app/secrets"}, ) output = sys.stderr.getvalue() assert output, "Should have logged to stderr" # Parse as JSON data = json.loads(output.strip()) assert data["level"] == "error" assert data["kind"] == "security_audit" assert data["type"] == "security_event" assert data["event_type"] == "path_traversal_attempt" assert "ts" in data assert "timestamp" in data assert data["attempted_path"] == "../etc/passwd" assert data["resolved_path"] == "/app/secrets/../etc/passwd" assert data["reason"] == "path_outside_allowed_directory" assert data["base_dir"] == "/app/secrets" finally: sys.stderr = old_stderr def test_log_security_event_minimal(): """Test that security audit events work with minimal parameters.""" import json import sys from io import StringIO # Capture stderr old_stderr = sys.stderr sys.stderr = StringIO() try: _log_security_event(event_type="test_event") output = sys.stderr.getvalue() assert output, "Should have logged to stderr" # Parse as JSON data = json.loads(output.strip()) assert data["event_type"] == "test_event" assert "ts" in data assert "timestamp" in data # Optional fields should not be present if not provided assert "attempted_path" not in data assert "resolved_path" not in data assert "reason" not in data finally: sys.stderr = old_stderr def test_audit_logging_path_traversal_attempt(): """Test that path traversal attempts are logged via audit logging.""" import json import sys from io import StringIO # Capture stderr old_stderr = sys.stderr sys.stderr = StringIO() try: with tempfile.TemporaryDirectory() as tmpdir: # Create a file outside the secrets directory parent_dir = os.path.dirname(tmpdir) outside_file = os.path.join(parent_dir, "outside_secret") with open(outside_file, "w") as f: f.write("outside-secret") # Trigger path traversal attempt with valid secret name that has traversal # Use a secret name that passes character validation but triggers path traversal secret_name = "valid_secret" # Valid characters secret_file = os.path.join(tmpdir, secret_name) with open(secret_file, "w") as f: f.write("valid-content") # Now try to access outside file using key_path with traversal # Use key_path which allows path traversal attempts result = _resolve_key_path("../outside_secret", keys_dir=tmpdir) # Should be rejected assert result == "" # Check audit log was written output = sys.stderr.getvalue() assert output, "Should have logged path traversal attempt" # Find the audit log entry lines = output.strip().split("\n") audit_line = None for line in lines: if line.strip(): try: data = json.loads(line) if data.get("event_type") == "path_traversal_attempt": audit_line = line break except json.JSONDecodeError: continue assert audit_line is not None, "Should have security audit log entry" data = json.loads(audit_line) assert data["event_type"] == "path_traversal_attempt" assert "attempted_path" in data assert "reason" in data # Clean up os.remove(outside_file) finally: sys.stderr = old_stderr def test_audit_logging_invalid_file_access(): """Test that invalid file access attempts are logged via audit logging.""" import json import sys from io import StringIO # Capture stderr old_stderr = sys.stderr sys.stderr = StringIO() try: with tempfile.TemporaryDirectory() as tmpdir: # Create a directory (not a file) subdir = os.path.join(tmpdir, "subdir") os.makedirs(subdir, exist_ok=True) # Try to resolve directory as secret (should fail) result = _resolve_secret("subdir", secrets_dir=tmpdir) # Should be rejected assert result == "" # Check audit log was written output = sys.stderr.getvalue() assert output, "Should have logged invalid file access" # Find the audit log entry for file validation failure lines = output.strip().split("\n") audit_line = None for line in lines: if line.strip(): try: data = json.loads(line) if data.get("event_type") == "file_validation_failed": audit_line = line break except json.JSONDecodeError: continue assert audit_line is not None, "Should have security audit log entry" data = json.loads(audit_line) assert data["event_type"] == "file_validation_failed" assert "attempted_path" in data assert "reason" in data finally: sys.stderr = old_stderr def test_audit_logging_oversized_file(): """Test that oversized file attempts are logged via audit logging.""" import json import sys from io import StringIO # Capture stderr old_stderr = sys.stderr sys.stderr = StringIO() try: with tempfile.TemporaryDirectory() as tmpdir: yaml_path = os.path.join(tmpdir, "oversized.yml") # Create oversized file oversized_size = MAX_YAML_FILE_SIZE + 1024 with open(yaml_path, "w") as f: large_content = "key: " + "x" * oversized_size f.write(large_content) # Try to load oversized file result = _load_yaml(yaml_path) # Should return empty dict assert result == {} # Check audit log was written output = sys.stderr.getvalue() assert output, "Should have logged oversized file attempt" # Find the audit log entry lines = output.strip().split("\n") audit_line = None for line in lines: if line.strip(): try: data = json.loads(line) if data.get("event_type") == "file_size_limit_exceeded": audit_line = line break except json.JSONDecodeError: continue assert audit_line is not None, "Should have security audit log entry" data = json.loads(audit_line) assert data["event_type"] == "file_size_limit_exceeded" assert "attempted_path" in data assert "file_size" in data assert "max_size" in data assert data["file_size"] > data["max_size"] finally: sys.stderr = old_stderr def test_audit_logging_does_not_break_functionality(): """Test that audit logging failures don't break functionality.""" from unittest.mock import patch # Mock stderr.write to raise exception with patch("sys.stderr.write", side_effect=Exception("Logging failed")): # Should still function normally (logging errors are silent) with tempfile.TemporaryDirectory() as tmpdir: secret_file = os.path.join(tmpdir, "valid_secret") with open(secret_file, "w") as f: f.write("secret-content") # Should still work despite logging failure result = _resolve_secret("valid_secret", secrets_dir=tmpdir) assert result == "secret-content" def test_audit_logging_includes_timestamp(): """Test that audit logs include both Unix timestamp and ISO format.""" import json import sys import time from io import StringIO # Capture stderr old_stderr = sys.stderr sys.stderr = StringIO() try: before = time.time() _log_security_event(event_type="test_event") after = time.time() output = sys.stderr.getvalue() data = json.loads(output.strip()) # Check Unix timestamp assert "ts" in data assert isinstance(data["ts"], int | float) assert before <= data["ts"] <= after # Check ISO timestamp assert "timestamp" in data assert isinstance(data["timestamp"], str) # Should be in format YYYY-MM-DDTHH:MM:SS+offset assert "T" in data["timestamp"] finally: sys.stderr = old_stderr # ============================================================================ # Tests for .env file loading # ============================================================================ def test_load_env_file_basic(): """Test basic .env file parsing with KEY=value format.""" with tempfile.TemporaryDirectory() as tmpdir: env_file = os.path.join(tmpdir, ".env") with open(env_file, "w") as f: f.write("KEY1=value1\n") f.write("KEY2=value2\n") f.write("KEY3=value3\n") result = _load_env_file(tmpdir) assert result == {"KEY1": "value1", "KEY2": "value2", "KEY3": "value3"} def test_load_env_file_with_comments(): """Test .env file parsing with comments and empty lines.""" with tempfile.TemporaryDirectory() as tmpdir: env_file = os.path.join(tmpdir, ".env") with open(env_file, "w") as f: f.write("# This is a comment\n") f.write("KEY1=value1\n") f.write("\n") # Empty line f.write("# Another comment\n") f.write("KEY2=value2\n") result = _load_env_file(tmpdir) assert result == {"KEY1": "value1", "KEY2": "value2"} def test_load_env_file_with_quotes(): """Test .env file parsing with quoted values.""" with tempfile.TemporaryDirectory() as tmpdir: env_file = os.path.join(tmpdir, ".env") with open(env_file, "w") as f: f.write('KEY1="value with spaces"\n') f.write("KEY2='single quoted value'\n") f.write("KEY3=unquoted_value\n") result = _load_env_file(tmpdir) assert result == { "KEY1": "value with spaces", "KEY2": "single quoted value", "KEY3": "unquoted_value", } def test_load_env_file_with_equals_in_value(): """Test .env file parsing with values containing = characters.""" with tempfile.TemporaryDirectory() as tmpdir: env_file = os.path.join(tmpdir, ".env") with open(env_file, "w") as f: f.write("KEY1=value1=value2=value3\n") f.write("KEY2=key=value&another=value\n") result = _load_env_file(tmpdir) assert result == { "KEY1": "value1=value2=value3", "KEY2": "key=value&another=value", } def test_load_env_file_missing(): """Test that missing .env file returns empty dict.""" with tempfile.TemporaryDirectory() as tmpdir: result = _load_env_file(tmpdir) assert result == {} def test_load_env_file_oversized(): """Test that oversized .env file is rejected.""" with tempfile.TemporaryDirectory() as tmpdir: env_file = os.path.join(tmpdir, ".env") # Create a file larger than MAX_ENV_FILE_SIZE oversized_size = MAX_ENV_FILE_SIZE + 1024 with open(env_file, "w") as f: f.write("KEY=" + "x" * oversized_size) result = _load_env_file(tmpdir) assert result == {} def test_load_env_file_permissive_permissions(): """Test that permissive permissions are logged but file still loads.""" import sys from io import StringIO with tempfile.TemporaryDirectory() as tmpdir: env_file = os.path.join(tmpdir, ".env") with open(env_file, "w") as f: f.write("KEY1=value1\n") # Set permissive permissions (644 - group and other can read) os.chmod(env_file, 0o644) # Capture stderr old_stderr = sys.stderr sys.stderr = StringIO() try: result = _load_env_file(tmpdir) # File should still load assert result == {"KEY1": "value1"} # Check that security event was logged output = sys.stderr.getvalue() assert "insecure_file_permissions" in output or output == "" finally: sys.stderr = old_stderr # Restore permissions for cleanup os.chmod(env_file, 0o600) def test_load_env_file_symlink_rejection(): """Test that symlinks are rejected when loading .env file.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a regular file target_file = os.path.join(tmpdir, "target") with open(target_file, "w") as f: f.write("KEY1=value1\n") # Create a symlink to it symlink_path = os.path.join(tmpdir, ".env") os.symlink(target_file, symlink_path) # Symlink should be rejected result = _load_env_file(tmpdir) assert result == {} def test_load_env_file_directory_rejection(): """Test that directories are rejected when loading .env file.""" with tempfile.TemporaryDirectory() as tmpdir: # Create a directory named .env (shouldn't happen, but test it) env_dir = os.path.join(tmpdir, ".env") os.makedirs(env_dir, exist_ok=True) # Directory should be rejected result = _load_env_file(tmpdir) assert result == {} def test_load_env_file_strips_whitespace(): """Test that keys and values are stripped of whitespace.""" with tempfile.TemporaryDirectory() as tmpdir: env_file = os.path.join(tmpdir, ".env") with open(env_file, "w") as f: f.write(" KEY1 = value1 \n") f.write("KEY2=value2\n") result = _load_env_file(tmpdir) assert result == {"KEY1": "value1", "KEY2": "value2"} def test_load_env_file_caching(): """Test that .env file is cached after first load.""" with tempfile.TemporaryDirectory() as tmpdir: env_file = os.path.join(tmpdir, ".env") with open(env_file, "w") as f: f.write("KEY1=value1\n") # First load result1 = _load_env_file(tmpdir) assert result1 == {"KEY1": "value1"} # Modify file with open(env_file, "w") as f: f.write("KEY1=modified_value\n") # Second load should return cached value result2 = _load_env_file(tmpdir) assert result2 == {"KEY1": "value1"} # Cached value, not modified def test_load_env_file_no_cache_on_missing_file(): """Test that missing .env file is not cached, allowing later file creation.""" with tempfile.TemporaryDirectory() as tmpdir: env_file = os.path.join(tmpdir, ".env") # First call: file doesn't exist result1 = _load_env_file(tmpdir) assert result1 == {} # Create file after first call with open(env_file, "w") as f: f.write("KEY1=value1\n") # Second call: should load the newly created file (not cached empty result) result2 = _load_env_file(tmpdir) assert result2 == {"KEY1": "value1"} # ============================================================================ # Tests for secret resolution with .env file # ============================================================================ def test_resolve_secret_from_env_file(): """Test resolving secret from .env file.""" with tempfile.TemporaryDirectory() as tmpdir: env_file = os.path.join(tmpdir, ".env") with open(env_file, "w") as f: f.write("TEST_SECRET=env-file-value\n") result = _resolve_secret("TEST_SECRET", secrets_dir=tmpdir) assert result == "env-file-value" def test_resolve_secret_priority_env_over_env_file(monkeypatch): """Test that environment variable takes precedence over .env file.""" with tempfile.TemporaryDirectory() as tmpdir: env_file = os.path.join(tmpdir, ".env") with open(env_file, "w") as f: f.write("TEST_SECRET=env-file-value\n") # Set environment variable monkeypatch.setenv("TEST_SECRET", "env-var-value") result = _resolve_secret("TEST_SECRET", secrets_dir=tmpdir) assert result == "env-var-value" # Env var should win def test_resolve_secret_priority_env_file_over_individual_file(): """Test that .env file takes precedence over individual files.""" with tempfile.TemporaryDirectory() as tmpdir: # Create .env file env_file = os.path.join(tmpdir, ".env") with open(env_file, "w") as f: f.write("TEST_SECRET=env-file-value\n") # Create individual secret file secret_file = os.path.join(tmpdir, "TEST_SECRET") with open(secret_file, "w") as f: f.write("individual-file-value\n") result = _resolve_secret("TEST_SECRET", secrets_dir=tmpdir) assert result == "env-file-value" # .env file should win def test_resolve_secret_case_insensitive_env_file(): """Test case-insensitive key lookup in .env file.""" with tempfile.TemporaryDirectory() as tmpdir: env_file = os.path.join(tmpdir, ".env") with open(env_file, "w") as f: f.write("TEST_SECRET=uppercase-key-value\n") # Try lowercase secret name result = _resolve_secret("test_secret", secrets_dir=tmpdir) assert result == "uppercase-key-value" def test_resolve_secret_env_file_with_special_characters(): """Test .env file values with special characters.""" with tempfile.TemporaryDirectory() as tmpdir: env_file = os.path.join(tmpdir, ".env") with open(env_file, "w") as f: f.write('TEST_SECRET="value with spaces and !@#$%^&*()"\n') result = _resolve_secret("TEST_SECRET", secrets_dir=tmpdir) assert result == "value with spaces and !@#$%^&*()" def test_resolve_secret_mixed_sources(monkeypatch): """Test resolving secrets from mixed sources.""" with tempfile.TemporaryDirectory() as tmpdir: # Create .env file env_file = os.path.join(tmpdir, ".env") with open(env_file, "w") as f: f.write("ENV_FILE_SECRET=env-file-value\n") f.write("INDIVIDUAL_FILE_SECRET=should-not-be-used\n") # Create individual secret file secret_file = os.path.join(tmpdir, "INDIVIDUAL_FILE_SECRET") with open(secret_file, "w") as f: f.write("individual-file-value\n") # Set environment variable monkeypatch.setenv("ENV_VAR_SECRET", "env-var-value") # Test env var (highest priority) result1 = _resolve_secret("ENV_VAR_SECRET", secrets_dir=tmpdir) assert result1 == "env-var-value" # Test .env file (medium priority) result2 = _resolve_secret("ENV_FILE_SECRET", secrets_dir=tmpdir) assert result2 == "env-file-value" # Test individual file (lowest priority, but .env takes precedence) result3 = _resolve_secret("INDIVIDUAL_FILE_SECRET", secrets_dir=tmpdir) assert result3 == "should-not-be-used" # .env file value, not individual file # ============================================================================ # Integration tests for credential resolution using .env file # ============================================================================ def test_get_credentials_with_env_file(temp_config_dir): """Test full credential resolution using .env file.""" import tempfile # Create credentials.yml with secret reference creds_path = os.path.join(temp_config_dir, "credentials.yml") credentials = { "entries": [ { "name": "test_cred", "username": "testuser", "key_path": "id_ed25519", "key_passphrase_secret": "SSH_KEY_PASSPHRASE_01", "password_secret": "SSH_PASSWORD_SECRET_01", } ] } with open(creds_path, "w") as f: yaml.dump(credentials, f) # Create .env file with secrets with tempfile.TemporaryDirectory() as secrets_dir: env_file = os.path.join(secrets_dir, ".env") with open(env_file, "w") as f: f.write("SSH_KEY_PASSPHRASE_01=my-passphrase\n") f.write("SSH_PASSWORD_SECRET_01=my-password\n") config = Config(temp_config_dir, secrets_dir=secrets_dir) creds = config.get_credentials("test_cred") assert creds["passphrase"] == "my-passphrase" assert creds["password"] == "my-password" def test_get_credentials_env_file_fallback(temp_config_dir): """Test fallback to individual files if secret not in .env file.""" import tempfile # Create credentials.yml creds_path = os.path.join(temp_config_dir, "credentials.yml") credentials = { "entries": [ { "name": "test_cred", "username": "testuser", "password_secret": "INDIVIDUAL_FILE_SECRET", } ] } with open(creds_path, "w") as f: yaml.dump(credentials, f) # Create .env file (without the secret) with tempfile.TemporaryDirectory() as secrets_dir: env_file = os.path.join(secrets_dir, ".env") with open(env_file, "w") as f: f.write("OTHER_SECRET=other-value\n") # Create individual secret file secret_file = os.path.join(secrets_dir, "INDIVIDUAL_FILE_SECRET") with open(secret_file, "w") as f: f.write("individual-file-value\n") config = Config(temp_config_dir, secrets_dir=secrets_dir) creds = config.get_credentials("test_cred") assert ( creds["password"] == "individual-file-value" ) # Fallback to individual file

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/samerfarida/mcp-ssh-orchestrator'

If you have feedback or need assistance with the MCP directory API, please join our Discord server