Skip to main content
Glama

OpenSCAD MCP Server

by quellant
test_performance.pyโ€ข15.7 kB
""" Performance tests for OpenSCAD MCP Server. Tests performance characteristics including: - Parser performance - Response size estimation speed - Large data handling - Concurrent operations """ import pytest import time import json import sys from pathlib import Path from typing import Any, Dict, List from unittest.mock import Mock, patch import asyncio # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent / 'src')) from openscad_mcp.server import ( parse_list_param, parse_dict_param, parse_image_size_param, estimate_response_size, manage_response_size, compress_base64_image, save_image_to_file ) @pytest.mark.performance class TestPerformance: """Performance tests for critical functions.""" def test_parse_list_param_performance(self): """Test parse_list_param completes within acceptable time.""" test_cases = [ '["item1", "item2", "item3", "item4", "item5"]', "item1,item2,item3,item4,item5", ["item1", "item2", "item3", "item4", "item5"], "single_item" ] start = time.perf_counter() for _ in range(1000): for test_case in test_cases: parse_list_param(test_case, []) elapsed_ms = (time.perf_counter() - start) * 1000 # Should complete 4000 iterations in under 100ms assert elapsed_ms < 100, f"Parser took {elapsed_ms}ms, expected < 100ms" # Calculate operations per second ops_per_second = (1000 * len(test_cases)) / (elapsed_ms / 1000) print(f"Parse list param: {ops_per_second:.0f} ops/sec") def test_parse_dict_param_performance(self): """Test parse_dict_param completes within acceptable time.""" test_cases = [ '{"key1": "value1", "key2": 123, "key3": true}', "key1=value1,key2=123,key3=true", {"key1": "value1", "key2": 123, "key3": True} ] start = time.perf_counter() for _ in range(1000): for test_case in test_cases: parse_dict_param(test_case, {}) elapsed_ms = (time.perf_counter() - start) * 1000 # Should complete 3000 iterations in under 150ms assert elapsed_ms < 150, f"Parser took {elapsed_ms}ms, expected < 150ms" ops_per_second = (1000 * len(test_cases)) / (elapsed_ms / 1000) print(f"Parse dict param: {ops_per_second:.0f} ops/sec") def test_parse_image_size_performance(self): """Test parse_image_size_param with various formats.""" test_cases = [ [800, 600], "800x600", "800,600", (800, 600), "[800, 600]" ] start = time.perf_counter() for _ in range(1000): for test_case in test_cases: parse_image_size_param(test_case, []) elapsed_ms = (time.perf_counter() - start) * 1000 # Should complete 5000 iterations in under 50ms assert elapsed_ms < 50, f"Parser took {elapsed_ms}ms, expected < 50ms" ops_per_second = (1000 * len(test_cases)) / (elapsed_ms / 1000) print(f"Parse image size: {ops_per_second:.0f} ops/sec") def test_response_size_estimation_performance(self): """Test speed of response size estimation.""" # Small data small_data = {"image": "A" * 100} start = time.perf_counter() for _ in range(10000): estimate_response_size(small_data) elapsed_ms = (time.perf_counter() - start) * 1000 assert elapsed_ms < 50, f"Small estimation took {elapsed_ms}ms" # Medium data medium_data = { "images": {"view_" + str(i): "A" * 1000 for i in range(10)} } start = time.perf_counter() for _ in range(1000): estimate_response_size(medium_data) elapsed_ms = (time.perf_counter() - start) * 1000 assert elapsed_ms < 100, f"Medium estimation took {elapsed_ms}ms" # Large data large_data = { "images": {"view_" + str(i): "A" * 10000 for i in range(20)}, "metadata": {"info": "data" * 100} } start = time.perf_counter() for _ in range(100): estimate_response_size(large_data) elapsed_ms = (time.perf_counter() - start) * 1000 assert elapsed_ms < 200, f"Large estimation took {elapsed_ms}ms" @pytest.mark.slow def test_large_batch_parsing(self, performance_test_data): """Test parsing performance with large datasets.""" large_list = performance_test_data["large_list"] large_dict = performance_test_data["large_dict"] # Convert to JSON strings large_list_json = json.dumps(large_list) large_dict_json = json.dumps(large_dict) # Test large list parsing start = time.perf_counter() result = parse_list_param(large_list_json, []) list_time = (time.perf_counter() - start) * 1000 assert len(result) == len(large_list) assert list_time < 100, f"Large list parsing took {list_time}ms" # Test large dict parsing start = time.perf_counter() result = parse_dict_param(large_dict_json, {}) dict_time = (time.perf_counter() - start) * 1000 assert len(result) == len(large_dict) assert dict_time < 150, f"Large dict parsing took {dict_time}ms" @patch('openscad_mcp.server.save_image_to_file') def test_manage_response_size_performance(self, mock_save, performance_test_data): """Test response management with many images.""" mock_save.return_value = "/tmp/test.png" many_images = performance_test_data["many_images"] # Auto mode performance start = time.perf_counter() result = manage_response_size(many_images, output_format="auto", max_size=1000) auto_time = (time.perf_counter() - start) * 1000 assert auto_time < 500, f"Auto mode took {auto_time}ms for {len(many_images)} images" # File path mode performance start = time.perf_counter() result = manage_response_size(many_images, output_format="file_path") file_time = (time.perf_counter() - start) * 1000 assert file_time < 200, f"File mode took {file_time}ms" def test_concurrent_parsing(self): """Test thread safety of parsing functions.""" import concurrent.futures import threading # Test data test_lists = [f'["item{i}", "item{i+1}"]' for i in range(100)] test_dicts = [f'{{"key{i}": {i}, "key{i+1}": {i+1}}}' for i in range(100)] results = [] lock = threading.Lock() def parse_list_worker(data): result = parse_list_param(data, []) with lock: results.append(("list", result)) def parse_dict_worker(data): result = parse_dict_param(data, {}) with lock: results.append(("dict", result)) start = time.perf_counter() with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: # Submit all tasks list_futures = [ executor.submit(parse_list_worker, data) for data in test_lists ] dict_futures = [ executor.submit(parse_dict_worker, data) for data in test_dicts ] # Wait for completion concurrent.futures.wait(list_futures + dict_futures) elapsed = time.perf_counter() - start assert len(results) == 200 assert elapsed < 1.0, f"Concurrent parsing took {elapsed}s" # Verify correctness list_results = [r for t, r in results if t == "list"] dict_results = [r for t, r in results if t == "dict"] assert len(list_results) == 100 assert len(dict_results) == 100 @pytest.mark.performance class TestMemoryEfficiency: """Test memory efficiency of operations.""" def test_large_image_handling_memory(self, large_base64_image, temp_test_dir): """Test memory efficiency when handling large images.""" import tracemalloc # Start memory tracking tracemalloc.start() baseline = tracemalloc.get_traced_memory()[0] # Process large image for _ in range(10): save_image_to_file(large_base64_image, f"test_{_}.png", temp_test_dir) peak = tracemalloc.get_traced_memory()[1] tracemalloc.stop() # Memory usage should be reasonable (< 50MB for 10 large images) memory_used_mb = (peak - baseline) / 1024 / 1024 assert memory_used_mb < 50, f"Used {memory_used_mb:.2f}MB, expected < 50MB" @patch('openscad_mcp.server.Image') def test_compression_memory_efficiency(self, mock_image_class): """Test memory efficiency of compression.""" import tracemalloc import io # Mock PIL Image mock_image = Mock() mock_image_class.open.return_value = mock_image def save_side_effect(buffer, **kwargs): # Simulate saving compressed data buffer.write(b"compressed_data" * 100) mock_image.save.side_effect = save_side_effect # Large image data large_data = "A" * 100000 test_image = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJ" + large_data tracemalloc.start() baseline = tracemalloc.get_traced_memory()[0] # Compress multiple times for _ in range(5): compress_base64_image(test_image, quality=85) peak = tracemalloc.get_traced_memory()[1] tracemalloc.stop() memory_used_mb = (peak - baseline) / 1024 / 1024 assert memory_used_mb < 20, f"Compression used {memory_used_mb:.2f}MB" @pytest.mark.performance class TestScalability: """Test scalability with increasing load.""" def test_parsing_scalability(self): """Test how parsing performance scales with input size.""" sizes = [10, 50, 100, 500, 1000] times = [] for size in sizes: # Generate test data test_list = list(range(size)) test_json = json.dumps(test_list) # Measure parsing time start = time.perf_counter() for _ in range(100): parse_list_param(test_json, []) elapsed = time.perf_counter() - start times.append(elapsed) print(f"Size {size}: {elapsed:.4f}s") # Check that time increases sub-linearly (good scalability) # Time should not increase more than O(n log n) for i in range(1, len(sizes)): size_ratio = sizes[i] / sizes[i-1] time_ratio = times[i] / times[i-1] # Allow for some variance but ensure sub-quadratic growth assert time_ratio < size_ratio * 2, \ f"Poor scalability: {size_ratio}x size -> {time_ratio}x time" def test_response_size_estimation_scalability(self): """Test estimation performance with increasing data size.""" sizes = [100, 1000, 10000, 50000] times = [] for size in sizes: data = {"data": "A" * size} start = time.perf_counter() for _ in range(1000): estimate_response_size(data) elapsed = time.perf_counter() - start times.append(elapsed) print(f"Data size {size}: {elapsed:.4f}s for 1000 estimations") # Linear scalability is acceptable for estimation for i in range(1, len(sizes)): size_ratio = sizes[i] / sizes[i-1] time_ratio = times[i] / times[i-1] assert time_ratio < size_ratio * 1.5, \ f"Poor scalability at size {sizes[i]}" @pytest.mark.performance @pytest.mark.asyncio class TestAsyncPerformance: """Test performance of async operations.""" async def test_concurrent_renders(self): """Test concurrent render operations.""" from openscad_mcp.server import render_single with patch('openscad_mcp.server.render_scad_to_png') as mock_render: mock_render.return_value = "base64imagedata" # Create multiple render tasks tasks = [] for i in range(10): task = render_single( scad_content=f"cube({i+10});", camera_position=[i*10, i*10, i*10] ) tasks.append(task) # Run concurrently start = time.perf_counter() results = await asyncio.gather(*tasks) elapsed = time.perf_counter() - start assert len(results) == 10 assert all(r["success"] for r in results) assert elapsed < 1.0, f"Concurrent renders took {elapsed}s" async def test_render_queue_performance(self): """Test performance with queued renders.""" from openscad_mcp.server import render_single with patch('openscad_mcp.server.render_scad_to_png') as mock_render: # Simulate varying render times mock_render.side_effect = lambda *args: "base64data" tasks = [] views = ["front", "top", "isometric", "left", "right"] for i, view in enumerate(views): task = render_single( scad_content=f"sphere({i*5});", view=view ) tasks.append(task) start = time.perf_counter() results = await asyncio.gather(*tasks) elapsed = time.perf_counter() - start assert len(results) == 5 # Should handle 5 single renders efficiently assert elapsed < 2.0, f"Queue processing took {elapsed}s" def benchmark_all_parsers(): """Benchmark all parser functions.""" results = {} iterations = 10000 # Benchmark parse_list_param test_list = '["item1", "item2", "item3"]' start = time.perf_counter() for _ in range(iterations): parse_list_param(test_list, []) results["parse_list_param"] = (time.perf_counter() - start) * 1000 # Benchmark parse_dict_param test_dict = '{"key1": "value1", "key2": 123}' start = time.perf_counter() for _ in range(iterations): parse_dict_param(test_dict, {}) results["parse_dict_param"] = (time.perf_counter() - start) * 1000 # Benchmark parse_image_size_param test_size = "800x600" start = time.perf_counter() for _ in range(iterations): parse_image_size_param(test_size, []) results["parse_image_size_param"] = (time.perf_counter() - start) * 1000 # Print results print("\n=== Parser Benchmarks ===") print(f"Iterations: {iterations}") for func, time_ms in results.items(): ops_per_sec = iterations / (time_ms / 1000) print(f"{func}: {time_ms:.2f}ms total, {ops_per_sec:.0f} ops/sec") return results if __name__ == "__main__": # Run benchmarks when executed directly benchmark_all_parsers()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/quellant/openscad-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server