mcp_app.py•19.8 kB
import base64
import json
import os
import asyncio
from datetime import datetime
from typing import Any
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp import types
from .render import ASPECT_RATIO, RenderOptions, render_markdown_text_to_image
from .store import ImageStore
_store = ImageStore(base_dir=os.path.join(os.getcwd(), "outputs"))
# Create the server instance
server = Server("word2img-mcp")
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available tools."""
return [
types.Tool(
name="submit_markdown",
description="接收Markdown文本并生成高质量图片,返回任务ID和详细状态信息",
inputSchema={
"type": "object",
"properties": {
"markdown_text": {"type": "string", "description": "要渲染的Markdown文本"},
"align": {"type": "string", "enum": ["center", "left", "right"], "default": "center", "description": "文本对齐方式"},
"bold": {"type": "boolean", "default": False, "description": "是否加粗显示"},
"width": {"type": "integer", "default": 1200, "minimum": 300, "maximum": 4000, "description": "图片宽度(像素)"},
"height": {"type": "integer", "default": 1600, "minimum": 400, "maximum": 6000, "description": "图片高度(像素),默认按3:4比例计算"},
"background_color": {"type": "string", "default": "#FFFFFF", "description": "背景颜色,支持HEX、RGB、RGBA格式"},
"text_color": {"type": "string", "default": "#000000", "description": "文字颜色,支持HEX、RGB、RGBA格式"},
"accent_color": {"type": "string", "default": "#4682B4", "description": "强调色,用于链接、代码块等"},
"font_family": {"type": "string", "default": "Microsoft YaHei, PingFang SC, Helvetica Neue, Arial, sans-serif", "description": "字体家族"},
"font_size": {"type": "integer", "default": 20, "minimum": 8, "maximum": 48, "description": "基础字体大小(像素)"},
"line_height": {"type": "number", "default": 1.6, "minimum": 1.0, "maximum": 3.0, "description": "行高倍数"},
"header_scale": {"type": "number", "default": 1.5, "minimum": 1.0, "maximum": 3.0, "description": "标题字体缩放比例"},
"theme": {"type": "string", "enum": ["default", "dark", "light", "professional", "casual"], "default": "default", "description": "主题样式"},
"shadow": {"type": "boolean", "default": True, "description": "是否添加文字阴影效果"},
"watermark": {"type": "boolean", "default": False, "description": "是否添加水印"},
"watermark_text": {"type": "string", "default": "Generated by word2img-mcp", "description": "水印文字"},
"output_format": {"type": "string", "enum": ["png", "jpg", "jpeg", "webp"], "default": "png", "description": "输出图片格式"},
"quality": {"type": "integer", "default": 95, "minimum": 1, "maximum": 100, "description": "图片质量(仅JPG/WebP有效)"},
"backend_preference": {"type": "string", "enum": ["auto", "imgkit", "markdown-pdf", "md-to-image", "pil"], "default": "auto", "description": "渲染后端偏好设置"}
},
"required": ["markdown_text"]
}
),
types.Tool(
name="get_image",
description="根据任务ID返回图片和详细信息。智能处理大文件以避免token限制",
inputSchema={
"type": "object",
"properties": {
"task_id": {"type": "string", "description": "任务ID"},
"as_base64": {"type": "boolean", "default": True, "description": "是否返回base64编码(否则返回文件路径)"},
"include_metadata": {"type": "boolean", "default": False, "description": "是否包含图片元数据信息"},
"show_in_chat": {"type": "boolean", "default": True, "description": "是否在会话中显示图片"},
"include_full_base64": {"type": "boolean", "default": False, "description": "是否包含完整base64数据(大文件可能导致token限制)"}
},
"required": ["task_id"]
}
),
types.Tool(
name="get_render_info",
description="获取渲染器状态信息和可用后端列表",
inputSchema={
"type": "object",
"properties": {
"detailed": {"type": "boolean", "default": False, "description": "是否返回详细的后端信息"}
}
}
),
types.Tool(
name="list_tasks",
description="列出所有任务状态和统计信息",
inputSchema={
"type": "object",
"properties": {
"limit": {"type": "integer", "default": 10, "minimum": 1, "maximum": 100, "description": "返回的任务数量限制"},
"status": {"type": "string", "enum": ["all", "completed", "failed", "processing"], "default": "all", "description": "任务状态过滤"}
}
}
)
]
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict[str, Any]) -> list[types.TextContent]:
"""Handle tool calls with detailed error information."""
try:
if name == "submit_markdown":
return await _handle_submit_markdown(arguments)
elif name == "get_image":
return await _handle_get_image(arguments)
elif name == "get_render_info":
return await _handle_get_render_info(arguments)
elif name == "list_tasks":
return await _handle_list_tasks(arguments)
else:
raise ValueError(f"未知工具: {name}")
except Exception as e:
# 提供详细的错误信息
error_details = {
"error": str(e),
"error_type": type(e).__name__,
"tool": name,
"arguments": arguments
}
error_message = f"工具调用失败: {json.dumps(error_details, ensure_ascii=False)}"
raise ValueError(error_message) from e
async def _handle_submit_markdown(arguments: dict[str, Any]) -> list[types.TextContent]:
"""Handle submit_markdown tool with detailed options."""
try:
markdown_text = arguments["markdown_text"]
align = arguments.get("align", "center")
bold = arguments.get("bold", False)
width = arguments.get("width", 1200)
height = arguments.get("height", int(width * ASPECT_RATIO[1] / ASPECT_RATIO[0]))
background_color = arguments.get("background_color", "#FFFFFF")
text_color = arguments.get("text_color", "#000000")
accent_color = arguments.get("accent_color", "#4682B4")
font_family = arguments.get("font_family", "Microsoft YaHei, PingFang SC, Helvetica Neue, Arial, sans-serif")
font_size = arguments.get("font_size", 20)
line_height = arguments.get("line_height", 1.6)
header_scale = arguments.get("header_scale", 1.5)
theme = arguments.get("theme", "default")
shadow = arguments.get("shadow", True)
watermark = arguments.get("watermark", False)
watermark_text = arguments.get("watermark_text", "Generated by word2img-mcp")
output_format = arguments.get("output_format", "png")
quality = arguments.get("quality", 95)
backend_preference = arguments.get("backend_preference", "auto")
options = RenderOptions(
width=width,
height=height,
background_color=background_color,
text_color=text_color,
accent_color=accent_color,
align=align,
font_family=font_family,
font_size=font_size,
line_height=line_height,
header_scale=header_scale,
theme=theme,
shadow=shadow,
watermark=watermark,
watermark_text=watermark_text,
output_format=output_format
)
img_path = render_markdown_text_to_image(markdown_text, options)
# Load the generated image
from PIL import Image
img = Image.open(img_path)
# Prepare options for storage
storage_options = {
"align": align,
"bold": bold,
"width": width,
"height": height,
"background_color": background_color,
"text_color": text_color,
"accent_color": accent_color,
"font_family": font_family,
"font_size": font_size,
"line_height": line_height,
"header_scale": header_scale,
"theme": theme,
"shadow": shadow,
"watermark": watermark,
"watermark_text": watermark_text,
"output_format": output_format,
"quality": quality,
"backend_preference": backend_preference,
"backend_used": getattr(options, 'backend_used', 'unknown'),
"original_path": img_path
}
task_id = _store.save_image(img, format=output_format, options=storage_options)
# Clean up the temporary file if it's different from the stored one
try:
stored_path = _store.get_path(task_id)
if img_path != stored_path and os.path.exists(img_path):
os.remove(img_path)
except:
pass # Ignore cleanup errors
# 返回详细的任务信息
task_info = {
"task_id": task_id,
"status": "completed",
"image_size": f"{width}x{height}",
"format": output_format,
"created_at": datetime.now().isoformat(),
"options": storage_options
}
return [types.TextContent(type="text", text=json.dumps(task_info, ensure_ascii=False))]
except Exception as e:
error_details = {
"error": str(e),
"error_type": type(e).__name__,
"tool": "submit_markdown",
"arguments": arguments
}
raise ValueError(f"Markdown渲染失败: {json.dumps(error_details, ensure_ascii=False)}") from e
async def _handle_get_image(arguments: dict[str, Any]) -> list[types.TextContent]:
"""Handle get_image tool with size optimization to avoid token limits."""
try:
task_id = arguments["task_id"]
as_base64 = arguments.get("as_base64", True)
include_metadata = arguments.get("include_metadata", False)
# 新增参数:控制是否在会话中显示图片
show_in_chat = arguments.get("show_in_chat", True)
# 新增参数:控制是否包含完整的 base64 数据
include_full_base64 = arguments.get("include_full_base64", False)
path = _store.get_path(task_id)
if not path or not os.path.exists(path):
raise ValueError(f"任务ID无效或图片不存在: {task_id}")
# 获取文件大小信息
file_size = os.path.getsize(path)
if not as_base64:
result = {
"file_path": path,
"file_size": file_size,
"display_info": "仅返回文件路径,未包含图片数据"
}
if include_metadata:
result["metadata"] = _get_image_metadata(path)
return [types.TextContent(type="text", text=json.dumps(result, ensure_ascii=False))]
# 读取图片数据
with open(path, "rb") as f:
b = f.read()
# 获取图片格式
format_ext = os.path.splitext(path)[1][1:].lower()
if format_ext == 'jpg':
format_ext = 'jpeg' # 标准化格式名称
content_list = []
# 根据文件大小决定处理方式
if file_size > 50 * 1024: # 如果文件大于 50KB,采用保守策略
if show_in_chat and not include_full_base64:
# 只在会话中显示图片,不返回完整 base64 数据
b64_data = base64.b64encode(b).decode("utf-8")
content_list.append(types.ImageContent(
type="image",
data=b64_data,
mimeType=f"image/{format_ext}"
))
# 返回简化的文本信息(不包含完整 base64)
result = {
"format": format_ext,
"file_path": path,
"file_size": file_size,
"display_info": "图片已在上方显示,为避免 token 限制未返回完整 base64 数据",
"data_url_info": f"完整 data URL 长度约 {len(b64_data) + 50} 字符",
"note": "如需完整 base64 数据,请设置 include_full_base64=true"
}
elif include_full_base64:
# 用户明确要求完整数据(可能导致 token 限制)
b64_data = base64.b64encode(b).decode("utf-8")
data_url = f"data:image/{format_ext};base64,{b64_data}"
if show_in_chat:
content_list.append(types.ImageContent(
type="image",
data=b64_data,
mimeType=f"image/{format_ext}"
))
result = {
"image_data": b64_data,
"data_url": data_url,
"format": format_ext,
"file_path": path,
"file_size": file_size,
"display_info": "已返回完整数据",
"warning": f"大文件 ({file_size//1024}KB) 可能导致 token 限制问题"
}
else:
# 只返回文件信息,不显示图片
result = {
"format": format_ext,
"file_path": path,
"file_size": file_size,
"display_info": f"文件较大 ({file_size//1024}KB),未显示图片以避免 token 限制",
"note": "设置 show_in_chat=true 可在会话中显示图片(不含 base64 数据)"
}
else:
# 小文件处理:正常返回所有数据
b64_data = base64.b64encode(b).decode("utf-8")
data_url = f"data:image/{format_ext};base64,{b64_data}"
if show_in_chat:
content_list.append(types.ImageContent(
type="image",
data=b64_data,
mimeType=f"image/{format_ext}"
))
result = {
"image_data": b64_data,
"data_url": data_url,
"format": format_ext,
"file_path": path,
"file_size": file_size,
"display_info": "小文件,已返回完整数据"
}
if include_metadata:
result["metadata"] = _get_image_metadata(path)
content_list.append(types.TextContent(
type="text",
text=json.dumps(result, ensure_ascii=False, indent=2)
))
return content_list
except Exception as e:
error_details = {
"error": str(e),
"error_type": type(e).__name__,
"tool": "get_image",
"arguments": arguments
}
raise ValueError(f"图片获取失败: {json.dumps(error_details, ensure_ascii=False)}") from e
async def _handle_get_render_info(arguments: dict[str, Any]) -> list[types.TextContent]:
"""Handle get_render_info tool."""
try:
detailed = arguments.get("detailed", False)
from word2img_mcp.render import get_available_backends, get_renderer_status
backends = get_available_backends()
status = get_renderer_status()
info = {
"available_backends": backends,
"renderer_status": status,
"default_options": {
"width": 1200,
"height": 1600,
"aspect_ratio": f"{ASPECT_RATIO[0]}:{ASPECT_RATIO[1]}",
"supported_formats": ["png", "jpg", "jpeg", "webp"],
"max_dimensions": {"width": 4000, "height": 6000}
}
}
if detailed:
info["detailed_backend_info"] = _get_detailed_backend_info()
return [types.TextContent(type="text", text=json.dumps(info, ensure_ascii=False))]
except Exception as e:
error_details = {
"error": str(e),
"error_type": type(e).__name__,
"tool": "get_render_info",
"arguments": arguments
}
raise ValueError(f"渲染器信息获取失败: {json.dumps(error_details, ensure_ascii=False)}") from e
async def _handle_list_tasks(arguments: dict[str, Any]) -> list[types.TextContent]:
"""Handle list_tasks tool."""
try:
limit = arguments.get("limit", 10)
status_filter = arguments.get("status", "all")
tasks = _store.list_tasks(limit, status_filter)
stats = _store.get_task_statistics()
result = {
"tasks": tasks,
"statistics": stats,
"total_count": len(tasks),
"filter": {"limit": limit, "status": status_filter}
}
return [types.TextContent(type="text", text=json.dumps(result, ensure_ascii=False))]
except Exception as e:
error_details = {
"error": str(e),
"error_type": type(e).__name__,
"tool": "list_tasks",
"arguments": arguments
}
raise ValueError(f"任务列表获取失败: {json.dumps(error_details, ensure_ascii=False)}") from e
def _get_image_metadata(file_path: str) -> dict:
"""Get image metadata information."""
try:
import PIL.Image as Image
from PIL.ExifTags import TAGS
with Image.open(file_path) as img:
metadata = {
"format": img.format,
"size": img.size,
"mode": img.mode,
"file_size": os.path.getsize(file_path)
}
# Get EXIF data if available
exif_data = {}
if hasattr(img, '_getexif') and img._getexif():
for tag, value in img._getexif().items():
tag_name = TAGS.get(tag, tag)
exif_data[tag_name] = str(value)
metadata["exif"] = exif_data
return metadata
except Exception:
return {"error": "无法读取图片元数据"}
def _get_detailed_backend_info() -> dict:
"""Get detailed backend information."""
from word2img_mcp.render import get_backend_details
return get_backend_details()
async def run_server() -> None:
"""Run the MCP server."""
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)