From dab16c230a12f13da5f0979cefe1ac250963fc3a Mon Sep 17 00:00:00 2001 From: YeonGyu-Kim Date: Tue, 7 Apr 2026 15:15:42 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20b5-session-export=20=E2=80=94=20batch?= =?UTF-8?q?=205=20wave=202?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rust/crates/rusty-claude-cli/src/main.rs | 305 ++++++++++++++--------- 1 file changed, 188 insertions(+), 117 deletions(-) diff --git a/rust/crates/rusty-claude-cli/src/main.rs b/rust/crates/rusty-claude-cli/src/main.rs index 189a65a..497a2f9 100644 --- a/rust/crates/rusty-claude-cli/src/main.rs +++ b/rust/crates/rusty-claude-cli/src/main.rs @@ -69,6 +69,7 @@ const VERSION: &str = env!("CARGO_PKG_VERSION"); const BUILD_TARGET: Option<&str> = option_env!("TARGET"); const GIT_SHA: Option<&str> = option_env!("GIT_SHA"); const INTERNAL_PROGRESS_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(3); +const POST_TOOL_STALL_TIMEOUT: Duration = Duration::from_secs(10); const PRIMARY_SESSION_EXTENSION: &str = "jsonl"; const LEGACY_SESSION_EXTENSION: &str = "json"; const LATEST_SESSION_REFERENCE: &str = "latest"; @@ -6288,6 +6289,7 @@ impl ApiClient for AnthropicRuntimeClient { if let Some(progress_reporter) = &self.progress_reporter { progress_reporter.mark_model_phase(); } + let is_post_tool = request_ends_with_tool_result(&request); let message_request = MessageRequest { model: self.model.clone(), max_tokens: max_tokens_for_model(&self.model), @@ -6301,46 +6303,96 @@ impl ApiClient for AnthropicRuntimeClient { }; self.runtime.block_on(async { - let mut stream = - self.client - .stream_message(&message_request) - .await - .map_err(|error| { - RuntimeError::new(format_user_visible_api_error(&self.session_id, &error)) - })?; - let mut stdout = io::stdout(); - let mut sink = io::sink(); - let out: &mut dyn Write = if self.emit_output { - &mut stdout - } else { - &mut sink - }; - let renderer = TerminalRenderer::new(); - let mut markdown_stream = MarkdownStreamState::default(); - let mut events = Vec::new(); - let mut pending_tool: Option<(String, String, String)> = None; - let mut block_has_thinking_summary = false; - let mut saw_stop = false; + // When resuming after tool execution, apply a stall timeout on the + // first stream event. If the model does not respond within the + // deadline we drop the stalled connection and re-send the request as + // a continuation nudge (one retry only). + let max_attempts: usize = if is_post_tool { 2 } else { 1 }; - while let Some(event) = stream.next_event().await.map_err(|error| { - RuntimeError::new(format_user_visible_api_error(&self.session_id, &error)) - })? { - match event { - ApiStreamEvent::MessageStart(start) => { - for block in start.message.content { - push_output_block( - block, - out, - &mut events, - &mut pending_tool, - true, - &mut block_has_thinking_summary, - )?; - } + for attempt in 1..=max_attempts { + let result = self + .consume_stream(&message_request, is_post_tool && attempt == 1) + .await; + match result { + Ok(events) => return Ok(events), + Err(error) if error.to_string().contains("post-tool stall") && attempt < max_attempts => { + // Stalled after tool completion — nudge the model by + // re-sending the same request. + continue; } - ApiStreamEvent::ContentBlockStart(start) => { + Err(error) => return Err(error), + } + } + + Err(RuntimeError::new( + "post-tool continuation nudge exhausted", + )) + }) + } +} + +impl AnthropicRuntimeClient { + /// Consume a single streaming response, optionally applying a stall + /// timeout on the first event for post-tool continuations. + #[allow(clippy::too_many_lines)] + async fn consume_stream( + &self, + message_request: &MessageRequest, + apply_stall_timeout: bool, + ) -> Result, RuntimeError> { + let mut stream = + self.client + .stream_message(message_request) + .await + .map_err(|error| { + RuntimeError::new(format_user_visible_api_error(&self.session_id, &error)) + })?; + let mut stdout = io::stdout(); + let mut sink = io::sink(); + let out: &mut dyn Write = if self.emit_output { + &mut stdout + } else { + &mut sink + }; + let renderer = TerminalRenderer::new(); + let mut markdown_stream = MarkdownStreamState::default(); + let mut events = Vec::new(); + let mut pending_tool: Option<(String, String, String)> = None; + let mut block_has_thinking_summary = false; + let mut saw_stop = false; + let mut received_any_event = false; + + loop { + let next = if apply_stall_timeout && !received_any_event { + match tokio::time::timeout(POST_TOOL_STALL_TIMEOUT, stream.next_event()).await { + Ok(inner) => inner.map_err(|error| { + RuntimeError::new(format_user_visible_api_error( + &self.session_id, + &error, + )) + })?, + Err(_elapsed) => { + return Err(RuntimeError::new( + "post-tool stall: model did not respond within timeout", + )); + } + } + } else { + stream.next_event().await.map_err(|error| { + RuntimeError::new(format_user_visible_api_error(&self.session_id, &error)) + })? + }; + + let Some(event) = next else { + break; + }; + received_any_event = true; + + match event { + ApiStreamEvent::MessageStart(start) => { + for block in start.message.content { push_output_block( - start.content_block, + block, out, &mut events, &mut pending_tool, @@ -6348,101 +6400,120 @@ impl ApiClient for AnthropicRuntimeClient { &mut block_has_thinking_summary, )?; } - ApiStreamEvent::ContentBlockDelta(delta) => match delta.delta { - ContentBlockDelta::TextDelta { text } => { - if !text.is_empty() { - if let Some(progress_reporter) = &self.progress_reporter { - progress_reporter.mark_text_phase(&text); - } - if let Some(rendered) = markdown_stream.push(&renderer, &text) { - write!(out, "{rendered}") - .and_then(|()| out.flush()) - .map_err(|error| RuntimeError::new(error.to_string()))?; - } - events.push(AssistantEvent::TextDelta(text)); - } - } - ContentBlockDelta::InputJsonDelta { partial_json } => { - if let Some((_, _, input)) = &mut pending_tool { - input.push_str(&partial_json); - } - } - ContentBlockDelta::ThinkingDelta { .. } => { - if !block_has_thinking_summary { - render_thinking_block_summary(out, None, false)?; - block_has_thinking_summary = true; - } - } - ContentBlockDelta::SignatureDelta { .. } => {} - }, - ApiStreamEvent::ContentBlockStop(_) => { - block_has_thinking_summary = false; - if let Some(rendered) = markdown_stream.flush(&renderer) { - write!(out, "{rendered}") - .and_then(|()| out.flush()) - .map_err(|error| RuntimeError::new(error.to_string()))?; - } - if let Some((id, name, input)) = pending_tool.take() { + } + ApiStreamEvent::ContentBlockStart(start) => { + push_output_block( + start.content_block, + out, + &mut events, + &mut pending_tool, + true, + &mut block_has_thinking_summary, + )?; + } + ApiStreamEvent::ContentBlockDelta(delta) => match delta.delta { + ContentBlockDelta::TextDelta { text } => { + if !text.is_empty() { if let Some(progress_reporter) = &self.progress_reporter { - progress_reporter.mark_tool_phase(&name, &input); + progress_reporter.mark_text_phase(&text); } - // Display tool call now that input is fully accumulated - writeln!(out, "\n{}", format_tool_call_start(&name, &input)) - .and_then(|()| out.flush()) - .map_err(|error| RuntimeError::new(error.to_string()))?; - events.push(AssistantEvent::ToolUse { id, name, input }); + if let Some(rendered) = markdown_stream.push(&renderer, &text) { + write!(out, "{rendered}") + .and_then(|()| out.flush()) + .map_err(|error| RuntimeError::new(error.to_string()))?; + } + events.push(AssistantEvent::TextDelta(text)); } } - ApiStreamEvent::MessageDelta(delta) => { - events.push(AssistantEvent::Usage(delta.usage.token_usage())); - } - ApiStreamEvent::MessageStop(_) => { - saw_stop = true; - if let Some(rendered) = markdown_stream.flush(&renderer) { - write!(out, "{rendered}") - .and_then(|()| out.flush()) - .map_err(|error| RuntimeError::new(error.to_string()))?; + ContentBlockDelta::InputJsonDelta { partial_json } => { + if let Some((_, _, input)) = &mut pending_tool { + input.push_str(&partial_json); } - events.push(AssistantEvent::MessageStop); + } + ContentBlockDelta::ThinkingDelta { .. } => { + if !block_has_thinking_summary { + render_thinking_block_summary(out, None, false)?; + block_has_thinking_summary = true; + } + } + ContentBlockDelta::SignatureDelta { .. } => {} + }, + ApiStreamEvent::ContentBlockStop(_) => { + block_has_thinking_summary = false; + if let Some(rendered) = markdown_stream.flush(&renderer) { + write!(out, "{rendered}") + .and_then(|()| out.flush()) + .map_err(|error| RuntimeError::new(error.to_string()))?; + } + if let Some((id, name, input)) = pending_tool.take() { + if let Some(progress_reporter) = &self.progress_reporter { + progress_reporter.mark_tool_phase(&name, &input); + } + // Display tool call now that input is fully accumulated + writeln!(out, "\n{}", format_tool_call_start(&name, &input)) + .and_then(|()| out.flush()) + .map_err(|error| RuntimeError::new(error.to_string()))?; + events.push(AssistantEvent::ToolUse { id, name, input }); } } + ApiStreamEvent::MessageDelta(delta) => { + events.push(AssistantEvent::Usage(delta.usage.token_usage())); + } + ApiStreamEvent::MessageStop(_) => { + saw_stop = true; + if let Some(rendered) = markdown_stream.flush(&renderer) { + write!(out, "{rendered}") + .and_then(|()| out.flush()) + .map_err(|error| RuntimeError::new(error.to_string()))?; + } + events.push(AssistantEvent::MessageStop); + } } + } - push_prompt_cache_record(&self.client, &mut events); + push_prompt_cache_record(&self.client, &mut events); - if !saw_stop - && events.iter().any(|event| { - matches!(event, AssistantEvent::TextDelta(text) if !text.is_empty()) - || matches!(event, AssistantEvent::ToolUse { .. }) - }) - { - events.push(AssistantEvent::MessageStop); - } + if !saw_stop + && events.iter().any(|event| { + matches!(event, AssistantEvent::TextDelta(text) if !text.is_empty()) + || matches!(event, AssistantEvent::ToolUse { .. }) + }) + { + events.push(AssistantEvent::MessageStop); + } - if events - .iter() - .any(|event| matches!(event, AssistantEvent::MessageStop)) - { - return Ok(events); - } + if events + .iter() + .any(|event| matches!(event, AssistantEvent::MessageStop)) + { + return Ok(events); + } - let response = self - .client - .send_message(&MessageRequest { - stream: false, - ..message_request.clone() - }) - .await - .map_err(|error| { - RuntimeError::new(format_user_visible_api_error(&self.session_id, &error)) - })?; - let mut events = response_to_events(response, out)?; - push_prompt_cache_record(&self.client, &mut events); - Ok(events) - }) + let response = self + .client + .send_message(&MessageRequest { + stream: false, + ..message_request.clone() + }) + .await + .map_err(|error| { + RuntimeError::new(format_user_visible_api_error(&self.session_id, &error)) + })?; + let mut events = response_to_events(response, out)?; + push_prompt_cache_record(&self.client, &mut events); + Ok(events) } } +/// Returns `true` when the conversation ends with a tool-result message, +/// meaning the model is expected to continue after tool execution. +fn request_ends_with_tool_result(request: &ApiRequest) -> bool { + request + .messages + .last() + .is_some_and(|message| message.role == MessageRole::Tool) +} + fn format_user_visible_api_error(session_id: &str, error: &api::ApiError) -> String { if error.is_context_window_failure() { format_context_window_blocked_error(session_id, error)