Skip to main content
Glama

MCP Ahrefs

by SAGAAIDEV
parallelize.py8.61 kB
"""Parallelization decorator for MCP tools - SAAGA Pattern. This decorator transforms function signatures to accept List[Dict] parameters for parallel execution following SAAGA standards: - Async-only pattern - Signature transformation: func(args) → func(kwargs_list: List[Dict]) - Batch processing support - Integration with SAAGA logging and error handling - Fail-fast behavior: if any item fails, the entire batch fails - Automatic type conversion for MCP string parameters Features: - Automatic parallelization of compatible tools - Signature transformation for batch processing - Concurrent execution with asyncio.gather - Fail-fast error handling (SAAGA standard) - Type validation for input parameters - Automatic type conversion from strings (MCP protocol compatibility) Type Conversion: Since MCP protocol sends all parameters as strings, this decorator automatically converts string parameters to their annotated types: - str → int: Numeric strings are converted to integers - str → float: Numeric strings are converted to floats - str → bool: "true", "1", "yes" (case-insensitive) → True, others → False - Other types: Passed through unchanged Usage: @parallelize async def batch_process_tool(item: str, count: int) -> str: # Tool implementation that can be parallelized return processed_item # After decoration, signature becomes: # async def batch_process_tool(kwargs_list: List[Dict]) -> List[str] # And handles {"item": "text", "count": "5"} by converting count to int """ import asyncio from functools import wraps from typing import Callable, Any, Awaitable, List, Dict, Union import logging import inspect logger = logging.getLogger(__name__) def _set_parallelized_signature_and_annotations( wrapper_func: Callable, param_name: str, param_annotation: Any, return_annotation: Any ): """Sets the __signature__ and __annotations__ for the wrapper function.""" new_param = inspect.Parameter( name=param_name, kind=inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=param_annotation ) new_sig = inspect.Signature( parameters=[new_param], return_annotation=return_annotation ) wrapper_func.__signature__ = new_sig wrapper_func.__annotations__ = { param_name: param_annotation, 'return': return_annotation } def _build_parallelized_docstring(func: Callable) -> str: """Constructs the docstring for the parallelized wrapper function.""" original_doc = func.__doc__.strip() if func.__doc__ else "No original docstring provided." func_name = func.__name__ sig = inspect.signature(func) params = [] for name, param in sig.parameters.items(): if param.annotation != inspect.Parameter.empty: # Get the type name instead of the full class representation type_name = param.annotation.__name__ if hasattr(param.annotation, '__name__') else str(param.annotation) params.append(f"{name}: {type_name}") else: params.append(name) params_str = ", ".join(params) return f"""Parallelized version of `{func_name}`. This function accepts a list of keyword argument dictionaries and executes `{func_name}` concurrently for each set of arguments. Original function signature: {func_name}({params_str}) Args: kwargs_list (List[Dict[str, Any]]): A list of dictionaries, where each dictionary provides the keyword arguments for a single call to `{func_name}`. Returns: List[Any]: A list containing the results of each call to `{func_name}`, in the same order as the input `kwargs_list`. Original docstring: {original_doc} """ def parallelize(func: Callable[..., Awaitable[Any]]) -> Callable[[List[Dict]], Awaitable[List[Any]]]: """Decorator to enable parallel execution of MCP tools - SAAGA Pattern. Transforms function signature from: async def func(param1: str, param2: int) -> str To: async def func(kwargs_list: List[Dict]) -> List[str] Uses fail-fast behavior: if any task fails, the entire batch fails immediately. This maintains SAAGA compatibility and prevents partial results. The decorator preserves the original function's parameter information for better introspection and documentation, while transforming the execution signature to accept batch parameters. Automatic Type Conversion: Since MCP protocol sends all parameters as strings, this decorator automatically converts string values to match the function's type annotations: - int: "123" → 123 - float: "12.34" → 12.34 - bool: "true"/"1"/"yes" → True (case-insensitive) Args: func: The async function to decorate Returns: The decorated function that accepts List[Dict] and returns List[results] Raises: TypeError: If kwargs_list is not a List[Dict] Exception: Any exception from the first failing task (fail-fast) """ # Store original function signature for introspection original_signature = inspect.signature(func) @wraps(func) async def wrapper(kwargs_list: List[Dict]) -> List[Any]: """Execute function in parallel for each kwargs dict.""" if not isinstance(kwargs_list, list): raise TypeError("Parallel tools require List[Dict] parameter") if not kwargs_list: logger.warning(f"Empty kwargs_list provided to {func.__name__}") return [] logger.info(f"Parallel execution of {func.__name__} with {len(kwargs_list)} items") # Validate all items are dictionaries for i, kwargs in enumerate(kwargs_list): if not isinstance(kwargs, dict): raise TypeError(f"Item {i} in kwargs_list must be a dict, got {type(kwargs).__name__}") # Execute all calls concurrently tasks = [] for i, kwargs in enumerate(kwargs_list): try: # Validate parameters against original function signature bound_args = original_signature.bind(**kwargs) bound_args.apply_defaults() # Convert string parameters to appropriate types based on annotations converted_kwargs = {} for param_name, param_value in bound_args.arguments.items(): param = original_signature.parameters.get(param_name) if param and param.annotation != inspect.Parameter.empty: # Try to convert the value to the expected type try: if param.annotation == int and isinstance(param_value, str): converted_kwargs[param_name] = int(param_value) elif param.annotation == float and isinstance(param_value, str): converted_kwargs[param_name] = float(param_value) elif param.annotation == bool and isinstance(param_value, str): converted_kwargs[param_name] = param_value.lower() in ('true', '1', 'yes') else: converted_kwargs[param_name] = param_value except (ValueError, TypeError): # If conversion fails, use original value and let the function handle it converted_kwargs[param_name] = param_value else: converted_kwargs[param_name] = param_value task = func(**converted_kwargs) tasks.append(task) except Exception as e: # If function call fails immediately, create a failed task async def failed_task(): raise e tasks.append(failed_task()) # Wait for all tasks to complete - SAAGA fail-fast behavior results = await asyncio.gather(*tasks) return results # Update the docstring and signature for the wrapper function wrapper.__doc__ = _build_parallelized_docstring(func) _set_parallelized_signature_and_annotations( wrapper_func=wrapper, param_name="kwargs_list", param_annotation=List[Dict[str, Any]], return_annotation=List[Any] ) return wrapper

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SAGAAIDEV/mcp-ahrefs'

If you have feedback or need assistance with the MCP directory API, please join our Discord server