Skip to main content
Glama

Fabric MCP Agent

by yingkiat
intent_router.py47.9 kB
""" Intent classification and routing for agentic layer """ import os from openai import AzureOpenAI from typing import Dict, Any, List import json # Load environment variables first from dotenv import load_dotenv load_dotenv() # Azure OpenAI config AZURE_OPENAI_KEY = os.getenv("AZURE_OPENAI_KEY") AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT") AZURE_OPENAI_DEPLOYMENT = os.getenv("AZURE_OPENAI_DEPLOYMENT") client = AzureOpenAI( api_key=AZURE_OPENAI_KEY, api_version="2024-02-15-preview", azure_endpoint=AZURE_OPENAI_ENDPOINT ) def get_model_params(model_name: str, max_tokens: int, temperature: float = 0) -> dict: """Get appropriate parameters for different model APIs""" params = {} # GPT-5 and newer models have different parameter requirements if 'gpt-5' in model_name.lower() or 'o1' in model_name.lower(): params["max_completion_tokens"] = max_tokens # GPT-5 models don't support temperature=0, only default (1) if temperature != 1: params["temperature"] = 1 # Force to default else: # GPT-4o, GPT-4, and older models use max_tokens params["max_tokens"] = max_tokens params["temperature"] = temperature return params def load_prompt_module(module_name: str) -> str: """Load prompt module content from markdown file""" prompt_path = f"agentic_layer/prompts/{module_name}.md" try: with open(prompt_path, 'r', encoding='utf-8') as f: return f.read() except FileNotFoundError: return f"Prompt module '{module_name}' not found at {prompt_path}" def get_available_personas() -> Dict[str, str]: """Dynamically discover available persona modules""" import os personas = {} personas_dir = "agentic_layer/prompts/personas" if os.path.exists(personas_dir): for file in os.listdir(personas_dir): if file.endswith('.md'): persona_name = file[:-3] # Remove .md extension # Read first few lines to get persona description try: with open(os.path.join(personas_dir, file), 'r', encoding='utf-8') as f: content = f.read() # Extract description from markdown (first paragraph after title) lines = content.split('\n') description = "Available persona" for i, line in enumerate(lines): if line.startswith('### Context') or line.startswith('### Role') or (line.startswith('## ') and i > 0): description = lines[i+1:i+3] # Get next 1-2 lines description = ' '.join([l.strip() for l in description if l.strip()]) break personas[persona_name] = description[:100] + "..." if len(description) > 100 else description except: personas[persona_name] = "Available persona" return personas def load_intent_template(stage: str) -> str: """Load generic intent template for multi-stage execution""" template_path = f"agentic_layer/prompts/intent/{stage}.md" try: with open(template_path, 'r', encoding='utf-8') as f: return f.read() except FileNotFoundError: return f"Intent template '{stage}' not found at {template_path}" def load_persona_module(persona_name: str) -> str: """Load persona module content from personas folder""" persona_path = f"agentic_layer/prompts/personas/{persona_name}.md" try: with open(persona_path, 'r', encoding='utf-8') as f: return f.read() except FileNotFoundError: return f"Persona module '{persona_name}' not found at {persona_path}" def classify_intent(user_question: str, request_id: str = None) -> Dict[str, Any]: """ Domain-agnostic intent classification and routing """ # Load the classification template from markdown stage0_template = load_intent_template("stage0_intent") # Get available personas dynamically with sample content for better classification available_personas = get_available_personas() # Pre-load key personas to get actual table names for classification persona_context = {} for persona_name in available_personas.keys(): try: content = load_persona_module(persona_name) # Extract table references from persona import re tables = re.findall(r'JPNPROdb_\w+', content) persona_context[persona_name] = { "description": available_personas[persona_name], "tables": list(set(tables)), # Remove duplicates "sample_content": content[:300] + "..." if len(content) > 300 else content } except: persona_context[persona_name] = { "description": available_personas[persona_name], "tables": [], "sample_content": "Persona content not available" } # Build enhanced persona list with actual table context persona_list = [] for name, context in persona_context.items(): tables_str = ", ".join(context["tables"]) if context["tables"] else "No specific tables" persona_list.append(f"- {name}: {context['description']}\n Tables: {tables_str}") persona_list = "\n".join(persona_list) # Build the classification prompt using the template classification_prompt = f""" {stage0_template} ## Current Analysis Request Analyze this question: "{user_question}" Available personas (business domain experts): {persona_list} ## Required Analysis 1. Extract any competitor names and specific products mentioned 2. Determine the best matching persona based on question content and domain 3. Select appropriate execution strategy and tool chain 4. Provide reasoning for classification decisions """ # Get model-appropriate parameters model_params = get_model_params(AZURE_OPENAI_DEPLOYMENT, 500, 0) response = client.chat.completions.create( model=AZURE_OPENAI_DEPLOYMENT, messages=[ {"role": "system", "content": "You are a JSON classifier. Return ONLY valid JSON, no other text."}, {"role": "user", "content": classification_prompt} ], **model_params ) # Log API call with session logger if request_id: try: from session_logger import get_session_logger session_logger = get_session_logger(request_id, user_question) usage = response.usage if hasattr(response, 'usage') else None prompt_tokens = usage.prompt_tokens if usage else 0 completion_tokens = usage.completion_tokens if usage else 0 session_logger.log_api_call("intent_classification", AZURE_OPENAI_DEPLOYMENT, prompt_tokens, completion_tokens) except Exception as e: pass # Don't fail if logging fails try: raw_content = response.choices[0].message.content # Check for empty responses (common with GPT-5 models) if not raw_content or raw_content.strip() == "": print(f"⚠️ Empty response from {AZURE_OPENAI_DEPLOYMENT}") raw_content = "{}" # Treat as empty JSON to trigger fallback raw_content = raw_content.strip() # Try to extract JSON if wrapped in code blocks or extra text if '```json' in raw_content: json_start = raw_content.find('```json') + 7 json_end = raw_content.find('```', json_start) raw_content = raw_content[json_start:json_end].strip() elif '{' in raw_content: json_start = raw_content.find('{') json_end = raw_content.rfind('}') + 1 raw_content = raw_content[json_start:json_end] result = json.loads(raw_content) # Check if we got an empty or incomplete result if not result or 'persona' not in result: print(f"⚠️ Incomplete JSON response from {AZURE_OPENAI_DEPLOYMENT}: {result}") raise ValueError("Missing required fields in classification response") return result except (json.JSONDecodeError, ValueError) as e: print(f"JSON parsing error: {e}") print(f"Raw response: {response.choices[0].message.content}") # Generic fallback - don't try to guess intent return { "intent": "general_query", "persona": "product_planning", "confidence": 0.5, "execution_strategy": "single_stage", "metadata_strategy": "skip", # Always skip for performance "tool_chain": ["run_sql_query", "summarize_results"], "reasoning": f"Fallback due to {AZURE_OPENAI_DEPLOYMENT} response issue: {str(e)[:50]}", "requires_intermediate_processing": False, "actual_tables": ["JPNPROdb_ps_mstr", "JPNPROdb_pt_mstr"] } def execute_tool_chain(user_question: str, classification: Dict[str, Any], request_id: str = None) -> Dict[str, Any]: """ Domain-agnostic tool chain execution with direct-first optimization and multi-stage support """ from mcp_server.tools.sql_tools import get_metadata, run_sql_query from mcp_server.tools.analysis_tools import summarize_results, generate_visualization # Load the appropriate persona persona_content = load_persona_module(classification["persona"]) results = { "classification": classification, "persona_used": classification["persona"], "tool_results": {}, "final_response": "" } try: # Step 1: Try direct tools first (performance optimization) direct_results = attempt_direct_tools(user_question, classification, request_id) # Step 2: Execute based on direct tool success AND result quality if direct_results["success"]: # Check if direct tool actually found meaningful results result_data = direct_results["result"] direct_data = (result_data.get("specifications", []) or result_data.get("results", []) or []) # DEBUG: Log what we're checking print(f"DEBUG: Checking direct tool results - specifications: {len(result_data.get('specifications', []))}, results: {len(result_data.get('results', []))}, direct_data length: {len(direct_data) if direct_data else 0}") if direct_data and len(direct_data) > 0: # Direct path: Fast data retrieval + AI evaluation print(f"DEBUG: Taking direct path with {len(direct_data)} results") results["tool_results"] = execute_direct_with_evaluation( user_question, direct_results, classification, persona_content, request_id ) print(f"DEBUG: Direct path completed, execution_path: {results['tool_results'].get('execution_path', 'unknown')}") else: # Direct tool found no results - fallback to AI workflow if request_id: from logging_config import tracker, log_negative_case tracker.log_direct_tool_fallback(request_id, "direct_tool_found_no_results") # Log as negative case - direct tool didn't find expected data log_negative_case( request_id=request_id, case_type="direct_tool_no_results", user_question=user_question, reason="Direct tool executed successfully but found no matching data", classification=classification, additional_context={"direct_tool_result": direct_results} ) results["tool_results"] = execute_standard_ai_workflow( user_question, classification, persona_content, request_id, {} ) else: # Fallback path: Standard AI workflow results["tool_results"] = execute_standard_ai_workflow( user_question, classification, persona_content, request_id, {} ) # Generate final response results["final_response"] = generate_final_response(user_question, results, persona_content) # Log consolidated request flow if request_id: from logging_config import log_request_flow # Extract key data for logging persona = classification.get("persona", "unknown") intent_type = classification.get("extracted_entities", {}).get("intent_type", "unknown") execution_path = results["tool_results"].get("execution_path", "unknown") success = bool(results.get("final_response")) # Log the complete request flow log_request_flow( request_id=request_id, user_question=user_question, persona=persona, intent_type=intent_type, execution_path=execution_path, direct_tools_result=direct_results if direct_results else None, success=success ) except Exception as e: results["error"] = str(e) results["final_response"] = f"Error executing tool chain: {str(e)}" # Log negative case - system error if request_id: from logging_config import log_negative_case log_negative_case( request_id=request_id, case_type="system_error", user_question=user_question, reason=str(e), classification=classification, additional_context={"stack_trace": str(e)} ) return results def handle_metadata_discovery(strategy: str, classification: Dict[str, Any] = None) -> Dict[str, Any]: """Generic metadata discovery handler""" from mcp_server.tools.sql_tools import get_metadata if strategy == "minimal": # Use actual tables from classification if available actual_tables = classification.get("actual_tables", []) if classification else [] if actual_tables: validation_results = {} for table in actual_tables: validation_results[table] = get_metadata(table) return {"strategy": "minimal", "validation_results": validation_results} else: return {"strategy": "minimal", "basic_schema": get_metadata()} elif strategy == "full": return {"strategy": "full", "full_schema": get_metadata()} else: return {"strategy": "skip"} def execute_single_stage_workflow(user_question: str, classification: Dict[str, Any], persona_content: str, request_id: str, existing_results: Dict[str, Any]) -> Dict[str, Any]: """Standard single-pass execution""" from mcp_server.tools.sql_tools import run_sql_query from mcp_server.tools.analysis_tools import summarize_results, generate_visualization results = existing_results.copy() # Check if this is a general inquiry that likely doesn't need database access intent_type = classification.get("extracted_entities", {}).get("intent_type", "") enable_ai_evaluation = classification.get("enable_ai_evaluation", False) # For general inquiries with no business terms, skip SQL and go straight to AI evaluation # This handles cases where enable_ai_evaluation might be false but we still want to answer general questions if (intent_type == "general_inquiry" and not any(term in user_question.lower() for term in [ "product", "price", "cost", "jpnprodb", "competitor", "specification", "part", "component", "inventory", "sales", "mrh-", "spt", "hogy", "livedo", "hopes", "medline", "table", "database", "schema" ])): # Skip SQL execution and go straight to stage 3 evaluation for general knowledge stage3_template = load_intent_template("stage3_evaluation") stage3_context = f""" {stage3_template} PERSONA CONTEXT: {persona_content} GENERAL KNOWLEDGE QUESTION: No database lookup needed for this question USER QUESTION: {user_question} STAGE 3 TASK: Answer this general question using available knowledge. This does not require business/database data. IMPORTANT: This appears to be a general knowledge question. Provide a direct, helpful answer without referencing business data. """ # Direct stage 3 evaluation without SQL evaluation_result = evaluate_final_results(stage3_context, request_id) results["stage3_evaluation"] = evaluation_result return results enhanced_question = f""" Context from {classification['persona']} module: {persona_content} User question: {user_question} """ # Execute tool chain sequentially for tool_name in classification["tool_chain"]: if tool_name == "run_sql_query": results["run_sql_query"] = run_sql_query(question=enhanced_question, request_id=request_id) elif tool_name == "summarize_results" and "run_sql_query" in results: sql_results = results["run_sql_query"].get("results", []) results["summarize_results"] = summarize_results(sql_results, classification["intent"]) elif tool_name == "generate_visualization" and "run_sql_query" in results: sql_results = results["run_sql_query"].get("results", []) results["generate_visualization"] = generate_visualization(sql_results, "table", f"Results for: {user_question}") # Add AI business evaluation if requested by classification if enable_ai_evaluation and "run_sql_query" in results: final_data = results["run_sql_query"].get("results", []) if final_data: # Load the stage3 evaluation template stage3_template = load_intent_template("stage3_evaluation") stage3_context = f""" {stage3_template} PERSONA CONTEXT: {persona_content} SINGLE-STAGE EXECUTION WITH AI EVALUATION: Direct SQL query executed successfully QUERY RESULTS: {len(final_data)} records retrieved USER QUESTION: {user_question} STAGE 3 TASK: Evaluate single-stage query results to extract clear business answer using persona domain expertise. Final Data Sample (compressed): {compress_data_for_llm(final_data, max_records=10) if final_data else "No data available"} """ # Log data compression for single-stage evaluation if final_data and request_id: try: from session_logger import get_session_logger session_logger = get_session_logger(request_id, user_question) compressed_sample = compress_data_for_llm(final_data, max_records=10) compression_stats = get_compression_stats(final_data, compressed_sample) session_logger.log_data_compression( "single_stage_evaluation", compression_stats['original_size'], compression_stats['compressed_size'], compression_stats['compression_ratio'] ) except Exception: pass # Don't fail if logging fails # AI evaluation of single-stage results evaluation_result = evaluate_final_results(stage3_context, request_id) results["stage3_evaluation"] = evaluation_result return results def execute_multi_stage_workflow(user_question: str, classification: Dict[str, Any], persona_content: str, request_id: str, existing_results: Dict[str, Any]) -> Dict[str, Any]: """Multi-stage execution with intermediate AI reasoning using intent templates""" from mcp_server.tools.sql_tools import run_sql_query from mcp_server.tools.analysis_tools import summarize_results, generate_visualization results = existing_results.copy() # Load generic intent templates stage1_template = load_intent_template("stage1_discovery") stage2_template = load_intent_template("stage2_analysis") stage3_template = load_intent_template("stage3_evaluation") # Stage 1: Discovery using generic template + persona context stage1_context = f""" {stage1_template} PERSONA CONTEXT: {persona_content} USER QUESTION: {user_question} STAGE 1 TASK: Execute discovery query to find relevant candidates based on user criteria using persona domain knowledge. """ stage1_result = run_sql_query(question=stage1_context, request_id=request_id) results["stage1_query"] = stage1_result # Check if Stage 1 produced useful results if stage1_result.get("results") and len(stage1_result["results"]) > 0: # AI processes Stage 1 results for Stage 2 using persona context intermediate_analysis = process_intermediate_results( stage1_result["results"], user_question, persona_content ) results["intermediate_analysis"] = intermediate_analysis # Stage 2: Analysis using generic template + persona context + Stage 1 results stage2_context = f""" {stage2_template} PERSONA CONTEXT: {persona_content} STAGE 1 RESULTS SUMMARY: {intermediate_analysis.get("summary", "No summary available")} SELECTED ITEMS FROM STAGE 1: {intermediate_analysis.get("selected_items", "No items selected")} USER QUESTION: {user_question} STAGE 2 TASK: Execute detailed analysis query using selected items from Stage 1 to gather comprehensive information. """ stage2_result = run_sql_query(question=stage2_context, request_id=request_id) results["stage2_query"] = stage2_result # Stage 3: Evaluation using generic template + all previous context final_data = stage2_result.get("results", []) if final_data: stage3_context = f""" {stage3_template} PERSONA CONTEXT: {persona_content} STAGE 1 RESULTS: {len(stage1_result.get("results", []))} candidates found INTERMEDIATE ANALYSIS: {intermediate_analysis.get("reasoning", "No reasoning available")} STAGE 2 RESULTS: {len(final_data)} detailed records retrieved USER QUESTION: {user_question} STAGE 3 TASK: Evaluate final results to extract clear business answer using persona domain expertise. Final Data Sample (compressed): {compress_data_for_llm(final_data, max_records=10) if final_data else "No data available"} """ # Log compression stats for Stage 3 with session logger if final_data and request_id: try: from session_logger import get_session_logger session_logger = get_session_logger(request_id, user_question) compressed_sample = compress_data_for_llm(final_data, max_records=10) compression_stats = get_compression_stats(final_data, compressed_sample) session_logger.log_data_compression( "stage3_evaluation", compression_stats['original_size'], compression_stats['compressed_size'], compression_stats['compression_ratio'] ) except Exception as e: pass # Don't fail if logging fails # Use LLM for evaluation (no SQL execution - just analysis) evaluation_result = evaluate_final_results(stage3_context, request_id) results["stage3_evaluation"] = evaluation_result else: final_data = stage1_result.get("results", []) else: # Fallback if Stage 1 failed final_data = stage1_result.get("results", []) # Execute remaining tools on final data if "summarize_results" in classification["tool_chain"] and final_data: results["summarize_results"] = summarize_results(final_data, classification["intent"]) if "generate_visualization" in classification["tool_chain"] and final_data: results["generate_visualization"] = generate_visualization(final_data, "table", f"Results for: {user_question}") return results def evaluate_final_results(evaluation_context: str, request_id: str = None) -> Dict[str, Any]: """Stage 3: Pure evaluation and analysis without SQL execution""" evaluation_prompt = f""" {evaluation_context} IMPORTANT: DO NOT generate any SQL queries. Your task is to analyze the data provided above and create a business answer. Based on the data gathered from Stages 1 and 2, provide a comprehensive business evaluation in JSON format. CRITICAL: Escape all special characters in JSON strings. Use simple ASCII characters where possible to avoid JSON parsing errors. {{ "business_answer": "Direct answer to user question - avoid special characters", "key_findings": ["finding1", "finding2", "finding3"], "recommended_action": "What user should do next", "supporting_data": {{ "primary_values": "key metrics or values", "alternatives": "other options if applicable", "confidence": "high|medium|low" }}, "data_quality": "assessment of result reliability", "sql_executed": null }} """ try: # Get model-appropriate parameters model_params = get_model_params(AZURE_OPENAI_DEPLOYMENT, 500, 0) response = client.chat.completions.create( model=AZURE_OPENAI_DEPLOYMENT, messages=[ {"role": "system", "content": "You are a business analyst. Analyze data and provide insights. DO NOT generate SQL queries."}, {"role": "user", "content": evaluation_prompt} ], **model_params ) # Log API call if request_id: from logging_config import tracker usage = response.usage if hasattr(response, 'usage') else None prompt_tokens = usage.prompt_tokens if usage else 0 completion_tokens = usage.completion_tokens if usage else 0 tracker.log_api_call(request_id, AZURE_OPENAI_DEPLOYMENT, prompt_tokens, completion_tokens, "stage3_evaluation") import json raw_content = response.choices[0].message.content.strip() # Try to extract JSON if wrapped in code blocks if '```json' in raw_content: json_start = raw_content.find('```json') + 7 json_end = raw_content.find('```', json_start) raw_content = raw_content[json_start:json_end].strip() elif '{' in raw_content: json_start = raw_content.find('{') json_end = raw_content.rfind('}') + 1 raw_content = raw_content[json_start:json_end] try: result = json.loads(raw_content) result["sql_executed"] = None # Ensure no SQL was executed return result except json.JSONDecodeError as json_error: # Extract key information from raw response instead of generic fallback import re # Try to extract business answer from raw text business_answer_match = re.search(r'"business_answer":\s*"([^"]*(?:\\.[^"]*)*)"', raw_content) business_answer = business_answer_match.group(1) if business_answer_match else "Analysis completed (JSON parsing issue)" # Try to extract key findings findings_match = re.search(r'"key_findings":\s*\[(.*?)\]', raw_content, re.DOTALL) key_findings = [] if findings_match: findings_text = findings_match.group(1) findings = re.findall(r'"([^"]*(?:\\.[^"]*)*)"', findings_text) key_findings = findings[:5] # Limit to 5 findings if not key_findings: key_findings = ["Analysis completed successfully", "Data analysis performed on available records"] # Try to extract recommended action action_match = re.search(r'"recommended_action":\s*"([^"]*(?:\\.[^"]*)*)"', raw_content) recommended_action = action_match.group(1) if action_match else "Review the analysis results for insights" # Clean up escaped characters business_answer = business_answer.replace('\\"', '"').replace('\\n', ' ') recommended_action = recommended_action.replace('\\"', '"').replace('\\n', ' ') key_findings = [finding.replace('\\"', '"').replace('\\n', ' ') for finding in key_findings] return { "business_answer": business_answer, "key_findings": key_findings, "recommended_action": recommended_action, "supporting_data": { "primary_values": "Analysis extracted from LLM response", "alternatives": "See raw response for additional details", "confidence": "medium" }, "data_quality": "good - LLM analysis completed", "sql_executed": None, "raw_response": raw_content[:500] + "..." if len(raw_content) > 500 else raw_content, "parsing_note": "JSON parsing failed, extracted from raw text" } except Exception as e: return { "business_answer": "Error during evaluation stage", "key_findings": [f"Evaluation error: {str(e)}"], "recommended_action": "Review previous stage results manually", "supporting_data": { "primary_values": "N/A", "alternatives": "N/A", "confidence": "low" }, "data_quality": "poor - evaluation failed", "sql_executed": None, "error": str(e) } def get_compression_stats(original_records: list, compressed_output: str) -> dict: """Calculate compression effectiveness for monitoring""" if not original_records: return {"compression_ratio": 0, "token_savings": 0} original_size = len(str(original_records)) compressed_size = len(compressed_output) compression_ratio = (original_size - compressed_size) / original_size if original_size > 0 else 0 return { "original_size": original_size, "compressed_size": compressed_size, "compression_ratio": round(compression_ratio, 3), "token_savings": original_size - compressed_size } def compress_data_for_llm(records: list, max_records: int = 10) -> str: """Compress data by extracting common fields and reducing redundancy""" if not records or len(records) == 0: return "No data available" # Limit records first limited_records = records[:max_records] if len(limited_records) == 1: # Single record - just format cleanly record = limited_records[0] return ", ".join([f"{k}: {v}" for k, v in record.items() if v is not None]) # Detect common fields across all records common_fields = {} first_record = limited_records[0] for key in first_record.keys(): values = [record.get(key) for record in limited_records] unique_values = set(v for v in values if v is not None) # If all non-null values are identical, it's a common field if len(unique_values) <= 1: common_fields[key] = list(unique_values)[0] if unique_values else None # Extract varying fields only varying_records = [] for record in limited_records: varying = {k: v for k, v in record.items() if k not in common_fields and v is not None} if varying: # Only include if there are varying fields varying_records.append(varying) # Format compressed output if common_fields: common_str = ", ".join([f"{k}: {v}" for k, v in common_fields.items() if v is not None]) varying_str = " | ".join([ ", ".join([f"{k}: {v}" for k, v in record.items()]) for record in varying_records[:5] # Limit varying records too ]) return f"Common: {common_str} | Variants: {varying_str}" else: # No common fields, just show first few records compactly return " | ".join([ ", ".join([f"{k}: {v}" for k, v in record.items() if v is not None]) for record in limited_records[:3] ]) def process_intermediate_results(stage1_data: list, user_question: str, prompt_context: str) -> Dict[str, Any]: """AI processes Stage 1 results to inform Stage 2""" # Use compressed data representation instead of raw stringification data_summary = compress_data_for_llm(stage1_data, max_records=10) # Log compression stats with session logger try: from session_logger import get_session_logger session_logger = get_session_logger(request_id or "unknown", user_question) compression_stats = get_compression_stats(stage1_data, data_summary) session_logger.log_data_compression( "stage1_intermediate", compression_stats['original_size'], compression_stats['compressed_size'], compression_stats['compression_ratio'] ) except Exception as e: pass # Don't fail if logging fails analysis_prompt = f""" Analyze these Stage 1 results and select the most relevant items for Stage 2 detailed analysis. Original question: {user_question} Context: {prompt_context[:500]}... Stage 1 Results: {data_summary} Provide: 1. Brief summary of findings 2. Select the most relevant 1-3 items for detailed Stage 2 analysis 3. Key identifiers (IDs, part numbers, etc.) to use in Stage 2 Return JSON format: {{ "summary": "brief summary of Stage 1 findings", "selected_items": ["item1_id", "item2_id"], "reasoning": "why these items were selected", "stage2_focus": "what Stage 2 should analyze" }} """ try: # Get model-appropriate parameters model_params = get_model_params(AZURE_OPENAI_DEPLOYMENT, 300, 0) response = client.chat.completions.create( model=AZURE_OPENAI_DEPLOYMENT, messages=[{"role": "user", "content": analysis_prompt}], **model_params ) import json return json.loads(response.choices[0].message.content.strip()) except: # Fallback if AI processing fails # Use compressed representation for fallback too compressed_items = compress_data_for_llm(stage1_data[:3], max_records=3) return { "summary": f"Found {len(stage1_data)} potential matches", "selected_items": compressed_items, "reasoning": "Automatic selection due to processing error", "stage2_focus": "detailed analysis of selected items" } def execute_iterative_workflow(user_question: str, classification: Dict[str, Any], persona_content: str, request_id: str, existing_results: Dict[str, Any]) -> Dict[str, Any]: """Iterative refinement workflow (future enhancement)""" # For now, fallback to multi-stage return execute_multi_stage_workflow(user_question, classification, persona_content, request_id, existing_results) # =============================================================================== # DIRECT-FIRST OPTIMIZATION FUNCTIONS # =============================================================================== def attempt_direct_tools(user_question: str, classification: Dict[str, Any], request_id: str = None) -> Dict[str, Any]: """ Try direct tools with comprehensive error handling and performance tracking """ import time try: from mcp_server.tools.direct_tools_registry import get_direct_tools_for_persona persona = classification.get("persona", "") direct_tools = get_direct_tools_for_persona(persona) if not direct_tools: return {"success": False, "reason": "no_direct_tools_for_persona"} # Track pattern matching results for consolidated logging pattern_results = {} # Try each applicable direct tool for tool_config in direct_tools: try: # Check if pattern matches (pass classification to AI-powered pattern matcher) pattern_match = tool_config["pattern_matcher"](user_question, classification) pattern_results[tool_config["name"]] = pattern_match if pattern_match: # Execute direct tool with timing start_time = time.time() result = tool_config["executor"](user_question, classification) execution_time_ms = (time.time() - start_time) * 1000 # Log successful direct execution if request_id: from logging_config import tracker tracker.log_direct_tool_success(request_id, tool_config["name"], execution_time_ms) return { "success": True, "tool_used": tool_config["name"], "result": result, "execution_time_ms": execution_time_ms, "pattern_matched": True } except Exception as e: # Log failure and continue to next tool if request_id: from logging_config import tracker tracker.log_direct_tool_failure(request_id, tool_config["name"], str(e)) # Log and continue to next tool (don't print to console) continue return {"success": False, "reason": "no_pattern_match_or_all_failed", "pattern_results": pattern_results} except ImportError: # Direct tools registry not available - fallback gracefully return {"success": False, "reason": "direct_tools_registry_unavailable"} except Exception as e: # Unexpected error - log and fallback if request_id: from logging_config import tracker tracker.log_error(request_id, e, "direct_tools_attempt") return {"success": False, "reason": f"direct_tools_error: {str(e)}"} def execute_direct_with_evaluation(user_question: str, direct_results: Dict[str, Any], classification: Dict[str, Any], persona_content: str, request_id: str = None) -> Dict[str, Any]: """ Execute direct data retrieval + AI evaluation (Stage 3 equivalent) Direct results become the 'final_data' for AI evaluation """ results = { "direct_tool_execution": direct_results, "execution_path": "direct_first_with_ai_evaluation" } # Extract data from direct tool results (handle different field names from different direct tools) result_data = direct_results["result"] direct_data = (result_data.get("specifications", []) or result_data.get("results", [])) if direct_data and len(direct_data) > 0: # Stage 3: AI Evaluation using direct results (same as multi-stage Stage 3) stage3_template = load_intent_template("stage3_evaluation") stage3_context = f""" {stage3_template} PERSONA CONTEXT: {persona_content} DIRECT TOOL USED: {direct_results["tool_used"]} DIRECT EXECUTION TIME: {direct_results["execution_time_ms"]}ms USER QUESTION: {user_question} STAGE 3 TASK: Evaluate direct lookup results to extract clear business answer using persona domain expertise. Direct Tool Results (compressed): {compress_data_for_llm(direct_data, max_records=10)} IMPORTANT: This data was retrieved via direct SQL lookup, not AI-generated queries. Focus on business analysis and insights. """ # Log data compression if available if direct_data and request_id: try: from session_logger import get_session_logger session_logger = get_session_logger(request_id, user_question) compressed_sample = compress_data_for_llm(direct_data, max_records=10) compression_stats = get_compression_stats(direct_data, compressed_sample) session_logger.log_data_compression( "direct_tool_evaluation", compression_stats['original_size'], compression_stats['compressed_size'], compression_stats['compression_ratio'] ) except Exception: pass # Don't fail if logging fails # AI evaluation of direct results evaluation_result = evaluate_final_results(stage3_context, request_id) results["stage3_evaluation"] = evaluation_result else: # No data from direct tool - create evaluation noting the failure results["stage3_evaluation"] = { "business_answer": "Direct lookup found no matching products", "key_findings": ["No direct mapping available for requested product"], "recommended_action": "Try alternative search or contact support for manual mapping", "supporting_data": { "primary_values": "No results", "alternatives": "AI workflow may find similar products", "confidence": "low" }, "data_quality": "poor - no direct mapping available", "sql_executed": None } # Execute remaining standard tools if specified in tool chain tool_chain = classification.get("tool_chain", []) if "summarize_results" in tool_chain and direct_data: from mcp_server.tools.analysis_tools import summarize_results results["summarize_results"] = summarize_results(direct_data, classification["intent"]) if "generate_visualization" in tool_chain and direct_data: from mcp_server.tools.analysis_tools import generate_visualization results["generate_visualization"] = generate_visualization(direct_data, "table", f"Direct Results: {user_question}") return results def execute_standard_ai_workflow(user_question: str, classification: Dict[str, Any], persona_content: str, request_id: str, existing_results: Dict[str, Any]) -> Dict[str, Any]: """ Execute standard AI workflow (fallback from direct tools) This maintains the existing single-stage/multi-stage logic """ results = existing_results.copy() results["execution_path"] = "ai_workflow_fallback" # Log that we're using AI fallback if request_id: from logging_config import tracker tracker.log_direct_tool_fallback(request_id, "falling_back_to_ai_workflow") # Execute based on original strategy execution_strategy = classification.get("execution_strategy", "single_stage") if execution_strategy == "multi_stage": return execute_multi_stage_workflow( user_question, classification, persona_content, request_id, results ) elif execution_strategy == "iterative": return execute_iterative_workflow( user_question, classification, persona_content, request_id, results ) else: # Single-stage execution return execute_single_stage_workflow( user_question, classification, persona_content, request_id, results ) def generate_final_response(user_question: str, results: Dict[str, Any], persona_context: str) -> str: """Generate a final business-friendly response using all tool results""" # Extract key information from tool results tool_results = results.get("tool_results", {}) # Check if we have Stage 3 evaluation (multi-stage workflow) stage3_eval = tool_results.get("stage3_evaluation", {}) if stage3_eval and stage3_eval.get("business_answer"): # Use Stage 3 business analysis as the primary response response_parts = [] response_parts.append(f"**{stage3_eval['business_answer']}**") if stage3_eval.get("key_findings"): response_parts.append("\n**Key Findings:**") for finding in stage3_eval["key_findings"]: response_parts.append(f"• {finding}") if stage3_eval.get("recommended_action"): response_parts.append(f"\n**Recommended Action:** {stage3_eval['recommended_action']}") return "\n".join(response_parts) # Fallback to single-stage response format sql_results = tool_results.get("run_sql_query", {}).get("results", []) summary = tool_results.get("summarize_results", {}) if not sql_results: # For any question that produces no database results, try stage 3 evaluation as fallback # This handles both business questions that find no data AND general knowledge questions stage3_template = load_intent_template("stage3_evaluation") fallback_context = f""" {stage3_template} PERSONA CONTEXT: {persona_context} NO DATABASE RESULTS FALLBACK: The database query returned no results for this question. USER QUESTION: {user_question} STAGE 3 TASK: Since no database results were found, evaluate if this can be answered using general knowledge or provide appropriate guidance. IMPORTANT: The database contained no relevant data for this question. Either answer using available knowledge (if it's a general question) or explain why no business data was found. """ try: fallback_evaluation = evaluate_final_results(fallback_context) if fallback_evaluation and fallback_evaluation.get("business_answer"): return fallback_evaluation["business_answer"] except Exception as e: # If fallback fails, continue to standard error message pass return "I wasn't able to retrieve data to answer your question. Please check if the tables are accessible or rephrase your question." response_parts = [] # Quick answer response_parts.append(f"**Answer to: {user_question}**") # Summary from summarize_results tool if summary.get("summary"): response_parts.append(f"\n**Summary**: {summary['summary']}") # Key details (first few records) if isinstance(sql_results, list) and sql_results: response_parts.append(f"\n**Key Details**:") for i, record in enumerate(sql_results[:3]): # Show first 3 records record_str = ", ".join([f"{k}: {v}" for k, v in record.items() if v is not None]) response_parts.append(f"• Record {i+1}: {record_str}") # Data summary if isinstance(sql_results, list): response_parts.append(f"\n**Data Summary**: Found {len(sql_results)} records") # Suggestions for product planning context if "product_planning" in results.get("classification", {}).get("intent", ""): response_parts.append("\n**Suggestions**: You can also ask about product comparisons, specifications, or part number relationships.") return "\n".join(response_parts)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yingkiat/mcp_fabric_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server