Skip to main content
Glama
workloads.py10.9 kB
""" Workload definitions for load testing the MCP server. Defines realistic operation mixes and individual operation functions. """ import logging import random import time import uuid from mcp import ClientSession logger = logging.getLogger(__name__) class OperationResult: """Result of a single operation execution.""" def __init__( self, operation: str, success: bool, duration: float, error: str | None = None, ): self.operation = operation self.success = success self.duration = duration self.error = error self.timestamp = time.time() class WorkloadOperations: """Collection of MCP operations for load testing.""" def __init__(self, session: ClientSession): self.session = session self._created_notes: list[int] = [] self._created_boards: list[int] = [] async def get_capabilities(self) -> OperationResult: """Fetch server capabilities (lightweight operation).""" start = time.time() try: await self.session.read_resource("nc://capabilities") duration = time.time() - start return OperationResult("get_capabilities", True, duration) except Exception as e: duration = time.time() - start return OperationResult("get_capabilities", False, duration, str(e)) async def list_notes(self) -> OperationResult: """List all notes (read operation).""" start = time.time() try: await self.session.call_tool("nc_notes_search_notes", {"query": ""}) duration = time.time() - start return OperationResult("list_notes", True, duration) except Exception as e: duration = time.time() - start return OperationResult("list_notes", False, duration, str(e)) async def search_notes(self, query: str = "test") -> OperationResult: """Search notes by query (read operation with filtering).""" start = time.time() try: await self.session.call_tool("nc_notes_search_notes", {"query": query}) duration = time.time() - start return OperationResult("search_notes", True, duration) except Exception as e: duration = time.time() - start return OperationResult("search_notes", False, duration, str(e)) async def create_note(self) -> OperationResult: """Create a new note (write operation).""" start = time.time() unique_id = uuid.uuid4().hex[:8] try: result = await self.session.call_tool( "nc_notes_create_note", { "title": f"Load Test Note {unique_id}", "content": f"Content for load test note {unique_id}", "category": "LoadTesting", }, ) duration = time.time() - start # Track created note ID for cleanup if result and len(result.content) > 0: content = result.content[0] if hasattr(content, "text"): import json note_data = json.loads(content.text) note_id = note_data.get("id") if note_id: self._created_notes.append(note_id) return OperationResult("create_note", True, duration) except Exception as e: duration = time.time() - start return OperationResult("create_note", False, duration, str(e)) async def get_note(self, note_id: int) -> OperationResult: """Get a specific note by ID (read operation).""" start = time.time() try: await self.session.call_tool("nc_notes_get_note", {"note_id": note_id}) duration = time.time() - start return OperationResult("get_note", True, duration) except Exception as e: duration = time.time() - start return OperationResult("get_note", False, duration, str(e)) async def update_note(self, note_id: int, etag: str) -> OperationResult: """Update an existing note (write operation).""" start = time.time() try: await self.session.call_tool( "nc_notes_update_note", { "note_id": note_id, "etag": etag, "title": f"Updated Note {note_id}", "content": f"Updated content at {time.time()}", "category": "LoadTesting", }, ) duration = time.time() - start return OperationResult("update_note", True, duration) except Exception as e: duration = time.time() - start return OperationResult("update_note", False, duration, str(e)) async def delete_note(self, note_id: int) -> OperationResult: """Delete a note (write operation).""" start = time.time() try: await self.session.call_tool("nc_notes_delete_note", {"note_id": note_id}) duration = time.time() - start # Remove from tracking if note_id in self._created_notes: self._created_notes.remove(note_id) return OperationResult("delete_note", True, duration) except Exception as e: duration = time.time() - start return OperationResult("delete_note", False, duration, str(e)) async def list_webdav_files(self, path: str = "/") -> OperationResult: """List files via WebDAV (read operation).""" start = time.time() try: await self.session.call_tool("nc_webdav_list", {"path": path}) duration = time.time() - start return OperationResult("list_webdav_files", True, duration) except Exception as e: duration = time.time() - start return OperationResult("list_webdav_files", False, duration, str(e)) async def list_calendars(self) -> OperationResult: """List calendars (read operation).""" start = time.time() try: await self.session.call_tool("nc_calendar_list_calendars", {}) duration = time.time() - start return OperationResult("list_calendars", True, duration) except Exception as e: duration = time.time() - start return OperationResult("list_calendars", False, duration, str(e)) async def list_deck_boards(self) -> OperationResult: """List deck boards (read operation).""" start = time.time() try: await self.session.call_tool("nc_deck_list_boards", {}) duration = time.time() - start return OperationResult("list_deck_boards", True, duration) except Exception as e: duration = time.time() - start return OperationResult("list_deck_boards", False, duration, str(e)) async def cleanup(self): """Clean up any resources created during testing.""" logger.info(f"Cleaning up {len(self._created_notes)} test notes...") for note_id in self._created_notes[:]: try: await self.delete_note(note_id) except Exception as e: logger.warning(f"Failed to delete note {note_id}: {e}") class MixedWorkload: """ Realistic mixed workload simulating typical MCP server usage. Operation distribution: - 40% Notes read (list/get/search) - 20% Notes write (create/update/delete) - 15% Notes search - 10% WebDAV operations - 10% Calendar operations - 5% Other (capabilities, deck) """ def __init__(self, operations: WorkloadOperations): self.ops = operations # Pre-create some notes for read/update operations self._warmup_note_ids: list[tuple[int, str]] = [] async def warmup(self, count: int = 10): """Create initial notes for read/update operations.""" logger.info(f"Warming up with {count} test notes...") for _ in range(count): result = await self.ops.create_note() if result.success and self.ops._created_notes: note_id = self.ops._created_notes[-1] # Get the note to fetch its etag try: get_result = await self.ops.session.call_tool( "nc_notes_get_note", {"note_id": note_id} ) if get_result and len(get_result.content) > 0: import json note_data = json.loads(get_result.content[0].text) etag = note_data.get("etag", "") self._warmup_note_ids.append((note_id, etag)) except Exception as e: logger.warning(f"Failed to get etag for note {note_id}: {e}") async def run_operation(self) -> OperationResult: """Execute one random operation based on the workload distribution.""" rand = random.random() # 40% reads (list/get/search) if rand < 0.40: op_rand = random.random() if op_rand < 0.5: return await self.ops.list_notes() elif op_rand < 0.8 and self._warmup_note_ids: note_id, _ = random.choice(self._warmup_note_ids) return await self.ops.get_note(note_id) else: return await self.ops.search_notes() # 20% writes (create/update/delete) elif rand < 0.60: op_rand = random.random() if op_rand < 0.5: return await self.ops.create_note() elif op_rand < 0.8 and self._warmup_note_ids: note_id, etag = random.choice(self._warmup_note_ids) return await self.ops.update_note(note_id, etag) elif self.ops._created_notes and len(self.ops._created_notes) > 5: # Only delete if we have enough notes note_id = random.choice(self.ops._created_notes) return await self.ops.delete_note(note_id) else: return await self.ops.create_note() # 15% search elif rand < 0.75: queries = ["test", "load", "note", "content", ""] return await self.ops.search_notes(random.choice(queries)) # 10% WebDAV elif rand < 0.85: return await self.ops.list_webdav_files() # 10% Calendar elif rand < 0.95: return await self.ops.list_calendars() # 5% Other else: op_rand = random.random() if op_rand < 0.5: return await self.ops.get_capabilities() else: return await self.ops.list_deck_boards()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/No-Smoke/nextcloud-mcp-comprehensive'

If you have feedback or need assistance with the MCP directory API, please join our Discord server