Skip to main content
Glama
main.rs20 kB
use anyhow::{Context, Result}; use rs_utcp::config::UtcpClientConfig; use rs_utcp::plugins::codemode::{CodeModeArgs, CodeModeUtcp}; use rs_utcp::repository::in_memory::InMemoryToolRepository; use rs_utcp::tag::tag_search::TagSearchStrategy; use rs_utcp::{UtcpClient, UtcpClientInterface}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::collections::HashMap; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tracing::{error, info}; use std::path::PathBuf; /// JSON-RPC 2.0 Request #[derive(Debug, Clone, Serialize, Deserialize)] struct JsonRpcRequest { jsonrpc: String, id: Option<Value>, method: String, params: Option<Value>, } /// JSON-RPC 2.0 Response #[derive(Debug, Clone, Serialize, Deserialize)] struct JsonRpcResponse { jsonrpc: String, id: Option<Value>, #[serde(skip_serializing_if = "Option::is_none")] result: Option<Value>, #[serde(skip_serializing_if = "Option::is_none")] error: Option<JsonRpcError>, } #[derive(Debug, Clone, Serialize, Deserialize)] struct JsonRpcError { code: i32, message: String, #[serde(skip_serializing_if = "Option::is_none")] data: Option<Value>, } /// MCP Server Info #[derive(Debug, Clone, Serialize, Deserialize)] struct ServerInfo { name: String, version: String, } /// MCP Server Capabilities #[derive(Debug, Clone, Serialize, Deserialize)] struct ServerCapabilities { #[serde(skip_serializing_if = "Option::is_none")] tools: Option<Value>, } /// MCP Tool Schema #[derive(Debug, Clone, Serialize, Deserialize)] struct McpTool { name: String, #[serde(skip_serializing_if = "Option::is_none")] description: Option<String>, #[serde(rename = "inputSchema")] input_schema: Value, } /// The main bridge struct pub struct UtcpMcpBridge { utcp_client: Arc<UtcpClient>, utcp_code: Arc<CodeModeUtcp>, } impl UtcpMcpBridge { /// Create a new bridge pub async fn new(config: UtcpClientConfig) -> Result<Self> { let repo = Arc::new(InMemoryToolRepository::new()); let strat = Arc::new(TagSearchStrategy::new(repo.clone(), 0.5)); let utcp_client = Arc::new( UtcpClient::new(config, repo, strat).await.context("Failed to create UTCP client")? ); let utcp_code = Arc::new(CodeModeUtcp::new(utcp_client.clone())); Ok(Self { utcp_client, utcp_code, }) } /// Get MCP tool definitions fn get_tools() -> Vec<McpTool> { vec![ McpTool { name: "utcp_call_tool".to_string(), description: Some("Call a UTCP tool by name with arguments".to_string()), input_schema: json!({ "type": "object", "properties": { "tool_name": { "type": "string", "description": "Name of the UTCP tool to call" }, "arguments": { "type": "object", "description": "Arguments to pass to the tool" } }, "required": ["tool_name"] }), }, McpTool { name: "utcp_search_tools".to_string(), description: Some("Search for available UTCP tools".to_string()), input_schema: json!({ "type": "object", "properties": { "query": { "type": "string", "description": "Search query" }, "limit": { "type": "integer", "description": "Maximum number of results", "default": 10 } }, "required": ["query"] }), }, McpTool { name: "utcp_call_tool_stream".to_string(), description: Some("Call a UTCP tool with streaming response".to_string()), input_schema: json!({ "type": "object", "properties": { "tool_name": { "type": "string", "description": "Name of the UTCP tool to call" }, "arguments": { "type": "object", "description": "Arguments to pass to the tool" } }, "required": ["tool_name"] }), }, McpTool { name: "utcp_run_code".to_string(), description: Some("Execute inline code via UTCP CodeMode engine".to_string()), input_schema: json!({ "type": "object", "properties": { "code": { "type": "string", "description": "Rhai script to execute" }, "timeout": { "type": "integer", "description": "Timeout in milliseconds", "default": 3000 } }, "required": ["code"] }), }, ] } /// Handle a JSON-RPC request async fn handle_request(&self, request: JsonRpcRequest) -> JsonRpcResponse { let id = request.id.clone(); match request.method.as_str() { "initialize" => { let result = json!({ "protocolVersion": "2024-11-05", "serverInfo": { "name": "utcp-bridge", "version": "1.0.0" }, "capabilities": { "tools": {} } }); JsonRpcResponse { jsonrpc: "2.0".to_string(), id, result: Some(result), error: None, } } "tools/list" => { let tools = Self::get_tools(); JsonRpcResponse { jsonrpc: "2.0".to_string(), id, result: Some(json!({ "tools": tools })), error: None, } } "tools/call" => self.handle_tool_call(id, request.params).await, _ => JsonRpcResponse { jsonrpc: "2.0".to_string(), id, result: None, error: Some(JsonRpcError { code: -32601, message: format!("Method not found: {}", request.method), data: None, }), }, } } async fn handle_tool_call(&self, id: Option<Value>, params: Option<Value>) -> JsonRpcResponse { let params = match params { Some(p) => p, None => { return JsonRpcResponse { jsonrpc: "2.0".to_string(), id, result: None, error: Some(JsonRpcError { code: -32602, message: "Invalid params".to_string(), data: None, }), } } }; let tool_name = match params.get("name").and_then(|v| v.as_str()) { Some(name) => name, None => { return JsonRpcResponse { jsonrpc: "2.0".to_string(), id, result: None, error: Some(JsonRpcError { code: -32602, message: "Missing tool name".to_string(), data: None, }), } } }; let arguments = params .get("arguments") .and_then(|v| v.as_object()) .cloned() .unwrap_or_default(); match tool_name { "utcp_call_tool" => self.handle_utcp_call_tool(id, arguments).await, "utcp_search_tools" => self.handle_utcp_search_tools(id, arguments).await, "utcp_call_tool_stream" => self.handle_utcp_call_tool_stream(id, arguments).await, "utcp_run_code" => self.handle_utcp_run_code(id, arguments).await, _ => JsonRpcResponse { jsonrpc: "2.0".to_string(), id, result: None, error: Some(JsonRpcError { code: -32602, message: format!("Unknown tool: {}", tool_name), data: None, }), }, } } async fn handle_utcp_run_code( &self, id: Option<Value>, arguments: serde_json::Map<String, Value>, ) -> JsonRpcResponse { let code = match arguments.get("code").and_then(|v| v.as_str()) { Some(c) => c.to_string(), None => { return JsonRpcResponse { jsonrpc: "2.0".to_string(), id, result: None, error: Some(JsonRpcError { code: -32602, message: "code is required".to_string(), data: None, }), } } }; let timeout = arguments .get("timeout") .and_then(|v| v.as_u64()) .map(|t| t); let args = CodeModeArgs { code, timeout, }; match self.utcp_code.execute(args).await { Ok(result) => JsonRpcResponse { jsonrpc: "2.0".to_string(), id, result: Some(json!({ "content": [{ "type": "text", "text": serde_json::to_string_pretty(&result.value).unwrap_or_else(|_| "{}".to_string()) }] })), error: None, }, Err(e) => JsonRpcResponse { jsonrpc: "2.0".to_string(), id, result: None, error: Some(JsonRpcError { code: -32603, message: format!("Error executing code: {}", e), data: None, }), }, } } async fn handle_utcp_call_tool( &self, id: Option<Value>, arguments: serde_json::Map<String, Value>, ) -> JsonRpcResponse { let tool_name = match arguments.get("tool_name").and_then(|v| v.as_str()) { Some(name) => name, None => { return JsonRpcResponse { jsonrpc: "2.0".to_string(), id, result: None, error: Some(JsonRpcError { code: -32602, message: "tool_name is required".to_string(), data: None, }), } } }; let tool_args = arguments .get("arguments") .and_then(|v| v.as_object()) .map(|obj| { obj.iter() .map(|(k, v)| (k.clone(), v.clone())) .collect::<HashMap<String, Value>>() }) .unwrap_or_default(); match self.utcp_client.call_tool(tool_name, tool_args).await { Ok(result) => JsonRpcResponse { jsonrpc: "2.0".to_string(), id, result: Some(json!({ "content": [{ "type": "text", "text": serde_json::to_string_pretty(&result).unwrap_or_else(|_| "{}".to_string()) }] })), error: None, }, Err(e) => JsonRpcResponse { jsonrpc: "2.0".to_string(), id, result: None, error: Some(JsonRpcError { code: -32603, message: format!("Error calling tool: {}", e), data: None, }), }, } } async fn handle_utcp_search_tools( &self, id: Option<Value>, arguments: serde_json::Map<String, Value>, ) -> JsonRpcResponse { let query = match arguments.get("query").and_then(|v| v.as_str()) { Some(q) => q, None => { return JsonRpcResponse { jsonrpc: "2.0".to_string(), id, result: None, error: Some(JsonRpcError { code: -32602, message: "query is required".to_string(), data: None, }), } } }; let limit = arguments .get("limit") .and_then(|v| v.as_u64()) .unwrap_or(10) as usize; match self.utcp_client.search_tools(query, limit).await { Ok(tools) => JsonRpcResponse { jsonrpc: "2.0".to_string(), id, result: Some(json!({ "content": [{ "type": "text", "text": serde_json::to_string_pretty(&tools).unwrap_or_else(|_| "[]".to_string()) }] })), error: None, }, Err(e) => JsonRpcResponse { jsonrpc: "2.0".to_string(), id, result: None, error: Some(JsonRpcError { code: -32603, message: format!("Error searching tools: {}", e), data: None, }), }, } } async fn handle_utcp_call_tool_stream( &self, id: Option<Value>, arguments: serde_json::Map<String, Value>, ) -> JsonRpcResponse { let tool_name = match arguments.get("tool_name").and_then(|v| v.as_str()) { Some(name) => name, None => { return JsonRpcResponse { jsonrpc: "2.0".to_string(), id, result: None, error: Some(JsonRpcError { code: -32602, message: "tool_name is required".to_string(), data: None, }), } } }; let tool_args = arguments .get("arguments") .and_then(|v| v.as_object()) .map(|obj| { obj.iter() .map(|(k, v)| (k.clone(), v.clone())) .collect::<HashMap<String, Value>>() }) .unwrap_or_default(); match self.utcp_client.call_tool_stream(tool_name, tool_args).await { Ok(mut stream) => { let mut chunks = Vec::new(); loop { match stream.next().await { Ok(Some(chunk)) => { chunks.push(chunk); } Ok(None) => break, Err(e) => { return JsonRpcResponse { jsonrpc: "2.0".to_string(), id, result: None, error: Some(JsonRpcError { code: -32603, message: format!("Stream error: {}", e), data: None, }), }; } } } JsonRpcResponse { jsonrpc: "2.0".to_string(), id, result: Some(json!({ "content": [{ "type": "text", "text": serde_json::to_string_pretty(&json!({ "chunks": chunks })).unwrap_or_else(|_| "{}".to_string()) }] })), error: None, } } Err(e) => JsonRpcResponse { jsonrpc: "2.0".to_string(), id, result: None, error: Some(JsonRpcError { code: -32603, message: format!("Error calling tool stream: {}", e), data: None, }), }, } } /// Run the MCP server over stdio pub async fn run(&self) -> Result<()> { let stdin = tokio::io::stdin(); let mut stdout = tokio::io::stdout(); let reader = BufReader::new(stdin); let mut lines = reader.lines(); info!("UTCP MCP Bridge is ready on stdio"); while let Some(line) = lines.next_line().await? { if line.trim().is_empty() { continue; } // Parse the JSON-RPC request let request: JsonRpcRequest = match serde_json::from_str(&line) { Ok(req) => req, Err(e) => { error!("Failed to parse request: {}", e); let error_response = JsonRpcResponse { jsonrpc: "2.0".to_string(), id: None, result: None, error: Some(JsonRpcError { code: -32700, message: format!("Parse error: {}", e), data: None, }), }; let response_str = serde_json::to_string(&error_response)?; stdout.write_all(response_str.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; continue; } }; // Handle the request let response = self.handle_request(request).await; // Send the response let response_str = serde_json::to_string(&response)?; stdout.write_all(response_str.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; } Ok(()) } } #[tokio::main] async fn main() -> Result<()> { // Initialize tracing tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), ) .with_writer(std::io::stderr) // Write logs to stderr to avoid interfering with stdio protocol .init(); info!("Starting UTCP MCP Bridge..."); // Create UTCP client config let providers_file = std::env::var("UTCP_PROVIDERS_FILE").ok(); let mut config = UtcpClientConfig::default(); if let Some(path) = providers_file { config.providers_file_path = Some(path.into()); } // Create and run the bridge let bridge = UtcpMcpBridge::new(config).await?; bridge.run().await?; info!("Bridge shutting down"); Ok(()) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/universal-tool-calling-protocol/utcp-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server