//! Bridge between MCP tool surface (ListMcpResources, ReadMcpResource, McpAuth, MCP) //! and the existing McpServerManager runtime. //! //! Provides a stateful client registry that tool handlers can use to //! connect to MCP servers and invoke their capabilities. use std::collections::HashMap; use std::sync::{Arc, Mutex, OnceLock}; use crate::mcp::mcp_tool_name; use crate::mcp_stdio::McpServerManager; use serde::{Deserialize, Serialize}; /// Status of a managed MCP server connection. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum McpConnectionStatus { Disconnected, Connecting, Connected, AuthRequired, Error, } impl std::fmt::Display for McpConnectionStatus { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Disconnected => write!(f, "disconnected"), Self::Connecting => write!(f, "connecting"), Self::Connected => write!(f, "connected"), Self::AuthRequired => write!(f, "auth_required"), Self::Error => write!(f, "error"), } } } /// Metadata about an MCP resource. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct McpResourceInfo { pub uri: String, pub name: String, pub description: Option, pub mime_type: Option, } /// Metadata about an MCP tool exposed by a server. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct McpToolInfo { pub name: String, pub description: Option, pub input_schema: Option, } /// Tracked state of an MCP server connection. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct McpServerState { pub server_name: String, pub status: McpConnectionStatus, pub tools: Vec, pub resources: Vec, pub server_info: Option, pub error_message: Option, } #[derive(Debug, Clone, Default)] pub struct McpToolRegistry { inner: Arc>>, manager: Arc>>>, } impl McpToolRegistry { #[must_use] pub fn new() -> Self { Self::default() } pub fn set_manager( &self, manager: Arc>, ) -> Result<(), Arc>> { self.manager.set(manager) } pub fn register_server( &self, server_name: &str, status: McpConnectionStatus, tools: Vec, resources: Vec, server_info: Option, ) { let mut inner = self.inner.lock().expect("mcp registry lock poisoned"); inner.insert( server_name.to_owned(), McpServerState { server_name: server_name.to_owned(), status, tools, resources, server_info, error_message: None, }, ); } pub fn get_server(&self, server_name: &str) -> Option { let inner = self.inner.lock().expect("mcp registry lock poisoned"); inner.get(server_name).cloned() } pub fn list_servers(&self) -> Vec { let inner = self.inner.lock().expect("mcp registry lock poisoned"); inner.values().cloned().collect() } pub fn list_resources(&self, server_name: &str) -> Result, String> { let inner = self.inner.lock().expect("mcp registry lock poisoned"); match inner.get(server_name) { Some(state) => { if state.status != McpConnectionStatus::Connected { return Err(format!( "server '{}' is not connected (status: {})", server_name, state.status )); } Ok(state.resources.clone()) } None => Err(format!("server '{}' not found", server_name)), } } pub fn read_resource(&self, server_name: &str, uri: &str) -> Result { let inner = self.inner.lock().expect("mcp registry lock poisoned"); let state = inner .get(server_name) .ok_or_else(|| format!("server '{}' not found", server_name))?; if state.status != McpConnectionStatus::Connected { return Err(format!( "server '{}' is not connected (status: {})", server_name, state.status )); } state .resources .iter() .find(|r| r.uri == uri) .cloned() .ok_or_else(|| format!("resource '{}' not found on server '{}'", uri, server_name)) } pub fn list_tools(&self, server_name: &str) -> Result, String> { let inner = self.inner.lock().expect("mcp registry lock poisoned"); match inner.get(server_name) { Some(state) => { if state.status != McpConnectionStatus::Connected { return Err(format!( "server '{}' is not connected (status: {})", server_name, state.status )); } Ok(state.tools.clone()) } None => Err(format!("server '{}' not found", server_name)), } } fn spawn_tool_call( manager: Arc>, qualified_tool_name: String, arguments: Option, ) -> Result { let join_handle = std::thread::Builder::new() .name(format!("mcp-tool-call-{qualified_tool_name}")) .spawn(move || { let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .map_err(|error| format!("failed to create MCP tool runtime: {error}"))?; runtime.block_on(async move { let response = { let mut manager = manager .lock() .map_err(|_| "mcp server manager lock poisoned".to_string())?; manager .discover_tools() .await .map_err(|error| error.to_string())?; let response = manager .call_tool(&qualified_tool_name, arguments) .await .map_err(|error| error.to_string()); let shutdown = manager.shutdown().await.map_err(|error| error.to_string()); match (response, shutdown) { (Ok(response), Ok(())) => Ok(response), (Err(error), Ok(())) | (Err(error), Err(_)) => Err(error), (Ok(_), Err(error)) => Err(error), } }?; if let Some(error) = response.error { return Err(format!( "MCP server returned JSON-RPC error for tools/call: {} ({})", error.message, error.code )); } let result = response.result.ok_or_else(|| { "MCP server returned no result for tools/call".to_string() })?; serde_json::to_value(result) .map_err(|error| format!("failed to serialize MCP tool result: {error}")) }) }) .map_err(|error| format!("failed to spawn MCP tool call thread: {error}"))?; join_handle.join().map_err(|panic_payload| { if let Some(message) = panic_payload.downcast_ref::<&str>() { format!("MCP tool call thread panicked: {message}") } else if let Some(message) = panic_payload.downcast_ref::() { format!("MCP tool call thread panicked: {message}") } else { "MCP tool call thread panicked".to_string() } })? } pub fn call_tool( &self, server_name: &str, tool_name: &str, arguments: &serde_json::Value, ) -> Result { let inner = self.inner.lock().expect("mcp registry lock poisoned"); let state = inner .get(server_name) .ok_or_else(|| format!("server '{}' not found", server_name))?; if state.status != McpConnectionStatus::Connected { return Err(format!( "server '{}' is not connected (status: {})", server_name, state.status )); } if !state.tools.iter().any(|t| t.name == tool_name) { return Err(format!( "tool '{}' not found on server '{}'", tool_name, server_name )); } drop(inner); let manager = self .manager .get() .cloned() .ok_or_else(|| "MCP server manager is not configured".to_string())?; Self::spawn_tool_call( manager, mcp_tool_name(server_name, tool_name), (!arguments.is_null()).then(|| arguments.clone()), ) } /// Set auth status for a server. pub fn set_auth_status( &self, server_name: &str, status: McpConnectionStatus, ) -> Result<(), String> { let mut inner = self.inner.lock().expect("mcp registry lock poisoned"); let state = inner .get_mut(server_name) .ok_or_else(|| format!("server '{}' not found", server_name))?; state.status = status; Ok(()) } /// Disconnect / remove a server. pub fn disconnect(&self, server_name: &str) -> Option { let mut inner = self.inner.lock().expect("mcp registry lock poisoned"); inner.remove(server_name) } /// Number of registered servers. #[must_use] pub fn len(&self) -> usize { let inner = self.inner.lock().expect("mcp registry lock poisoned"); inner.len() } #[must_use] pub fn is_empty(&self) -> bool { self.len() == 0 } } #[cfg(test)] mod tests { use std::collections::BTreeMap; use std::fs; use std::os::unix::fs::PermissionsExt; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{SystemTime, UNIX_EPOCH}; use super::*; use crate::config::{ ConfigSource, McpServerConfig, McpStdioServerConfig, ScopedMcpServerConfig, }; fn temp_dir() -> PathBuf { static NEXT_TEMP_DIR_ID: AtomicU64 = AtomicU64::new(0); let nanos = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("time should be after epoch") .as_nanos(); let unique_id = NEXT_TEMP_DIR_ID.fetch_add(1, Ordering::Relaxed); std::env::temp_dir().join(format!("runtime-mcp-tool-bridge-{nanos}-{unique_id}")) } fn cleanup_script(script_path: &Path) { if let Some(root) = script_path.parent() { let _ = fs::remove_dir_all(root); } } fn write_bridge_mcp_server_script() -> PathBuf { let root = temp_dir(); fs::create_dir_all(&root).expect("temp dir"); let script_path = root.join("bridge-mcp-server.py"); let script = [ "#!/usr/bin/env python3", "import json, os, sys", "LABEL = os.environ.get('MCP_SERVER_LABEL', 'server')", "LOG_PATH = os.environ.get('MCP_LOG_PATH')", "", "def log(method):", " if LOG_PATH:", " with open(LOG_PATH, 'a', encoding='utf-8') as handle:", " handle.write(f'{method}\\n')", "", "def read_message():", " header = b''", r" while not header.endswith(b'\r\n\r\n'):", " chunk = sys.stdin.buffer.read(1)", " if not chunk:", " return None", " header += chunk", " length = 0", r" for line in header.decode().split('\r\n'):", r" if line.lower().startswith('content-length:'):", r" length = int(line.split(':', 1)[1].strip())", " payload = sys.stdin.buffer.read(length)", " return json.loads(payload.decode())", "", "def send_message(message):", " payload = json.dumps(message).encode()", r" sys.stdout.buffer.write(f'Content-Length: {len(payload)}\r\n\r\n'.encode() + payload)", " sys.stdout.buffer.flush()", "", "while True:", " request = read_message()", " if request is None:", " break", " method = request['method']", " log(method)", " if method == 'initialize':", " send_message({", " 'jsonrpc': '2.0',", " 'id': request['id'],", " 'result': {", " 'protocolVersion': request['params']['protocolVersion'],", " 'capabilities': {'tools': {}},", " 'serverInfo': {'name': LABEL, 'version': '1.0.0'}", " }", " })", " elif method == 'tools/list':", " send_message({", " 'jsonrpc': '2.0',", " 'id': request['id'],", " 'result': {", " 'tools': [", " {", " 'name': 'echo',", " 'description': f'Echo tool for {LABEL}',", " 'inputSchema': {", " 'type': 'object',", " 'properties': {'text': {'type': 'string'}},", " 'required': ['text']", " }", " }", " ]", " }", " })", " elif method == 'tools/call':", " args = request['params'].get('arguments') or {}", " text = args.get('text', '')", " send_message({", " 'jsonrpc': '2.0',", " 'id': request['id'],", " 'result': {", " 'content': [{'type': 'text', 'text': f'{LABEL}:{text}'}],", " 'structuredContent': {'server': LABEL, 'echoed': text},", " 'isError': False", " }", " })", " else:", " send_message({", " 'jsonrpc': '2.0',", " 'id': request['id'],", " 'error': {'code': -32601, 'message': f'unknown method: {method}'},", " })", "", ] .join("\n"); fs::write(&script_path, script).expect("write script"); let mut permissions = fs::metadata(&script_path).expect("metadata").permissions(); permissions.set_mode(0o755); fs::set_permissions(&script_path, permissions).expect("chmod"); script_path } fn manager_server_config( script_path: &Path, server_name: &str, log_path: &Path, ) -> ScopedMcpServerConfig { ScopedMcpServerConfig { scope: ConfigSource::Local, config: McpServerConfig::Stdio(McpStdioServerConfig { command: "python3".to_string(), args: vec![script_path.to_string_lossy().into_owned()], env: BTreeMap::from([ ("MCP_SERVER_LABEL".to_string(), server_name.to_string()), ( "MCP_LOG_PATH".to_string(), log_path.to_string_lossy().into_owned(), ), ]), tool_call_timeout_ms: Some(1_000), }), } } #[test] fn registers_and_retrieves_server() { let registry = McpToolRegistry::new(); registry.register_server( "test-server", McpConnectionStatus::Connected, vec![McpToolInfo { name: "greet".into(), description: Some("Greet someone".into()), input_schema: None, }], vec![McpResourceInfo { uri: "res://data".into(), name: "Data".into(), description: None, mime_type: Some("application/json".into()), }], Some("TestServer v1.0".into()), ); let server = registry.get_server("test-server").expect("should exist"); assert_eq!(server.status, McpConnectionStatus::Connected); assert_eq!(server.tools.len(), 1); assert_eq!(server.resources.len(), 1); } #[test] fn lists_resources_from_connected_server() { let registry = McpToolRegistry::new(); registry.register_server( "srv", McpConnectionStatus::Connected, vec![], vec![McpResourceInfo { uri: "res://alpha".into(), name: "Alpha".into(), description: None, mime_type: None, }], None, ); let resources = registry.list_resources("srv").expect("should succeed"); assert_eq!(resources.len(), 1); assert_eq!(resources[0].uri, "res://alpha"); } #[test] fn rejects_resource_listing_for_disconnected_server() { let registry = McpToolRegistry::new(); registry.register_server( "srv", McpConnectionStatus::Disconnected, vec![], vec![], None, ); assert!(registry.list_resources("srv").is_err()); } #[test] fn reads_specific_resource() { let registry = McpToolRegistry::new(); registry.register_server( "srv", McpConnectionStatus::Connected, vec![], vec![McpResourceInfo { uri: "res://data".into(), name: "Data".into(), description: Some("Test data".into()), mime_type: Some("text/plain".into()), }], None, ); let resource = registry .read_resource("srv", "res://data") .expect("should find"); assert_eq!(resource.name, "Data"); assert!(registry.read_resource("srv", "res://missing").is_err()); } #[test] fn given_connected_server_without_manager_when_calling_tool_then_it_errors() { let registry = McpToolRegistry::new(); registry.register_server( "srv", McpConnectionStatus::Connected, vec![McpToolInfo { name: "greet".into(), description: None, input_schema: None, }], vec![], None, ); let error = registry .call_tool("srv", "greet", &serde_json::json!({"name": "world"})) .expect_err("should require a configured manager"); assert!(error.contains("MCP server manager is not configured")); // Unknown tool should fail assert!(registry .call_tool("srv", "missing", &serde_json::json!({})) .is_err()); } #[test] fn given_connected_server_with_manager_when_calling_tool_then_it_returns_live_result() { let script_path = write_bridge_mcp_server_script(); let root = script_path.parent().expect("script parent"); let log_path = root.join("bridge.log"); let servers = BTreeMap::from([( "alpha".to_string(), manager_server_config(&script_path, "alpha", &log_path), )]); let manager = Arc::new(Mutex::new(McpServerManager::from_servers(&servers))); let registry = McpToolRegistry::new(); registry.register_server( "alpha", McpConnectionStatus::Connected, vec![McpToolInfo { name: "echo".into(), description: Some("Echo tool for alpha".into()), input_schema: Some(serde_json::json!({ "type": "object", "properties": {"text": {"type": "string"}}, "required": ["text"] })), }], vec![], Some("bridge test server".into()), ); registry .set_manager(Arc::clone(&manager)) .expect("manager should only be set once"); let result = registry .call_tool("alpha", "echo", &serde_json::json!({"text": "hello"})) .expect("should return live MCP result"); assert_eq!( result["structuredContent"]["server"], serde_json::json!("alpha") ); assert_eq!( result["structuredContent"]["echoed"], serde_json::json!("hello") ); assert_eq!( result["content"][0]["text"], serde_json::json!("alpha:hello") ); let log = fs::read_to_string(&log_path).expect("read log"); assert_eq!( log.lines().collect::>(), vec!["initialize", "tools/list", "tools/call"] ); cleanup_script(&script_path); } #[test] fn rejects_tool_call_on_disconnected_server() { let registry = McpToolRegistry::new(); registry.register_server( "srv", McpConnectionStatus::AuthRequired, vec![McpToolInfo { name: "greet".into(), description: None, input_schema: None, }], vec![], None, ); assert!(registry .call_tool("srv", "greet", &serde_json::json!({})) .is_err()); } #[test] fn sets_auth_and_disconnects() { let registry = McpToolRegistry::new(); registry.register_server( "srv", McpConnectionStatus::AuthRequired, vec![], vec![], None, ); registry .set_auth_status("srv", McpConnectionStatus::Connected) .expect("should succeed"); let state = registry.get_server("srv").unwrap(); assert_eq!(state.status, McpConnectionStatus::Connected); let removed = registry.disconnect("srv"); assert!(removed.is_some()); assert!(registry.is_empty()); } #[test] fn rejects_operations_on_missing_server() { let registry = McpToolRegistry::new(); assert!(registry.list_resources("missing").is_err()); assert!(registry.read_resource("missing", "uri").is_err()); assert!(registry.list_tools("missing").is_err()); assert!(registry .call_tool("missing", "tool", &serde_json::json!({})) .is_err()); assert!(registry .set_auth_status("missing", McpConnectionStatus::Connected) .is_err()); } #[test] fn mcp_connection_status_display_all_variants() { // given let cases = [ (McpConnectionStatus::Disconnected, "disconnected"), (McpConnectionStatus::Connecting, "connecting"), (McpConnectionStatus::Connected, "connected"), (McpConnectionStatus::AuthRequired, "auth_required"), (McpConnectionStatus::Error, "error"), ]; // when let rendered: Vec<_> = cases .into_iter() .map(|(status, expected)| (status.to_string(), expected)) .collect(); // then assert_eq!( rendered, vec![ ("disconnected".to_string(), "disconnected"), ("connecting".to_string(), "connecting"), ("connected".to_string(), "connected"), ("auth_required".to_string(), "auth_required"), ("error".to_string(), "error"), ] ); } #[test] fn list_servers_returns_all_registered() { // given let registry = McpToolRegistry::new(); registry.register_server( "alpha", McpConnectionStatus::Connected, vec![], vec![], None, ); registry.register_server( "beta", McpConnectionStatus::Connecting, vec![], vec![], None, ); // when let servers = registry.list_servers(); // then assert_eq!(servers.len(), 2); assert!(servers.iter().any(|server| server.server_name == "alpha")); assert!(servers.iter().any(|server| server.server_name == "beta")); } #[test] fn list_tools_from_connected_server() { // given let registry = McpToolRegistry::new(); registry.register_server( "srv", McpConnectionStatus::Connected, vec![McpToolInfo { name: "inspect".into(), description: Some("Inspect data".into()), input_schema: Some(serde_json::json!({"type": "object"})), }], vec![], None, ); // when let tools = registry.list_tools("srv").expect("tools should list"); // then assert_eq!(tools.len(), 1); assert_eq!(tools[0].name, "inspect"); } #[test] fn list_tools_rejects_disconnected_server() { // given let registry = McpToolRegistry::new(); registry.register_server( "srv", McpConnectionStatus::AuthRequired, vec![], vec![], None, ); // when let result = registry.list_tools("srv"); // then let error = result.expect_err("non-connected server should fail"); assert!(error.contains("not connected")); assert!(error.contains("auth_required")); } #[test] fn list_tools_rejects_missing_server() { // given let registry = McpToolRegistry::new(); // when let result = registry.list_tools("missing"); // then assert_eq!( result.expect_err("missing server should fail"), "server 'missing' not found" ); } #[test] fn get_server_returns_none_for_missing() { // given let registry = McpToolRegistry::new(); // when let server = registry.get_server("missing"); // then assert!(server.is_none()); } #[test] fn call_tool_payload_structure() { let script_path = write_bridge_mcp_server_script(); let root = script_path.parent().expect("script parent"); let log_path = root.join("payload.log"); let servers = BTreeMap::from([( "srv".to_string(), manager_server_config(&script_path, "srv", &log_path), )]); let registry = McpToolRegistry::new(); let arguments = serde_json::json!({"text": "world"}); registry.register_server( "srv", McpConnectionStatus::Connected, vec![McpToolInfo { name: "echo".into(), description: Some("Echo tool for srv".into()), input_schema: Some(serde_json::json!({ "type": "object", "properties": {"text": {"type": "string"}}, "required": ["text"] })), }], vec![], None, ); registry .set_manager(Arc::new(Mutex::new(McpServerManager::from_servers( &servers, )))) .expect("manager should only be set once"); let result = registry .call_tool("srv", "echo", &arguments) .expect("tool should return live payload"); assert_eq!(result["structuredContent"]["server"], "srv"); assert_eq!(result["structuredContent"]["echoed"], "world"); assert_eq!(result["content"][0]["text"], "srv:world"); cleanup_script(&script_path); } #[test] fn upsert_overwrites_existing_server() { // given let registry = McpToolRegistry::new(); registry.register_server("srv", McpConnectionStatus::Connecting, vec![], vec![], None); // when registry.register_server( "srv", McpConnectionStatus::Connected, vec![McpToolInfo { name: "inspect".into(), description: None, input_schema: None, }], vec![], Some("Inspector".into()), ); let state = registry.get_server("srv").expect("server should exist"); // then assert_eq!(state.status, McpConnectionStatus::Connected); assert_eq!(state.tools.len(), 1); assert_eq!(state.server_info.as_deref(), Some("Inspector")); } #[test] fn disconnect_missing_returns_none() { // given let registry = McpToolRegistry::new(); // when let removed = registry.disconnect("missing"); // then assert!(removed.is_none()); } #[test] fn len_and_is_empty_transitions() { // given let registry = McpToolRegistry::new(); // when registry.register_server( "alpha", McpConnectionStatus::Connected, vec![], vec![], None, ); registry.register_server("beta", McpConnectionStatus::Connected, vec![], vec![], None); let after_create = registry.len(); registry.disconnect("alpha"); let after_first_remove = registry.len(); registry.disconnect("beta"); // then assert_eq!(after_create, 2); assert_eq!(after_first_remove, 1); assert_eq!(registry.len(), 0); assert!(registry.is_empty()); } }