Skip to main content
Glama
test_cache.py10.3 kB
"""Comprehensive cache behavior and expiration tests.""" from __future__ import annotations import asyncio import time import httpx import pytest from src.cache import CacheEntry, TTLCache from src.client import FourGetClient from src.config import Config pytestmark = pytest.mark.asyncio class TestTTLCache: """Test cache implementation in isolation.""" async def test_cache_entry_expiration(self) -> None: """Test CacheEntry expiration logic.""" entry = CacheEntry(value='test', expires_at=time.monotonic() + 1.0) # Should not be expired initially assert not entry.expired() # Should not be expired with explicit current time now = time.monotonic() assert not entry.expired(now) # Should be expired in the future future_time = time.monotonic() + 2.0 assert entry.expired(future_time) async def test_cache_basic_operations(self) -> None: """Test basic cache set/get operations.""" cache = TTLCache(ttl_seconds=1.0, maxsize=3) # Initially empty assert await cache.get('key1') is None # Set and get await cache.set('key1', 'value1') assert await cache.get('key1') == 'value1' # Set multiple await cache.set('key2', 'value2') await cache.set('key3', 'value3') assert await cache.get('key2') == 'value2' assert await cache.get('key3') == 'value3' async def test_cache_ttl_expiration(self) -> None: """Test that cache entries expire after TTL.""" cache = TTLCache(ttl_seconds=0.1, maxsize=10) # Very short TTL await cache.set('key1', 'value1') assert await cache.get('key1') == 'value1' # Wait for expiration await asyncio.sleep(0.15) assert await cache.get('key1') is None async def test_cache_maxsize_eviction(self) -> None: """Test that cache evicts oldest entries when maxsize exceeded.""" cache = TTLCache(ttl_seconds=10.0, maxsize=2) # Long TTL, small size await cache.set('key1', 'value1') await cache.set('key2', 'value2') # Both should be present assert await cache.get('key1') == 'value1' assert await cache.get('key2') == 'value2' # Adding third should evict oldest (key1) await cache.set('key3', 'value3') assert await cache.get('key1') is None # Evicted assert await cache.get('key2') == 'value2' # Still present assert await cache.get('key3') == 'value3' # New entry async def test_cache_zero_ttl_disabled(self) -> None: """Test that zero TTL disables caching.""" cache = TTLCache(ttl_seconds=0.0, maxsize=10) await cache.set('key1', 'value1') # With zero TTL, nothing should be cached assert await cache.get('key1') is None async def test_cache_clear(self) -> None: """Test cache clearing.""" cache = TTLCache(ttl_seconds=10.0, maxsize=10) await cache.set('key1', 'value1') await cache.set('key2', 'value2') assert await cache.get('key1') == 'value1' assert await cache.get('key2') == 'value2' await cache.clear() assert await cache.get('key1') is None assert await cache.get('key2') is None async def test_cache_concurrent_access(self) -> None: """Test cache behavior under concurrent access.""" cache = TTLCache(ttl_seconds=1.0, maxsize=100) async def set_values(prefix: str) -> None: for i in range(10): await cache.set(f'{prefix}_{i}', f'value_{prefix}_{i}') async def get_values(prefix: str) -> list[str | None]: values = [] for i in range(10): values.append(await cache.get(f'{prefix}_{i}')) return values # Set values concurrently await asyncio.gather( set_values('a'), set_values('b'), set_values('c'), ) # Get values concurrently results = await asyncio.gather( get_values('a'), get_values('b'), get_values('c'), ) # All values should be present for result_set in results: assert len(result_set) == 10 assert all(v is not None for v in result_set) class TestClientCaching: """Test caching behavior in the client.""" @pytest.fixture def fast_expiry_config(self) -> Config: """Config with very short cache TTL for testing.""" return Config( base_url='https://example.test', cache_ttl=0.1, # Very short TTL cache_maxsize=10, max_retries=0, # No retries for faster tests ) @pytest.fixture def no_cache_config(self) -> Config: """Config with caching disabled.""" return Config( base_url='https://example.test', cache_ttl=0.0, # Disabled cache_maxsize=10, max_retries=0, ) async def test_cache_expiration_forces_new_request( self, mock_api, fast_expiry_config: Config ) -> None: """Test that expired cache entries trigger new API requests.""" api, transport = mock_api client = FourGetClient(fast_expiry_config, transport=transport) api.add_json('/api/v1/web', {'status': 'ok', 'results': ['first']}) # First request result1 = await client.web_search('test') assert result1['results'] == ['first'] assert len(api.calls) == 1 # Second request immediately (should use cache) result2 = await client.web_search('test') assert result2['results'] == ['first'] assert len(api.calls) == 1 # No new request # Wait for cache to expire await asyncio.sleep(0.15) # Update API response api.add_json('/api/v1/web', {'status': 'ok', 'results': ['second']}) # Third request after expiration (should make new request) result3 = await client.web_search('test') assert result3['results'] == ['second'] assert len(api.calls) == 2 # New request made async def test_disabled_cache_always_requests(self, mock_api, no_cache_config: Config) -> None: """Test that disabled cache always makes new requests.""" api, transport = mock_api client = FourGetClient(no_cache_config, transport=transport) api.add_json('/api/v1/web', {'status': 'ok', 'results': ['test']}) # Multiple identical requests await client.web_search('test') await client.web_search('test') await client.web_search('test') # All should result in API calls assert len(api.calls) == 3 async def test_cache_key_generation(self, mock_api, config: Config) -> None: """Test that different parameters generate different cache keys.""" api, transport = mock_api client = FourGetClient(config, transport=transport) # Different responses for different endpoints api.add_json('/api/v1/web', {'status': 'ok', 'type': 'web'}) api.add_json('/api/v1/images', {'status': 'ok', 'type': 'images'}) api.add_json('/api/v1/news', {'status': 'ok', 'type': 'news'}) # Different search types should not share cache web_result = await client.web_search('test') image_result = await client.image_search('test') news_result = await client.news_search('test') assert web_result['type'] == 'web' assert image_result['type'] == 'images' assert news_result['type'] == 'news' assert len(api.calls) == 3 # All different requests async def test_cache_with_different_parameters(self, mock_api, config: Config) -> None: """Test caching behavior with different search parameters.""" api, transport = mock_api client = FourGetClient(config, transport=transport) def response_handler(request): params = dict(request.url.params) return { 'status': 'ok', 'query': params.get('s', 'unknown'), 'extended': params.get('extendedsearch', 'false'), } api.add_responder( '/api/v1/web', lambda req: httpx.Response(200, json=response_handler(req)) ) # Different queries should have separate cache entries result1 = await client.web_search('query1') result2 = await client.web_search('query2') result3 = await client.web_search('query1') # Should use cache assert result1['query'] == 'query1' assert result2['query'] == 'query2' assert result3['query'] == 'query1' assert len(api.calls) == 2 # Third was cached async def test_cache_eviction_under_pressure(self, mock_api, config: Config) -> None: """Test cache behavior when maxsize is exceeded.""" # Create config with very small cache small_cache_config = Config( base_url=config.base_url, cache_ttl=60.0, # Long TTL cache_maxsize=2, # Very small cache max_retries=0, ) api, transport = mock_api client = FourGetClient(small_cache_config, transport=transport) def unique_response(request): params = dict(request.url.params) query = params.get('s', 'unknown') return httpx.Response(200, json={'status': 'ok', 'query': query}) api.add_responder('/api/v1/web', unique_response) # Fill cache beyond capacity await client.web_search('query1') # Cache: [query1] await client.web_search('query2') # Cache: [query1, query2] await client.web_search('query3') # Cache: [query2, query3] (query1 evicted) assert len(api.calls) == 3 # Requesting query1 again should require new API call (was evicted) await client.web_search('query1') # New request await client.web_search('query2') # Should be cache hit # query1 needed new request, query2 might also need new request due to eviction assert len(api.calls) >= 4 # At least query1 needed new request assert len(api.calls) <= 5 # query2 might also be evicted

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yshalsager/mcp-4get'

If you have feedback or need assistance with the MCP directory API, please join our Discord server