Prevent worker prompts from outrunning boot readiness

Add a foundational worker_boot control plane and tool surface for
reliable startup. The new registry tracks trust gates, ready-for-prompt
handshakes, prompt delivery attempts, and shell misdelivery recovery so
callers can coordinate worker boot above raw terminal transport.

Constraint: Current main has no tmux-backed worker control API to extend directly
Constraint: First slice must stay deterministic and fully testable in-process
Rejected: Wire the first implementation straight to tmux panes | would couple transport details to unfinished state semantics
Rejected: Ship parser helpers without control tools | would not enforce the ready-before-prompt contract end to end
Confidence: high
Scope-risk: moderate
Reversibility: clean
Directive: Treat WorkerObserve heuristics as a temporary transport adapter and replace them with typed runtime events before widening automation policy
Tested: cargo test -p runtime worker_boot
Tested: cargo test -p tools worker_tools
Tested: cargo check -p runtime -p tools
Not-tested: Real tmux/TTY trust prompts and live worker boot on an actual coding session
Not-tested: Full cargo clippy -p runtime -p tools --all-targets -- -D warnings (fails on pre-existing warnings outside this slice)
This commit is contained in:
Yeachan-Heo
2026-04-03 15:04:52 +00:00
parent 56ee33e057
commit f76311f9d6
3 changed files with 1090 additions and 4 deletions

View File

@@ -18,6 +18,7 @@ use runtime::{
read_file,
task_registry::TaskRegistry,
team_cron_registry::{CronRegistry, TeamRegistry},
worker_boot::{WorkerReadySnapshot, WorkerRegistry},
write_file, ApiClient, ApiRequest, AssistantEvent, BashCommandInput, ContentBlock,
ConversationMessage, ConversationRuntime, GrepSearchInput, MessageRole, PermissionMode,
PermissionPolicy, PromptCacheEvent, RuntimeError, Session, ToolError, ToolExecutor,
@@ -56,6 +57,12 @@ fn global_task_registry() -> &'static TaskRegistry {
REGISTRY.get_or_init(TaskRegistry::new)
}
fn global_worker_registry() -> &'static WorkerRegistry {
use std::sync::OnceLock;
static REGISTRY: OnceLock<WorkerRegistry> = OnceLock::new();
REGISTRY.get_or_init(WorkerRegistry::new)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ToolManifestEntry {
pub name: String,
@@ -806,6 +813,117 @@ pub fn mvp_tool_specs() -> Vec<ToolSpec> {
}),
required_permission: PermissionMode::ReadOnly,
},
ToolSpec {
name: "WorkerCreate",
description: "Create a coding worker boot session with trust-gate and prompt-delivery guards.",
input_schema: json!({
"type": "object",
"properties": {
"cwd": { "type": "string" },
"trusted_roots": {
"type": "array",
"items": { "type": "string" }
},
"auto_recover_prompt_misdelivery": { "type": "boolean" }
},
"required": ["cwd"],
"additionalProperties": false
}),
required_permission: PermissionMode::DangerFullAccess,
},
ToolSpec {
name: "WorkerGet",
description: "Fetch the current worker boot state, last error, and event history.",
input_schema: json!({
"type": "object",
"properties": {
"worker_id": { "type": "string" }
},
"required": ["worker_id"],
"additionalProperties": false
}),
required_permission: PermissionMode::ReadOnly,
},
ToolSpec {
name: "WorkerObserve",
description: "Feed a terminal snapshot into worker boot detection to resolve trust gates, ready handshakes, and prompt misdelivery.",
input_schema: json!({
"type": "object",
"properties": {
"worker_id": { "type": "string" },
"screen_text": { "type": "string" }
},
"required": ["worker_id", "screen_text"],
"additionalProperties": false
}),
required_permission: PermissionMode::ReadOnly,
},
ToolSpec {
name: "WorkerResolveTrust",
description: "Resolve a detected trust prompt so worker boot can continue.",
input_schema: json!({
"type": "object",
"properties": {
"worker_id": { "type": "string" }
},
"required": ["worker_id"],
"additionalProperties": false
}),
required_permission: PermissionMode::DangerFullAccess,
},
ToolSpec {
name: "WorkerAwaitReady",
description: "Return the current ready-handshake verdict for a coding worker.",
input_schema: json!({
"type": "object",
"properties": {
"worker_id": { "type": "string" }
},
"required": ["worker_id"],
"additionalProperties": false
}),
required_permission: PermissionMode::ReadOnly,
},
ToolSpec {
name: "WorkerSendPrompt",
description: "Send a task prompt only after the worker reaches ready_for_prompt; can replay a recovered prompt.",
input_schema: json!({
"type": "object",
"properties": {
"worker_id": { "type": "string" },
"prompt": { "type": "string" }
},
"required": ["worker_id"],
"additionalProperties": false
}),
required_permission: PermissionMode::DangerFullAccess,
},
ToolSpec {
name: "WorkerRestart",
description: "Restart worker boot state after a failed or stale startup.",
input_schema: json!({
"type": "object",
"properties": {
"worker_id": { "type": "string" }
},
"required": ["worker_id"],
"additionalProperties": false
}),
required_permission: PermissionMode::DangerFullAccess,
},
ToolSpec {
name: "WorkerTerminate",
description: "Terminate a worker and mark the lane finished from the control plane.",
input_schema: json!({
"type": "object",
"properties": {
"worker_id": { "type": "string" }
},
"required": ["worker_id"],
"additionalProperties": false
}),
required_permission: PermissionMode::DangerFullAccess,
},
ToolSpec {
name: "TeamCreate",
description: "Create a team of sub-agents for parallel task execution.",
@@ -1059,6 +1177,18 @@ fn execute_tool_with_enforcer(
"TaskStop" => from_value::<TaskIdInput>(input).and_then(run_task_stop),
"TaskUpdate" => from_value::<TaskUpdateInput>(input).and_then(run_task_update),
"TaskOutput" => from_value::<TaskIdInput>(input).and_then(run_task_output),
"WorkerCreate" => from_value::<WorkerCreateInput>(input).and_then(run_worker_create),
"WorkerGet" => from_value::<WorkerIdInput>(input).and_then(run_worker_get),
"WorkerObserve" => from_value::<WorkerObserveInput>(input).and_then(run_worker_observe),
"WorkerResolveTrust" => {
from_value::<WorkerIdInput>(input).and_then(run_worker_resolve_trust)
}
"WorkerAwaitReady" => from_value::<WorkerIdInput>(input).and_then(run_worker_await_ready),
"WorkerSendPrompt" => {
from_value::<WorkerSendPromptInput>(input).and_then(run_worker_send_prompt)
}
"WorkerRestart" => from_value::<WorkerIdInput>(input).and_then(run_worker_restart),
"WorkerTerminate" => from_value::<WorkerIdInput>(input).and_then(run_worker_terminate),
"TeamCreate" => from_value::<TeamCreateInput>(input).and_then(run_team_create),
"TeamDelete" => from_value::<TeamDeleteInput>(input).and_then(run_team_delete),
"CronCreate" => from_value::<CronCreateInput>(input).and_then(run_cron_create),
@@ -1232,6 +1362,60 @@ fn run_task_output(input: TaskIdInput) -> Result<String, String> {
}
}
#[allow(clippy::needless_pass_by_value)]
fn run_worker_create(input: WorkerCreateInput) -> Result<String, String> {
let worker = global_worker_registry().create(
&input.cwd,
&input.trusted_roots,
input.auto_recover_prompt_misdelivery,
);
to_pretty_json(worker)
}
#[allow(clippy::needless_pass_by_value)]
fn run_worker_get(input: WorkerIdInput) -> Result<String, String> {
global_worker_registry().get(&input.worker_id).map_or_else(
|| Err(format!("worker not found: {}", input.worker_id)),
to_pretty_json,
)
}
#[allow(clippy::needless_pass_by_value)]
fn run_worker_observe(input: WorkerObserveInput) -> Result<String, String> {
let worker = global_worker_registry().observe(&input.worker_id, &input.screen_text)?;
to_pretty_json(worker)
}
#[allow(clippy::needless_pass_by_value)]
fn run_worker_resolve_trust(input: WorkerIdInput) -> Result<String, String> {
let worker = global_worker_registry().resolve_trust(&input.worker_id)?;
to_pretty_json(worker)
}
#[allow(clippy::needless_pass_by_value)]
fn run_worker_await_ready(input: WorkerIdInput) -> Result<String, String> {
let snapshot: WorkerReadySnapshot = global_worker_registry().await_ready(&input.worker_id)?;
to_pretty_json(snapshot)
}
#[allow(clippy::needless_pass_by_value)]
fn run_worker_send_prompt(input: WorkerSendPromptInput) -> Result<String, String> {
let worker = global_worker_registry().send_prompt(&input.worker_id, input.prompt.as_deref())?;
to_pretty_json(worker)
}
#[allow(clippy::needless_pass_by_value)]
fn run_worker_restart(input: WorkerIdInput) -> Result<String, String> {
let worker = global_worker_registry().restart(&input.worker_id)?;
to_pretty_json(worker)
}
#[allow(clippy::needless_pass_by_value)]
fn run_worker_terminate(input: WorkerIdInput) -> Result<String, String> {
let worker = global_worker_registry().terminate(&input.worker_id)?;
to_pretty_json(worker)
}
#[allow(clippy::needless_pass_by_value)]
fn run_team_create(input: TeamCreateInput) -> Result<String, String> {
let task_ids: Vec<String> = input
@@ -1799,6 +1983,37 @@ struct TaskUpdateInput {
message: String,
}
#[derive(Debug, Deserialize)]
struct WorkerCreateInput {
cwd: String,
#[serde(default)]
trusted_roots: Vec<String>,
#[serde(default = "default_auto_recover_prompt_misdelivery")]
auto_recover_prompt_misdelivery: bool,
}
#[derive(Debug, Deserialize)]
struct WorkerIdInput {
worker_id: String,
}
#[derive(Debug, Deserialize)]
struct WorkerObserveInput {
worker_id: String,
screen_text: String,
}
#[derive(Debug, Deserialize)]
struct WorkerSendPromptInput {
worker_id: String,
#[serde(default)]
prompt: Option<String>,
}
const fn default_auto_recover_prompt_misdelivery() -> bool {
true
}
#[derive(Debug, Deserialize)]
struct TeamCreateInput {
name: String,
@@ -4623,6 +4838,10 @@ mod tests {
assert!(names.contains(&"StructuredOutput"));
assert!(names.contains(&"REPL"));
assert!(names.contains(&"PowerShell"));
assert!(names.contains(&"WorkerCreate"));
assert!(names.contains(&"WorkerObserve"));
assert!(names.contains(&"WorkerAwaitReady"));
assert!(names.contains(&"WorkerSendPrompt"));
}
#[test]
@@ -4631,6 +4850,139 @@ mod tests {
assert!(error.contains("unsupported tool"));
}
#[test]
fn worker_tools_gate_prompt_delivery_until_ready_and_support_auto_trust() {
let created = execute_tool(
"WorkerCreate",
&json!({
"cwd": "/tmp/worktree/repo",
"trusted_roots": ["/tmp/worktree"]
}),
)
.expect("WorkerCreate should succeed");
let created_output: serde_json::Value = serde_json::from_str(&created).expect("json");
let worker_id = created_output["worker_id"]
.as_str()
.expect("worker id")
.to_string();
assert_eq!(created_output["status"], "spawning");
assert_eq!(created_output["trust_auto_resolve"], true);
let gated = execute_tool(
"WorkerSendPrompt",
&json!({
"worker_id": worker_id,
"prompt": "ship the change"
}),
)
.expect_err("prompt delivery before ready should fail");
assert!(gated.contains("not ready for prompt delivery"));
let observed = execute_tool(
"WorkerObserve",
&json!({
"worker_id": created_output["worker_id"],
"screen_text": "Do you trust the files in this folder?\n1. Yes, proceed\n2. No"
}),
)
.expect("WorkerObserve should auto-resolve trust");
let observed_output: serde_json::Value = serde_json::from_str(&observed).expect("json");
assert_eq!(observed_output["status"], "spawning");
assert_eq!(observed_output["trust_gate_cleared"], true);
let ready = execute_tool(
"WorkerObserve",
&json!({
"worker_id": created_output["worker_id"],
"screen_text": "Ready for your input\n>"
}),
)
.expect("WorkerObserve should mark worker ready");
let ready_output: serde_json::Value = serde_json::from_str(&ready).expect("json");
assert_eq!(ready_output["status"], "ready_for_prompt");
let await_ready = execute_tool(
"WorkerAwaitReady",
&json!({
"worker_id": created_output["worker_id"]
}),
)
.expect("WorkerAwaitReady should succeed");
let await_ready_output: serde_json::Value =
serde_json::from_str(&await_ready).expect("json");
assert_eq!(await_ready_output["ready"], true);
let accepted = execute_tool(
"WorkerSendPrompt",
&json!({
"worker_id": created_output["worker_id"],
"prompt": "ship the change"
}),
)
.expect("WorkerSendPrompt should succeed after ready");
let accepted_output: serde_json::Value = serde_json::from_str(&accepted).expect("json");
assert_eq!(accepted_output["status"], "prompt_accepted");
assert_eq!(accepted_output["prompt_delivery_attempts"], 1);
}
#[test]
fn worker_tools_detect_misdelivery_and_arm_prompt_replay() {
let created = execute_tool(
"WorkerCreate",
&json!({
"cwd": "/tmp/repo/worker-misdelivery"
}),
)
.expect("WorkerCreate should succeed");
let created_output: serde_json::Value = serde_json::from_str(&created).expect("json");
let worker_id = created_output["worker_id"]
.as_str()
.expect("worker id")
.to_string();
execute_tool(
"WorkerObserve",
&json!({
"worker_id": worker_id,
"screen_text": "Ready for input\n>"
}),
)
.expect("worker should become ready");
execute_tool(
"WorkerSendPrompt",
&json!({
"worker_id": worker_id,
"prompt": "Investigate flaky boot"
}),
)
.expect("prompt send should succeed");
let recovered = execute_tool(
"WorkerObserve",
&json!({
"worker_id": worker_id,
"screen_text": "% Investigate flaky boot\nzsh: command not found: Investigate"
}),
)
.expect("misdelivery observe should succeed");
let recovered_output: serde_json::Value = serde_json::from_str(&recovered).expect("json");
assert_eq!(recovered_output["status"], "ready_for_prompt");
assert_eq!(recovered_output["last_error"]["kind"], "prompt_delivery");
assert_eq!(recovered_output["replay_prompt"], "Investigate flaky boot");
let replayed = execute_tool(
"WorkerSendPrompt",
&json!({
"worker_id": worker_id
}),
)
.expect("WorkerSendPrompt should replay recovered prompt");
let replayed_output: serde_json::Value = serde_json::from_str(&replayed).expect("json");
assert_eq!(replayed_output["status"], "prompt_accepted");
assert_eq!(replayed_output["prompt_delivery_attempts"], 2);
}
#[test]
fn global_tool_registry_denies_blocked_tool_before_dispatch() {
// given
@@ -6326,10 +6678,7 @@ printf 'pwsh:%s' "$1"
fs::write(&file, "content\n").expect("write test file");
let registry = read_only_registry();
let result = registry.execute(
"read_file",
&json!({ "path": file.display().to_string() }),
);
let result = registry.execute("read_file", &json!({ "path": file.display().to_string() }));
assert!(result.is_ok(), "read_file should be allowed: {result:?}");
let _ = fs::remove_dir_all(root);