-
Notifications
You must be signed in to change notification settings - Fork 1
turn gemini to streaming #12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
|
Caution Review failedThe pull request is closed. 📝 WalkthroughWalkthroughThis PR introduces streaming multi-turn prompt functionality for agents, adds tracing-based structured logging throughout, and integrates a web search tool into prompt review workflows. New dependencies for async streaming and distributed tracing are added to support these features. Changes
Sequence DiagramsequenceDiagram
actor User
participant Agent as Agent
participant Model as CompletionModel
participant Stream as StreamingResult
participant Tool as Tool
participant History as ChatHistory
User->>Agent: multi_turn_prompt(prompt, history)
loop Outer Loop (until no tools invoked)
Agent->>Model: stream completions
activate Stream
Model-->>Stream: stream Text/ToolCall/Reasoning
deactivate Stream
loop Stream Iteration
Stream->>Stream: next()
alt Text Output
Stream->>User: yield streamed text
Stream->>History: accumulate content
else ToolCall Event
Stream->>Tool: invoke tool
Tool-->>Stream: tool result
Stream->>History: add ToolCall message
Stream->>History: add ToolResult message
else Reasoning
Stream->>Stream: collect snippet
else Error
Stream->>User: propagate StreamingError
end
end
Agent->>History: update prompt from accumulated content
alt Tools invoked?
Note over Agent: continue loop
else No tools invoked
Note over Agent: break loop
end
end
Agent-->>User: return final StreamingResult
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
✨ Finishing touches
📜 Recent review detailsConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (12)
Comment |
Code Review Completed! 🔥The code review was successfully completed based on your current configurations. Kody Guide: Usage and ConfigurationInteracting with Kody
Current Kody ConfigurationReview OptionsThe following review options are enabled or disabled:
|
| use rig::providers::gemini::Client; | ||
| use rig::prelude::*; | ||
| use rig::completion::Prompt; | ||
| use futures::StreamExt; | ||
| use rig::client::CompletionClient; | ||
| use crate::agents::multi_turn_prompt; | ||
| use rig::tool::Tool; | ||
|
|
||
| pub async fn optimizer(prompt: Intent) -> Result<Artifact> { | ||
| require_env("GEMINI_API_KEY")?; | ||
| let client = Client::new(require_env("GEMINI_API_KEY")?)?; | ||
| let system_prompt_json = include_str!("../../data/optimizer.json"); | ||
| let artifact: Artifact = serde_json::from_str(system_prompt_json) | ||
| .map_err(|e| ScribeError::Validation(format!("Failed to parse embedded optimizer.json: {}", e)))?; | ||
| let system_prompt = artifact.system_prompt; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// In a shared module like src/utilities.rs
use crate::agents::StreamingResult;
use crate::error::{Result, ScribeError};
use futures::StreamExt;
use std::io::Write;
pub async fn consume_stream_to_string(mut stream: StreamingResult) -> Result<String> {
let mut full_response = String::new();
while let Some(res) = stream.next().await {
match res {
Ok(text) => {
print!("{}", text.text);
let _ = std::io::stdout().flush();
full_response.push_str(&text.text);
}
Err(e) => {
tracing::error!("Streaming error: {}", e);
return Err(ScribeError::ProtocolViolation(e.to_string()));
}
}
}
println!(); // Final newline for clean output
Ok(full_response)
}
// In optimizer.rs, deconstructor.rs, etc.
let stream = multi_turn_prompt(prompt_officer, input, Vec::new()).await;
let optimized_prompt = crate::utilities::consume_stream_to_string(stream).await?;The code contains duplicated logic for consuming streams and writing to stdout across multiple files (optimizer.rs, deconstructor.rs, prompt_reviewer.rs). This violates the DRY principle and can lead to inconsistent behavior and maintenance challenges.
This issue appears in multiple locations:
- src/agents/optimizer.rs: Lines 5-18
- src/tools/deconstructor.rs: Lines 49-59
- src/tools/prompt_reviewer.rs: Lines 63-74
Extract the stream consumption logic into a shared utility function that handles all printing and error logging consistently, ensuring thread-safe operations and removing code duplication.
Talk to Kody by mentioning @kody
Was this suggestion helpful? React with 👍 or 👎 to help Kody learn from this interaction.
| }); | ||
| } | ||
|
|
||
| // Add tool results to chat history |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Add tool results to chat history
if !tool_results.is_empty() {
let user_contents: Vec<UserContent> = tool_results
.into_iter()
.map(|(id, call_id, tool_result)| {
let tool_result_content = OneOrMany::one(ToolResultContent::text(tool_result));
if let Some(call_id) = call_id {
UserContent::tool_result_with_call_id(id, call_id, tool_result_content)
} else {
UserContent::tool_result(id, tool_result_content)
}
})
.collect();
chat_history.push(Message::User {
content: OneOrMany::many(user_contents).expect("tool_results is not empty"),
});
}When a model turn results in multiple tool calls, the current implementation adds each tool's result as a separate User message to the chat history. This is semantically incorrect, as all results from a single turn should be grouped into one message.
The issue is made worse by the logic that follows, which pops only the very last message from the history to use as the prompt for the next turn. As a result, the agent will only consider the result of the final tool call and ignore all others from that turn, leading to incomplete or incorrect final responses.
To fix this, collect all tool results into a single Vec<UserContent> and then push one consolidated Message::User to the chat history. This ensures the conversational context is accurate and that the agent processes all available information.
Talk to Kody by mentioning @kody
Was this suggestion helpful? React with 👍 or 👎 to help Kody learn from this interaction.
| // Log tool definitions for verbose output | ||
| let deconstructor_def = Deconstructor.definition("".to_string()).await; | ||
| tracing::info!("Tool Definition - Deconstructor: {:?}", deconstructor_def); | ||
|
|
||
| let prompt_reviewer_def = PromptReviewer.definition("".to_string()).await; | ||
| tracing::info!("Tool Definition - PromptReviewer: {:?}", prompt_reviewer_def); | ||
|
|
||
| let web_searcher_def = WebSearcher.definition("".to_string()).await; | ||
| tracing::info!("Tool Definition - WebSearcher: {:?}", web_searcher_def); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// The agent builder below will handle tool definitions internally.
// The explicit, awaited calls here were redundant and added unnecessary latency.The code makes three separate async calls to .definition() for each tool, awaits the result, logs it, and then discards the result. This introduces unnecessary latency on every call to the optimizer function because these async operations are performed sequentially just for logging.
The rig agent framework, when initialized with .tool(Deconstructor), will internally call .definition() when it needs the tool's schema. Therefore, these explicit calls are redundant. The values are computed and then immediately discarded without being used in the core logic, representing dead computation that slows down the request path.
To fix this, you can remove these blocks entirely. If logging the definitions is a critical debugging requirement, it should be done conditionally or through a more efficient mechanism that doesn't impact the performance of every request.
Talk to Kody by mentioning @kody
Was this suggestion helpful? React with 👍 or 👎 to help Kody learn from this interaction.
| use tracing_appender::non_blocking::WorkerGuard; | ||
| use tracing_subscriber::{fmt, prelude::*, EnvFilter}; | ||
|
|
||
| pub fn init_logging() -> WorkerGuard { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub fn init_logging() {
let file_appender = tracing_appender::rolling::daily("logs", "rigscribe.log");
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
let console_layer = fmt::layer()
.with_target(true) // Include context (target)
.with_thread_ids(false)
.with_level(true)
.with_file(true)
.with_line_number(true)
.compact(); // Use a more compact format for console if desired, or pretty()
let file_layer = fmt::layer()
.with_writer(non_blocking)
.with_target(true)
.with_thread_ids(true)
.with_level(true)
.with_file(true)
.with_line_number(true)
.with_ansi(false); // Disable colors for file
let filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info,rigscribe=debug"));
tracing_subscriber::registry()
.with(filter)
.with(console_layer)
.with(file_layer)
.init();
// The guard must be held for the lifetime of the application to prevent
// the logging thread from shutting down. By forgetting it, we leak it,
// ensuring it lives forever and file logging remains active.
std::mem::forget(guard);
}The WorkerGuard returned by tracing_appender::non_blocking must be kept alive for the entire duration of the application. If it is dropped, the background thread that writes logs to the file will be shut down, and any subsequent log messages will be silently lost.
The current init_logging function returns this guard, but if the caller doesn't explicitly store it in a variable, the guard will be dropped immediately after the function call, effectively disabling all file logging. This is a common pitfall that leads to silent failures.
To ensure file logging works reliably, the guard's lifetime should be tied to the application's lifetime. A common and robust pattern for this is to leak the guard, preventing it from ever being dropped and thus keeping the logging thread alive permanently. This ensures logs are always captured, with the minor trade-off of not flushing the final buffered logs on a clean shutdown (which is often acceptable for long-running services).
Talk to Kody by mentioning @kody
Was this suggestion helpful? React with 👍 or 👎 to help Kody learn from this interaction.

Summary by CodeRabbit
New Features
Documentation
Chores
✏️ Tip: You can customize this high-level summary in your review settings.
This pull request introduces streaming capabilities for AI responses throughout the
rigscribeapplication, particularly for the prompt optimization process and within its agentic tools.Key changes include:
multi_turn_promptfunction that leverages theasync-streamandfuturescrates.multi_turn_promptfunction improves the agent's ability to handle multi-turn interactions, especially when tools are involved. It processes tool calls within the streaming loop, executes them, and integrates their results into the chat history for subsequent AI turns.PromptRevieweragent is now explicitly configured to utilize theWebSearchertool for research during its prompt refinement process, with updated instructions to guide this behavior.tracingcrate to provide structured logging for better debugging and operational insights. This replaces previouseprintln!andprintln!calls withinfo!anddebug!messages, and sets up daily rolling log files.async-stream,futures,tracing,tracing-appender, andtracing-subscriberto support the new streaming and logging functionalities.STREAMING_DEBUG_REPORT.mdfile has been added, providing detailed explanations of streaming concepts in Rust and documenting the debugging process during development.