Skip to main content
Glama
sparql_client.rsโ€ข11.1 kB
//! Utilities, such as executing a SPARQL query on a remote endpoint use crate::error::McpError; use oxigraph::io::RdfFormat; use oxigraph::sparql::QueryResults; use oxigraph::store::Store; use reqwest::Client; use serde::{Deserialize, Serialize}; use serde_json::json; use std::collections::HashMap; use std::io::Cursor; use std::time::Duration; /// SPARQL JSON Results format structs #[derive(Debug, Serialize, Deserialize)] pub struct SparqlResults { pub results: SparqlBindings, } #[derive(Debug, Serialize, Deserialize)] pub struct SparqlBindings { pub bindings: Vec<SparqlBindingList>, } #[derive(Debug, Serialize, Deserialize)] pub struct SparqlBindingList { #[serde(flatten)] pub values: std::collections::HashMap<String, SparqlBinding>, } #[derive(Debug, Serialize, Deserialize)] pub struct SparqlBinding { pub value: String, // #[serde(rename = "type")] // pub kind: String, // pub datatype: Option<String>, // pub lang: Option<String>, } /// Builder for SparqlClient #[derive(Clone)] pub struct SparqlClientBuilder { post: bool, timeout: Option<Duration>, client: Option<Client>, use_file: Option<String>, check_service_desc: bool, } impl Default for SparqlClientBuilder { fn default() -> Self { Self { post: false, timeout: None, client: None, use_file: None, check_service_desc: true, } } } impl SparqlClientBuilder { pub fn new() -> Self { Self::default() } pub fn post(mut self, post: bool) -> Self { self.post = post; self } pub fn timeout(mut self, timeout: Duration) -> Self { self.timeout = Some(timeout); self } pub fn client(mut self, client: Client) -> Self { self.client = Some(client); self } pub fn use_file(mut self, file: impl Into<String>) -> Self { self.use_file = Some(file.into()); self } pub fn check_service_desc(mut self, check: bool) -> Self { self.check_service_desc = check; self } pub fn build(self) -> Result<SparqlClient, McpError> { // let endpoint_url = self.endpoint_url.ok_or_else(|| McpError::Internal("endpoint_url is required".to_string()))?; let client = match self.client { Some(c) => c, None => { let mut headers = reqwest::header::HeaderMap::new(); headers.insert( reqwest::header::USER_AGENT, reqwest::header::HeaderValue::from_static("SparqlMcp/0.1"), ); headers.insert( reqwest::header::ACCEPT, reqwest::header::HeaderValue::from_static( "application/sparql-results+json, text/turtle;q=0.9", ), ); let mut client_builder = Client::builder() .default_headers(headers) .redirect(reqwest::redirect::Policy::limited(10)); if let Some(timeout) = self.timeout { client_builder = client_builder.timeout(timeout); } client_builder .build() .map_err(|e| McpError::Internal(format!("Failed to build HTTP client: {e}")))? } }; Ok(SparqlClient { post: self.post, client, use_file: self.use_file, check_service_desc: self.check_service_desc, }) } } /// SPARQL client for querying remote endpoints or local files #[derive(Clone)] pub struct SparqlClient { post: bool, client: Client, use_file: Option<String>, check_service_desc: bool, } impl SparqlClient { pub fn builder() -> SparqlClientBuilder { SparqlClientBuilder::new() } /// Query the endpoint or file pub async fn query_endpoint( &self, endpoint_url: &str, query: &str, ) -> Result<String, McpError> { let post = if self.post { true } else { query.len() > 2000 }; if let Some(file_path) = &self.use_file { let json = query_turtle_file(query, file_path).await?; return Ok(json.to_string()); } let response = if post { self.client .post(endpoint_url) .form(&[("query", query)]) .send() .await } else { self.client .get(endpoint_url) .header("Accept", "application/sparql-results+json") .query(&[("query", query)]) .send() .await } .map_err(|e| McpError::Internal(format!("HTTP request failed: {e}")))?; if !response.status().is_success() { return Err(McpError::Internal(format!( "HTTP error: {}", response.status() ))); } let text = response .text() .await .map_err(|e| McpError::Internal(format!("Failed to get plain text response: {e}")))?; Ok(text) } /// Query the endpoint or file pub async fn query_select( &self, endpoint_url: &str, query: &str, ) -> Result<SparqlResults, McpError> { let text = self.query_endpoint(endpoint_url, query).await?; let results: SparqlResults = serde_json::from_str(&text) .map_err(|e| McpError::Internal(format!("Failed to parse SPARQL results: {e}")))?; // Check if we got results, if not and check_service_desc is true, try service description if self.check_service_desc && results.results.bindings.is_empty() { let json = query_service_description(&self.client, query, endpoint_url).await?; let results: SparqlResults = serde_json::from_value(json).map_err(|e| { McpError::Internal(format!("Failed to parse service description results: {e}")) })?; // tracing::debug!("{:?}", results); return Ok(results); } Ok(results) } } /// Query a turtle file using oxigraph async fn query_turtle_file(query: &str, file_path: &str) -> Result<serde_json::Value, McpError> { let store = Store::new().map_err(|e| McpError::Internal(format!("Failed to create store: {e}")))?; // Read and parse the turtle file let turtle_content = tokio::fs::read_to_string(file_path) .await .map_err(|e| McpError::Internal(format!("Failed to read file {file_path}: {e}")))?; store .load_from_reader(RdfFormat::Turtle, Cursor::new(turtle_content.as_bytes())) .map_err(|e| McpError::Internal(format!("Failed to parse turtle file: {e}")))?; // Execute the query let results = store .query(query) .map_err(|e| McpError::Internal(format!("Failed to execute query: {e}")))?; // Convert results to JSON format compatible with SPARQL JSON results format match results { QueryResults::Solutions(solutions) => { let mut bindings = Vec::new(); for solution in solutions { let solution = solution .map_err(|e| McpError::Internal(format!("Failed to process solution: {e}")))?; let mut binding = HashMap::new(); for (var, term) in solution.iter() { binding.insert(var.as_str().to_string(), json!({"value": term.to_string()})); } bindings.push(binding); } Ok(json!({"results": {"bindings": bindings}})) } QueryResults::Boolean(result) => Ok(json!({"boolean": result})), QueryResults::Graph(graph) => { // For CONSTRUCT/DESCRIBE queries, we'll serialize as triples let mut triples = Vec::new(); for triple in graph { let triple = triple .map_err(|e| McpError::Internal(format!("Failed to process triple: {e}")))?; triples.push(json!({ "subject": {"value": triple.subject.to_string()}, "predicate": {"value": triple.predicate.to_string()}, "object": {"value": triple.object.to_string()} })); } Ok(json!({"results": {"bindings": triples}})) } } } /// Query the service description when no direct results are found async fn query_service_description( client: &Client, query: &str, endpoint_url: &str, ) -> Result<serde_json::Value, McpError> { // Get service description in turtle format let response = client .get(endpoint_url) .header("Accept", "text/turtle") .send() .await .map_err(|e| McpError::Internal(format!("Failed to get service description: {e}")))?; if !response.status().is_success() { return Err(McpError::Internal(format!( "Service description HTTP error: {}", response.status() ))); } let turtle_content = response .text() .await .map_err(|e| McpError::Internal(format!("Failed to read service description: {e}")))?; // Parse and query the service description let store = Store::new().map_err(|e| McpError::Internal(format!("Failed to create store: {e}")))?; store .load_from_reader(RdfFormat::Turtle, Cursor::new(turtle_content.as_bytes())) .map_err(|e| McpError::Internal(format!("Failed to parse service description: {e}")))?; let results = store .query(query) .map_err(|e| McpError::Internal(format!("Failed to query service description: {e}")))?; // Convert results to JSON format match results { QueryResults::Solutions(solutions) => { let mut bindings = Vec::new(); for solution in solutions { let solution = solution .map_err(|e| McpError::Internal(format!("Failed to process solution: {e}")))?; let mut binding = HashMap::new(); for (var, term) in solution.iter() { binding.insert(var.as_str().to_string(), json!({"value": term.to_string()})); } bindings.push(binding); } Ok(json!({"results": {"bindings": bindings}})) } QueryResults::Boolean(result) => Ok(json!({"boolean": result})), QueryResults::Graph(graph) => { let mut triples = Vec::new(); for triple in graph { let triple = triple .map_err(|e| McpError::Internal(format!("Failed to process triple: {e}")))?; triples.push(json!({ "subject": {"value": triple.subject.to_string()}, "predicate": {"value": triple.predicate.to_string()}, "object": {"value": triple.object.to_string()} })); } Ok(json!({"results": {"bindings": triples}})) } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sib-swiss/sparql-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server