orchestration.py•5.96 kB
from typing import Dict, Any, List, Set
from langgraph.graph import StateGraph, END
from providers import PROVIDERS
from parsing import (
extract_domains, extract_mentions, naive_sentiment,
compute_sov, compute_first_position, auto_detect_brands
)
# State definition
class VisibilityState:
def __init__(self, phone: str, query_text: str, brands: List[str], platforms: List[str]):
self.phone = phone
self.query_text = query_text
self.brands = brands
self.platforms = platforms
self.raw_results: Dict[str, Any] = {}
self.parsed: Dict[str, Any] = {}
self.summary: Dict[str, Any] = {}
async def run_platforms(state: VisibilityState) -> VisibilityState:
"""Run visibility check on all specified platforms"""
prompt = f'For the query "{state.query_text}", list the top 5 recommended brands/tools. Provide brief reasoning and citations (links/domains). Keep under 200 words.'
for platform in state.platforms:
if platform in PROVIDERS:
provider = PROVIDERS[platform]
result = await provider.call(prompt)
state.raw_results[platform] = result
else:
state.raw_results[platform] = {"error": "unknown_platform"}
return state
def parse_results(state: VisibilityState) -> VisibilityState:
"""Parse raw results to extract mentions, citations, and sentiment"""
state.parsed = {}
for platform, result in state.raw_results.items():
if "error" in result:
state.parsed[platform] = {"error": result["error"]}
continue
raw_answer = result["raw_answer"]
# Extract mentions
mentions = extract_mentions(raw_answer, set(state.brands))
# Extract citations
citations = extract_domains(raw_answer)
# Compute sentiment for each brand
sentiment = {}
for brand in state.brands:
sentiment[brand] = naive_sentiment(raw_answer, brand)
# Compute first position
first_position_brand = compute_first_position(mentions)
state.parsed[platform] = {
"mentions": mentions,
"citations": citations,
"sentiment": sentiment,
"first_position_brand": first_position_brand,
"stats": {
"tokens_used": result.get("tokens_used"),
"latency_ms": result.get("latency_ms")
}
}
return state
def aggregate(state: VisibilityState) -> VisibilityState:
"""Compute aggregated metrics across platforms"""
all_mentions = []
all_citations = []
platform_status = []
# Collect all mentions and citations
for platform, parsed in state.parsed.items():
if "error" in parsed:
platform_status.append({
"platform": platform,
"status": parsed["error"],
"last_latency_ms": None
})
continue
all_mentions.extend(parsed["mentions"])
all_citations.extend(parsed["citations"])
platform_status.append({
"platform": platform,
"status": "ok",
"last_latency_ms": parsed["stats"].get("latency_ms")
})
# Compute overall SoV
overall_sov = compute_sov(all_mentions)
# Aggregate citations
citation_counts = {}
for citation in all_citations:
domain = citation["domain"]
citation_counts[domain] = citation_counts.get(domain, 0) + citation["count"]
top_citation_domains = [
{"domain": domain, "count": count}
for domain, count in sorted(citation_counts.items(), key=lambda x: x[1], reverse=True)
]
# Aggregate sentiment
sentiment_summary = {}
for brand in state.brands:
brand_sentiment = {"positive": 0.0, "neutral": 0.0, "negative": 0.0}
count = 0
for platform, parsed in state.parsed.items():
if "error" not in parsed and brand in parsed["sentiment"]:
sentiment = parsed["sentiment"][brand]
brand_sentiment["positive"] += sentiment["positive"]
brand_sentiment["neutral"] += sentiment["neutral"]
brand_sentiment["negative"] += sentiment["negative"]
count += 1
if count > 0:
sentiment_summary[brand] = {
"positive": round(brand_sentiment["positive"] / count, 3),
"neutral": round(brand_sentiment["neutral"] / count, 3),
"negative": round(brand_sentiment["negative"] / count, 3)
}
state.summary = {
"platforms": platform_status,
"top_brands": [m["brand"] for m in all_mentions[:5]],
"share_of_voice": overall_sov,
"top_citation_domains": top_citation_domains[:10],
"sentiment_summary": sentiment_summary,
"notes": []
}
return state
def final(state: VisibilityState) -> Dict[str, Any]:
"""Return final structured response"""
return {
"ok": True,
"summary": state.summary,
"raw_results": state.raw_results,
"parsed": state.parsed
}
# Create the workflow
def create_visibility_workflow():
"""Create the LangGraph workflow for visibility checks"""
workflow = StateGraph(VisibilityState)
workflow.add_node("run_platforms", run_platforms)
workflow.add_node("parse_results", parse_results)
workflow.add_node("aggregate", aggregate)
workflow.add_node("final", final)
workflow.set_entry_point("run_platforms")
workflow.add_edge("run_platforms", "parse_results")
workflow.add_edge("parse_results", "aggregate")
workflow.add_edge("aggregate", "final")
workflow.add_edge("final", END)
return workflow.compile()
# Global workflow instance
visibility_workflow = create_visibility_workflow()