Merge remote-tracking branch 'origin/rcc/hook-pipeline' into integration/dori-cleanroom

# Conflicts:
#	rust/crates/runtime/src/config.rs
#	rust/crates/runtime/src/conversation.rs
#	rust/crates/runtime/src/hooks.rs
#	rust/crates/runtime/src/lib.rs
#	rust/crates/rusty-claude-cli/src/main.rs
#	rust/crates/rusty-claude-cli/src/render.rs
This commit is contained in:
YeonGyu-Kim
2026-04-02 11:05:03 +09:00
8 changed files with 1504 additions and 181 deletions

View File

@@ -10,9 +10,9 @@ use std::io::{self, Read, Write};
use std::net::TcpListener;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::mpsc::{self, RecvTimeoutError};
use std::sync::mpsc::{self, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use api::{
@@ -1080,6 +1080,61 @@ struct LiveCli {
session: SessionHandle,
}
struct HookAbortMonitor {
stop_tx: Option<Sender<()>>,
join_handle: Option<JoinHandle<()>>,
}
impl HookAbortMonitor {
fn spawn(abort_signal: runtime::HookAbortSignal) -> Self {
Self::spawn_with_waiter(abort_signal, move |stop_rx, abort_signal| {
let Ok(runtime) = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
else {
return;
};
runtime.block_on(async move {
let wait_for_stop = tokio::task::spawn_blocking(move || {
let _ = stop_rx.recv();
});
tokio::select! {
result = tokio::signal::ctrl_c() => {
if result.is_ok() {
abort_signal.abort();
}
}
_ = wait_for_stop => {}
}
});
})
}
fn spawn_with_waiter<F>(abort_signal: runtime::HookAbortSignal, wait_for_interrupt: F) -> Self
where
F: FnOnce(Receiver<()>, runtime::HookAbortSignal) + Send + 'static,
{
let (stop_tx, stop_rx) = mpsc::channel();
let join_handle = thread::spawn(move || wait_for_interrupt(stop_rx, abort_signal));
Self {
stop_tx: Some(stop_tx),
join_handle: Some(join_handle),
}
}
fn stop(mut self) {
if let Some(stop_tx) = self.stop_tx.take() {
let _ = stop_tx.send(());
}
if let Some(join_handle) = self.join_handle.take() {
let _ = join_handle.join();
}
}
}
impl LiveCli {
fn new(
model: String,
@@ -1136,7 +1191,35 @@ impl LiveCli {
)
}
fn prepare_turn_runtime(
&self,
emit_output: bool,
) -> Result<
(
ConversationRuntime<AnthropicRuntimeClient, CliToolExecutor>,
HookAbortMonitor,
),
Box<dyn std::error::Error>,
> {
let hook_abort_signal = runtime::HookAbortSignal::new();
let runtime = build_runtime(
self.runtime.session().clone(),
self.model.clone(),
self.system_prompt.clone(),
true,
emit_output,
self.allowed_tools.clone(),
self.permission_mode,
None,
)?
.with_hook_abort_signal(hook_abort_signal.clone());
let hook_abort_monitor = HookAbortMonitor::spawn(hook_abort_signal);
Ok((runtime, hook_abort_monitor))
}
fn run_turn(&mut self, input: &str) -> Result<(), Box<dyn std::error::Error>> {
let (mut runtime, hook_abort_monitor) = self.prepare_turn_runtime(true)?;
let mut spinner = Spinner::new();
let mut stdout = io::stdout();
spinner.tick(
@@ -1145,7 +1228,9 @@ impl LiveCli {
&mut stdout,
)?;
let mut permission_prompter = CliPermissionPrompter::new(self.permission_mode);
let result = self.runtime.run_turn(input, Some(&mut permission_prompter));
let result = runtime.run_turn(input, Some(&mut permission_prompter));
hook_abort_monitor.stop();
self.runtime = runtime;
match result {
Ok(summary) => {
spinner.finish(
@@ -1186,19 +1271,11 @@ impl LiveCli {
}
fn run_prompt_json(&mut self, input: &str) -> Result<(), Box<dyn std::error::Error>> {
let session = self.runtime.session().clone();
let mut runtime = build_runtime(
session,
self.model.clone(),
self.system_prompt.clone(),
true,
false,
self.allowed_tools.clone(),
self.permission_mode,
None,
)?;
let (mut runtime, hook_abort_monitor) = self.prepare_turn_runtime(false)?;
let mut permission_prompter = CliPermissionPrompter::new(self.permission_mode);
let summary = runtime.run_turn(input, Some(&mut permission_prompter))?;
let result = runtime.run_turn(input, Some(&mut permission_prompter));
hook_abort_monitor.stop();
let summary = result?;
self.runtime = runtime;
self.persist_session()?;
println!(
@@ -2948,7 +3025,7 @@ fn build_runtime(
progress_reporter: Option<InternalPromptProgressReporter>,
) -> Result<ConversationRuntime<AnthropicRuntimeClient, CliToolExecutor>, Box<dyn std::error::Error>> {
let (feature_config, tool_registry) = build_runtime_plugin_state()?;
Ok(ConversationRuntime::new_with_features(
let mut runtime = ConversationRuntime::new_with_features(
session,
AnthropicRuntimeClient::new(
model,
@@ -2959,10 +3036,47 @@ fn build_runtime(
progress_reporter,
)?,
CliToolExecutor::new(allowed_tools.clone(), emit_output, tool_registry.clone()),
permission_policy(permission_mode, &tool_registry),
permission_policy(permission_mode, &feature_config, &tool_registry),
system_prompt,
feature_config,
))
);
if emit_output {
runtime = runtime.with_hook_progress_reporter(Box::new(CliHookProgressReporter));
}
Ok(runtime)
}
struct CliHookProgressReporter;
impl runtime::HookProgressReporter for CliHookProgressReporter {
fn on_event(&mut self, event: &runtime::HookProgressEvent) {
match event {
runtime::HookProgressEvent::Started {
event,
tool_name,
command,
} => eprintln!(
"[hook {event_name}] {tool_name}: {command}",
event_name = event.as_str()
),
runtime::HookProgressEvent::Completed {
event,
tool_name,
command,
} => eprintln!(
"[hook done {event_name}] {tool_name}: {command}",
event_name = event.as_str()
),
runtime::HookProgressEvent::Cancelled {
event,
tool_name,
command,
} => eprintln!(
"[hook cancelled {event_name}] {tool_name}: {command}",
event_name = event.as_str()
),
}
}
}
struct CliPermissionPrompter {
@@ -2985,6 +3099,9 @@ impl runtime::PermissionPrompter for CliPermissionPrompter {
println!(" Tool {}", request.tool_name);
println!(" Current mode {}", self.current_mode.as_str());
println!(" Required mode {}", request.required_mode.as_str());
if let Some(reason) = &request.reason {
println!(" Reason {reason}");
}
println!(" Input {}", request.input);
print!("Approve this tool call? [y/N]: ");
let _ = io::stdout().flush();
@@ -3817,9 +3934,13 @@ impl ToolExecutor for CliToolExecutor {
}
}
fn permission_policy(mode: PermissionMode, tool_registry: &GlobalToolRegistry) -> PermissionPolicy {
fn permission_policy(
mode: PermissionMode,
feature_config: &runtime::RuntimeFeatureConfig,
tool_registry: &GlobalToolRegistry,
) -> PermissionPolicy {
tool_registry.permission_specs(None).into_iter().fold(
PermissionPolicy::new(mode),
PermissionPolicy::new(mode).with_permission_rules(feature_config.permission_rules()),
|policy, (name, required_permission)| {
policy.with_tool_requirement(name, required_permission)
},
@@ -3975,13 +4096,14 @@ mod tests {
push_output_block, render_config_report, render_diff_report, render_memory_report,
render_repl_help, resolve_model_alias, response_to_events,
resume_supported_slash_commands, run_resume_command, status_context, CliAction,
CliOutputFormat, InternalPromptProgressEvent, InternalPromptProgressState, SlashCommand,
StatusUsage, DEFAULT_MODEL,
CliOutputFormat, HookAbortMonitor, InternalPromptProgressEvent,
InternalPromptProgressState, SlashCommand, StatusUsage, DEFAULT_MODEL,
};
use api::{MessageResponse, OutputContentBlock, Usage};
use plugins::{PluginTool, PluginToolDefinition, PluginToolPermission};
use runtime::{
AssistantEvent, ContentBlock, ConversationMessage, MessageRole, PermissionMode, Session,
AssistantEvent, ContentBlock, ConversationMessage, HookAbortSignal, MessageRole,
PermissionMode, Session,
};
use serde_json::json;
use std::fs;
@@ -4050,6 +4172,7 @@ mod tests {
std::env::set_current_dir(previous).expect("cwd should restore");
result
}
use std::sync::mpsc;
#[test]
fn defaults_to_repl_when_no_args() {
@@ -5101,4 +5224,43 @@ mod sandbox_report_tests {
assert!(report.contains("Filesystem mode"));
assert!(report.contains("Fallback reason"));
}
#[test]
fn hook_abort_monitor_stops_without_aborting() {
let abort_signal = HookAbortSignal::new();
let (ready_tx, ready_rx) = mpsc::channel();
let monitor = HookAbortMonitor::spawn_with_waiter(
abort_signal.clone(),
move |stop_rx, abort_signal| {
ready_tx.send(()).expect("ready signal");
let _ = stop_rx.recv();
assert!(!abort_signal.is_aborted());
},
);
ready_rx.recv().expect("waiter should be ready");
monitor.stop();
assert!(!abort_signal.is_aborted());
}
#[test]
fn hook_abort_monitor_propagates_interrupt() {
let abort_signal = HookAbortSignal::new();
let (done_tx, done_rx) = mpsc::channel();
let monitor = HookAbortMonitor::spawn_with_waiter(
abort_signal.clone(),
move |_stop_rx, abort_signal| {
abort_signal.abort();
done_tx.send(()).expect("done signal");
},
);
done_rx
.recv_timeout(Duration::from_secs(1))
.expect("interrupt should complete");
monitor.stop();
assert!(abort_signal.is_aborted());
}
}