Skip to main content
Glama
sse_e2e.rs16.5 kB
/// End-to-end integration tests for SSE MCP server /// These tests start the actual server with SSE transport and test real MCP protocol communication use tokio::time::{timeout, Duration, sleep}; use serde_json::{json, Value}; use reqwest; mod common; /// Helper to send a message to SSE server via POST and receive response via SSE stream async fn send_sse_message( client: &reqwest::Client, endpoint_url: &str, message: Value, ) -> Result<(), Box<dyn std::error::Error>> { let response = client .post(endpoint_url) .json(&message) .send() .await?; let status = response.status(); if !status.is_success() { let body = response.text().await.unwrap_or_default(); eprintln!("Response body: {}", body); return Err(format!("Failed to send message: {}", status).into()); } Ok(()) } /// Holds an active SSE session with background task to drain the stream struct SseSession { endpoint_url: String, _task_handle: tokio::task::JoinHandle<()>, _client: reqwest::Client, } impl SseSession { /// Get the endpoint URL for this session fn endpoint(&self) -> &str { &self.endpoint_url } } /// Helper to establish SSE connection and extract session endpoint /// Returns an SseSession that keeps the connection alive async fn get_session_endpoint( base_url: &str, ) -> Result<SseSession, Box<dyn std::error::Error>> { // Create client with longer timeouts and connection settings to prevent premature closures let client = reqwest::Client::builder() .timeout(Duration::from_secs(60)) .pool_max_idle_per_host(0) // Disable connection pooling .tcp_keepalive(Duration::from_secs(10)) .build()?; let sse_url = format!("{}/sse", base_url); let mut response = client .get(&sse_url) .send() .await?; let mut buffer = String::new(); // Read from response until we get the endpoint event while let Ok(Some(chunk)) = timeout(Duration::from_secs(2), response.chunk()).await? { buffer.push_str(&String::from_utf8_lossy(&chunk)); // Parse the endpoint from the SSE event // Format: "event: endpoint\ndata: /message?sessionId=...\n\n" for line in buffer.lines() { if line.starts_with("data: ") { let endpoint = line.strip_prefix("data: ").unwrap_or(""); if endpoint.contains("sessionId=") { let endpoint_url = format!("{}{}", base_url, endpoint); // Spawn background task to continuously drain the SSE stream // This is critical - if we don't read from the stream, the server's // write buffer will fill up and it won't be able to send more data let task_handle = tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_millis(10)); loop { tokio::select! { chunk_result = response.chunk() => { match chunk_result { Ok(Some(_)) => { // Successfully drained a chunk continue; } Ok(None) => { // Stream ended break; } Err(_) => { // Error reading break; } } } _ = interval.tick() => { // Continue draining } } } }); // Give the background task time to start sleep(Duration::from_millis(100)).await; return Ok(SseSession { endpoint_url, _task_handle: task_handle, _client: client, }); } } } // If buffer gets too large, something is wrong if buffer.len() > 1024 { break; } } Err("Failed to extract session endpoint".into()) } /// Helper structure to manage SSE server process struct SseServer { child: Option<tokio::process::Child>, base_url: String, } impl SseServer { /// Start a new SSE MCP server for testing async fn start(port: u16, heap_dir: &str) -> Result<Self, Box<dyn std::error::Error>> { use tokio::process::Command; use std::process::Stdio; let child = Command::new(env!("CARGO")) .args(&["run", "--", "--directory-path", heap_dir, "--sse-port", &port.to_string()]) .stdin(Stdio::null()) .stdout(Stdio::inherit()) // Show server stdout .stderr(Stdio::inherit()) // Show server stderr .spawn()?; let base_url = format!("http://127.0.0.1:{}", port); // Give server time to start sleep(Duration::from_millis(1000)).await; Ok(SseServer { child: Some(child), base_url, }) } /// Stop the server async fn stop(&mut self) { if let Some(mut child) = self.child.take() { let _ = child.kill().await; // Wait for the process to actually terminate and release the port let _ = child.wait().await; // Give the OS time to release the port (longer wait for SSE server cleanup) // Increased to 1500ms to ensure ports are fully released between sequential tests sleep(Duration::from_millis(1500)).await; } } } impl Drop for SseServer { fn drop(&mut self) { // Kill the child process synchronously if it still exists if let Some(child) = &mut self.child { // Use blocking kill - this is in Drop so we can't use async let _ = child.start_kill(); } } } /// Test SSE server starts and accepts connections #[tokio::test] async fn test_sse_server_startup() -> Result<(), Box<dyn std::error::Error>> { let heap_dir = common::create_temp_heap_dir(); let port = 9001; let mut server = SseServer::start(port, &heap_dir).await?; // Try to connect to SSE endpoint let client = reqwest::Client::new(); let sse_url = format!("{}/sse", server.base_url); let response = timeout( Duration::from_secs(2), client.get(&sse_url).send() ).await; // Server should respond (even if we can't fully process the SSE stream) assert!(response.is_ok(), "Should be able to connect to SSE endpoint"); server.stop().await; common::cleanup_heap_dir(&heap_dir); Ok(()) } /// Test sending POST message to SSE server #[tokio::test] async fn test_sse_post_message() -> Result<(), Box<dyn std::error::Error>> { let heap_dir = common::create_temp_heap_dir(); let port = 9002; let mut server = SseServer::start(port, &heap_dir).await?; let client = reqwest::Client::new(); // Get the session endpoint from the SSE stream let session = get_session_endpoint(&server.base_url).await?; // Send initialize message let initialize_msg = json!({ "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": { "name": "sse-e2e-test", "version": "1.0.0" } } }); let result = timeout( Duration::from_secs(2), send_sse_message(&client, session.endpoint(), initialize_msg) ).await; assert!(result.is_ok(), "Should be able to send message to SSE server"); server.stop().await; common::cleanup_heap_dir(&heap_dir); Ok(()) } /// Test MCP initialize handshake via SSE #[tokio::test] async fn test_sse_initialize_handshake() -> Result<(), Box<dyn std::error::Error>> { let heap_dir = common::create_temp_heap_dir(); let port = 9003; let mut server = SseServer::start(port, &heap_dir).await?; let client = reqwest::Client::builder() .build()?; // Get the session endpoint from the SSE stream let session = get_session_endpoint(&server.base_url).await?; // Send initialize request let initialize_msg = json!({ "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": { "name": "sse-e2e-test", "version": "1.0.0" } } }); send_sse_message(&client, session.endpoint(), initialize_msg).await?; // Note: This is a basic test - full SSE message parsing would require // proper SSE client implementation assert!(true, "SSE handshake test completed"); server.stop().await; common::cleanup_heap_dir(&heap_dir); Ok(()) } /// Test run_js tool execution via SSE #[tokio::test] async fn test_sse_run_js_execution() -> Result<(), Box<dyn std::error::Error>> { let heap_dir = common::create_temp_heap_dir(); let port = 9004; let mut server = SseServer::start(port, &heap_dir).await?; let client = reqwest::Client::builder() .build()?; // Get the session endpoint from the SSE stream let session = get_session_endpoint(&server.base_url).await?; // Initialize let initialize_msg = json!({ "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": { "name": "sse-e2e-test", "version": "1.0.0" } } }); send_sse_message(&client, session.endpoint(), initialize_msg).await?; sleep(Duration::from_millis(200)).await; // Call run_js tool let tool_call_msg = json!({ "jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": { "name": "run_js", "arguments": { "code": "1 + 1", "heap": "sse-test-heap" } } }); let result = send_sse_message(&client, session.endpoint(), tool_call_msg).await; assert!(result.is_ok(), "Should be able to call run_js tool"); server.stop().await; common::cleanup_heap_dir(&heap_dir); Ok(()) } /// Test heap persistence across multiple SSE calls #[tokio::test] async fn test_sse_heap_persistence() -> Result<(), Box<dyn std::error::Error>> { let heap_dir = common::create_temp_heap_dir(); let port = 9005; let mut server = SseServer::start(port, &heap_dir).await?; let client = reqwest::Client::builder() .build()?; // Get the session endpoint from the SSE stream let session = get_session_endpoint(&server.base_url).await?; // Initialize let initialize_msg = json!({ "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": { "name": "sse-e2e-test", "version": "1.0.0" } } }); send_sse_message(&client, session.endpoint(), initialize_msg).await?; sleep(Duration::from_millis(500)).await; // Set and read a variable in one call to test heap persistence let test_msg = json!({ "jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": { "name": "run_js", "arguments": { "code": "var sseValue = 100; sseValue", "heap": "sse-persistence-heap" } } }); let result = send_sse_message(&client, session.endpoint(), test_msg).await; assert!(result.is_ok(), "Should be able to execute JavaScript and use heap"); server.stop().await; common::cleanup_heap_dir(&heap_dir); Ok(()) } /// Test error handling for invalid JavaScript via SSE #[tokio::test] async fn test_sse_invalid_javascript_error() -> Result<(), Box<dyn std::error::Error>> { let heap_dir = common::create_temp_heap_dir(); let port = 9006; let mut server = SseServer::start(port, &heap_dir).await?; let client = reqwest::Client::builder() .build()?; // Get the session endpoint from the SSE stream let session = get_session_endpoint(&server.base_url).await?; // Initialize let initialize_msg = json!({ "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": { "name": "sse-e2e-test", "version": "1.0.0" } } }); send_sse_message(&client, session.endpoint(), initialize_msg).await?; sleep(Duration::from_millis(200)).await; // Send invalid JavaScript let invalid_js_msg = json!({ "jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": { "name": "run_js", "arguments": { "code": "this is not valid javascript at all!!!", "heap": "sse-error-heap" } } }); // Server should accept the message (error will be in SSE response) let result = send_sse_message(&client, session.endpoint(), invalid_js_msg).await; assert!(result.is_ok(), "Server should accept message even with invalid JS"); server.stop().await; common::cleanup_heap_dir(&heap_dir); Ok(()) } /// Test multiple sequential operations via SSE #[tokio::test] async fn test_sse_sequential_operations() -> Result<(), Box<dyn std::error::Error>> { let heap_dir = common::create_temp_heap_dir(); let port = 9007; let mut server = SseServer::start(port, &heap_dir).await?; let client = reqwest::Client::builder() .build()?; // Get the session endpoint from the SSE stream let session = get_session_endpoint(&server.base_url).await?; // Initialize let initialize_msg = json!({ "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": { "name": "sse-e2e-test", "version": "1.0.0" } } }); send_sse_message(&client, session.endpoint(), initialize_msg).await?; sleep(Duration::from_millis(500)).await; // Test a single sequential operation let msg = json!({ "jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": { "name": "run_js", "arguments": { "code": "var counter = 0; counter = counter + 5; counter", "heap": "sse-sequential-heap" } } }); send_sse_message(&client, session.endpoint(), msg).await?; server.stop().await; common::cleanup_heap_dir(&heap_dir); Ok(()) } /// Test SSE keep-alive behavior #[tokio::test] async fn test_sse_keepalive() -> Result<(), Box<dyn std::error::Error>> { let heap_dir = common::create_temp_heap_dir(); let port = 9008; let mut server = SseServer::start(port, &heap_dir).await?; // Connect to SSE endpoint let sse_url = format!("{}/sse", server.base_url); let connection_task = tokio::spawn(async move { reqwest::Client::new() .get(&sse_url) .timeout(Duration::from_secs(20)) .send() .await }); // Wait for keep-alive duration (15 seconds + buffer) // The connection should remain open due to keep-alive messages sleep(Duration::from_secs(3)).await; // If we get here without the connection closing, keep-alive is working assert!(true, "SSE connection should remain open with keep-alive"); connection_task.abort(); server.stop().await; common::cleanup_heap_dir(&heap_dir); Ok(()) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/r33drichards/mcp-js'

If you have feedback or need assistance with the MCP directory API, please join our Discord server