mcp_starter.py•17.6 kB
import asyncio
from typing import Annotated, List
import os
from dotenv import load_dotenv
from fastmcp import FastMCP
from fastmcp.server.auth.providers.bearer import BearerAuthProvider, RSAKeyPair
from mcp import ErrorData, McpError
from mcp.server.auth.provider import AccessToken
from mcp.types import TextContent, ImageContent, INVALID_PARAMS, INTERNAL_ERROR
from pydantic import BaseModel, Field, AnyUrl
import markdownify
import httpx
import readabilipy
import sys
import os
# Add the data directory to the path
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'data'))
# Import visibility modules
from database import create_db_and_tables
from visibility_service import visibility_service
# --- Load environment variables ---
load_dotenv()
TOKEN = os.environ.get("AUTH_TOKEN")
MY_NUMBER = os.environ.get("MY_NUMBER")
assert TOKEN is not None, "Please set AUTH_TOKEN in your .env file"
assert MY_NUMBER is not None, "Please set MY_NUMBER in your .env file"
# --- Auth Provider ---
class SimpleBearerAuthProvider(BearerAuthProvider):
def __init__(self, token: str):
k = RSAKeyPair.generate()
super().__init__(public_key=k.public_key, jwks_uri=None, issuer=None, audience=None)
self.token = token
async def load_access_token(self, token: str) -> AccessToken | None:
if token == self.token:
return AccessToken(
token=token,
client_id="puch-client",
scopes=["*"],
expires_at=None,
)
return None
# --- Rich Tool Description model ---
class RichToolDescription(BaseModel):
description: str
use_when: str
side_effects: str | None = None
# --- Fetch Utility Class ---
class Fetch:
USER_AGENT = "Puch/1.0 (Autonomous)"
@classmethod
async def fetch_url(
cls,
url: str,
user_agent: str,
force_raw: bool = False,
) -> tuple[str, str]:
async with httpx.AsyncClient() as client:
try:
response = await client.get(
url,
follow_redirects=True,
headers={"User-Agent": user_agent},
timeout=30,
)
except httpx.HTTPError as e:
raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to fetch {url}: {e!r}"))
if response.status_code >= 400:
raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to fetch {url} - status code {response.status_code}"))
page_raw = response.text
content_type = response.headers.get("content-type", "")
is_page_html = "text/html" in content_type
if is_page_html and not force_raw:
return cls.extract_content_from_html(page_raw), ""
return (
page_raw,
f"Content type {content_type} cannot be simplified to markdown, but here is the raw content:\n",
)
@staticmethod
def extract_content_from_html(html: str) -> str:
"""Extract and convert HTML content to Markdown format."""
ret = readabilipy.simple_json.simple_json_from_html_string(html, use_readability=True)
if not ret or not ret.get("content"):
return "<error>Page failed to be simplified from HTML</error>"
content = markdownify.markdownify(ret["content"], heading_style=markdownify.ATX)
return content
@staticmethod
async def google_search_links(query: str, num_results: int = 5) -> list[str]:
"""
Perform a scoped DuckDuckGo search and return a list of job posting URLs.
(Using DuckDuckGo because Google blocks most programmatic scraping.)
"""
ddg_url = f"https://html.duckduckgo.com/html/?q={query.replace(' ', '+')}"
links = []
async with httpx.AsyncClient() as client:
resp = await client.get(ddg_url, headers={"User-Agent": Fetch.USER_AGENT})
if resp.status_code != 200:
return ["<error>Failed to perform search.</error>"]
from bs4 import BeautifulSoup
soup = BeautifulSoup(resp.text, "html.parser")
for a in soup.find_all("a", class_="result__a", href=True):
href = a["href"]
if "http" in href:
links.append(href)
if len(links) >= num_results:
break
return links or ["<error>No results found.</error>"]
# --- MCP Server Setup ---
mcp = FastMCP(
"Job Finder MCP Server",
auth=SimpleBearerAuthProvider(TOKEN),
)
# --- Tool: validate (required by Puch) ---
@mcp.tool
async def validate() -> str:
return MY_NUMBER
# --- Tool: job_finder (now smart!) ---
JobFinderDescription = RichToolDescription(
description="Smart job tool: analyze descriptions, fetch URLs, or search jobs based on free text.",
use_when="Use this to evaluate job descriptions or search for jobs using freeform goals.",
side_effects="Returns insights, fetched job descriptions, or relevant job links.",
)
@mcp.tool(description=JobFinderDescription.model_dump_json())
async def job_finder(
user_goal: Annotated[str, Field(description="The user's goal (can be a description, intent, or freeform query)")],
job_description: Annotated[str | None, Field(description="Full job description text, if available.")] = None,
job_url: Annotated[AnyUrl | None, Field(description="A URL to fetch a job description from.")] = None,
raw: Annotated[bool, Field(description="Return raw HTML content if True")] = False,
) -> str:
"""
Handles multiple job discovery methods: direct description, URL fetch, or freeform search query.
"""
if job_description:
return (
f"📝 **Job Description Analysis**\n\n"
f"---\n{job_description.strip()}\n---\n\n"
f"User Goal: **{user_goal}**\n\n"
f"💡 Suggestions:\n- Tailor your resume.\n- Evaluate skill match.\n- Consider applying if relevant."
)
if job_url:
content, _ = await Fetch.fetch_url(str(job_url), Fetch.USER_AGENT, force_raw=raw)
return (
f"🔗 **Fetched Job Posting from URL**: {job_url}\n\n"
f"---\n{content.strip()}\n---\n\n"
f"User Goal: **{user_goal}**"
)
if "look for" in user_goal.lower() or "find" in user_goal.lower():
links = await Fetch.google_search_links(user_goal)
return (
f"🔍 **Search Results for**: _{user_goal}_\n\n" +
"\n".join(f"- {link}" for link in links)
)
raise McpError(ErrorData(code=INVALID_PARAMS, message="Please provide either a job description, a job URL, or a search query in user_goal."))
# Image inputs and sending images
MAKE_IMG_BLACK_AND_WHITE_DESCRIPTION = RichToolDescription(
description="Convert an image to black and white and save it.",
use_when="Use this tool when the user provides an image URL and requests it to be converted to black and white.",
side_effects="The image will be processed and saved in a black and white format.",
)
@mcp.tool(description=MAKE_IMG_BLACK_AND_WHITE_DESCRIPTION.model_dump_json())
async def make_img_black_and_white(
puch_image_data: Annotated[str, Field(description="Base64-encoded image data to convert to black and white")] = None,
) -> list[TextContent | ImageContent]:
import base64
import io
from PIL import Image
try:
image_bytes = base64.b64decode(puch_image_data)
image = Image.open(io.BytesIO(image_bytes))
bw_image = image.convert("L")
buf = io.BytesIO()
bw_image.save(buf, format="PNG")
bw_bytes = buf.getvalue()
bw_base64 = base64.b64encode(bw_bytes).decode("utf-8")
return [ImageContent(type="image", mimeType="image/png", data=bw_base64)]
except Exception as e:
raise McpError(ErrorData(code=INTERNAL_ERROR, message=str(e)))
# --- Visibility Tools ---
@mcp.tool(description="Add or update a tracked query for LLM visibility monitoring")
async def add_tracking_query(
bearer_token: Annotated[str, Field(description="Authentication token")],
query: Annotated[str, Field(description="The search query to track")],
competitors: Annotated[list[str] | None, Field(description="List of competitor brands to monitor")] = None,
) -> str:
"""Add or update a tracked query for visibility monitoring"""
if bearer_token != TOKEN:
raise McpError(ErrorData(code=INVALID_PARAMS, message="Invalid bearer token"))
try:
result = await visibility_service.add_tracking_query(MY_NUMBER, query, competitors)
return f"✅ Query tracked successfully! Query ID: {result['query_id']}"
except Exception as e:
raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to add tracking query: {str(e)}"))
@mcp.tool(description="List all tracked queries for the user")
async def list_tracked_queries(
bearer_token: Annotated[str, Field(description="Authentication token")],
) -> str:
"""List all tracked queries for the user"""
if bearer_token != TOKEN:
raise McpError(ErrorData(code=INVALID_PARAMS, message="Invalid bearer token"))
try:
result = visibility_service.list_tracked_queries(MY_NUMBER)
if not result["queries"]:
return "📝 No tracked queries found. Use add_tracking_query to start monitoring."
queries_text = "\n".join([
f"• **{q['query_text']}** (ID: {q['id']})\n Competitors: {', '.join(q['competitors']) if q['competitors'] else 'Auto-detect'}\n Created: {q['created_at']}"
for q in result["queries"]
])
return f"📊 **Tracked Queries:**\n\n{queries_text}"
except Exception as e:
raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to list queries: {str(e)}"))
@mcp.tool(description="Run visibility check for a specific query across LLM platforms")
async def run_visibility_check(
bearer_token: Annotated[str, Field(description="Authentication token")],
query_id: Annotated[int, Field(description="ID of the tracked query to check")],
platforms: Annotated[List[str] | None, Field(description="List of platforms to check (default: azure_openai, perplexity)")] = None,
) -> str:
"""Run visibility check for a specific query"""
if bearer_token != TOKEN:
raise McpError(ErrorData(code=INVALID_PARAMS, message="Invalid bearer token"))
try:
result = await visibility_service.run_visibility_check(MY_NUMBER, query_id, platforms)
if "error" in result:
return f"❌ Error: {result['error']}"
summary = result["summary"]
# Format the response
response = f"🔍 **Visibility Check Results** (Run ID: {result['run_id']})\n\n"
# Platform status
response += "**Platform Status:**\n"
for platform in summary["platforms"]:
status_emoji = "✅" if platform["status"] == "ok" else "❌"
response += f"{status_emoji} {platform['platform']}: {platform['status']}"
if platform.get("last_latency_ms"):
response += f" ({platform['last_latency_ms']}ms)"
response += "\n"
# Top brands
if summary["top_brands"]:
response += f"\n**Top Brands Mentioned:** {', '.join(summary['top_brands'])}\n"
# Share of voice
if summary["share_of_voice"]:
response += "\n**Share of Voice:**\n"
for sov in summary["share_of_voice"]:
response += f"• {sov['brand']}: {sov['overall']:.1%}\n"
# Top citation domains
if summary["top_citation_domains"]:
response += "\n**Top Citation Domains:**\n"
for domain in summary["top_citation_domains"][:5]:
response += f"• {domain['domain']}: {domain['count']} citations\n"
return response
except Exception as e:
raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to run visibility check: {str(e)}"))
@mcp.tool(description="Fetch comprehensive visibility report for a tracked query")
async def fetch_visibility_report(
bearer_token: Annotated[str, Field(description="Authentication token")],
query_id: Annotated[int, Field(description="ID of the tracked query")],
range_days: Annotated[str, Field(description="Time range: 7d, 30d, or all")] = "7d",
) -> str:
"""Fetch visibility report for a query"""
if bearer_token != TOKEN:
raise McpError(ErrorData(code=INVALID_PARAMS, message="Invalid bearer token"))
try:
result = visibility_service.fetch_visibility_report(MY_NUMBER, query_id, range_days)
if "error" in result:
return f"❌ Error: {result['error']}"
response = f"📊 **Visibility Report**\n\n"
response += f"**Query:** {result['query']}\n"
response += f"**Period:** {result['period']}\n\n"
# Share of voice
if result["share_of_voice"]:
response += "**Share of Voice:**\n"
for sov in result["share_of_voice"]:
response += f"• {sov['brand']}: {sov['overall']:.1%}\n"
# Sentiment
if result["sentiment"]:
response += "\n**Sentiment Analysis:**\n"
for sentiment in result["sentiment"]:
brand = sentiment["brand"]
pos = sentiment["positive"]
neu = sentiment["neutral"]
neg = sentiment["negative"]
response += f"• {brand}: 😊{pos:.1%} 😐{neu:.1%} 😞{neg:.1%}\n"
# Top citation domains
if result["top_citation_domains"]:
response += "\n**Top Citation Domains:**\n"
for domain in result["top_citation_domains"][:5]:
response += f"• {domain['domain']}: {domain['count']} citations\n"
# Platform status
if result["platform_status"]:
response += "\n**Platform Status:**\n"
for platform in result["platform_status"]:
status_emoji = "✅" if platform["status"] == "ok" else "❌"
response += f"{status_emoji} {platform['platform']}: {platform['status']}\n"
return response
except Exception as e:
raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to fetch report: {str(e)}"))
@mcp.tool(description="Get platform snapshot for a specific query and platform")
async def get_platform_snapshot(
bearer_token: Annotated[str, Field(description="Authentication token")],
query_id: Annotated[int, Field(description="ID of the tracked query")],
platform: Annotated[str, Field(description="Platform name (azure_openai, perplexity)")],
date: Annotated[str | None, Field(description="Date in YYYY-MM-DD format (optional)")] = None,
) -> str:
"""Get platform snapshot for a specific query and platform"""
if bearer_token != TOKEN:
raise McpError(ErrorData(code=INVALID_PARAMS, message="Invalid bearer token"))
try:
result = visibility_service.get_platform_snapshot(MY_NUMBER, query_id, platform, date)
if "error" in result:
return f"❌ Error: {result['error']}"
response = f"📸 **Platform Snapshot**\n\n"
response += f"**Query:** {result['query']}\n"
response += f"**Platform:** {result['platform']}\n"
response += f"**Date:** {result['date']}\n\n"
# Raw answer
response += f"**Raw Answer:**\n{result['raw_answer'][:500]}...\n\n"
# Mentions
if result["mentions"]:
response += "**Brand Mentions:**\n"
for mention in result["mentions"]:
response += f"• {mention['brand']}: {mention['count']} mentions (first at position {mention['first_index']})\n"
# Citations
if result["citations"]:
response += "\n**Citations:**\n"
for citation in result["citations"][:5]:
response += f"• {citation['domain']}: {citation['count']} citations\n"
return response
except Exception as e:
raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to get snapshot: {str(e)}"))
@mcp.tool(description="Run visibility check for all tracked queries (helper for automation)")
async def run_all_today(
bearer_token: Annotated[str, Field(description="Authentication token")],
) -> str:
"""Run visibility check for all tracked queries"""
if bearer_token != TOKEN:
raise McpError(ErrorData(code=INVALID_PARAMS, message="Invalid bearer token"))
try:
result = await visibility_service.run_all_today(MY_NUMBER)
if result["run_ids"]:
return f"✅ Successfully ran {len(result['run_ids'])} visibility checks. Run IDs: {', '.join(map(str, result['run_ids']))}"
else:
return "ℹ️ No tracked queries found to run visibility checks on."
except Exception as e:
raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to run all checks: {str(e)}"))
# --- Run MCP Server ---
async def main():
print("🚀 Starting MCP server on http://0.0.0.0:8086")
# Initialize database
print("📊 Initializing database...")
create_db_and_tables()
print("✅ Database initialized successfully!")
await mcp.run_async("streamable-http", host="0.0.0.0", port=8086)
if __name__ == "__main__":
asyncio.run(main())