Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 100 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ license = "MIT"
repository = "https://github.com/hghalebi/rigscribe"

[dependencies]
async-stream = "0.3.6"
futures = "0.3.31"

rig-core = {version="0.26.0",features=["all"]}
schemars ={version= "1.1.0"}
Expand All @@ -16,3 +18,6 @@ serpscraper = "0.1.3"
termimad = "0.34.1"
thiserror = "2.0.17"
tokio = {version= "1.48.0",features=["full"]}
tracing = "0.1.44"
tracing-appender = "0.2.4"
tracing-subscriber = { version = "0.3.22", features = ["env-filter", "fmt"] }
181 changes: 181 additions & 0 deletions STREAMING_DEBUG_REPORT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# Debugging Report & Streaming Guide

## Debugging Summary

During the review of the `rigscribe` repository, several issues were identified and resolved to ensure the project compiles and runs correctly.

### 1. Missing Dependency (`async-stream`)
* **Issue:** The code in `src/agents/optimizer.rs` utilized the `async_stream::stream!` macro, but the `async-stream` crate was not declared in `Cargo.toml`.
* **Fix:** Added `async-stream = "0.3.6"` to `Cargo.toml`.

### 2. Invalid Trait Syntax (`Example` Trait)
* **Issue:** The code contained invalid type definitions like `<std::io::Error as Example>::Result`. The trait `Example` did not exist and seemed to be a placeholder or syntax error.
* **Fix:** Updated the `StreamingResult` type alias and function signatures to use standard Rust `Result` types:
```rust
type StreamingResult = Pin<Box<dyn Stream<Item = std::result::Result<Text, StreamingError>> + Send>>;
```

### 3. Type Mismatch with `stream_to_stdout`
* **Issue:** The `rig` library's helper function `stream_to_stdout` expected a specific stream type that didn't match the custom `StreamingResult` returned by `multi_turn_prompt`. Additionally, the original code wasn't capturing the final output string needed to create the `Artifact`.
* **Fix:** Replaced the library call with a manual consumption loop. This allows us to both print to stdout in real-time *and* build the final string:
```rust
while let Some(res) = stream.next().await {
match res {
Ok(text) => {
print!("{}", text.text); // Stream to console
optimized_prompt.push_str(&text.text); // Capture for logic
}
// ... error handling
}
}
```

### 4. Code Cleanup
* **Issue:** Several unused imports (`schemars`, `serde`, various `rig` items) and unused functions (`custom_stream_to_stdout`, `map_provider_error`) were causing compiler warnings.
* **Fix:** Removed these unused elements to achieve a clean compilation.

---

## Understanding Streaming in Rust

Streaming is a powerful concept in Rust, especially for AI applications where responses are generated token-by-token.

### What is a Stream?

In synchronous Rust, we have `Iterator`, which yields a sequence of values:
```rust
// Sync
let iter = vec![1, 2, 3].into_iter();
for item in iter { ... }
```

In asynchronous Rust, a **`Stream`** is the async equivalent of an Iterator. Instead of returning the next item immediately, it returns a `Future` that resolves to the next item.

### The `async-stream` Crate

Creating a Stream manually involves implementing the `Stream` trait, which can be complex (handling `Poll`, `Context`, and `Pin`).

The `async-stream` crate simplifies this by allowing you to write streams using generator syntax, similar to Python. It provides the `stream!` macro and the `yield` keyword.

```rust
// Example from your code
async_stream::stream! {
// ... do some work ...
yield Ok(Text { text: "Hello" });
// ... await something ...
yield Ok(Text { text: " World" });
}
```

### The Return Type: `Pin<Box<dyn Stream...>>`

You will often see this return type:
```rust
type StreamingResult = Pin<Box<dyn Stream<Item = Result<Text, Error>> + Send>>;
```

1. **`dyn Stream`**: We are returning a "Trait Object". We don't care about the specific concrete type of the stream, just that it implements `Stream`.
2. **`Box`**: Because `dyn Stream` has an unknown size at compile time, we put it on the heap.
3. **`Pin`**: Async blocks and streams often contain self-referential references. `Pin` ensures the data isn't moved in memory, which is required for safety when polling futures.
4. **`Send`**: Ensures the stream can be sent across threads (required by most async runtimes like Tokio).

### Consuming a Stream

To use a stream, you typically use a `while let` loop with `.next().await`:

```rust
use futures::StreamExt; // Required for .next()

let mut stream = my_async_stream();

while let Some(item) = stream.next().await {
match item {
Ok(content) => println!("Received: {}", content),
Err(e) => eprintln!("Error: {}", e),
}
}
```

This pattern allows your application to remain responsive. In `rigscribe`, this is used to show the AI's "thought process" or generated text to the user immediately, rather than waiting for the entire generation to finish.

---

## Deep Dive: Why `async-stream` and `Pin`?

While the sections above cover *how* to use them, this section explains *why* they exist and what problems they solve.

### 1. `async-stream`: The Magic of Generators

Rust does not yet have native syntax for `yield` in functions (often called Generators or Coroutines) in the stable channel.

Without `async-stream`, if you wanted to create a stream that emits values over time, you would have to manually implement the `Stream` trait. This requires building a state machine by hand:

**The Hard Way (Manual State Machine):**
```rust
struct MyStream {
state: State,
count: usize,
}
enum State { Start, Waiting, Done }

impl Stream for MyStream {
type Item = i32;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.state {
State::Start => {
// Return a value and switch state
self.state = State::Waiting;
Poll::Ready(Some(1))
}
State::Waiting => {
// Check if async task is done...
// ... complex polling logic ...
Poll::Pending
}
State::Done => Poll::Ready(None),
}
}
}
```

**The Easy Way (`async-stream`):**
The `async_stream::stream!` macro essentially compiles your code block into an anonymous struct that implements `Stream` (like the manual example above) but manages the state machine for you automatically.

```rust
stream! {
yield 1; // The macro handles saving state here
some_async_work().await; // And restoring it here
yield 2;
}
```

### 2. `Pin`: Solving the "Self-Referential" Problem

To understand `Pin`, you must understand **Moves** and **Self-References**.

1. **Moves:** In Rust, values are "moved" (copied to a new memory address) frequently (e.g., passing by value, resizing a `Vec`). Usually, this is fine because types like `i32` or `String` don't care where they live in memory.
2. **Self-References:** `Future`s generated by `async` blocks are different. They often store pointers to their *own* internal variables.

**Example:**
Imagine an async block:
```rust
async {
let x = [0; 1024]; // A large array
let y = &x; // A reference to x (internal pointer!)
some_await().await; // Yield execution
println!("{:?}", y); // Use y when we wake up
}
```
When this async block compiles, it becomes a struct. `y` is a pointer pointing to `x` *inside the same struct*.
If we **Move** this struct to a new memory location (e.g., `Box`ing it or passing it to a function), `x` moves to the new address, but `y` **still points to the old address**. This is a dangling pointer, which causes undefined behavior (crashes).

**The Solution:**
`Pin<P>` is a wrapper around a pointer `P` (like `Box<T>` or `&mut T`). It effectively says:
> "The value pointed to by `P` will **never move** in memory again until it is dropped."

By returning `Pin<Box<dyn Stream...>>`, we are promising the compiler:
1. We put the Stream on the heap (`Box`).
2. We `Pin`ned it there.
3. Therefore, it is safe to poll it, even if it has internal self-references (which `async` blocks almost always do).

This is why you almost always see `Pin` when dealing with manually managed Futures or Streams.
Loading