pub mod event;
mod schema_stream;
use std::convert::Infallible;
use std::str::FromStr;
use std::path::PathBuf;
use std::pin::Pin;
use std::time::Duration;
use crate::uplink::UplinkConfig;
use crate::uplink::stream_from_uplink;
use derive_more::Display;
use derive_more::From;
use educe::Educe;
use event::Event;
use event::Event::{NoMoreSchema, UpdateSchema};
use futures::prelude::*;
pub(crate) use schema_stream::SupergraphSdlQuery;
use url::Url;
/// Represents the new state of a schema after an update.
#[derive(Eq, PartialEq, Debug)]
pub struct SchemaState {
pub sdl: String,
pub(crate) launch_id: Option<String>,
}
impl FromStr for SchemaState {
type Err = Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self {
sdl: s.to_string(),
launch_id: None,
})
}
}
type SchemaStream = Pin<Box<dyn Stream<Item = String> + Send>>;
/// The user supplied schema. Either a static string or a stream for hot reloading.
#[derive(From, Display, Educe)]
#[educe(Debug)]
#[non_exhaustive]
pub enum SchemaSource {
/// A static schema.
#[display("String")]
Static { schema_sdl: String },
/// A stream of schema.
#[display("Stream")]
Stream(#[educe(Debug(ignore))] SchemaStream),
/// A YAML file that may be watched for changes.
#[display("File")]
File {
/// The path of the schema file.
path: PathBuf,
/// `true` to watch the file for changes and hot apply them.
watch: bool,
},
/// Apollo managed federation.
#[display("Registry")]
Registry(UplinkConfig),
/// A list of URLs to fetch the schema from.
#[display("URLs")]
URLs {
/// The URLs to fetch the schema from.
urls: Vec<Url>,
},
}
impl From<&'_ str> for SchemaSource {
fn from(s: &'_ str) -> Self {
Self::Static {
schema_sdl: s.to_owned(),
}
}
}
impl SchemaSource {
/// Convert this schema into a stream regardless of if is static or not. Allows for unified handling later.
pub fn into_stream(self) -> impl Stream<Item = Event> {
match self {
SchemaSource::Static { schema_sdl: schema } => {
let update_schema = UpdateSchema(SchemaState {
sdl: schema,
launch_id: None,
});
stream::once(future::ready(update_schema)).boxed()
}
SchemaSource::Stream(stream) => stream
.map(|sdl| {
UpdateSchema(SchemaState {
sdl,
launch_id: None,
})
})
.boxed(),
SchemaSource::File {
path,
watch,
} => {
// Sanity check, does the schema file exists, if it doesn't then bail.
if !path.exists() {
tracing::error!(
"Supergraph schema at path '{}' does not exist.",
path.to_string_lossy()
);
stream::empty().boxed()
} else {
//The schema file exists try and load it
match std::fs::read_to_string(&path) {
Ok(schema) => {
if watch {
crate::files::watch(&path)
.filter_map(move |_| {
let path = path.clone();
async move {
match tokio::fs::read_to_string(&path).await {
Ok(schema) => {
let update_schema = UpdateSchema(SchemaState {
sdl: schema,
launch_id: None,
});
Some(update_schema)
}
Err(err) => {
tracing::error!(reason = %err, "failed to read supergraph schema");
None
}
}
}
})
.boxed()
} else {
let update_schema = UpdateSchema(SchemaState {
sdl: schema,
launch_id: None,
});
stream::once(future::ready(update_schema)).boxed()
}
}
Err(err) => {
tracing::error!(reason = %err, "failed to read supergraph schema");
stream::empty().boxed()
}
}
}
}
SchemaSource::Registry(uplink_config) => {
stream_from_uplink::<SupergraphSdlQuery, SchemaState>(uplink_config)
.filter_map(|res| {
future::ready(match res {
Ok(schema) => {
let update_schema = UpdateSchema(schema);
Some(update_schema)
}
Err(e) => {
tracing::error!("{}", e);
None
}
})
})
.boxed()
}
SchemaSource::URLs { urls } => {
futures::stream::once(async move {
fetch_supergraph_from_first_viable_url(&urls).await
})
.filter_map(|s| async move { s.map(Event::UpdateSchema) })
.boxed()
}
}
.chain(stream::iter(vec![NoMoreSchema]))
.boxed()
}
}
// Encapsulates fetching the schema from the first viable url.
// It will try each url in order until it finds one that works.
#[allow(clippy::unwrap_used)] // TODO - existing unwrap from router code
async fn fetch_supergraph_from_first_viable_url(urls: &[Url]) -> Option<SchemaState> {
let Ok(client) = reqwest::Client::builder()
.no_gzip()
.timeout(Duration::from_secs(10))
.build()
else {
tracing::error!("failed to create HTTP client to fetch supergraph schema");
return None;
};
for url in urls {
match client.get(Url::parse(url.as_ref()).unwrap()).send().await {
Ok(res) if res.status().is_success() => match res.text().await {
Ok(schema) => {
return Some(SchemaState {
sdl: schema,
launch_id: None,
});
}
Err(err) => {
tracing::warn!(
url.full = %url,
reason = %err,
"failed to fetch supergraph schema"
)
}
},
Ok(res) => tracing::warn!(
http.response.status_code = res.status().as_u16(),
url.full = %url,
"failed to fetch supergraph schema"
),
Err(err) => tracing::warn!(
url.full = %url,
reason = %err,
"failed to fetch supergraph schema"
),
}
}
tracing::error!("failed to fetch supergraph schema from all urls");
None
}
#[cfg(test)]
mod tests {
use std::env::temp_dir;
use test_log::test;
use tracing_futures::WithSubscriber;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
use super::*;
use crate::assert_snapshot_subscriber;
use crate::files::tests::create_temp_file;
use crate::files::tests::write_and_flush;
#[test(tokio::test)]
async fn schema_by_file_watching() {
let (path, mut file) = create_temp_file();
let schema = include_str!("../testdata/supergraph.graphql");
write_and_flush(&mut file, schema).await;
let mut stream = SchemaSource::File { path, watch: true }
.into_stream()
.boxed();
// First update is guaranteed
assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
// Need different contents, since we won't get an event if content is the same
let schema_minimal = include_str!("../testdata/minimal_supergraph.graphql");
// Modify the file and try again
write_and_flush(&mut file, schema_minimal).await;
assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
}
#[test(tokio::test)]
async fn schema_by_file_no_watch() {
let (path, mut file) = create_temp_file();
let schema = include_str!("../testdata/supergraph.graphql");
write_and_flush(&mut file, schema).await;
let mut stream = SchemaSource::File { path, watch: false }.into_stream();
assert!(matches!(stream.next().await.unwrap(), UpdateSchema(_)));
assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
}
#[test(tokio::test)]
async fn schema_by_file_missing() {
let mut stream = SchemaSource::File {
path: temp_dir().join("does_not_exist"),
watch: true,
}
.into_stream();
// First update fails because the file is invalid.
assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
}
const SCHEMA_1: &str = "schema1";
const SCHEMA_2: &str = "schema2";
#[test(tokio::test)]
async fn schema_by_url() {
async {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/schema1"))
.respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_1))
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/schema2"))
.respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_2))
.mount(&mock_server)
.await;
let mut stream = SchemaSource::URLs {
urls: vec![
Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
],
}
.into_stream();
assert!(
matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_1)
);
assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
}
.with_subscriber(assert_snapshot_subscriber!())
.await;
}
#[test(tokio::test)]
async fn schema_by_url_fallback() {
async {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/schema1"))
.respond_with(ResponseTemplate::new(400))
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/schema2"))
.respond_with(ResponseTemplate::new(200).set_body_string(SCHEMA_2))
.mount(&mock_server)
.await;
let mut stream = SchemaSource::URLs {
urls: vec![
Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
],
}
.into_stream();
assert!(
matches!(stream.next().await.unwrap(), UpdateSchema(schema) if schema.sdl == SCHEMA_2)
);
assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
}
.with_subscriber(assert_snapshot_subscriber!({
"[].fields[\"url.full\"]" => "[url.full]"
}))
.await;
}
#[test(tokio::test)]
async fn schema_by_url_all_fail() {
async {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/schema1"))
.respond_with(ResponseTemplate::new(400))
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/schema2"))
.respond_with(ResponseTemplate::new(400))
.mount(&mock_server)
.await;
let mut stream = SchemaSource::URLs {
urls: vec![
Url::parse(&format!("http://{}/schema1", mock_server.address())).unwrap(),
Url::parse(&format!("http://{}/schema2", mock_server.address())).unwrap(),
],
}
.into_stream();
assert!(matches!(stream.next().await.unwrap(), NoMoreSchema));
}
.with_subscriber(assert_snapshot_subscriber!({
"[].fields[\"url.full\"]" => "[url.full]"
}))
.await;
}
}