Skip to main content
Glama
test_pool_limits.py7.38 kB
"""Tests for connection pool size limits.""" from unittest.mock import AsyncMock, MagicMock, patch import pytest from scout_mcp.services.pool import ConnectionPool @pytest.fixture(autouse=True) def mock_known_hosts(monkeypatch): """Disable host key verification for all tests.""" monkeypatch.setenv("SCOUT_KNOWN_HOSTS", "none") class TestPoolSizeLimits: """Test pool size limiting and LRU eviction.""" @pytest.fixture def small_pool(self) -> ConnectionPool: """Create a small pool for testing eviction.""" return ConnectionPool(idle_timeout=60, max_size=2, known_hosts=None) def make_host(self, name: str) -> MagicMock: """Create a mock SSH host.""" host = MagicMock() host.name = name host.hostname = f"{name}.local" host.port = 22 host.user = "test" host.identity_file = None return host @pytest.mark.asyncio async def test_evicts_lru_when_full(self, small_pool: ConnectionPool) -> None: """Pool evicts LRU connection when at capacity.""" hosts = [self.make_host(f"host{i}") for i in range(3)] with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect: mock_conn = MagicMock() mock_conn.is_closed = False mock_connect.return_value = mock_conn # Fill pool await small_pool.get_connection(hosts[0]) await small_pool.get_connection(hosts[1]) assert small_pool.pool_size == 2 # Third should evict first (LRU) await small_pool.get_connection(hosts[2]) assert small_pool.pool_size == 2 assert "host0" not in small_pool.active_hosts assert "host1" in small_pool.active_hosts assert "host2" in small_pool.active_hosts @pytest.mark.asyncio async def test_reuse_updates_lru_order(self, small_pool: ConnectionPool) -> None: """Reusing connection updates LRU order.""" hosts = [self.make_host(f"host{i}") for i in range(3)] with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect: mock_conn = MagicMock() mock_conn.is_closed = False mock_connect.return_value = mock_conn # Fill pool await small_pool.get_connection(hosts[0]) await small_pool.get_connection(hosts[1]) # Reuse host0 (moves to end) await small_pool.get_connection(hosts[0]) # Third should evict host1 (now LRU) await small_pool.get_connection(hosts[2]) assert "host0" in small_pool.active_hosts assert "host1" not in small_pool.active_hosts assert "host2" in small_pool.active_hosts @pytest.mark.asyncio async def test_pool_never_exceeds_max_size(self, small_pool: ConnectionPool) -> None: """Pool size never exceeds max_size.""" hosts = [self.make_host(f"host{i}") for i in range(10)] with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect: mock_conn = MagicMock() mock_conn.is_closed = False mock_connect.return_value = mock_conn for host in hosts: await small_pool.get_connection(host) # Should never exceed max_size=2 assert small_pool.pool_size == 2 @pytest.mark.asyncio async def test_eviction_closes_connection(self, small_pool: ConnectionPool) -> None: """Evicted connections are properly closed.""" hosts = [self.make_host(f"host{i}") for i in range(3)] with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect: mock_conns = [MagicMock(is_closed=False) for _ in range(3)] mock_connect.side_effect = mock_conns # Fill pool await small_pool.get_connection(hosts[0]) await small_pool.get_connection(hosts[1]) # Third connection should evict first await small_pool.get_connection(hosts[2]) # Verify first connection was closed mock_conns[0].close.assert_called_once() @pytest.mark.asyncio async def test_max_size_with_default_value(self) -> None: """Pool respects default max_size.""" pool = ConnectionPool(idle_timeout=60, known_hosts=None) assert pool.max_size == 100 @pytest.mark.asyncio async def test_max_size_with_custom_value(self) -> None: """Pool respects custom max_size.""" pool = ConnectionPool(idle_timeout=60, max_size=50, known_hosts=None) assert pool.max_size == 50 @pytest.mark.asyncio async def test_lru_order_maintained_across_multiple_reuses( self, small_pool: ConnectionPool ) -> None: """LRU order correctly maintained with complex access patterns.""" hosts = [self.make_host(f"host{i}") for i in range(3)] with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect: mock_conn = MagicMock() mock_conn.is_closed = False mock_connect.return_value = mock_conn # Fill pool: host0, host1 await small_pool.get_connection(hosts[0]) await small_pool.get_connection(hosts[1]) # Reuse host0 (order: host1, host0) await small_pool.get_connection(hosts[0]) # Reuse host1 (order: host0, host1) await small_pool.get_connection(hosts[1]) # Add host2, should evict host0 (LRU) await small_pool.get_connection(hosts[2]) assert "host0" not in small_pool.active_hosts assert "host1" in small_pool.active_hosts assert "host2" in small_pool.active_hosts @pytest.mark.asyncio async def test_eviction_with_stale_connection( self, small_pool: ConnectionPool ) -> None: """Eviction works when replacing stale connection.""" hosts = [self.make_host(f"host{i}") for i in range(3)] with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect: # Need 4 connections: host0, host1, host0 (replacement), host2 mock_conns = [MagicMock(is_closed=False) for _ in range(4)] mock_connect.side_effect = mock_conns # Fill pool await small_pool.get_connection(hosts[0]) await small_pool.get_connection(hosts[1]) # Mark first connection as stale mock_conns[0].is_closed = True # Try to reuse stale connection, then add new await small_pool.get_connection(hosts[0]) await small_pool.get_connection(hosts[2]) # Should have evicted the LRU (host1) assert "host0" in small_pool.active_hosts assert "host1" not in small_pool.active_hosts assert "host2" in small_pool.active_hosts def test_max_size_zero_raises_error(self) -> None: """Pool rejects max_size=0.""" with pytest.raises(ValueError, match="max_size must be > 0"): ConnectionPool(idle_timeout=60, max_size=0, known_hosts=None) def test_max_size_negative_raises_error(self) -> None: """Pool rejects negative max_size.""" with pytest.raises(ValueError, match="max_size must be > 0"): ConnectionPool(idle_timeout=60, max_size=-1, known_hosts=None)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmagar/scout_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server