sparql_client.rsโข11.1 kB
//! Utilities, such as executing a SPARQL query on a remote endpoint
use crate::error::McpError;
use oxigraph::io::RdfFormat;
use oxigraph::sparql::QueryResults;
use oxigraph::store::Store;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::io::Cursor;
use std::time::Duration;
/// SPARQL JSON Results format structs
#[derive(Debug, Serialize, Deserialize)]
pub struct SparqlResults {
pub results: SparqlBindings,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SparqlBindings {
pub bindings: Vec<SparqlBindingList>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SparqlBindingList {
#[serde(flatten)]
pub values: std::collections::HashMap<String, SparqlBinding>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SparqlBinding {
pub value: String,
// #[serde(rename = "type")]
// pub kind: String,
// pub datatype: Option<String>,
// pub lang: Option<String>,
}
/// Builder for SparqlClient
#[derive(Clone)]
pub struct SparqlClientBuilder {
post: bool,
timeout: Option<Duration>,
client: Option<Client>,
use_file: Option<String>,
check_service_desc: bool,
}
impl Default for SparqlClientBuilder {
fn default() -> Self {
Self {
post: false,
timeout: None,
client: None,
use_file: None,
check_service_desc: true,
}
}
}
impl SparqlClientBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn post(mut self, post: bool) -> Self {
self.post = post;
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn client(mut self, client: Client) -> Self {
self.client = Some(client);
self
}
pub fn use_file(mut self, file: impl Into<String>) -> Self {
self.use_file = Some(file.into());
self
}
pub fn check_service_desc(mut self, check: bool) -> Self {
self.check_service_desc = check;
self
}
pub fn build(self) -> Result<SparqlClient, McpError> {
// let endpoint_url = self.endpoint_url.ok_or_else(|| McpError::Internal("endpoint_url is required".to_string()))?;
let client = match self.client {
Some(c) => c,
None => {
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::USER_AGENT,
reqwest::header::HeaderValue::from_static("SparqlMcp/0.1"),
);
headers.insert(
reqwest::header::ACCEPT,
reqwest::header::HeaderValue::from_static(
"application/sparql-results+json, text/turtle;q=0.9",
),
);
let mut client_builder = Client::builder()
.default_headers(headers)
.redirect(reqwest::redirect::Policy::limited(10));
if let Some(timeout) = self.timeout {
client_builder = client_builder.timeout(timeout);
}
client_builder
.build()
.map_err(|e| McpError::Internal(format!("Failed to build HTTP client: {e}")))?
}
};
Ok(SparqlClient {
post: self.post,
client,
use_file: self.use_file,
check_service_desc: self.check_service_desc,
})
}
}
/// SPARQL client for querying remote endpoints or local files
#[derive(Clone)]
pub struct SparqlClient {
post: bool,
client: Client,
use_file: Option<String>,
check_service_desc: bool,
}
impl SparqlClient {
pub fn builder() -> SparqlClientBuilder {
SparqlClientBuilder::new()
}
/// Query the endpoint or file
pub async fn query_endpoint(
&self,
endpoint_url: &str,
query: &str,
) -> Result<String, McpError> {
let post = if self.post { true } else { query.len() > 2000 };
if let Some(file_path) = &self.use_file {
let json = query_turtle_file(query, file_path).await?;
return Ok(json.to_string());
}
let response = if post {
self.client
.post(endpoint_url)
.form(&[("query", query)])
.send()
.await
} else {
self.client
.get(endpoint_url)
.header("Accept", "application/sparql-results+json")
.query(&[("query", query)])
.send()
.await
}
.map_err(|e| McpError::Internal(format!("HTTP request failed: {e}")))?;
if !response.status().is_success() {
return Err(McpError::Internal(format!(
"HTTP error: {}",
response.status()
)));
}
let text = response
.text()
.await
.map_err(|e| McpError::Internal(format!("Failed to get plain text response: {e}")))?;
Ok(text)
}
/// Query the endpoint or file
pub async fn query_select(
&self,
endpoint_url: &str,
query: &str,
) -> Result<SparqlResults, McpError> {
let text = self.query_endpoint(endpoint_url, query).await?;
let results: SparqlResults = serde_json::from_str(&text)
.map_err(|e| McpError::Internal(format!("Failed to parse SPARQL results: {e}")))?;
// Check if we got results, if not and check_service_desc is true, try service description
if self.check_service_desc && results.results.bindings.is_empty() {
let json = query_service_description(&self.client, query, endpoint_url).await?;
let results: SparqlResults = serde_json::from_value(json).map_err(|e| {
McpError::Internal(format!("Failed to parse service description results: {e}"))
})?;
// tracing::debug!("{:?}", results);
return Ok(results);
}
Ok(results)
}
}
/// Query a turtle file using oxigraph
async fn query_turtle_file(query: &str, file_path: &str) -> Result<serde_json::Value, McpError> {
let store =
Store::new().map_err(|e| McpError::Internal(format!("Failed to create store: {e}")))?;
// Read and parse the turtle file
let turtle_content = tokio::fs::read_to_string(file_path)
.await
.map_err(|e| McpError::Internal(format!("Failed to read file {file_path}: {e}")))?;
store
.load_from_reader(RdfFormat::Turtle, Cursor::new(turtle_content.as_bytes()))
.map_err(|e| McpError::Internal(format!("Failed to parse turtle file: {e}")))?;
// Execute the query
let results = store
.query(query)
.map_err(|e| McpError::Internal(format!("Failed to execute query: {e}")))?;
// Convert results to JSON format compatible with SPARQL JSON results format
match results {
QueryResults::Solutions(solutions) => {
let mut bindings = Vec::new();
for solution in solutions {
let solution = solution
.map_err(|e| McpError::Internal(format!("Failed to process solution: {e}")))?;
let mut binding = HashMap::new();
for (var, term) in solution.iter() {
binding.insert(var.as_str().to_string(), json!({"value": term.to_string()}));
}
bindings.push(binding);
}
Ok(json!({"results": {"bindings": bindings}}))
}
QueryResults::Boolean(result) => Ok(json!({"boolean": result})),
QueryResults::Graph(graph) => {
// For CONSTRUCT/DESCRIBE queries, we'll serialize as triples
let mut triples = Vec::new();
for triple in graph {
let triple = triple
.map_err(|e| McpError::Internal(format!("Failed to process triple: {e}")))?;
triples.push(json!({
"subject": {"value": triple.subject.to_string()},
"predicate": {"value": triple.predicate.to_string()},
"object": {"value": triple.object.to_string()}
}));
}
Ok(json!({"results": {"bindings": triples}}))
}
}
}
/// Query the service description when no direct results are found
async fn query_service_description(
client: &Client,
query: &str,
endpoint_url: &str,
) -> Result<serde_json::Value, McpError> {
// Get service description in turtle format
let response = client
.get(endpoint_url)
.header("Accept", "text/turtle")
.send()
.await
.map_err(|e| McpError::Internal(format!("Failed to get service description: {e}")))?;
if !response.status().is_success() {
return Err(McpError::Internal(format!(
"Service description HTTP error: {}",
response.status()
)));
}
let turtle_content = response
.text()
.await
.map_err(|e| McpError::Internal(format!("Failed to read service description: {e}")))?;
// Parse and query the service description
let store =
Store::new().map_err(|e| McpError::Internal(format!("Failed to create store: {e}")))?;
store
.load_from_reader(RdfFormat::Turtle, Cursor::new(turtle_content.as_bytes()))
.map_err(|e| McpError::Internal(format!("Failed to parse service description: {e}")))?;
let results = store
.query(query)
.map_err(|e| McpError::Internal(format!("Failed to query service description: {e}")))?;
// Convert results to JSON format
match results {
QueryResults::Solutions(solutions) => {
let mut bindings = Vec::new();
for solution in solutions {
let solution = solution
.map_err(|e| McpError::Internal(format!("Failed to process solution: {e}")))?;
let mut binding = HashMap::new();
for (var, term) in solution.iter() {
binding.insert(var.as_str().to_string(), json!({"value": term.to_string()}));
}
bindings.push(binding);
}
Ok(json!({"results": {"bindings": bindings}}))
}
QueryResults::Boolean(result) => Ok(json!({"boolean": result})),
QueryResults::Graph(graph) => {
let mut triples = Vec::new();
for triple in graph {
let triple = triple
.map_err(|e| McpError::Internal(format!("Failed to process triple: {e}")))?;
triples.push(json!({
"subject": {"value": triple.subject.to_string()},
"predicate": {"value": triple.predicate.to_string()},
"object": {"value": triple.object.to_string()}
}));
}
Ok(json!({"results": {"bindings": triples}}))
}
}
}