Skip to main content
Glama
schema.rs13.8 kB
pub mod event; mod schema_stream; use std::convert::Infallible; use std::str::FromStr; use std::path::PathBuf; use std::pin::Pin; use std::time::Duration; use crate::uplink::UplinkConfig; use crate::uplink::stream_from_uplink; use derive_more::Display; use derive_more::From; use educe::Educe; use event::Event; use event::Event::{NoMoreSchema, UpdateSchema}; use futures::prelude::*; pub(crate) use schema_stream::SupergraphSdlQuery; use url::Url; /// Represents the new state of a schema after an update. #[derive(Eq, PartialEq, Debug)] pub struct SchemaState { pub sdl: String, pub(crate) launch_id: Option<String>, } impl FromStr for SchemaState { type Err = Infallible; fn from_str(s: &str) -> Result<Self, Self::Err> { Ok(Self { sdl: s.to_string(), launch_id: None, }) } } type SchemaStream = Pin<Box<dyn Stream<Item = String> + Send>>; /// The user supplied schema. Either a static string or a stream for hot reloading. #[derive(From, Display, Educe)] #[educe(Debug)] #[non_exhaustive] pub enum SchemaSource { /// A static schema. #[display("String")] Static { schema_sdl: String }, /// A stream of schema. #[display("Stream")] Stream(#[educe(Debug(ignore))] SchemaStream), /// A YAML file that may be watched for changes. #[display("File")] File { /// The path of the schema file. path: PathBuf, /// `true` to watch the file for changes and hot apply them. watch: bool, }, /// Apollo managed federation. #[display("Registry")] Registry(UplinkConfig), /// A list of URLs to fetch the schema from. #[display("URLs")] URLs { /// The URLs to fetch the schema from. urls: Vec<Url>, }, } impl From<&'_ str> for SchemaSource { fn from(s: &'_ str) -> Self { Self::Static { schema_sdl: s.to_owned(), } } } impl SchemaSource { /// Convert this schema into a stream regardless of if is static or not. Allows for unified handling later. pub fn into_stream(self) -> impl Stream<Item = Event> { match self { SchemaSource::Static { schema_sdl: schema } => { let update_schema = UpdateSchema(SchemaState { sdl: schema, launch_id: None, }); stream::once(future::ready(update_schema)).boxed() } SchemaSource::Stream(stream) => stream .map(|sdl| { UpdateSchema(SchemaState { sdl, launch_id: None, }) }) .boxed(), SchemaSource::File { path, watch, } => { // Sanity check, does the schema file exists, if it doesn't then bail. if !path.exists() { tracing::error!( "Supergraph schema at path '{}' does not exist.", path.to_string_lossy() ); stream::empty().boxed() } else { //The schema file exists try and load it match std::fs::read_to_string(&path) { Ok(schema) => { if watch { crate::files::watch(&path) .filter_map(move |_| { let path = path.clone(); async move { match tokio::fs::read_to_string(&path).await { Ok(schema) => { let update_schema = UpdateSchema(SchemaState { sdl: schema, launch_id: None, }); Some(update_schema) } Err(err) => { tracing::error!(reason = %err, "failed to read supergraph schema"); None } } } }) .boxed() } else { let update_schema = UpdateSchema(SchemaState { sdl: schema, launch_id: None, }); stream::once(future::ready(update_schema)).boxed() } } Err(err) => { tracing::error!(reason = %err, "failed to read supergraph schema"); stream::empty().boxed() } } } } SchemaSource::Registry(uplink_config) => { stream_from_uplink::<SupergraphSdlQuery, SchemaState>(uplink_config) .filter_map(|res| { future::ready(match res { Ok(schema) => { let update_schema = UpdateSchema(schema); Some(update_schema) } Err(e) => { tracing::error!("{}", e); None } }) }) .boxed() } SchemaSource::URLs { urls } => { futures::stream::once(async move { fetch_supergraph_from_first_viable_url(&urls).await }) .filter_map(|s| async move { s.map(Event::UpdateSchema) }) .boxed() } } .chain(stream::iter(vec![NoMoreSchema])) .boxed() } } // Encapsulates fetching the schema from the first viable url. // It will try each url in order until it finds one that works. #[allow(clippy::unwrap_used)] // TODO - existing unwrap from router code async fn fetch_supergraph_from_first_viable_url(urls: &[Url]) -> Option<SchemaState> { let Ok(client) = reqwest::Client::builder() .no_gzip() .timeout(Duration::from_secs(10)) .build() else { tracing::error!("failed to create HTTP client to fetch supergraph schema"); return None; }; for url in urls { match client.get(Url::parse(url.as_ref()).unwrap()).send().await { Ok(res) if res.status().is_success() => match res.text().await { Ok(schema) => { return Some(SchemaState { sdl: schema, launch_id: None, }); } Err(err) => { tracing::warn!( url.full = %url, reason = %err, "failed to fetch supergraph schema" ) } }, Ok(res) => tracing::warn!( http.response.status_code = res.status().as_u16(), url.full = %url, "failed to fetch supergraph schema" ), Err(err) => tracing::warn!( url.full = %url, reason = %err, "failed to fetch supergraph schema" ), } } tracing::error!("failed to fetch supergraph schema from all urls"); None } #[cfg(test)] mod tests { use std::env::temp_dir; use test_log::test; use tracing_futures::WithSubscriber; use wiremock::Mock; use wiremock::MockServer; use wiremock::ResponseTemplate; use wiremock::matchers::method; use wiremock::matchers::path; use super::*; use crate::assert_snapshot_subscriber; use crate::files::tests::create_temp_file; use crate::files::tests::write_and_flush; #[test(tokio::test)] async fn schema_by_file_watching() { let (path, mut file) = create_temp_file(); let schema = include_str!("../testdata/supergraph.graphql"); write_and_flush(&mut file, schema).await; let mut stream = SchemaSource::File { path, watch: true } .into_stream() .boxed(); // First update is guaranteed assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_))); // Need different contents, since we won't get an event if content is the same let schema_minimal = include_str!("../testdata/minimal_supergraph.graphql"); // Modify the file and try again write_and_flush(&mut file, schema_minimal).await; assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_))); } #[test(tokio::test)] async fn schema_by_file_no_watch() { let (path, mut file) = create_temp_file(); let schema = include_str!("../testdata/supergraph.graphql"); write_and_flush(&mut file, schema).await; let mut stream = SchemaSource::File { path, watch: false }.into_stream(); assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_))); assert!(matches!(stream.next().await.unwrap(), NoMoreSchema)); } #[test(tokio::test)] async fn schema_by_file_missing() { let mut stream = SchemaSource::File { path: temp_dir().join("does_not_exist"), watch: true, } .into_stream(); // First update fails because the file is invalid. assert!(matches!(stream.next().await.unwrap(), NoMoreSchema)); } const SCHEMA_1: &str = "schema1"; const SCHEMA_2: &str = "schema2"; #[test(tokio::test)] async fn schema_by_url() { async { let mock_server = MockServer::start().await; Mock::given(method("GET")) .and(path("/schema1")) .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_1)) .mount(&mock_server) .await; Mock::given(method("GET")) .and(path("/schema2")) .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_2)) .mount(&mock_server) .await; let mut stream = SchemaSource::URLs { urls: vec![ Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(), Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(), ], } .into_stream(); assert!( matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1) ); assert!(matches!(stream.next().await.unwrap(), NoMoreSchema)); } .with_subscriber(assert_snapshot_subscriber!()) .await; } #[test(tokio::test)] async fn schema_by_url_fallback() { async { let mock_server = MockServer::start().await; Mock::given(method("GET")) .and(path("/schema1")) .respond_with(ResponseTemplate::new(400)) .mount(&mock_server) .await; Mock::given(method("GET")) .and(path("/schema2")) .respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_2)) .mount(&mock_server) .await; let mut stream = SchemaSource::URLs { urls: vec![ Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(), Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(), ], } .into_stream(); assert!( matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_2) ); assert!(matches!(stream.next().await.unwrap(), NoMoreSchema)); } .with_subscriber(assert_snapshot_subscriber!({ "[].fields[\"url.full\"]" => "[url.full]" })) .await; } #[test(tokio::test)] async fn schema_by_url_all_fail() { async { let mock_server = MockServer::start().await; Mock::given(method("GET")) .and(path("/schema1")) .respond_with(ResponseTemplate::new(400)) .mount(&mock_server) .await; Mock::given(method("GET")) .and(path("/schema2")) .respond_with(ResponseTemplate::new(400)) .mount(&mock_server) .await; let mut stream = SchemaSource::URLs { urls: vec![ Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(), Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(), ], } .into_stream(); assert!(matches!(stream.next().await.unwrap(), NoMoreSchema)); } .with_subscriber(assert_snapshot_subscriber!({ "[].fields[\"url.full\"]" => "[url.full]" })) .await; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/apollographql/apollo-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server