From f5e94f3c92b99f55c8d287c1fd0c9dcaffce2ff2 Mon Sep 17 00:00:00 2001 From: Jobdori Date: Sat, 4 Apr 2026 00:38:35 +0900 Subject: [PATCH 1/2] feat(runtime): plugin lifecycle --- rust/crates/runtime/src/lib.rs | 11 +- rust/crates/runtime/src/plugin_lifecycle.rs | 459 ++++++++++++++++++++ 2 files changed, 467 insertions(+), 3 deletions(-) create mode 100644 rust/crates/runtime/src/plugin_lifecycle.rs diff --git a/rust/crates/runtime/src/lib.rs b/rust/crates/runtime/src/lib.rs index 1c01a3f..083d8fd 100644 --- a/rust/crates/runtime/src/lib.rs +++ b/rust/crates/runtime/src/lib.rs @@ -15,6 +15,7 @@ pub mod mcp_tool_bridge; mod oauth; pub mod permission_enforcer; mod permissions; +pub mod plugin_lifecycle; mod prompt; mod remote; pub mod sandbox; @@ -80,6 +81,10 @@ pub use permissions::{ PermissionContext, PermissionMode, PermissionOutcome, PermissionOverride, PermissionPolicy, PermissionPromptDecision, PermissionPrompter, PermissionRequest, }; +pub use plugin_lifecycle::{ + DegradedMode, DiscoveryResult, PluginHealthcheck, PluginLifecycle, PluginLifecycleEvent, + PluginState, ResourceInfo, ServerHealth, ServerStatus, ToolInfo, +}; pub use prompt::{ load_system_prompt, prepend_bullets, ContextFile, ProjectContext, PromptBuildError, SystemPromptBuilder, FRONTIER_MODEL_NAME, SYSTEM_PROMPT_DYNAMIC_BOUNDARY, @@ -100,13 +105,13 @@ pub use session::{ SessionFork, }; pub use sse::{IncrementalSseParser, SseEvent}; +pub use usage::{ + format_usd, pricing_for_model, ModelPricing, TokenUsage, UsageCostEstimate, UsageTracker, +}; pub use worker_boot::{ Worker, WorkerEvent, WorkerEventKind, WorkerFailure, WorkerFailureKind, WorkerReadySnapshot, WorkerRegistry, WorkerStatus, }; -pub use usage::{ - format_usd, pricing_for_model, ModelPricing, TokenUsage, UsageCostEstimate, UsageTracker, -}; #[cfg(test)] pub(crate) fn test_env_lock() -> std::sync::MutexGuard<'static, ()> { diff --git a/rust/crates/runtime/src/plugin_lifecycle.rs b/rust/crates/runtime/src/plugin_lifecycle.rs new file mode 100644 index 0000000..4400599 --- /dev/null +++ b/rust/crates/runtime/src/plugin_lifecycle.rs @@ -0,0 +1,459 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +use serde::{Deserialize, Serialize}; + +use crate::config::RuntimePluginConfig; +use crate::mcp_tool_bridge::{McpResourceInfo, McpToolInfo}; + +fn now_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} + +pub type ToolInfo = McpToolInfo; +pub type ResourceInfo = McpResourceInfo; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ServerStatus { + Healthy, + Degraded, + Failed, +} + +impl std::fmt::Display for ServerStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Healthy => write!(f, "healthy"), + Self::Degraded => write!(f, "degraded"), + Self::Failed => write!(f, "failed"), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ServerHealth { + pub server_name: String, + pub status: ServerStatus, + pub capabilities: Vec, + pub last_error: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case", tag = "state")] +pub enum PluginState { + Unconfigured, + Validated, + Starting, + Healthy, + Degraded { + healthy_servers: Vec, + failed_servers: Vec, + }, + Failed { + reason: String, + }, + ShuttingDown, + Stopped, +} + +impl PluginState { + #[must_use] + pub fn from_servers(servers: &[ServerHealth]) -> Self { + if servers.is_empty() { + return Self::Failed { + reason: "no servers available".to_string(), + }; + } + + let healthy_servers = servers + .iter() + .filter(|server| server.status == ServerStatus::Healthy) + .map(|server| server.server_name.clone()) + .collect::>(); + let failed_servers = servers + .iter() + .filter(|server| server.status != ServerStatus::Healthy) + .cloned() + .collect::>(); + + if failed_servers.is_empty() { + Self::Healthy + } else if healthy_servers.is_empty() { + Self::Failed { + reason: format!("all {} servers failed", failed_servers.len()), + } + } else { + Self::Degraded { + healthy_servers, + failed_servers, + } + } + } +} + +impl std::fmt::Display for PluginState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Unconfigured => write!(f, "unconfigured"), + Self::Validated => write!(f, "validated"), + Self::Starting => write!(f, "starting"), + Self::Healthy => write!(f, "healthy"), + Self::Degraded { .. } => write!(f, "degraded"), + Self::Failed { .. } => write!(f, "failed"), + Self::ShuttingDown => write!(f, "shutting_down"), + Self::Stopped => write!(f, "stopped"), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PluginHealthcheck { + pub plugin_name: String, + pub state: PluginState, + pub servers: Vec, + pub last_check: u64, +} + +impl PluginHealthcheck { + #[must_use] + pub fn new(plugin_name: impl Into, servers: Vec) -> Self { + let state = PluginState::from_servers(&servers); + Self { + plugin_name: plugin_name.into(), + state, + servers, + last_check: now_secs(), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DiscoveryResult { + pub tools: Vec, + pub resources: Vec, + pub partial: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct DegradedMode { + pub available_tools: Vec, + pub unavailable_tools: Vec, + pub reason: String, +} + +impl DegradedMode { + #[must_use] + pub fn new( + available_tools: Vec, + unavailable_tools: Vec, + reason: impl Into, + ) -> Self { + Self { + available_tools, + unavailable_tools, + reason: reason.into(), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum PluginLifecycleEvent { + ConfigValidated, + StartupHealthy, + StartupDegraded, + StartupFailed, + Shutdown, +} + +impl std::fmt::Display for PluginLifecycleEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::ConfigValidated => write!(f, "config_validated"), + Self::StartupHealthy => write!(f, "startup_healthy"), + Self::StartupDegraded => write!(f, "startup_degraded"), + Self::StartupFailed => write!(f, "startup_failed"), + Self::Shutdown => write!(f, "shutdown"), + } + } +} + +pub trait PluginLifecycle { + fn validate_config(&self, config: &RuntimePluginConfig) -> Result<(), String>; + fn healthcheck(&self) -> PluginHealthcheck; + fn discover(&self) -> DiscoveryResult; + fn shutdown(&mut self) -> Result<(), String>; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Debug, Clone)] + struct MockPluginLifecycle { + plugin_name: String, + valid_config: bool, + healthcheck: PluginHealthcheck, + discovery: DiscoveryResult, + shutdown_error: Option, + shutdown_called: bool, + } + + impl MockPluginLifecycle { + fn new( + plugin_name: &str, + valid_config: bool, + servers: Vec, + discovery: DiscoveryResult, + shutdown_error: Option, + ) -> Self { + Self { + plugin_name: plugin_name.to_string(), + valid_config, + healthcheck: PluginHealthcheck::new(plugin_name, servers), + discovery, + shutdown_error, + shutdown_called: false, + } + } + } + + impl PluginLifecycle for MockPluginLifecycle { + fn validate_config(&self, _config: &RuntimePluginConfig) -> Result<(), String> { + if self.valid_config { + Ok(()) + } else { + Err(format!( + "plugin `{}` failed configuration validation", + self.plugin_name + )) + } + } + + fn healthcheck(&self) -> PluginHealthcheck { + if self.shutdown_called { + PluginHealthcheck { + plugin_name: self.plugin_name.clone(), + state: PluginState::Stopped, + servers: self.healthcheck.servers.clone(), + last_check: now_secs(), + } + } else { + self.healthcheck.clone() + } + } + + fn discover(&self) -> DiscoveryResult { + self.discovery.clone() + } + + fn shutdown(&mut self) -> Result<(), String> { + if let Some(error) = &self.shutdown_error { + return Err(error.clone()); + } + + self.shutdown_called = true; + Ok(()) + } + } + + fn healthy_server(name: &str, capabilities: &[&str]) -> ServerHealth { + ServerHealth { + server_name: name.to_string(), + status: ServerStatus::Healthy, + capabilities: capabilities + .iter() + .map(|capability| capability.to_string()) + .collect(), + last_error: None, + } + } + + fn failed_server(name: &str, capabilities: &[&str], error: &str) -> ServerHealth { + ServerHealth { + server_name: name.to_string(), + status: ServerStatus::Failed, + capabilities: capabilities + .iter() + .map(|capability| capability.to_string()) + .collect(), + last_error: Some(error.to_string()), + } + } + + fn tool(name: &str) -> ToolInfo { + ToolInfo { + name: name.to_string(), + description: Some(format!("{name} tool")), + input_schema: None, + } + } + + fn resource(name: &str, uri: &str) -> ResourceInfo { + ResourceInfo { + uri: uri.to_string(), + name: name.to_string(), + description: Some(format!("{name} resource")), + mime_type: Some("application/json".to_string()), + } + } + + #[test] + fn full_lifecycle_happy_path() { + // given + let mut lifecycle = MockPluginLifecycle::new( + "healthy-plugin", + true, + vec![ + healthy_server("alpha", &["search", "read"]), + healthy_server("beta", &["write"]), + ], + DiscoveryResult { + tools: vec![tool("search"), tool("read"), tool("write")], + resources: vec![resource("docs", "file:///docs")], + partial: false, + }, + None, + ); + let config = RuntimePluginConfig::default(); + + // when + let validation = lifecycle.validate_config(&config); + let healthcheck = lifecycle.healthcheck(); + let discovery = lifecycle.discover(); + let shutdown = lifecycle.shutdown(); + let post_shutdown = lifecycle.healthcheck(); + + // then + assert_eq!(validation, Ok(())); + assert_eq!(healthcheck.state, PluginState::Healthy); + assert_eq!(healthcheck.plugin_name, "healthy-plugin"); + assert_eq!(discovery.tools.len(), 3); + assert_eq!(discovery.resources.len(), 1); + assert!(!discovery.partial); + assert_eq!(shutdown, Ok(())); + assert_eq!(post_shutdown.state, PluginState::Stopped); + } + + #[test] + fn degraded_startup_when_one_of_three_servers_fails() { + // given + let lifecycle = MockPluginLifecycle::new( + "degraded-plugin", + true, + vec![ + healthy_server("alpha", &["search"]), + failed_server("beta", &["write"], "connection refused"), + healthy_server("gamma", &["read"]), + ], + DiscoveryResult { + tools: vec![tool("search"), tool("read")], + resources: vec![resource("alpha-docs", "file:///alpha")], + partial: true, + }, + None, + ); + + // when + let healthcheck = lifecycle.healthcheck(); + let discovery = lifecycle.discover(); + let degraded_mode = DegradedMode::new( + discovery + .tools + .iter() + .map(|tool| tool.name.clone()) + .collect(), + vec!["write".to_string()], + "server beta failed during startup", + ); + + // then + match healthcheck.state { + PluginState::Degraded { + healthy_servers, + failed_servers, + } => { + assert_eq!( + healthy_servers, + vec!["alpha".to_string(), "gamma".to_string()] + ); + assert_eq!(failed_servers.len(), 1); + assert_eq!(failed_servers[0].server_name, "beta"); + assert_eq!( + failed_servers[0].last_error.as_deref(), + Some("connection refused") + ); + } + other => panic!("expected degraded state, got {other:?}"), + } + assert!(discovery.partial); + assert_eq!( + degraded_mode.available_tools, + vec!["search".to_string(), "read".to_string()] + ); + assert_eq!(degraded_mode.unavailable_tools, vec!["write".to_string()]); + assert_eq!(degraded_mode.reason, "server beta failed during startup"); + } + + #[test] + fn complete_failure_when_all_servers_fail() { + // given + let lifecycle = MockPluginLifecycle::new( + "failed-plugin", + true, + vec![ + failed_server("alpha", &["search"], "timeout"), + failed_server("beta", &["read"], "handshake failed"), + ], + DiscoveryResult { + tools: Vec::new(), + resources: Vec::new(), + partial: true, + }, + None, + ); + + // when + let healthcheck = lifecycle.healthcheck(); + let discovery = lifecycle.discover(); + + // then + match healthcheck.state { + PluginState::Failed { reason } => { + assert_eq!(reason, "all 2 servers failed"); + } + other => panic!("expected failed state, got {other:?}"), + } + assert!(discovery.partial); + assert!(discovery.tools.is_empty()); + assert!(discovery.resources.is_empty()); + } + + #[test] + fn graceful_shutdown() { + // given + let mut lifecycle = MockPluginLifecycle::new( + "shutdown-plugin", + true, + vec![healthy_server("alpha", &["search"])], + DiscoveryResult { + tools: vec![tool("search")], + resources: Vec::new(), + partial: false, + }, + None, + ); + + // when + let shutdown = lifecycle.shutdown(); + let post_shutdown = lifecycle.healthcheck(); + + // then + assert_eq!(shutdown, Ok(())); + assert_eq!(PluginLifecycleEvent::Shutdown.to_string(), "shutdown"); + assert_eq!(post_shutdown.state, PluginState::Stopped); + } +} From 18340b561ee8657e6c2617ca58df56d04af15226 Mon Sep 17 00:00:00 2001 From: Jobdori Date: Sat, 4 Apr 2026 00:41:51 +0900 Subject: [PATCH 2/2] feat(runtime): first-class plugin lifecycle contract with degraded-mode support --- rust/crates/runtime/src/mcp_tool_bridge.rs | 9 +- rust/crates/runtime/src/plugin_lifecycle.rs | 105 +++++++++++++++++--- 2 files changed, 96 insertions(+), 18 deletions(-) diff --git a/rust/crates/runtime/src/mcp_tool_bridge.rs b/rust/crates/runtime/src/mcp_tool_bridge.rs index f15a102..fa7cbb6 100644 --- a/rust/crates/runtime/src/mcp_tool_bridge.rs +++ b/rust/crates/runtime/src/mcp_tool_bridge.rs @@ -184,7 +184,10 @@ impl McpToolRegistry { let mut manager = manager .lock() .map_err(|_| "mcp server manager lock poisoned".to_string())?; - manager.discover_tools().await.map_err(|error| error.to_string())?; + manager + .discover_tools() + .await + .map_err(|error| error.to_string())?; let response = manager .call_tool(&qualified_tool_name, arguments) .await @@ -827,7 +830,9 @@ mod tests { None, ); registry - .set_manager(Arc::new(Mutex::new(McpServerManager::from_servers(&servers)))) + .set_manager(Arc::new(Mutex::new(McpServerManager::from_servers( + &servers, + )))) .expect("manager should only be set once"); let result = registry diff --git a/rust/crates/runtime/src/plugin_lifecycle.rs b/rust/crates/runtime/src/plugin_lifecycle.rs index 4400599..b67837a 100644 --- a/rust/crates/runtime/src/plugin_lifecycle.rs +++ b/rust/crates/runtime/src/plugin_lifecycle.rs @@ -70,16 +70,19 @@ impl PluginState { let healthy_servers = servers .iter() - .filter(|server| server.status == ServerStatus::Healthy) + .filter(|server| server.status != ServerStatus::Failed) .map(|server| server.server_name.clone()) .collect::>(); let failed_servers = servers .iter() - .filter(|server| server.status != ServerStatus::Healthy) + .filter(|server| server.status == ServerStatus::Failed) .cloned() .collect::>(); + let has_degraded_server = servers + .iter() + .any(|server| server.status == ServerStatus::Degraded); - if failed_servers.is_empty() { + if failed_servers.is_empty() && !has_degraded_server { Self::Healthy } else if healthy_servers.is_empty() { Self::Failed { @@ -128,6 +131,32 @@ impl PluginHealthcheck { last_check: now_secs(), } } + + #[must_use] + pub fn degraded_mode(&self, discovery: &DiscoveryResult) -> Option { + match &self.state { + PluginState::Degraded { + healthy_servers, + failed_servers, + } => Some(DegradedMode { + available_tools: discovery + .tools + .iter() + .map(|tool| tool.name.clone()) + .collect(), + unavailable_tools: failed_servers + .iter() + .flat_map(|server| server.capabilities.iter().cloned()) + .collect(), + reason: format!( + "{} servers healthy, {} servers failed", + healthy_servers.len(), + failed_servers.len() + ), + }), + _ => None, + } + } } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -284,6 +313,18 @@ mod tests { } } + fn degraded_server(name: &str, capabilities: &[&str], error: &str) -> ServerHealth { + ServerHealth { + server_name: name.to_string(), + status: ServerStatus::Degraded, + capabilities: capabilities + .iter() + .map(|capability| capability.to_string()) + .collect(), + last_error: Some(error.to_string()), + } + } + fn tool(name: &str) -> ToolInfo { ToolInfo { name: name.to_string(), @@ -360,15 +401,9 @@ mod tests { // when let healthcheck = lifecycle.healthcheck(); let discovery = lifecycle.discover(); - let degraded_mode = DegradedMode::new( - discovery - .tools - .iter() - .map(|tool| tool.name.clone()) - .collect(), - vec!["write".to_string()], - "server beta failed during startup", - ); + let degraded_mode = healthcheck + .degraded_mode(&discovery) + .expect("degraded startup should expose degraded mode"); // then match healthcheck.state { @@ -395,7 +430,44 @@ mod tests { vec!["search".to_string(), "read".to_string()] ); assert_eq!(degraded_mode.unavailable_tools, vec!["write".to_string()]); - assert_eq!(degraded_mode.reason, "server beta failed during startup"); + assert_eq!(degraded_mode.reason, "2 servers healthy, 1 servers failed"); + } + + #[test] + fn degraded_server_status_keeps_server_usable() { + // given + let lifecycle = MockPluginLifecycle::new( + "soft-degraded-plugin", + true, + vec![ + healthy_server("alpha", &["search"]), + degraded_server("beta", &["write"], "high latency"), + ], + DiscoveryResult { + tools: vec![tool("search"), tool("write")], + resources: Vec::new(), + partial: true, + }, + None, + ); + + // when + let healthcheck = lifecycle.healthcheck(); + + // then + match healthcheck.state { + PluginState::Degraded { + healthy_servers, + failed_servers, + } => { + assert_eq!( + healthy_servers, + vec!["alpha".to_string(), "beta".to_string()] + ); + assert!(failed_servers.is_empty()); + } + other => panic!("expected degraded state, got {other:?}"), + } } #[test] @@ -411,7 +483,7 @@ mod tests { DiscoveryResult { tools: Vec::new(), resources: Vec::new(), - partial: true, + partial: false, }, None, ); @@ -421,15 +493,16 @@ mod tests { let discovery = lifecycle.discover(); // then - match healthcheck.state { + match &healthcheck.state { PluginState::Failed { reason } => { assert_eq!(reason, "all 2 servers failed"); } other => panic!("expected failed state, got {other:?}"), } - assert!(discovery.partial); + assert!(!discovery.partial); assert!(discovery.tools.is_empty()); assert!(discovery.resources.is_empty()); + assert!(healthcheck.degraded_mode(&discovery).is_none()); } #[test]