intent_router.py•47.9 kB
"""
Intent classification and routing for agentic layer
"""
import os
from openai import AzureOpenAI
from typing import Dict, Any, List
import json
# Load environment variables first
from dotenv import load_dotenv
load_dotenv()
# Azure OpenAI config
AZURE_OPENAI_KEY = os.getenv("AZURE_OPENAI_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT = os.getenv("AZURE_OPENAI_DEPLOYMENT")
client = AzureOpenAI(
api_key=AZURE_OPENAI_KEY,
api_version="2024-02-15-preview",
azure_endpoint=AZURE_OPENAI_ENDPOINT
)
def get_model_params(model_name: str, max_tokens: int, temperature: float = 0) -> dict:
"""Get appropriate parameters for different model APIs"""
params = {}
# GPT-5 and newer models have different parameter requirements
if 'gpt-5' in model_name.lower() or 'o1' in model_name.lower():
params["max_completion_tokens"] = max_tokens
# GPT-5 models don't support temperature=0, only default (1)
if temperature != 1:
params["temperature"] = 1 # Force to default
else:
# GPT-4o, GPT-4, and older models use max_tokens
params["max_tokens"] = max_tokens
params["temperature"] = temperature
return params
def load_prompt_module(module_name: str) -> str:
"""Load prompt module content from markdown file"""
prompt_path = f"agentic_layer/prompts/{module_name}.md"
try:
with open(prompt_path, 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
return f"Prompt module '{module_name}' not found at {prompt_path}"
def get_available_personas() -> Dict[str, str]:
"""Dynamically discover available persona modules"""
import os
personas = {}
personas_dir = "agentic_layer/prompts/personas"
if os.path.exists(personas_dir):
for file in os.listdir(personas_dir):
if file.endswith('.md'):
persona_name = file[:-3] # Remove .md extension
# Read first few lines to get persona description
try:
with open(os.path.join(personas_dir, file), 'r', encoding='utf-8') as f:
content = f.read()
# Extract description from markdown (first paragraph after title)
lines = content.split('\n')
description = "Available persona"
for i, line in enumerate(lines):
if line.startswith('### Context') or line.startswith('### Role') or (line.startswith('## ') and i > 0):
description = lines[i+1:i+3] # Get next 1-2 lines
description = ' '.join([l.strip() for l in description if l.strip()])
break
personas[persona_name] = description[:100] + "..." if len(description) > 100 else description
except:
personas[persona_name] = "Available persona"
return personas
def load_intent_template(stage: str) -> str:
"""Load generic intent template for multi-stage execution"""
template_path = f"agentic_layer/prompts/intent/{stage}.md"
try:
with open(template_path, 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
return f"Intent template '{stage}' not found at {template_path}"
def load_persona_module(persona_name: str) -> str:
"""Load persona module content from personas folder"""
persona_path = f"agentic_layer/prompts/personas/{persona_name}.md"
try:
with open(persona_path, 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
return f"Persona module '{persona_name}' not found at {persona_path}"
def classify_intent(user_question: str, request_id: str = None) -> Dict[str, Any]:
"""
Domain-agnostic intent classification and routing
"""
# Load the classification template from markdown
stage0_template = load_intent_template("stage0_intent")
# Get available personas dynamically with sample content for better classification
available_personas = get_available_personas()
# Pre-load key personas to get actual table names for classification
persona_context = {}
for persona_name in available_personas.keys():
try:
content = load_persona_module(persona_name)
# Extract table references from persona
import re
tables = re.findall(r'JPNPROdb_\w+', content)
persona_context[persona_name] = {
"description": available_personas[persona_name],
"tables": list(set(tables)), # Remove duplicates
"sample_content": content[:300] + "..." if len(content) > 300 else content
}
except:
persona_context[persona_name] = {
"description": available_personas[persona_name],
"tables": [],
"sample_content": "Persona content not available"
}
# Build enhanced persona list with actual table context
persona_list = []
for name, context in persona_context.items():
tables_str = ", ".join(context["tables"]) if context["tables"] else "No specific tables"
persona_list.append(f"- {name}: {context['description']}\n Tables: {tables_str}")
persona_list = "\n".join(persona_list)
# Build the classification prompt using the template
classification_prompt = f"""
{stage0_template}
## Current Analysis Request
Analyze this question: "{user_question}"
Available personas (business domain experts):
{persona_list}
## Required Analysis
1. Extract any competitor names and specific products mentioned
2. Determine the best matching persona based on question content and domain
3. Select appropriate execution strategy and tool chain
4. Provide reasoning for classification decisions
"""
# Get model-appropriate parameters
model_params = get_model_params(AZURE_OPENAI_DEPLOYMENT, 500, 0)
response = client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT,
messages=[
{"role": "system", "content": "You are a JSON classifier. Return ONLY valid JSON, no other text."},
{"role": "user", "content": classification_prompt}
],
**model_params
)
# Log API call with session logger
if request_id:
try:
from session_logger import get_session_logger
session_logger = get_session_logger(request_id, user_question)
usage = response.usage if hasattr(response, 'usage') else None
prompt_tokens = usage.prompt_tokens if usage else 0
completion_tokens = usage.completion_tokens if usage else 0
session_logger.log_api_call("intent_classification", AZURE_OPENAI_DEPLOYMENT, prompt_tokens, completion_tokens)
except Exception as e:
pass # Don't fail if logging fails
try:
raw_content = response.choices[0].message.content
# Check for empty responses (common with GPT-5 models)
if not raw_content or raw_content.strip() == "":
print(f"⚠️ Empty response from {AZURE_OPENAI_DEPLOYMENT}")
raw_content = "{}" # Treat as empty JSON to trigger fallback
raw_content = raw_content.strip()
# Try to extract JSON if wrapped in code blocks or extra text
if '```json' in raw_content:
json_start = raw_content.find('```json') + 7
json_end = raw_content.find('```', json_start)
raw_content = raw_content[json_start:json_end].strip()
elif '{' in raw_content:
json_start = raw_content.find('{')
json_end = raw_content.rfind('}') + 1
raw_content = raw_content[json_start:json_end]
result = json.loads(raw_content)
# Check if we got an empty or incomplete result
if not result or 'persona' not in result:
print(f"⚠️ Incomplete JSON response from {AZURE_OPENAI_DEPLOYMENT}: {result}")
raise ValueError("Missing required fields in classification response")
return result
except (json.JSONDecodeError, ValueError) as e:
print(f"JSON parsing error: {e}")
print(f"Raw response: {response.choices[0].message.content}")
# Generic fallback - don't try to guess intent
return {
"intent": "general_query",
"persona": "product_planning",
"confidence": 0.5,
"execution_strategy": "single_stage",
"metadata_strategy": "skip", # Always skip for performance
"tool_chain": ["run_sql_query", "summarize_results"],
"reasoning": f"Fallback due to {AZURE_OPENAI_DEPLOYMENT} response issue: {str(e)[:50]}",
"requires_intermediate_processing": False,
"actual_tables": ["JPNPROdb_ps_mstr", "JPNPROdb_pt_mstr"]
}
def execute_tool_chain(user_question: str, classification: Dict[str, Any], request_id: str = None) -> Dict[str, Any]:
"""
Domain-agnostic tool chain execution with direct-first optimization and multi-stage support
"""
from mcp_server.tools.sql_tools import get_metadata, run_sql_query
from mcp_server.tools.analysis_tools import summarize_results, generate_visualization
# Load the appropriate persona
persona_content = load_persona_module(classification["persona"])
results = {
"classification": classification,
"persona_used": classification["persona"],
"tool_results": {},
"final_response": ""
}
try:
# Step 1: Try direct tools first (performance optimization)
direct_results = attempt_direct_tools(user_question, classification, request_id)
# Step 2: Execute based on direct tool success AND result quality
if direct_results["success"]:
# Check if direct tool actually found meaningful results
result_data = direct_results["result"]
direct_data = (result_data.get("specifications", []) or
result_data.get("results", []) or
[])
# DEBUG: Log what we're checking
print(f"DEBUG: Checking direct tool results - specifications: {len(result_data.get('specifications', []))}, results: {len(result_data.get('results', []))}, direct_data length: {len(direct_data) if direct_data else 0}")
if direct_data and len(direct_data) > 0:
# Direct path: Fast data retrieval + AI evaluation
print(f"DEBUG: Taking direct path with {len(direct_data)} results")
results["tool_results"] = execute_direct_with_evaluation(
user_question, direct_results, classification, persona_content, request_id
)
print(f"DEBUG: Direct path completed, execution_path: {results['tool_results'].get('execution_path', 'unknown')}")
else:
# Direct tool found no results - fallback to AI workflow
if request_id:
from logging_config import tracker, log_negative_case
tracker.log_direct_tool_fallback(request_id, "direct_tool_found_no_results")
# Log as negative case - direct tool didn't find expected data
log_negative_case(
request_id=request_id,
case_type="direct_tool_no_results",
user_question=user_question,
reason="Direct tool executed successfully but found no matching data",
classification=classification,
additional_context={"direct_tool_result": direct_results}
)
results["tool_results"] = execute_standard_ai_workflow(
user_question, classification, persona_content, request_id, {}
)
else:
# Fallback path: Standard AI workflow
results["tool_results"] = execute_standard_ai_workflow(
user_question, classification, persona_content, request_id, {}
)
# Generate final response
results["final_response"] = generate_final_response(user_question, results, persona_content)
# Log consolidated request flow
if request_id:
from logging_config import log_request_flow
# Extract key data for logging
persona = classification.get("persona", "unknown")
intent_type = classification.get("extracted_entities", {}).get("intent_type", "unknown")
execution_path = results["tool_results"].get("execution_path", "unknown")
success = bool(results.get("final_response"))
# Log the complete request flow
log_request_flow(
request_id=request_id,
user_question=user_question,
persona=persona,
intent_type=intent_type,
execution_path=execution_path,
direct_tools_result=direct_results if direct_results else None,
success=success
)
except Exception as e:
results["error"] = str(e)
results["final_response"] = f"Error executing tool chain: {str(e)}"
# Log negative case - system error
if request_id:
from logging_config import log_negative_case
log_negative_case(
request_id=request_id,
case_type="system_error",
user_question=user_question,
reason=str(e),
classification=classification,
additional_context={"stack_trace": str(e)}
)
return results
def handle_metadata_discovery(strategy: str, classification: Dict[str, Any] = None) -> Dict[str, Any]:
"""Generic metadata discovery handler"""
from mcp_server.tools.sql_tools import get_metadata
if strategy == "minimal":
# Use actual tables from classification if available
actual_tables = classification.get("actual_tables", []) if classification else []
if actual_tables:
validation_results = {}
for table in actual_tables:
validation_results[table] = get_metadata(table)
return {"strategy": "minimal", "validation_results": validation_results}
else:
return {"strategy": "minimal", "basic_schema": get_metadata()}
elif strategy == "full":
return {"strategy": "full", "full_schema": get_metadata()}
else:
return {"strategy": "skip"}
def execute_single_stage_workflow(user_question: str, classification: Dict[str, Any],
persona_content: str, request_id: str, existing_results: Dict[str, Any]) -> Dict[str, Any]:
"""Standard single-pass execution"""
from mcp_server.tools.sql_tools import run_sql_query
from mcp_server.tools.analysis_tools import summarize_results, generate_visualization
results = existing_results.copy()
# Check if this is a general inquiry that likely doesn't need database access
intent_type = classification.get("extracted_entities", {}).get("intent_type", "")
enable_ai_evaluation = classification.get("enable_ai_evaluation", False)
# For general inquiries with no business terms, skip SQL and go straight to AI evaluation
# This handles cases where enable_ai_evaluation might be false but we still want to answer general questions
if (intent_type == "general_inquiry" and
not any(term in user_question.lower() for term in [
"product", "price", "cost", "jpnprodb", "competitor", "specification",
"part", "component", "inventory", "sales", "mrh-", "spt", "hogy",
"livedo", "hopes", "medline", "table", "database", "schema"
])):
# Skip SQL execution and go straight to stage 3 evaluation for general knowledge
stage3_template = load_intent_template("stage3_evaluation")
stage3_context = f"""
{stage3_template}
PERSONA CONTEXT:
{persona_content}
GENERAL KNOWLEDGE QUESTION: No database lookup needed for this question
USER QUESTION: {user_question}
STAGE 3 TASK: Answer this general question using available knowledge. This does not require business/database data.
IMPORTANT: This appears to be a general knowledge question. Provide a direct, helpful answer without referencing business data.
"""
# Direct stage 3 evaluation without SQL
evaluation_result = evaluate_final_results(stage3_context, request_id)
results["stage3_evaluation"] = evaluation_result
return results
enhanced_question = f"""
Context from {classification['persona']} module:
{persona_content}
User question: {user_question}
"""
# Execute tool chain sequentially
for tool_name in classification["tool_chain"]:
if tool_name == "run_sql_query":
results["run_sql_query"] = run_sql_query(question=enhanced_question, request_id=request_id)
elif tool_name == "summarize_results" and "run_sql_query" in results:
sql_results = results["run_sql_query"].get("results", [])
results["summarize_results"] = summarize_results(sql_results, classification["intent"])
elif tool_name == "generate_visualization" and "run_sql_query" in results:
sql_results = results["run_sql_query"].get("results", [])
results["generate_visualization"] = generate_visualization(sql_results, "table", f"Results for: {user_question}")
# Add AI business evaluation if requested by classification
if enable_ai_evaluation and "run_sql_query" in results:
final_data = results["run_sql_query"].get("results", [])
if final_data:
# Load the stage3 evaluation template
stage3_template = load_intent_template("stage3_evaluation")
stage3_context = f"""
{stage3_template}
PERSONA CONTEXT:
{persona_content}
SINGLE-STAGE EXECUTION WITH AI EVALUATION: Direct SQL query executed successfully
QUERY RESULTS: {len(final_data)} records retrieved
USER QUESTION: {user_question}
STAGE 3 TASK: Evaluate single-stage query results to extract clear business answer using persona domain expertise.
Final Data Sample (compressed):
{compress_data_for_llm(final_data, max_records=10) if final_data else "No data available"}
"""
# Log data compression for single-stage evaluation
if final_data and request_id:
try:
from session_logger import get_session_logger
session_logger = get_session_logger(request_id, user_question)
compressed_sample = compress_data_for_llm(final_data, max_records=10)
compression_stats = get_compression_stats(final_data, compressed_sample)
session_logger.log_data_compression(
"single_stage_evaluation",
compression_stats['original_size'],
compression_stats['compressed_size'],
compression_stats['compression_ratio']
)
except Exception:
pass # Don't fail if logging fails
# AI evaluation of single-stage results
evaluation_result = evaluate_final_results(stage3_context, request_id)
results["stage3_evaluation"] = evaluation_result
return results
def execute_multi_stage_workflow(user_question: str, classification: Dict[str, Any],
persona_content: str, request_id: str, existing_results: Dict[str, Any]) -> Dict[str, Any]:
"""Multi-stage execution with intermediate AI reasoning using intent templates"""
from mcp_server.tools.sql_tools import run_sql_query
from mcp_server.tools.analysis_tools import summarize_results, generate_visualization
results = existing_results.copy()
# Load generic intent templates
stage1_template = load_intent_template("stage1_discovery")
stage2_template = load_intent_template("stage2_analysis")
stage3_template = load_intent_template("stage3_evaluation")
# Stage 1: Discovery using generic template + persona context
stage1_context = f"""
{stage1_template}
PERSONA CONTEXT:
{persona_content}
USER QUESTION: {user_question}
STAGE 1 TASK: Execute discovery query to find relevant candidates based on user criteria using persona domain knowledge.
"""
stage1_result = run_sql_query(question=stage1_context, request_id=request_id)
results["stage1_query"] = stage1_result
# Check if Stage 1 produced useful results
if stage1_result.get("results") and len(stage1_result["results"]) > 0:
# AI processes Stage 1 results for Stage 2 using persona context
intermediate_analysis = process_intermediate_results(
stage1_result["results"], user_question, persona_content
)
results["intermediate_analysis"] = intermediate_analysis
# Stage 2: Analysis using generic template + persona context + Stage 1 results
stage2_context = f"""
{stage2_template}
PERSONA CONTEXT:
{persona_content}
STAGE 1 RESULTS SUMMARY:
{intermediate_analysis.get("summary", "No summary available")}
SELECTED ITEMS FROM STAGE 1:
{intermediate_analysis.get("selected_items", "No items selected")}
USER QUESTION: {user_question}
STAGE 2 TASK: Execute detailed analysis query using selected items from Stage 1 to gather comprehensive information.
"""
stage2_result = run_sql_query(question=stage2_context, request_id=request_id)
results["stage2_query"] = stage2_result
# Stage 3: Evaluation using generic template + all previous context
final_data = stage2_result.get("results", [])
if final_data:
stage3_context = f"""
{stage3_template}
PERSONA CONTEXT:
{persona_content}
STAGE 1 RESULTS: {len(stage1_result.get("results", []))} candidates found
INTERMEDIATE ANALYSIS: {intermediate_analysis.get("reasoning", "No reasoning available")}
STAGE 2 RESULTS: {len(final_data)} detailed records retrieved
USER QUESTION: {user_question}
STAGE 3 TASK: Evaluate final results to extract clear business answer using persona domain expertise.
Final Data Sample (compressed):
{compress_data_for_llm(final_data, max_records=10) if final_data else "No data available"}
"""
# Log compression stats for Stage 3 with session logger
if final_data and request_id:
try:
from session_logger import get_session_logger
session_logger = get_session_logger(request_id, user_question)
compressed_sample = compress_data_for_llm(final_data, max_records=10)
compression_stats = get_compression_stats(final_data, compressed_sample)
session_logger.log_data_compression(
"stage3_evaluation",
compression_stats['original_size'],
compression_stats['compressed_size'],
compression_stats['compression_ratio']
)
except Exception as e:
pass # Don't fail if logging fails
# Use LLM for evaluation (no SQL execution - just analysis)
evaluation_result = evaluate_final_results(stage3_context, request_id)
results["stage3_evaluation"] = evaluation_result
else:
final_data = stage1_result.get("results", [])
else:
# Fallback if Stage 1 failed
final_data = stage1_result.get("results", [])
# Execute remaining tools on final data
if "summarize_results" in classification["tool_chain"] and final_data:
results["summarize_results"] = summarize_results(final_data, classification["intent"])
if "generate_visualization" in classification["tool_chain"] and final_data:
results["generate_visualization"] = generate_visualization(final_data, "table", f"Results for: {user_question}")
return results
def evaluate_final_results(evaluation_context: str, request_id: str = None) -> Dict[str, Any]:
"""Stage 3: Pure evaluation and analysis without SQL execution"""
evaluation_prompt = f"""
{evaluation_context}
IMPORTANT: DO NOT generate any SQL queries. Your task is to analyze the data provided above and create a business answer.
Based on the data gathered from Stages 1 and 2, provide a comprehensive business evaluation in JSON format.
CRITICAL: Escape all special characters in JSON strings. Use simple ASCII characters where possible to avoid JSON parsing errors.
{{
"business_answer": "Direct answer to user question - avoid special characters",
"key_findings": ["finding1", "finding2", "finding3"],
"recommended_action": "What user should do next",
"supporting_data": {{
"primary_values": "key metrics or values",
"alternatives": "other options if applicable",
"confidence": "high|medium|low"
}},
"data_quality": "assessment of result reliability",
"sql_executed": null
}}
"""
try:
# Get model-appropriate parameters
model_params = get_model_params(AZURE_OPENAI_DEPLOYMENT, 500, 0)
response = client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT,
messages=[
{"role": "system", "content": "You are a business analyst. Analyze data and provide insights. DO NOT generate SQL queries."},
{"role": "user", "content": evaluation_prompt}
],
**model_params
)
# Log API call
if request_id:
from logging_config import tracker
usage = response.usage if hasattr(response, 'usage') else None
prompt_tokens = usage.prompt_tokens if usage else 0
completion_tokens = usage.completion_tokens if usage else 0
tracker.log_api_call(request_id, AZURE_OPENAI_DEPLOYMENT, prompt_tokens, completion_tokens, "stage3_evaluation")
import json
raw_content = response.choices[0].message.content.strip()
# Try to extract JSON if wrapped in code blocks
if '```json' in raw_content:
json_start = raw_content.find('```json') + 7
json_end = raw_content.find('```', json_start)
raw_content = raw_content[json_start:json_end].strip()
elif '{' in raw_content:
json_start = raw_content.find('{')
json_end = raw_content.rfind('}') + 1
raw_content = raw_content[json_start:json_end]
try:
result = json.loads(raw_content)
result["sql_executed"] = None # Ensure no SQL was executed
return result
except json.JSONDecodeError as json_error:
# Extract key information from raw response instead of generic fallback
import re
# Try to extract business answer from raw text
business_answer_match = re.search(r'"business_answer":\s*"([^"]*(?:\\.[^"]*)*)"', raw_content)
business_answer = business_answer_match.group(1) if business_answer_match else "Analysis completed (JSON parsing issue)"
# Try to extract key findings
findings_match = re.search(r'"key_findings":\s*\[(.*?)\]', raw_content, re.DOTALL)
key_findings = []
if findings_match:
findings_text = findings_match.group(1)
findings = re.findall(r'"([^"]*(?:\\.[^"]*)*)"', findings_text)
key_findings = findings[:5] # Limit to 5 findings
if not key_findings:
key_findings = ["Analysis completed successfully", "Data analysis performed on available records"]
# Try to extract recommended action
action_match = re.search(r'"recommended_action":\s*"([^"]*(?:\\.[^"]*)*)"', raw_content)
recommended_action = action_match.group(1) if action_match else "Review the analysis results for insights"
# Clean up escaped characters
business_answer = business_answer.replace('\\"', '"').replace('\\n', ' ')
recommended_action = recommended_action.replace('\\"', '"').replace('\\n', ' ')
key_findings = [finding.replace('\\"', '"').replace('\\n', ' ') for finding in key_findings]
return {
"business_answer": business_answer,
"key_findings": key_findings,
"recommended_action": recommended_action,
"supporting_data": {
"primary_values": "Analysis extracted from LLM response",
"alternatives": "See raw response for additional details",
"confidence": "medium"
},
"data_quality": "good - LLM analysis completed",
"sql_executed": None,
"raw_response": raw_content[:500] + "..." if len(raw_content) > 500 else raw_content,
"parsing_note": "JSON parsing failed, extracted from raw text"
}
except Exception as e:
return {
"business_answer": "Error during evaluation stage",
"key_findings": [f"Evaluation error: {str(e)}"],
"recommended_action": "Review previous stage results manually",
"supporting_data": {
"primary_values": "N/A",
"alternatives": "N/A",
"confidence": "low"
},
"data_quality": "poor - evaluation failed",
"sql_executed": None,
"error": str(e)
}
def get_compression_stats(original_records: list, compressed_output: str) -> dict:
"""Calculate compression effectiveness for monitoring"""
if not original_records:
return {"compression_ratio": 0, "token_savings": 0}
original_size = len(str(original_records))
compressed_size = len(compressed_output)
compression_ratio = (original_size - compressed_size) / original_size if original_size > 0 else 0
return {
"original_size": original_size,
"compressed_size": compressed_size,
"compression_ratio": round(compression_ratio, 3),
"token_savings": original_size - compressed_size
}
def compress_data_for_llm(records: list, max_records: int = 10) -> str:
"""Compress data by extracting common fields and reducing redundancy"""
if not records or len(records) == 0:
return "No data available"
# Limit records first
limited_records = records[:max_records]
if len(limited_records) == 1:
# Single record - just format cleanly
record = limited_records[0]
return ", ".join([f"{k}: {v}" for k, v in record.items() if v is not None])
# Detect common fields across all records
common_fields = {}
first_record = limited_records[0]
for key in first_record.keys():
values = [record.get(key) for record in limited_records]
unique_values = set(v for v in values if v is not None)
# If all non-null values are identical, it's a common field
if len(unique_values) <= 1:
common_fields[key] = list(unique_values)[0] if unique_values else None
# Extract varying fields only
varying_records = []
for record in limited_records:
varying = {k: v for k, v in record.items()
if k not in common_fields and v is not None}
if varying: # Only include if there are varying fields
varying_records.append(varying)
# Format compressed output
if common_fields:
common_str = ", ".join([f"{k}: {v}" for k, v in common_fields.items() if v is not None])
varying_str = " | ".join([
", ".join([f"{k}: {v}" for k, v in record.items()])
for record in varying_records[:5] # Limit varying records too
])
return f"Common: {common_str} | Variants: {varying_str}"
else:
# No common fields, just show first few records compactly
return " | ".join([
", ".join([f"{k}: {v}" for k, v in record.items() if v is not None])
for record in limited_records[:3]
])
def process_intermediate_results(stage1_data: list, user_question: str, prompt_context: str) -> Dict[str, Any]:
"""AI processes Stage 1 results to inform Stage 2"""
# Use compressed data representation instead of raw stringification
data_summary = compress_data_for_llm(stage1_data, max_records=10)
# Log compression stats with session logger
try:
from session_logger import get_session_logger
session_logger = get_session_logger(request_id or "unknown", user_question)
compression_stats = get_compression_stats(stage1_data, data_summary)
session_logger.log_data_compression(
"stage1_intermediate",
compression_stats['original_size'],
compression_stats['compressed_size'],
compression_stats['compression_ratio']
)
except Exception as e:
pass # Don't fail if logging fails
analysis_prompt = f"""
Analyze these Stage 1 results and select the most relevant items for Stage 2 detailed analysis.
Original question: {user_question}
Context: {prompt_context[:500]}...
Stage 1 Results:
{data_summary}
Provide:
1. Brief summary of findings
2. Select the most relevant 1-3 items for detailed Stage 2 analysis
3. Key identifiers (IDs, part numbers, etc.) to use in Stage 2
Return JSON format:
{{
"summary": "brief summary of Stage 1 findings",
"selected_items": ["item1_id", "item2_id"],
"reasoning": "why these items were selected",
"stage2_focus": "what Stage 2 should analyze"
}}
"""
try:
# Get model-appropriate parameters
model_params = get_model_params(AZURE_OPENAI_DEPLOYMENT, 300, 0)
response = client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT,
messages=[{"role": "user", "content": analysis_prompt}],
**model_params
)
import json
return json.loads(response.choices[0].message.content.strip())
except:
# Fallback if AI processing fails
# Use compressed representation for fallback too
compressed_items = compress_data_for_llm(stage1_data[:3], max_records=3)
return {
"summary": f"Found {len(stage1_data)} potential matches",
"selected_items": compressed_items,
"reasoning": "Automatic selection due to processing error",
"stage2_focus": "detailed analysis of selected items"
}
def execute_iterative_workflow(user_question: str, classification: Dict[str, Any],
persona_content: str, request_id: str, existing_results: Dict[str, Any]) -> Dict[str, Any]:
"""Iterative refinement workflow (future enhancement)"""
# For now, fallback to multi-stage
return execute_multi_stage_workflow(user_question, classification, persona_content, request_id, existing_results)
# ===============================================================================
# DIRECT-FIRST OPTIMIZATION FUNCTIONS
# ===============================================================================
def attempt_direct_tools(user_question: str, classification: Dict[str, Any], request_id: str = None) -> Dict[str, Any]:
"""
Try direct tools with comprehensive error handling and performance tracking
"""
import time
try:
from mcp_server.tools.direct_tools_registry import get_direct_tools_for_persona
persona = classification.get("persona", "")
direct_tools = get_direct_tools_for_persona(persona)
if not direct_tools:
return {"success": False, "reason": "no_direct_tools_for_persona"}
# Track pattern matching results for consolidated logging
pattern_results = {}
# Try each applicable direct tool
for tool_config in direct_tools:
try:
# Check if pattern matches (pass classification to AI-powered pattern matcher)
pattern_match = tool_config["pattern_matcher"](user_question, classification)
pattern_results[tool_config["name"]] = pattern_match
if pattern_match:
# Execute direct tool with timing
start_time = time.time()
result = tool_config["executor"](user_question, classification)
execution_time_ms = (time.time() - start_time) * 1000
# Log successful direct execution
if request_id:
from logging_config import tracker
tracker.log_direct_tool_success(request_id, tool_config["name"], execution_time_ms)
return {
"success": True,
"tool_used": tool_config["name"],
"result": result,
"execution_time_ms": execution_time_ms,
"pattern_matched": True
}
except Exception as e:
# Log failure and continue to next tool
if request_id:
from logging_config import tracker
tracker.log_direct_tool_failure(request_id, tool_config["name"], str(e))
# Log and continue to next tool (don't print to console)
continue
return {"success": False, "reason": "no_pattern_match_or_all_failed", "pattern_results": pattern_results}
except ImportError:
# Direct tools registry not available - fallback gracefully
return {"success": False, "reason": "direct_tools_registry_unavailable"}
except Exception as e:
# Unexpected error - log and fallback
if request_id:
from logging_config import tracker
tracker.log_error(request_id, e, "direct_tools_attempt")
return {"success": False, "reason": f"direct_tools_error: {str(e)}"}
def execute_direct_with_evaluation(user_question: str, direct_results: Dict[str, Any],
classification: Dict[str, Any], persona_content: str, request_id: str = None) -> Dict[str, Any]:
"""
Execute direct data retrieval + AI evaluation (Stage 3 equivalent)
Direct results become the 'final_data' for AI evaluation
"""
results = {
"direct_tool_execution": direct_results,
"execution_path": "direct_first_with_ai_evaluation"
}
# Extract data from direct tool results (handle different field names from different direct tools)
result_data = direct_results["result"]
direct_data = (result_data.get("specifications", []) or
result_data.get("results", []))
if direct_data and len(direct_data) > 0:
# Stage 3: AI Evaluation using direct results (same as multi-stage Stage 3)
stage3_template = load_intent_template("stage3_evaluation")
stage3_context = f"""
{stage3_template}
PERSONA CONTEXT:
{persona_content}
DIRECT TOOL USED: {direct_results["tool_used"]}
DIRECT EXECUTION TIME: {direct_results["execution_time_ms"]}ms
USER QUESTION: {user_question}
STAGE 3 TASK: Evaluate direct lookup results to extract clear business answer using persona domain expertise.
Direct Tool Results (compressed):
{compress_data_for_llm(direct_data, max_records=10)}
IMPORTANT: This data was retrieved via direct SQL lookup, not AI-generated queries. Focus on business analysis and insights.
"""
# Log data compression if available
if direct_data and request_id:
try:
from session_logger import get_session_logger
session_logger = get_session_logger(request_id, user_question)
compressed_sample = compress_data_for_llm(direct_data, max_records=10)
compression_stats = get_compression_stats(direct_data, compressed_sample)
session_logger.log_data_compression(
"direct_tool_evaluation",
compression_stats['original_size'],
compression_stats['compressed_size'],
compression_stats['compression_ratio']
)
except Exception:
pass # Don't fail if logging fails
# AI evaluation of direct results
evaluation_result = evaluate_final_results(stage3_context, request_id)
results["stage3_evaluation"] = evaluation_result
else:
# No data from direct tool - create evaluation noting the failure
results["stage3_evaluation"] = {
"business_answer": "Direct lookup found no matching products",
"key_findings": ["No direct mapping available for requested product"],
"recommended_action": "Try alternative search or contact support for manual mapping",
"supporting_data": {
"primary_values": "No results",
"alternatives": "AI workflow may find similar products",
"confidence": "low"
},
"data_quality": "poor - no direct mapping available",
"sql_executed": None
}
# Execute remaining standard tools if specified in tool chain
tool_chain = classification.get("tool_chain", [])
if "summarize_results" in tool_chain and direct_data:
from mcp_server.tools.analysis_tools import summarize_results
results["summarize_results"] = summarize_results(direct_data, classification["intent"])
if "generate_visualization" in tool_chain and direct_data:
from mcp_server.tools.analysis_tools import generate_visualization
results["generate_visualization"] = generate_visualization(direct_data, "table", f"Direct Results: {user_question}")
return results
def execute_standard_ai_workflow(user_question: str, classification: Dict[str, Any],
persona_content: str, request_id: str, existing_results: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute standard AI workflow (fallback from direct tools)
This maintains the existing single-stage/multi-stage logic
"""
results = existing_results.copy()
results["execution_path"] = "ai_workflow_fallback"
# Log that we're using AI fallback
if request_id:
from logging_config import tracker
tracker.log_direct_tool_fallback(request_id, "falling_back_to_ai_workflow")
# Execute based on original strategy
execution_strategy = classification.get("execution_strategy", "single_stage")
if execution_strategy == "multi_stage":
return execute_multi_stage_workflow(
user_question, classification, persona_content, request_id, results
)
elif execution_strategy == "iterative":
return execute_iterative_workflow(
user_question, classification, persona_content, request_id, results
)
else:
# Single-stage execution
return execute_single_stage_workflow(
user_question, classification, persona_content, request_id, results
)
def generate_final_response(user_question: str, results: Dict[str, Any], persona_context: str) -> str:
"""Generate a final business-friendly response using all tool results"""
# Extract key information from tool results
tool_results = results.get("tool_results", {})
# Check if we have Stage 3 evaluation (multi-stage workflow)
stage3_eval = tool_results.get("stage3_evaluation", {})
if stage3_eval and stage3_eval.get("business_answer"):
# Use Stage 3 business analysis as the primary response
response_parts = []
response_parts.append(f"**{stage3_eval['business_answer']}**")
if stage3_eval.get("key_findings"):
response_parts.append("\n**Key Findings:**")
for finding in stage3_eval["key_findings"]:
response_parts.append(f"• {finding}")
if stage3_eval.get("recommended_action"):
response_parts.append(f"\n**Recommended Action:** {stage3_eval['recommended_action']}")
return "\n".join(response_parts)
# Fallback to single-stage response format
sql_results = tool_results.get("run_sql_query", {}).get("results", [])
summary = tool_results.get("summarize_results", {})
if not sql_results:
# For any question that produces no database results, try stage 3 evaluation as fallback
# This handles both business questions that find no data AND general knowledge questions
stage3_template = load_intent_template("stage3_evaluation")
fallback_context = f"""
{stage3_template}
PERSONA CONTEXT:
{persona_context}
NO DATABASE RESULTS FALLBACK: The database query returned no results for this question.
USER QUESTION: {user_question}
STAGE 3 TASK: Since no database results were found, evaluate if this can be answered using general knowledge or provide appropriate guidance.
IMPORTANT: The database contained no relevant data for this question. Either answer using available knowledge (if it's a general question) or explain why no business data was found.
"""
try:
fallback_evaluation = evaluate_final_results(fallback_context)
if fallback_evaluation and fallback_evaluation.get("business_answer"):
return fallback_evaluation["business_answer"]
except Exception as e:
# If fallback fails, continue to standard error message
pass
return "I wasn't able to retrieve data to answer your question. Please check if the tables are accessible or rephrase your question."
response_parts = []
# Quick answer
response_parts.append(f"**Answer to: {user_question}**")
# Summary from summarize_results tool
if summary.get("summary"):
response_parts.append(f"\n**Summary**: {summary['summary']}")
# Key details (first few records)
if isinstance(sql_results, list) and sql_results:
response_parts.append(f"\n**Key Details**:")
for i, record in enumerate(sql_results[:3]): # Show first 3 records
record_str = ", ".join([f"{k}: {v}" for k, v in record.items() if v is not None])
response_parts.append(f"• Record {i+1}: {record_str}")
# Data summary
if isinstance(sql_results, list):
response_parts.append(f"\n**Data Summary**: Found {len(sql_results)} records")
# Suggestions for product planning context
if "product_planning" in results.get("classification", {}).get("intent", ""):
response_parts.append("\n**Suggestions**: You can also ask about product comparisons, specifications, or part number relationships.")
return "\n".join(response_parts)