feat: b5-session-export — batch 5 wave 2

This commit is contained in:
YeonGyu-Kim
2026-04-07 15:15:42 +09:00
parent a46711779c
commit dab16c230a

View File

@@ -69,6 +69,7 @@ const VERSION: &str = env!("CARGO_PKG_VERSION");
const BUILD_TARGET: Option<&str> = option_env!("TARGET"); const BUILD_TARGET: Option<&str> = option_env!("TARGET");
const GIT_SHA: Option<&str> = option_env!("GIT_SHA"); const GIT_SHA: Option<&str> = option_env!("GIT_SHA");
const INTERNAL_PROGRESS_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(3); const INTERNAL_PROGRESS_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(3);
const POST_TOOL_STALL_TIMEOUT: Duration = Duration::from_secs(10);
const PRIMARY_SESSION_EXTENSION: &str = "jsonl"; const PRIMARY_SESSION_EXTENSION: &str = "jsonl";
const LEGACY_SESSION_EXTENSION: &str = "json"; const LEGACY_SESSION_EXTENSION: &str = "json";
const LATEST_SESSION_REFERENCE: &str = "latest"; const LATEST_SESSION_REFERENCE: &str = "latest";
@@ -6288,6 +6289,7 @@ impl ApiClient for AnthropicRuntimeClient {
if let Some(progress_reporter) = &self.progress_reporter { if let Some(progress_reporter) = &self.progress_reporter {
progress_reporter.mark_model_phase(); progress_reporter.mark_model_phase();
} }
let is_post_tool = request_ends_with_tool_result(&request);
let message_request = MessageRequest { let message_request = MessageRequest {
model: self.model.clone(), model: self.model.clone(),
max_tokens: max_tokens_for_model(&self.model), max_tokens: max_tokens_for_model(&self.model),
@@ -6301,46 +6303,96 @@ impl ApiClient for AnthropicRuntimeClient {
}; };
self.runtime.block_on(async { self.runtime.block_on(async {
let mut stream = // When resuming after tool execution, apply a stall timeout on the
self.client // first stream event. If the model does not respond within the
.stream_message(&message_request) // deadline we drop the stalled connection and re-send the request as
.await // a continuation nudge (one retry only).
.map_err(|error| { let max_attempts: usize = if is_post_tool { 2 } else { 1 };
RuntimeError::new(format_user_visible_api_error(&self.session_id, &error))
})?;
let mut stdout = io::stdout();
let mut sink = io::sink();
let out: &mut dyn Write = if self.emit_output {
&mut stdout
} else {
&mut sink
};
let renderer = TerminalRenderer::new();
let mut markdown_stream = MarkdownStreamState::default();
let mut events = Vec::new();
let mut pending_tool: Option<(String, String, String)> = None;
let mut block_has_thinking_summary = false;
let mut saw_stop = false;
while let Some(event) = stream.next_event().await.map_err(|error| { for attempt in 1..=max_attempts {
RuntimeError::new(format_user_visible_api_error(&self.session_id, &error)) let result = self
})? { .consume_stream(&message_request, is_post_tool && attempt == 1)
match event { .await;
ApiStreamEvent::MessageStart(start) => { match result {
for block in start.message.content { Ok(events) => return Ok(events),
push_output_block( Err(error) if error.to_string().contains("post-tool stall") && attempt < max_attempts => {
block, // Stalled after tool completion — nudge the model by
out, // re-sending the same request.
&mut events, continue;
&mut pending_tool,
true,
&mut block_has_thinking_summary,
)?;
}
} }
ApiStreamEvent::ContentBlockStart(start) => { Err(error) => return Err(error),
}
}
Err(RuntimeError::new(
"post-tool continuation nudge exhausted",
))
})
}
}
impl AnthropicRuntimeClient {
/// Consume a single streaming response, optionally applying a stall
/// timeout on the first event for post-tool continuations.
#[allow(clippy::too_many_lines)]
async fn consume_stream(
&self,
message_request: &MessageRequest,
apply_stall_timeout: bool,
) -> Result<Vec<AssistantEvent>, RuntimeError> {
let mut stream =
self.client
.stream_message(message_request)
.await
.map_err(|error| {
RuntimeError::new(format_user_visible_api_error(&self.session_id, &error))
})?;
let mut stdout = io::stdout();
let mut sink = io::sink();
let out: &mut dyn Write = if self.emit_output {
&mut stdout
} else {
&mut sink
};
let renderer = TerminalRenderer::new();
let mut markdown_stream = MarkdownStreamState::default();
let mut events = Vec::new();
let mut pending_tool: Option<(String, String, String)> = None;
let mut block_has_thinking_summary = false;
let mut saw_stop = false;
let mut received_any_event = false;
loop {
let next = if apply_stall_timeout && !received_any_event {
match tokio::time::timeout(POST_TOOL_STALL_TIMEOUT, stream.next_event()).await {
Ok(inner) => inner.map_err(|error| {
RuntimeError::new(format_user_visible_api_error(
&self.session_id,
&error,
))
})?,
Err(_elapsed) => {
return Err(RuntimeError::new(
"post-tool stall: model did not respond within timeout",
));
}
}
} else {
stream.next_event().await.map_err(|error| {
RuntimeError::new(format_user_visible_api_error(&self.session_id, &error))
})?
};
let Some(event) = next else {
break;
};
received_any_event = true;
match event {
ApiStreamEvent::MessageStart(start) => {
for block in start.message.content {
push_output_block( push_output_block(
start.content_block, block,
out, out,
&mut events, &mut events,
&mut pending_tool, &mut pending_tool,
@@ -6348,101 +6400,120 @@ impl ApiClient for AnthropicRuntimeClient {
&mut block_has_thinking_summary, &mut block_has_thinking_summary,
)?; )?;
} }
ApiStreamEvent::ContentBlockDelta(delta) => match delta.delta { }
ContentBlockDelta::TextDelta { text } => { ApiStreamEvent::ContentBlockStart(start) => {
if !text.is_empty() { push_output_block(
if let Some(progress_reporter) = &self.progress_reporter { start.content_block,
progress_reporter.mark_text_phase(&text); out,
} &mut events,
if let Some(rendered) = markdown_stream.push(&renderer, &text) { &mut pending_tool,
write!(out, "{rendered}") true,
.and_then(|()| out.flush()) &mut block_has_thinking_summary,
.map_err(|error| RuntimeError::new(error.to_string()))?; )?;
} }
events.push(AssistantEvent::TextDelta(text)); ApiStreamEvent::ContentBlockDelta(delta) => match delta.delta {
} ContentBlockDelta::TextDelta { text } => {
} if !text.is_empty() {
ContentBlockDelta::InputJsonDelta { partial_json } => {
if let Some((_, _, input)) = &mut pending_tool {
input.push_str(&partial_json);
}
}
ContentBlockDelta::ThinkingDelta { .. } => {
if !block_has_thinking_summary {
render_thinking_block_summary(out, None, false)?;
block_has_thinking_summary = true;
}
}
ContentBlockDelta::SignatureDelta { .. } => {}
},
ApiStreamEvent::ContentBlockStop(_) => {
block_has_thinking_summary = false;
if let Some(rendered) = markdown_stream.flush(&renderer) {
write!(out, "{rendered}")
.and_then(|()| out.flush())
.map_err(|error| RuntimeError::new(error.to_string()))?;
}
if let Some((id, name, input)) = pending_tool.take() {
if let Some(progress_reporter) = &self.progress_reporter { if let Some(progress_reporter) = &self.progress_reporter {
progress_reporter.mark_tool_phase(&name, &input); progress_reporter.mark_text_phase(&text);
} }
// Display tool call now that input is fully accumulated if let Some(rendered) = markdown_stream.push(&renderer, &text) {
writeln!(out, "\n{}", format_tool_call_start(&name, &input)) write!(out, "{rendered}")
.and_then(|()| out.flush()) .and_then(|()| out.flush())
.map_err(|error| RuntimeError::new(error.to_string()))?; .map_err(|error| RuntimeError::new(error.to_string()))?;
events.push(AssistantEvent::ToolUse { id, name, input }); }
events.push(AssistantEvent::TextDelta(text));
} }
} }
ApiStreamEvent::MessageDelta(delta) => { ContentBlockDelta::InputJsonDelta { partial_json } => {
events.push(AssistantEvent::Usage(delta.usage.token_usage())); if let Some((_, _, input)) = &mut pending_tool {
} input.push_str(&partial_json);
ApiStreamEvent::MessageStop(_) => {
saw_stop = true;
if let Some(rendered) = markdown_stream.flush(&renderer) {
write!(out, "{rendered}")
.and_then(|()| out.flush())
.map_err(|error| RuntimeError::new(error.to_string()))?;
} }
events.push(AssistantEvent::MessageStop); }
ContentBlockDelta::ThinkingDelta { .. } => {
if !block_has_thinking_summary {
render_thinking_block_summary(out, None, false)?;
block_has_thinking_summary = true;
}
}
ContentBlockDelta::SignatureDelta { .. } => {}
},
ApiStreamEvent::ContentBlockStop(_) => {
block_has_thinking_summary = false;
if let Some(rendered) = markdown_stream.flush(&renderer) {
write!(out, "{rendered}")
.and_then(|()| out.flush())
.map_err(|error| RuntimeError::new(error.to_string()))?;
}
if let Some((id, name, input)) = pending_tool.take() {
if let Some(progress_reporter) = &self.progress_reporter {
progress_reporter.mark_tool_phase(&name, &input);
}
// Display tool call now that input is fully accumulated
writeln!(out, "\n{}", format_tool_call_start(&name, &input))
.and_then(|()| out.flush())
.map_err(|error| RuntimeError::new(error.to_string()))?;
events.push(AssistantEvent::ToolUse { id, name, input });
} }
} }
ApiStreamEvent::MessageDelta(delta) => {
events.push(AssistantEvent::Usage(delta.usage.token_usage()));
}
ApiStreamEvent::MessageStop(_) => {
saw_stop = true;
if let Some(rendered) = markdown_stream.flush(&renderer) {
write!(out, "{rendered}")
.and_then(|()| out.flush())
.map_err(|error| RuntimeError::new(error.to_string()))?;
}
events.push(AssistantEvent::MessageStop);
}
} }
}
push_prompt_cache_record(&self.client, &mut events); push_prompt_cache_record(&self.client, &mut events);
if !saw_stop if !saw_stop
&& events.iter().any(|event| { && events.iter().any(|event| {
matches!(event, AssistantEvent::TextDelta(text) if !text.is_empty()) matches!(event, AssistantEvent::TextDelta(text) if !text.is_empty())
|| matches!(event, AssistantEvent::ToolUse { .. }) || matches!(event, AssistantEvent::ToolUse { .. })
}) })
{ {
events.push(AssistantEvent::MessageStop); events.push(AssistantEvent::MessageStop);
} }
if events if events
.iter() .iter()
.any(|event| matches!(event, AssistantEvent::MessageStop)) .any(|event| matches!(event, AssistantEvent::MessageStop))
{ {
return Ok(events); return Ok(events);
} }
let response = self let response = self
.client .client
.send_message(&MessageRequest { .send_message(&MessageRequest {
stream: false, stream: false,
..message_request.clone() ..message_request.clone()
}) })
.await .await
.map_err(|error| { .map_err(|error| {
RuntimeError::new(format_user_visible_api_error(&self.session_id, &error)) RuntimeError::new(format_user_visible_api_error(&self.session_id, &error))
})?; })?;
let mut events = response_to_events(response, out)?; let mut events = response_to_events(response, out)?;
push_prompt_cache_record(&self.client, &mut events); push_prompt_cache_record(&self.client, &mut events);
Ok(events) Ok(events)
})
} }
} }
/// Returns `true` when the conversation ends with a tool-result message,
/// meaning the model is expected to continue after tool execution.
fn request_ends_with_tool_result(request: &ApiRequest) -> bool {
request
.messages
.last()
.is_some_and(|message| message.role == MessageRole::Tool)
}
fn format_user_visible_api_error(session_id: &str, error: &api::ApiError) -> String { fn format_user_visible_api_error(session_id: &str, error: &api::ApiError) -> String {
if error.is_context_window_failure() { if error.is_context_window_failure() {
format_context_window_blocked_error(session_id, error) format_context_window_blocked_error(session_id, error)