Skip to main content
Glama

MCP Starter for Puch AI

by Kulraj69
mcp_starter.py17.6 kB
import asyncio from typing import Annotated, List import os from dotenv import load_dotenv from fastmcp import FastMCP from fastmcp.server.auth.providers.bearer import BearerAuthProvider, RSAKeyPair from mcp import ErrorData, McpError from mcp.server.auth.provider import AccessToken from mcp.types import TextContent, ImageContent, INVALID_PARAMS, INTERNAL_ERROR from pydantic import BaseModel, Field, AnyUrl import markdownify import httpx import readabilipy import sys import os # Add the data directory to the path sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'data')) # Import visibility modules from database import create_db_and_tables from visibility_service import visibility_service # --- Load environment variables --- load_dotenv() TOKEN = os.environ.get("AUTH_TOKEN") MY_NUMBER = os.environ.get("MY_NUMBER") assert TOKEN is not None, "Please set AUTH_TOKEN in your .env file" assert MY_NUMBER is not None, "Please set MY_NUMBER in your .env file" # --- Auth Provider --- class SimpleBearerAuthProvider(BearerAuthProvider): def __init__(self, token: str): k = RSAKeyPair.generate() super().__init__(public_key=k.public_key, jwks_uri=None, issuer=None, audience=None) self.token = token async def load_access_token(self, token: str) -> AccessToken | None: if token == self.token: return AccessToken( token=token, client_id="puch-client", scopes=["*"], expires_at=None, ) return None # --- Rich Tool Description model --- class RichToolDescription(BaseModel): description: str use_when: str side_effects: str | None = None # --- Fetch Utility Class --- class Fetch: USER_AGENT = "Puch/1.0 (Autonomous)" @classmethod async def fetch_url( cls, url: str, user_agent: str, force_raw: bool = False, ) -> tuple[str, str]: async with httpx.AsyncClient() as client: try: response = await client.get( url, follow_redirects=True, headers={"User-Agent": user_agent}, timeout=30, ) except httpx.HTTPError as e: raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to fetch {url}: {e!r}")) if response.status_code >= 400: raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to fetch {url} - status code {response.status_code}")) page_raw = response.text content_type = response.headers.get("content-type", "") is_page_html = "text/html" in content_type if is_page_html and not force_raw: return cls.extract_content_from_html(page_raw), "" return ( page_raw, f"Content type {content_type} cannot be simplified to markdown, but here is the raw content:\n", ) @staticmethod def extract_content_from_html(html: str) -> str: """Extract and convert HTML content to Markdown format.""" ret = readabilipy.simple_json.simple_json_from_html_string(html, use_readability=True) if not ret or not ret.get("content"): return "<error>Page failed to be simplified from HTML</error>" content = markdownify.markdownify(ret["content"], heading_style=markdownify.ATX) return content @staticmethod async def google_search_links(query: str, num_results: int = 5) -> list[str]: """ Perform a scoped DuckDuckGo search and return a list of job posting URLs. (Using DuckDuckGo because Google blocks most programmatic scraping.) """ ddg_url = f"https://html.duckduckgo.com/html/?q={query.replace(' ', '+')}" links = [] async with httpx.AsyncClient() as client: resp = await client.get(ddg_url, headers={"User-Agent": Fetch.USER_AGENT}) if resp.status_code != 200: return ["<error>Failed to perform search.</error>"] from bs4 import BeautifulSoup soup = BeautifulSoup(resp.text, "html.parser") for a in soup.find_all("a", class_="result__a", href=True): href = a["href"] if "http" in href: links.append(href) if len(links) >= num_results: break return links or ["<error>No results found.</error>"] # --- MCP Server Setup --- mcp = FastMCP( "Job Finder MCP Server", auth=SimpleBearerAuthProvider(TOKEN), ) # --- Tool: validate (required by Puch) --- @mcp.tool async def validate() -> str: return MY_NUMBER # --- Tool: job_finder (now smart!) --- JobFinderDescription = RichToolDescription( description="Smart job tool: analyze descriptions, fetch URLs, or search jobs based on free text.", use_when="Use this to evaluate job descriptions or search for jobs using freeform goals.", side_effects="Returns insights, fetched job descriptions, or relevant job links.", ) @mcp.tool(description=JobFinderDescription.model_dump_json()) async def job_finder( user_goal: Annotated[str, Field(description="The user's goal (can be a description, intent, or freeform query)")], job_description: Annotated[str | None, Field(description="Full job description text, if available.")] = None, job_url: Annotated[AnyUrl | None, Field(description="A URL to fetch a job description from.")] = None, raw: Annotated[bool, Field(description="Return raw HTML content if True")] = False, ) -> str: """ Handles multiple job discovery methods: direct description, URL fetch, or freeform search query. """ if job_description: return ( f"📝 **Job Description Analysis**\n\n" f"---\n{job_description.strip()}\n---\n\n" f"User Goal: **{user_goal}**\n\n" f"💡 Suggestions:\n- Tailor your resume.\n- Evaluate skill match.\n- Consider applying if relevant." ) if job_url: content, _ = await Fetch.fetch_url(str(job_url), Fetch.USER_AGENT, force_raw=raw) return ( f"🔗 **Fetched Job Posting from URL**: {job_url}\n\n" f"---\n{content.strip()}\n---\n\n" f"User Goal: **{user_goal}**" ) if "look for" in user_goal.lower() or "find" in user_goal.lower(): links = await Fetch.google_search_links(user_goal) return ( f"🔍 **Search Results for**: _{user_goal}_\n\n" + "\n".join(f"- {link}" for link in links) ) raise McpError(ErrorData(code=INVALID_PARAMS, message="Please provide either a job description, a job URL, or a search query in user_goal.")) # Image inputs and sending images MAKE_IMG_BLACK_AND_WHITE_DESCRIPTION = RichToolDescription( description="Convert an image to black and white and save it.", use_when="Use this tool when the user provides an image URL and requests it to be converted to black and white.", side_effects="The image will be processed and saved in a black and white format.", ) @mcp.tool(description=MAKE_IMG_BLACK_AND_WHITE_DESCRIPTION.model_dump_json()) async def make_img_black_and_white( puch_image_data: Annotated[str, Field(description="Base64-encoded image data to convert to black and white")] = None, ) -> list[TextContent | ImageContent]: import base64 import io from PIL import Image try: image_bytes = base64.b64decode(puch_image_data) image = Image.open(io.BytesIO(image_bytes)) bw_image = image.convert("L") buf = io.BytesIO() bw_image.save(buf, format="PNG") bw_bytes = buf.getvalue() bw_base64 = base64.b64encode(bw_bytes).decode("utf-8") return [ImageContent(type="image", mimeType="image/png", data=bw_base64)] except Exception as e: raise McpError(ErrorData(code=INTERNAL_ERROR, message=str(e))) # --- Visibility Tools --- @mcp.tool(description="Add or update a tracked query for LLM visibility monitoring") async def add_tracking_query( bearer_token: Annotated[str, Field(description="Authentication token")], query: Annotated[str, Field(description="The search query to track")], competitors: Annotated[list[str] | None, Field(description="List of competitor brands to monitor")] = None, ) -> str: """Add or update a tracked query for visibility monitoring""" if bearer_token != TOKEN: raise McpError(ErrorData(code=INVALID_PARAMS, message="Invalid bearer token")) try: result = await visibility_service.add_tracking_query(MY_NUMBER, query, competitors) return f"✅ Query tracked successfully! Query ID: {result['query_id']}" except Exception as e: raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to add tracking query: {str(e)}")) @mcp.tool(description="List all tracked queries for the user") async def list_tracked_queries( bearer_token: Annotated[str, Field(description="Authentication token")], ) -> str: """List all tracked queries for the user""" if bearer_token != TOKEN: raise McpError(ErrorData(code=INVALID_PARAMS, message="Invalid bearer token")) try: result = visibility_service.list_tracked_queries(MY_NUMBER) if not result["queries"]: return "📝 No tracked queries found. Use add_tracking_query to start monitoring." queries_text = "\n".join([ f"• **{q['query_text']}** (ID: {q['id']})\n Competitors: {', '.join(q['competitors']) if q['competitors'] else 'Auto-detect'}\n Created: {q['created_at']}" for q in result["queries"] ]) return f"📊 **Tracked Queries:**\n\n{queries_text}" except Exception as e: raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to list queries: {str(e)}")) @mcp.tool(description="Run visibility check for a specific query across LLM platforms") async def run_visibility_check( bearer_token: Annotated[str, Field(description="Authentication token")], query_id: Annotated[int, Field(description="ID of the tracked query to check")], platforms: Annotated[List[str] | None, Field(description="List of platforms to check (default: azure_openai, perplexity)")] = None, ) -> str: """Run visibility check for a specific query""" if bearer_token != TOKEN: raise McpError(ErrorData(code=INVALID_PARAMS, message="Invalid bearer token")) try: result = await visibility_service.run_visibility_check(MY_NUMBER, query_id, platforms) if "error" in result: return f"❌ Error: {result['error']}" summary = result["summary"] # Format the response response = f"🔍 **Visibility Check Results** (Run ID: {result['run_id']})\n\n" # Platform status response += "**Platform Status:**\n" for platform in summary["platforms"]: status_emoji = "✅" if platform["status"] == "ok" else "❌" response += f"{status_emoji} {platform['platform']}: {platform['status']}" if platform.get("last_latency_ms"): response += f" ({platform['last_latency_ms']}ms)" response += "\n" # Top brands if summary["top_brands"]: response += f"\n**Top Brands Mentioned:** {', '.join(summary['top_brands'])}\n" # Share of voice if summary["share_of_voice"]: response += "\n**Share of Voice:**\n" for sov in summary["share_of_voice"]: response += f"• {sov['brand']}: {sov['overall']:.1%}\n" # Top citation domains if summary["top_citation_domains"]: response += "\n**Top Citation Domains:**\n" for domain in summary["top_citation_domains"][:5]: response += f"• {domain['domain']}: {domain['count']} citations\n" return response except Exception as e: raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to run visibility check: {str(e)}")) @mcp.tool(description="Fetch comprehensive visibility report for a tracked query") async def fetch_visibility_report( bearer_token: Annotated[str, Field(description="Authentication token")], query_id: Annotated[int, Field(description="ID of the tracked query")], range_days: Annotated[str, Field(description="Time range: 7d, 30d, or all")] = "7d", ) -> str: """Fetch visibility report for a query""" if bearer_token != TOKEN: raise McpError(ErrorData(code=INVALID_PARAMS, message="Invalid bearer token")) try: result = visibility_service.fetch_visibility_report(MY_NUMBER, query_id, range_days) if "error" in result: return f"❌ Error: {result['error']}" response = f"📊 **Visibility Report**\n\n" response += f"**Query:** {result['query']}\n" response += f"**Period:** {result['period']}\n\n" # Share of voice if result["share_of_voice"]: response += "**Share of Voice:**\n" for sov in result["share_of_voice"]: response += f"• {sov['brand']}: {sov['overall']:.1%}\n" # Sentiment if result["sentiment"]: response += "\n**Sentiment Analysis:**\n" for sentiment in result["sentiment"]: brand = sentiment["brand"] pos = sentiment["positive"] neu = sentiment["neutral"] neg = sentiment["negative"] response += f"• {brand}: 😊{pos:.1%} 😐{neu:.1%} 😞{neg:.1%}\n" # Top citation domains if result["top_citation_domains"]: response += "\n**Top Citation Domains:**\n" for domain in result["top_citation_domains"][:5]: response += f"• {domain['domain']}: {domain['count']} citations\n" # Platform status if result["platform_status"]: response += "\n**Platform Status:**\n" for platform in result["platform_status"]: status_emoji = "✅" if platform["status"] == "ok" else "❌" response += f"{status_emoji} {platform['platform']}: {platform['status']}\n" return response except Exception as e: raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to fetch report: {str(e)}")) @mcp.tool(description="Get platform snapshot for a specific query and platform") async def get_platform_snapshot( bearer_token: Annotated[str, Field(description="Authentication token")], query_id: Annotated[int, Field(description="ID of the tracked query")], platform: Annotated[str, Field(description="Platform name (azure_openai, perplexity)")], date: Annotated[str | None, Field(description="Date in YYYY-MM-DD format (optional)")] = None, ) -> str: """Get platform snapshot for a specific query and platform""" if bearer_token != TOKEN: raise McpError(ErrorData(code=INVALID_PARAMS, message="Invalid bearer token")) try: result = visibility_service.get_platform_snapshot(MY_NUMBER, query_id, platform, date) if "error" in result: return f"❌ Error: {result['error']}" response = f"📸 **Platform Snapshot**\n\n" response += f"**Query:** {result['query']}\n" response += f"**Platform:** {result['platform']}\n" response += f"**Date:** {result['date']}\n\n" # Raw answer response += f"**Raw Answer:**\n{result['raw_answer'][:500]}...\n\n" # Mentions if result["mentions"]: response += "**Brand Mentions:**\n" for mention in result["mentions"]: response += f"• {mention['brand']}: {mention['count']} mentions (first at position {mention['first_index']})\n" # Citations if result["citations"]: response += "\n**Citations:**\n" for citation in result["citations"][:5]: response += f"• {citation['domain']}: {citation['count']} citations\n" return response except Exception as e: raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to get snapshot: {str(e)}")) @mcp.tool(description="Run visibility check for all tracked queries (helper for automation)") async def run_all_today( bearer_token: Annotated[str, Field(description="Authentication token")], ) -> str: """Run visibility check for all tracked queries""" if bearer_token != TOKEN: raise McpError(ErrorData(code=INVALID_PARAMS, message="Invalid bearer token")) try: result = await visibility_service.run_all_today(MY_NUMBER) if result["run_ids"]: return f"✅ Successfully ran {len(result['run_ids'])} visibility checks. Run IDs: {', '.join(map(str, result['run_ids']))}" else: return "ℹ️ No tracked queries found to run visibility checks on." except Exception as e: raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to run all checks: {str(e)}")) # --- Run MCP Server --- async def main(): print("🚀 Starting MCP server on http://0.0.0.0:8086") # Initialize database print("📊 Initializing database...") create_db_and_tables() print("✅ Database initialized successfully!") await mcp.run_async("streamable-http", host="0.0.0.0", port=8086) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kulraj69/mcp-llm'

If you have feedback or need assistance with the MCP directory API, please join our Discord server