Skip to main content
Glama
tools.py36.4 kB
import asyncio import contextlib import json import logging import os import subprocess import sys import tempfile from typing import Any, Dict, List, Optional, Union # Azure AI Evaluation Imports from azure.ai.evaluation import ( # Agent Converter BleuScoreEvaluator, CodeVulnerabilityEvaluator, CoherenceEvaluator, ContentSafetyEvaluator, F1ScoreEvaluator, FluencyEvaluator, # Text Evaluators GroundednessEvaluator, HateUnfairnessEvaluator, IndirectAttackEvaluator, # Agent Evaluators IntentResolutionEvaluator, MeteorScoreEvaluator, ProtectedMaterialEvaluator, QAEvaluator, RelevanceEvaluator, RetrievalEvaluator, RougeScoreEvaluator, SelfHarmEvaluator, SexualEvaluator, SimilarityEvaluator, TaskAdherenceEvaluator, ToolCallAccuracyEvaluator, UngroundedAttributesEvaluator, ViolenceEvaluator, evaluate, ) from azure.ai.projects.aio import AIProjectClient from azure.ai.agents.models import Agent, MessageRole, MessageTextContent # Azure Imports from azure.identity import DefaultAzureCredential from azure.identity.aio import DefaultAzureCredential as AsyncDefaultAzureCredential from dotenv import load_dotenv from mcp_foundry.mcp_server import mcp logger = logging.getLogger(__name__) # Configure PromptFlow logging to go to stderr def configure_promptflow_logging(): import logging promptflow_logger = logging.getLogger("promptflow") for handler in promptflow_logger.handlers: promptflow_logger.removeHandler(handler) handler = logging.StreamHandler(sys.stderr) handler.setLevel(logging.INFO) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") handler.setFormatter(formatter) promptflow_logger.addHandler(handler) promptflow_logger.propagate = False # Don't propagate to root logger # Call this function early in your script's execution configure_promptflow_logging() # Load environment variables load_dotenv() ####################### # CONFIGURATION SETUP # ####################### # Initialize Azure AI project and Azure OpenAI connection with environment variables try: # Sync credential for evaluations CREDENTIAL = DefaultAzureCredential() # Azure OpenAI model configuration MODEL_CONFIG = { "azure_endpoint": os.environ.get("AZURE_OPENAI_ENDPOINT"), "api_key": os.environ.get("AZURE_OPENAI_API_KEY"), "azure_deployment": os.environ.get("AZURE_OPENAI_DEPLOYMENT"), "api_version": os.environ.get("AZURE_OPENAI_API_VERSION"), } # Directory for evaluation data files EVAL_DATA_DIR = os.environ.get("EVAL_DATA_DIR", ".") # Azure AI Agent configuration DEFAULT_AGENT_ID = os.environ.get("DEFAULT_AGENT_ID") AZURE_AI_PROJECT_ENDPOINT = os.environ.get("AZURE_AI_PROJECT_ENDPOINT") # Initialization flags EVALUATION_INITIALIZED = True if not all([AZURE_AI_PROJECT_ENDPOINT, MODEL_CONFIG["azure_endpoint"]]): EVALUATION_INITIALIZED = False logger.warning("Some evaluation credentials are missing, some evaluators may not work") AGENT_INITIALIZED = bool(AZURE_AI_PROJECT_ENDPOINT) if not AGENT_INITIALIZED: logger.warning("AZURE_AI_PROJECT_ENDPOINT is missing, agent features will not work") except Exception as e: logger.error(f"Initialization error: {str(e)}") CREDENTIAL = None AZURE_AI_PROJECT_ENDPOINT = None MODEL_CONFIG = None EVALUATION_INITIALIZED = False AGENT_INITIALIZED = False # Global variables for agent client and cache AI_CLIENT: Optional[AIProjectClient] = None AGENT_CACHE = {} USER_AGENT = "foundry-mcp" # Custom user agent for Azure AI Project client async def initialize_agent_client(): """Initialize the Azure AI Agent client asynchronously.""" global AI_CLIENT if not AGENT_INITIALIZED: return False try: async_credential = AsyncDefaultAzureCredential() AI_CLIENT = AIProjectClient(endpoint=AZURE_AI_PROJECT_ENDPOINT, credential=async_credential, user_agent=USER_AGENT) return True except Exception as e: logger.error(f"Failed to initialize AIProjectClient: {str(e)}") return False ####################### # EVALUATOR MAPPINGS # ####################### # Map evaluator names to classes for dynamic instantiation TEXT_EVALUATOR_MAP = { "groundedness": GroundednessEvaluator, "relevance": RelevanceEvaluator, "coherence": CoherenceEvaluator, "fluency": FluencyEvaluator, "similarity": SimilarityEvaluator, "retrieval": RetrievalEvaluator, "f1": F1ScoreEvaluator, "rouge": RougeScoreEvaluator, "bleu": BleuScoreEvaluator, "meteor": MeteorScoreEvaluator, "violence": ViolenceEvaluator, "sexual": SexualEvaluator, "self_harm": SelfHarmEvaluator, "hate_unfairness": HateUnfairnessEvaluator, "indirect_attack": IndirectAttackEvaluator, "protected_material": ProtectedMaterialEvaluator, "ungrounded_attributes": UngroundedAttributesEvaluator, "code_vulnerability": CodeVulnerabilityEvaluator, "qa": QAEvaluator, "content_safety": ContentSafetyEvaluator, } # Map agent evaluator names to classes AGENT_EVALUATOR_MAP = { "intent_resolution": IntentResolutionEvaluator, "tool_call_accuracy": ToolCallAccuracyEvaluator, "task_adherence": TaskAdherenceEvaluator, } # Required parameters for each text evaluator TEXT_EVALUATOR_REQUIREMENTS = { "groundedness": {"query": "Optional", "response": "Required", "context": "Required"}, "relevance": {"query": "Required", "response": "Required"}, "coherence": {"query": "Required", "response": "Required"}, "fluency": {"response": "Required"}, "similarity": {"query": "Required", "response": "Required", "ground_truth": "Required"}, "retrieval": {"query": "Required", "context": "Required"}, "f1": {"response": "Required", "ground_truth": "Required"}, "rouge": {"response": "Required", "ground_truth": "Required"}, "bleu": {"response": "Required", "ground_truth": "Required"}, "meteor": {"response": "Required", "ground_truth": "Required"}, "violence": {"query": "Required", "response": "Required"}, "sexual": {"query": "Required", "response": "Required"}, "self_harm": {"query": "Required", "response": "Required"}, "hate_unfairness": {"query": "Required", "response": "Required"}, "indirect_attack": {"query": "Required", "response": "Required", "context": "Required"}, "protected_material": {"query": "Required", "response": "Required"}, "ungrounded_attributes": {"query": "Required", "response": "Required", "context": "Required"}, "code_vulnerability": {"query": "Required", "response": "Required"}, "qa": {"query": "Required", "response": "Required", "context": "Required", "ground_truth": "Required"}, "content_safety": {"query": "Required", "response": "Required"}, } # Required parameters for each agent evaluator agent_evaluator_requirements = { "intent_resolution": { "query": "Required (Union[str, list[Message]])", "response": "Required (Union[str, list[Message]])", "tool_definitions": "Optional (list[ToolDefinition])", }, "tool_call_accuracy": { "query": "Required (Union[str, list[Message]])", "response": "Optional (Union[str, list[Message]])", "tool_calls": "Optional (Union[dict, list[ToolCall]])", "tool_definitions": "Required (list[ToolDefinition])", }, "task_adherence": { "query": "Required (Union[str, list[Message]])", "response": "Required (Union[str, list[Message]])", "tool_definitions": "Optional (list[ToolCall])", }, } ###################### # HELPER FUNCTIONS # ###################### def create_text_evaluator(evaluator_name: str) -> Any: """Create and configure an appropriate text evaluator instance.""" if evaluator_name not in TEXT_EVALUATOR_MAP: raise ValueError(f"Unknown text evaluator: {evaluator_name}") EvaluatorClass = TEXT_EVALUATOR_MAP[evaluator_name] # AI-assisted quality evaluators need a model if evaluator_name in ["groundedness", "relevance", "coherence", "fluency", "similarity"]: if not MODEL_CONFIG or not all([MODEL_CONFIG["azure_endpoint"], MODEL_CONFIG["api_key"]]): raise ValueError(f"Model configuration required for {evaluator_name} evaluator") return EvaluatorClass(MODEL_CONFIG) # AI-assisted risk and safety evaluators need Azure credentials elif evaluator_name in [ "violence", "sexual", "self_harm", "hate_unfairness", "indirect_attack", "protected_material", "ungrounded_attributes", "code_vulnerability", "content_safety", ]: if CREDENTIAL is None or AZURE_AI_PROJECT_ENDPOINT is None: raise ValueError(f"Azure credentials required for {evaluator_name} evaluator") return EvaluatorClass(credential=CREDENTIAL, azure_ai_project=AZURE_AI_PROJECT_ENDPOINT) # NLP evaluators don't need special configuration else: return EvaluatorClass() def create_agent_evaluator(evaluator_name: str) -> Any: """Create and configure an appropriate agent evaluator instance.""" if evaluator_name not in AGENT_EVALUATOR_MAP: raise ValueError(f"Unknown agent evaluator: {evaluator_name}") if not MODEL_CONFIG or not all([MODEL_CONFIG["azure_endpoint"], MODEL_CONFIG["api_key"]]): raise ValueError(f"Model configuration required for {evaluator_name} evaluator") EvaluatorClass = AGENT_EVALUATOR_MAP[evaluator_name] return EvaluatorClass(model_config=MODEL_CONFIG) async def get_agent(client: AIProjectClient, agent_id: str) -> Agent: """Get an agent by ID with simple caching.""" global AGENT_CACHE # Check cache first if agent_id in AGENT_CACHE: return AGENT_CACHE[agent_id] # Fetch agent if not in cache try: agent = await client.agents.get_agent(agent_id=agent_id) AGENT_CACHE[agent_id] = agent return agent except Exception as e: logger.error(f"Agent retrieval failed - ID: {agent_id}, Error: {str(e)}") raise ValueError(f"Agent not found or inaccessible: {agent_id}") async def query_agent(client: AIProjectClient, agent_id: str, query: str) -> Dict: """Query an Azure AI Agent and get the response with full thread/run data.""" try: # Get agent (from cache or fetch it) agent = await get_agent(client, agent_id) # Always create a new thread thread = await client.agents.threads.create() thread_id = thread.id # Add message to thread await client.agents.messages.create(thread_id=thread_id, role=MessageRole.USER, content=query) # Process the run run = await client.agents.runs.create(thread_id=thread_id, agent_id=agent_id) run_id = run.id # Poll until the run is complete while run.status in ["queued", "in_progress", "requires_action"]: await asyncio.sleep(1) # Non-blocking sleep run = await client.agents.runs.get(thread_id=thread_id, run_id=run.id) if run.status == "failed": error_msg = f"Agent run failed: {run.last_error}" logger.error(error_msg) return { "success": False, "error": error_msg, "thread_id": thread_id, "run_id": run_id, "result": f"Error: {error_msg}", } # Get the agent's response response_messages = client.agents.messages.list(thread_id=thread_id) response_message = None async for msg in response_messages: if msg.role == MessageRole.AGENT: response_message = msg result = "" citations = [] if response_message: # Collect text content for text_message in response_message.text_messages: result += text_message.text.value + "\n" # Collect citations for annotation in response_message.url_citation_annotations: citation = f"[{annotation.url_citation.title}]({annotation.url_citation.url})" if citation not in citations: citations.append(citation) # Add citations if any if citations: result += "\n\n## Sources\n" for citation in citations: result += f"- {citation}\n" return { "success": True, "thread_id": thread_id, "run_id": run_id, "result": result.strip(), "citations": citations, } except Exception as e: logger.error(f"Agent query failed - ID: {agent_id}, Error: {str(e)}") raise def az(*args: str) -> dict: """Run azure-cli and return output with improved error handling""" cmd = [sys.executable, "-m", "azure.cli", *args, "-o", "json"] # Log the command that's about to be executed logger.info(f"Attempting to run: {' '.join(cmd)}") try: # Run with full logging result = subprocess.run( cmd, text=True, capture_output=True, check=False, # Don't raise exception to see all errors ) # Log the results logger.info(f"Command exit code: {result.returncode}") logger.info(f"Command stdout (first 100 chars): {result.stdout[:100] if result.stdout else 'Empty'}") logger.warning(f"Command stderr (first 100 chars): {result.stderr[:100] if result.stderr else 'Empty'}") if result.returncode != 0: # Command failed return {"error": "Command failed", "stderr": result.stderr, "returncode": result.returncode} try: # Try to parse JSON return json.loads(result.stdout.strip()) except json.JSONDecodeError as json_err: # JSON parsing failed return { "error": f"Failed to parse JSON: {str(json_err)}", "raw_output": result.stdout[:500], # First 500 chars for debugging } except Exception as e: # Catch all other exceptions logger.error(f"Exception executing command: {str(e)}") return {"error": f"Exception: {str(e)}", "type": type(e).__name__} ######################## # TEXT EVALUATION TOOLS # ######################## @mcp.tool() def list_text_evaluators() -> List[str]: """ Returns a list of available text evaluator names for evaluating text outputs. """ return list(TEXT_EVALUATOR_MAP.keys()) @mcp.tool() def list_agent_evaluators() -> List[str]: """ Returns a list of available agent evaluator names for evaluating agent behaviors. """ return list(AGENT_EVALUATOR_MAP.keys()) @mcp.tool() def get_text_evaluator_requirements(evaluator_name: str = None) -> Dict: """ Get the required input fields for a specific text evaluator or all text evaluators. Parameters: - evaluator_name: Optional name of evaluator. If None, returns requirements for all evaluators. """ if evaluator_name is not None: if evaluator_name not in TEXT_EVALUATOR_MAP: raise ValueError(f"Unknown evaluator {evaluator_name}") return {evaluator_name: TEXT_EVALUATOR_REQUIREMENTS[evaluator_name]} else: return TEXT_EVALUATOR_REQUIREMENTS @mcp.tool() def get_agent_evaluator_requirements(evaluator_name: str = None) -> Dict: """ Get the required input fields for a specific agent evaluator or all agent evaluators. Parameters: - evaluator_name: Optional name of evaluator. If None, returns requirements for all evaluators. """ if evaluator_name is not None: if evaluator_name not in AGENT_EVALUATOR_MAP: raise ValueError(f"Unknown evaluator {evaluator_name}") return {evaluator_name: agent_evaluator_requirements[evaluator_name]} else: return agent_evaluator_requirements @mcp.tool() def run_text_eval( evaluator_names: Union[str, List[str]], # Single evaluator name or list of evaluator names file_path: Optional[str] = None, # Path to JSONL file content: Optional[str] = None, # JSONL content as a string (optional) include_studio_url: bool = True, # Option to include studio URL in response return_row_results: bool = False, # Option to include detailed row results ) -> Dict: """ Run one or multiple evaluators on a JSONL file or content string. Parameters: - evaluator_names: Either a single evaluator name (string) or a list of evaluator names - file_path: Path to a JSONL file to evaluate (preferred for efficiency) - content: JSONL content as a string (alternative if file_path not available) - include_studio_url: Whether to include the Azure AI studio URL in the response - return_row_results: Whether to include detailed row results (False by default for large datasets) """ # Save original stdout so we can restore it later original_stdout = sys.stdout # Redirect stdout to stderr to prevent PromptFlow output from breaking MCP sys.stdout = sys.stderr # Heartbeat mechanism import threading import time # Set up a heartbeat mechanism to keep the connection alive heartbeat_active = True def send_heartbeats(): count = 0 while heartbeat_active: count += 1 logger.info(f"Heartbeat {count} - Evaluation in progress...") # Print to stderr to keep connection alive print(f"Evaluation in progress... ({count * 15}s)", file=sys.stderr, flush=True) time.sleep(15) # Send heartbeat every 15 seconds # Start heartbeat thread heartbeat_thread = threading.Thread(target=send_heartbeats, daemon=True) heartbeat_thread.start() try: if not EVALUATION_INITIALIZED: heartbeat_active = False # Stop heartbeat return {"error": "Evaluation not initialized. Check environment variables."} # Validate inputs if content is None and file_path is None: heartbeat_active = False # Stop heartbeat return {"error": "Either file_path or content must be provided"} # Convert single evaluator to list for unified processing if isinstance(evaluator_names, str): evaluator_names = [evaluator_names] # Validate evaluator names for name in evaluator_names: if name not in TEXT_EVALUATOR_MAP: heartbeat_active = False # Stop heartbeat return {"error": f"Unknown evaluator: {name}"} # Variable to track if we need to clean up a temp file temp_file = None try: # Determine which input to use (prioritize file_path for efficiency) input_file = None if file_path: # Resolve file path if os.path.isfile(file_path): input_file = file_path else: # Check in data directory data_dir = os.environ.get("EVAL_DATA_DIR", ".") alternate_path = os.path.join(data_dir, file_path) if os.path.isfile(alternate_path): input_file = alternate_path else: heartbeat_active = False # Stop heartbeat return {"error": f"File not found: {file_path} (also checked in {data_dir})"} # Count rows quickly using file iteration with open(input_file, "r", encoding="utf-8") as f: row_count = sum(1 for line in f if line.strip()) elif content: # Create temporary file for content string fd, temp_file = tempfile.mkstemp(suffix=".jsonl") os.close(fd) # Write content to temp file with open(temp_file, "w", encoding="utf-8") as f: f.write(content) input_file = temp_file row_count = content.count("\n") + (0 if content.endswith("\n") else 1) logger.info(f"Processing {row_count} rows for {len(evaluator_names)} evaluator(s)") # Prepare evaluators evaluators = {} eval_config = {} for name in evaluator_names: # Create evaluator instance evaluators[name] = create_text_evaluator(name) # Set up column mapping for this evaluator requirements = TEXT_EVALUATOR_REQUIREMENTS[name] column_mapping = {} for field, requirement in requirements.items(): if requirement == "Required": column_mapping[field] = f"${{data.{field}}}" eval_config[name] = {"column_mapping": column_mapping} # Prepare evaluation args eval_args = {"data": input_file, "evaluators": evaluators, "evaluator_config": eval_config} # Add Azure AI project info if initialized if AZURE_AI_PROJECT_ENDPOINT and include_studio_url: eval_args["azure_ai_project"] = AZURE_AI_PROJECT_ENDPOINT eval_args["user_agent"] = USER_AGENT # Run evaluation with additional stdout redirection for extra safety with contextlib.redirect_stdout(sys.stderr): result = evaluate(**eval_args) # Prepare response response = {"evaluators": evaluator_names, "row_count": row_count, "metrics": result.get("metrics", {})} # Only include detailed row results if explicitly requested if return_row_results: response["row_results"] = result.get("rows", []) # Include studio URL if available if include_studio_url and "studio_url" in result: response["studio_url"] = result.get("studio_url") heartbeat_active = False # Stop heartbeat return response except Exception as e: logger.error(f"Evaluation error: {str(e)}") heartbeat_active = False # Stop heartbeat return {"error": str(e)} finally: # Clean up temp file if we created one if temp_file and os.path.exists(temp_file): try: os.remove(temp_file) except Exception: pass # Make sure heartbeat is stopped heartbeat_active = False finally: # Always restore stdout, even if an exception occurs sys.stdout = original_stdout heartbeat_active = False @mcp.tool() async def agent_query_and_evaluate( agent_id: str, query: str, evaluator_names: List[str] = None, include_studio_url: bool = True, # Option to include studio URL ) -> Dict: """ Query an agent and evaluate its response in a single operation. Parameters: - agent_id: ID of the agent to query - query: Text query to send to the agent - evaluator_names: Optional list of agent evaluator names to use (defaults to all) - include_studio_url: Whether to include the Azure AI studio URL in the response Returns both the agent response and evaluation results """ # Save original stdout so we can restore it later original_stdout = sys.stdout # Redirect stdout to stderr to prevent PromptFlow output from breaking MCP sys.stdout = sys.stderr # Heartbeat mechanism to keep connection alive during long operations import threading import time # Set up a heartbeat mechanism to keep the connection alive heartbeat_active = True def send_heartbeats(): count = 0 while heartbeat_active: count += 1 logger.info(f"Heartbeat {count} - Evaluation in progress...") # Print to stderr to keep connection alive print(f"Evaluation in progress... ({count * 15}s)", file=sys.stderr, flush=True) time.sleep(15) # Send heartbeat every 15 seconds # Start heartbeat thread heartbeat_thread = threading.Thread(target=send_heartbeats, daemon=True) heartbeat_thread.start() try: if not AGENT_INITIALIZED or not EVALUATION_INITIALIZED: heartbeat_active = False # Stop heartbeat return {"error": "Services not fully initialized. Check environment variables."} if AI_CLIENT is None: success = await initialize_agent_client() if not success or AI_CLIENT is None: heartbeat_active = False # Stop heartbeat return {"error": "Failed to initialize Azure AI Agent client."} try: # Query the agent (this part remains async) query_response = await query_agent(AI_CLIENT, agent_id, query) if not query_response.get("success", False): heartbeat_active = False # Stop heartbeat return query_response # Get the thread and run IDs thread_id = query_response["thread_id"] run_id = query_response["run_id"] # Now we'll switch to synchronous mode, exactly like the GitHub example # Step 1: Create a synchronous client (this is what GitHub example uses) from azure.ai.projects import AIProjectClient # This is the sync version from azure.identity import DefaultAzureCredential sync_client = AIProjectClient(endpoint=AZURE_AI_PROJECT_ENDPOINT, credential=DefaultAzureCredential(), user_agent=USER_AGENT) # Step 2: Create converter with the sync client, exactly like example from azure.ai.evaluation import AIAgentConverter converter = AIAgentConverter(sync_client) # Step 3: Create a temp file name temp_filename = "temp_evaluation_data.jsonl" try: # Step 4: Convert data synchronously, exactly as in their example evaluation_data = converter.convert(thread_id=thread_id, run_id=run_id) # Step 5: Write to file with open(temp_filename, "w") as f: json.dump(evaluation_data, f) # Step 6: Default to all agent evaluators if none specified if not evaluator_names: evaluator_names = list(AGENT_EVALUATOR_MAP.keys()) # Step 7: Create evaluators evaluators = {} for name in evaluator_names: evaluators[name] = create_agent_evaluator(name) # Step 8: Run evaluation, exactly as in their example # Use contextlib to ensure all stdout is redirected with contextlib.redirect_stdout(sys.stderr): from azure.ai.evaluation import evaluate evaluation_result = evaluate( data=temp_filename, evaluators=evaluators, azure_ai_project=AZURE_AI_PROJECT_ENDPOINT if include_studio_url else None, user_agent=USER_AGENT, ) # Step 9: Prepare response response = { "success": True, "agent_id": agent_id, "thread_id": thread_id, "run_id": run_id, "query": query, "response": query_response["result"], "citations": query_response.get("citations", []), "evaluation_metrics": evaluation_result.get("metrics", {}), } # Include studio URL if available if include_studio_url and "studio_url" in evaluation_result: response["studio_url"] = evaluation_result.get("studio_url") heartbeat_active = False # Stop heartbeat return response except Exception as e: logger.error(f"Evaluation error: {str(e)}") import traceback logger.error(traceback.format_exc()) heartbeat_active = False # Stop heartbeat return {"error": f"Evaluation error: {str(e)}"} finally: # Clean up temp file if os.path.exists(temp_filename): try: os.remove(temp_filename) except Exception: pass except Exception as e: logger.error(f"Error in query and evaluate: {str(e)}") heartbeat_active = False # Stop heartbeat return {"error": f"Error in query and evaluate: {str(e)}"} finally: # Always restore stdout, even if an exception occurs sys.stdout = original_stdout heartbeat_active = False # Stop heartbeat # Add this new helper function to format evaluation outputs @mcp.tool() def format_evaluation_report(evaluation_result: Dict) -> str: """ Format evaluation results into a readable report with metrics and Studio URL. Parameters: - evaluation_result: The evaluation result dictionary from run_text_eval or agent_query_and_evaluate Returns a formatted report with metrics and Azure AI Studio URL if available """ if "error" in evaluation_result: return f"❌ Evaluation Error: {evaluation_result['error']}" # Start the report report = ["# Evaluation Report\n"] # Add evaluator info evaluator = evaluation_result.get("evaluator") if evaluator: report.append(f"## Evaluator: {evaluator}\n") # Add metrics metrics = evaluation_result.get("metrics", {}) if metrics: report.append("## Metrics\n") for metric_name, metric_value in metrics.items(): # Format metric value based on type if isinstance(metric_value, (int, float)): formatted_value = f"{metric_value:.4f}" if isinstance(metric_value, float) else str(metric_value) else: formatted_value = str(metric_value) report.append(f"- **{metric_name}**: {formatted_value}") report.append("\n") # Add studio URL if available studio_url = evaluation_result.get("studio_url") if studio_url: report.append("## Azure AI Studio\n") report.append(f"📊 [View detailed evaluation results in Azure AI Studio]({studio_url})\n") # Return the formatted report return "\n".join(report) @mcp.tool() def run_agent_eval( evaluator_name: str, query: str, response: Optional[str] = None, tool_calls: Optional[str] = None, tool_definitions: Optional[str] = None, ) -> Dict: """ Run agent evaluation on agent data. Accepts both plain text and JSON strings. Parameters: - evaluator_name: Name of the agent evaluator to use (intent_resolution, tool_call_accuracy, task_adherence) - query: User query (plain text or JSON string) - response: Agent response (plain text or JSON string) - tool_calls: Optional tool calls data (JSON string) - tool_definitions: Optional tool definitions (JSON string) """ if not EVALUATION_INITIALIZED: return {"error": "Evaluation not initialized. Check environment variables."} if evaluator_name not in AGENT_EVALUATOR_MAP: raise ValueError(f"Unknown agent evaluator: {evaluator_name}") try: # Helper function to process inputs def process_input(input_str): if not input_str: return None # Check if it's already a valid JSON string try: # Try to parse as JSON return json.loads(input_str) except json.JSONDecodeError: # If not a JSON string, treat as plain text return input_str # Process inputs - handle both direct text and JSON strings query_data = process_input(query) response_data = process_input(response) if response else None tool_calls_data = process_input(tool_calls) if tool_calls else None tool_definitions_data = process_input(tool_definitions) if tool_definitions else None # If query/response are plain text, wrap them in the expected format if isinstance(query_data, str): query_data = {"content": query_data} if isinstance(response_data, str): response_data = {"content": response_data} # Create evaluator instance evaluator = create_agent_evaluator(evaluator_name) # Prepare kwargs for the evaluator kwargs = {"query": query_data} if response_data: kwargs["response"] = response_data if tool_calls_data: kwargs["tool_calls"] = tool_calls_data if tool_definitions_data: kwargs["tool_definitions"] = tool_definitions_data # Run evaluation result = evaluator(**kwargs) return {"evaluator": evaluator_name, "result": result} except Exception as e: logger.error(f"Agent evaluation error: {str(e)}") return {"error": str(e)} ######################## # AGENT SERVICE TOOLS # ######################## @mcp.tool() async def list_agents() -> str: """List available agents in the Azure AI Agent Service.""" if not AGENT_INITIALIZED: return "Error: Azure AI Agent service is not initialized. Check environment variables." if AI_CLIENT is None: await initialize_agent_client() if AI_CLIENT is None: return "Error: Failed to initialize Azure AI Agent client." try: agents = AI_CLIENT.agents.list_agents() if not agents: return "No agents found in the Azure AI Agent Service." result = "## Available Azure AI Agents\n\n" async for agent in agents: result += f"- **{agent.name}**: `{agent.id}`\n" if DEFAULT_AGENT_ID: result += f"\n**Default Agent ID**: `{DEFAULT_AGENT_ID}`" return result except Exception as e: logger.error(f"Error listing agents: {str(e)}") return f"Error listing agents: {str(e)}" @mcp.tool() async def connect_agent(agent_id: str, query: str) -> Dict: """ Connect to a specific Azure AI Agent and run a query. Parameters: - agent_id: ID of the agent to connect to - query: Text query to send to the agent Returns a dict with the agent's response and thread/run IDs for potential evaluation """ if not AGENT_INITIALIZED: return {"error": "Azure AI Agent service is not initialized. Check environment variables."} if AI_CLIENT is None: await initialize_agent_client() if AI_CLIENT is None: return {"error": "Failed to initialize Azure AI Agent client."} try: response = await query_agent(AI_CLIENT, agent_id, query) return response except Exception as e: logger.error(f"Error connecting to agent: {str(e)}") return {"error": f"Error connecting to agent: {str(e)}"} @mcp.tool() async def query_default_agent(query: str) -> Dict: """ Send a query to the default configured Azure AI Agent. Parameters: - query: Text query to send to the default agent Returns a dict with the agent's response and thread/run IDs for potential evaluation """ if not AGENT_INITIALIZED: return {"error": "Azure AI Agent service is not initialized. Check environment variables."} if not DEFAULT_AGENT_ID: return { "error": "No default agent configured. Set DEFAULT_AGENT_ID environment variable or use connect_agent tool." } if AI_CLIENT is None: await initialize_agent_client() if AI_CLIENT is None: return {"error": "Failed to initialize Azure AI Agent client."} try: response = await query_agent(AI_CLIENT, DEFAULT_AGENT_ID, query) return response except Exception as e: logger.error(f"Error querying default agent: {str(e)}") return {"error": f"Error querying default agent: {str(e)}"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/youssef7788/mcp-foundry'

If you have feedback or need assistance with the MCP directory API, please join our Discord server