-
Notifications
You must be signed in to change notification settings - Fork 277
feat: add embedding model continuous batching scheduler #564
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Huamin Chen <[email protected]>
Signed-off-by: Huamin Chen <[email protected]>
Signed-off-by: Huamin Chen <[email protected]>
✅ Deploy Preview for vllm-semantic-router ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
👥 vLLM Semantic Team NotificationThe following members have been identified for the changed files in this PR and have been automatically assigned: 📁
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces continuous batching optimization for embedding generation to improve throughput in concurrent workloads. The implementation adds a batching scheduler that groups multiple concurrent requests for efficient GPU processing, achieving significant performance improvements (11.4x throughput) while maintaining numerical accuracy.
Key Changes
- Implements continuous batching scheduler for Qwen3 embedding model with vLLM-inspired architecture
- Adds comprehensive verification and benchmarking tools in Go and Rust
- Extends FFI layer to support batched embedding operations
Reviewed Changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
candle-binding/src/model_architectures/embedding/continuous_batch_scheduler.rs |
Core batching scheduler with dynamic request grouping and statistics tracking |
candle-binding/src/model_architectures/embedding/qwen3_batched.rs |
Drop-in replacement wrapper for Qwen3EmbeddingModel with batching support |
candle-binding/src/model_architectures/embedding/mod.rs |
Module exports for new batching components |
candle-binding/src/ffi/embedding.rs |
FFI functions for batched embeddings (init, inference, shutdown) |
candle-binding/examples/embedding_benchmark.rs |
Standalone Rust benchmark comparing baseline vs batched performance |
examples/candle-binding/qwen3_embedding_verification.go |
Numerical accuracy verification between baseline and batched models |
examples/candle-binding/qwen3_embedding_example.go |
Comprehensive embedding generation and similarity example |
examples/candle-binding/qwen3_embedding_benchmark.go |
Concurrent API server simulation benchmark |
examples/candle-binding/README.md |
Updated documentation with embedding examples and benchmark results |
candle-binding/Cargo.toml |
Reordered crate-type list (rlib before staticlib) |
README.md |
Added alt text to image |
Comments suppressed due to low confidence (1)
examples/candle-binding/qwen3_embedding_benchmark.go:1
- The hardcoded 200ms sleep duration for warmup retry (line 194) is a magic number without explanation. Consider using a named constant or adding a comment explaining why this specific duration is needed for CUDA context initialization.
// Benchmark simulating a typical embedding server workload
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// Total batches processed (lock-free atomic) | ||
| pub total_batches: AtomicU64, | ||
| /// Average batch size (requires mutex for float arithmetic) | ||
| pub avg_batch_size: Mutex<f64>, |
Copilot
AI
Oct 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Clone implementation for BatchSchedulerStats uses lock().unwrap() which can panic if the mutex is poisoned. Consider using lock().expect() with a descriptive error message or handling poison errors explicitly to provide better error context in concurrent scenarios.
| license = "MIT OR Apache-2.0" | ||
|
|
||
| [lib] | ||
| name = "candle_semantic_router" |
Copilot
AI
Oct 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The crate-type order was changed from ['staticlib', 'cdylib', 'rlib'] to ['rlib', 'staticlib', 'cdylib']. While this works, the commit/PR description should explain the rationale for this change, as the order can affect build behavior and linking in certain scenarios.
| name = "candle_semantic_router" | |
| name = "candle_semantic_router" | |
| # The crate-type order is set to ["rlib", "staticlib", "cdylib"] so that the Rust library (rlib) is built by default. | |
| # This order ensures that consumers using Rust as a dependency get the rlib, while staticlib and cdylib are available for FFI use. |
| // Collect new requests (non-blocking) | ||
| while pending_requests.len() < config.max_batch_size { | ||
| match request_rx.try_recv() { | ||
| Ok(req) => { | ||
| pending_requests.push(req); | ||
| } | ||
| Err(_) => break, // No more requests available | ||
| } | ||
| } | ||
|
|
||
| // Decide whether to process batch now | ||
| let should_process = if pending_requests.is_empty() { | ||
| // No requests, wait a bit and try again | ||
| thread::sleep(Duration::from_micros(100)); | ||
| false | ||
| } else if pending_requests.len() >= config.max_batch_size { | ||
| // Batch is full, process immediately | ||
| if config.verbose { | ||
| println!("📦 Batch full ({} requests)", pending_requests.len()); | ||
| } | ||
| true | ||
| } else if last_batch_time.elapsed() >= batch_timeout { | ||
| // Timeout reached, process what we have | ||
| if config.verbose { | ||
| println!("⏱️ Batch timeout ({} requests)", pending_requests.len()); | ||
| } | ||
| true | ||
| } else { | ||
| // Wait a bit more for batch to fill | ||
| thread::sleep(Duration::from_micros(100)); |
Copilot
AI
Oct 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scheduler uses busy-waiting with 100μs sleep intervals when no requests are available (lines 272, 288). This approach can consume unnecessary CPU cycles. Consider using blocking channel operations (e.g., recv_timeout) or a condvar-based approach to avoid busy-waiting when idle.
| // Collect new requests (non-blocking) | |
| while pending_requests.len() < config.max_batch_size { | |
| match request_rx.try_recv() { | |
| Ok(req) => { | |
| pending_requests.push(req); | |
| } | |
| Err(_) => break, // No more requests available | |
| } | |
| } | |
| // Decide whether to process batch now | |
| let should_process = if pending_requests.is_empty() { | |
| // No requests, wait a bit and try again | |
| thread::sleep(Duration::from_micros(100)); | |
| false | |
| } else if pending_requests.len() >= config.max_batch_size { | |
| // Batch is full, process immediately | |
| if config.verbose { | |
| println!("📦 Batch full ({} requests)", pending_requests.len()); | |
| } | |
| true | |
| } else if last_batch_time.elapsed() >= batch_timeout { | |
| // Timeout reached, process what we have | |
| if config.verbose { | |
| println!("⏱️ Batch timeout ({} requests)", pending_requests.len()); | |
| } | |
| true | |
| } else { | |
| // Wait a bit more for batch to fill | |
| thread::sleep(Duration::from_micros(100)); | |
| // If we have no pending requests, block until we get one or until batch_timeout | |
| if pending_requests.is_empty() { | |
| match request_rx.recv_timeout(batch_timeout) { | |
| Ok(req) => { | |
| pending_requests.push(req); | |
| last_batch_time = Instant::now(); | |
| } | |
| Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { | |
| // Timeout reached, will check if we should process batch below | |
| } | |
| Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => { | |
| if config.verbose { | |
| println!("🛑 Request channel disconnected, shutting down"); | |
| } | |
| break; | |
| } | |
| } | |
| } else { | |
| // We have some pending requests, try to fill up the batch with a short timeout | |
| let time_since_last = last_batch_time.elapsed(); | |
| let remaining = if batch_timeout > time_since_last { | |
| batch_timeout - time_since_last | |
| } else { | |
| Duration::from_millis(0) | |
| }; | |
| let mut filled = false; | |
| while pending_requests.len() < config.max_batch_size { | |
| match request_rx.recv_timeout(remaining) { | |
| Ok(req) => { | |
| pending_requests.push(req); | |
| if pending_requests.len() >= config.max_batch_size { | |
| filled = true; | |
| break; | |
| } | |
| } | |
| Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { | |
| // Timeout reached, break to process what we have | |
| break; | |
| } | |
| Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => { | |
| if config.verbose { | |
| println!("🛑 Request channel disconnected, shutting down"); | |
| } | |
| break; | |
| } | |
| } | |
| } | |
| } | |
| // Decide whether to process batch now | |
| let should_process = if pending_requests.is_empty() { | |
| false | |
| } else if pending_requests.len() >= config.max_batch_size { | |
| if config.verbose { | |
| println!("📦 Batch full ({} requests)", pending_requests.len()); | |
| } | |
| true | |
| } else if last_batch_time.elapsed() >= batch_timeout { | |
| if config.verbose { | |
| println!("⏱️ Batch timeout ({} requests)", pending_requests.len()); | |
| } | |
| true | |
| } else { |
|
|
||
| - Single-threaded: 55.17 emb/s, 18.5ms P95 latency | ||
| - 8 concurrent clients: 14.90 emb/s, 601ms P95 latency (shows CUDA serialization) | ||
| - **With continuous batching: 170 emb/s, ~10ms P95 latency (11.4x faster!)** |
Copilot
AI
Oct 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The performance claim '11.4x faster' is inconsistent. The comparison shows 170 emb/s vs 14.90 emb/s (11.4x improvement), but the baseline single-threaded is 55.17 emb/s. The documentation should clarify this is comparing against concurrent baseline (14.90 emb/s), not single-threaded baseline (55.17 emb/s), to avoid confusion.
| - **With continuous batching: 170 emb/s, ~10ms P95 latency (11.4x faster!)** | |
| - **With continuous batching: 170 emb/s, ~10ms P95 latency (11.4x faster than 8 concurrent clients!)** |
| if GLOBAL_BATCHED_MODEL.get().is_some() { | ||
| eprintln!("Warning: batched embedding model already initialized"); | ||
| return true; | ||
| } |
Copilot
AI
Oct 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function silently succeeds when the model is already initialized, which could mask configuration mismatches. If the new config differs from the existing one (e.g., different max_batch_size), the caller won't know their configuration was ignored. Consider returning false or providing a way to check/update configuration.
| times.sort(); | ||
| let mean = times.iter().sum::<u128>() / times.len() as u128; | ||
| let median = times[times.len() / 2]; | ||
| let p95 = times[(times.len() as f64 * 0.95) as usize]; |
Copilot
AI
Oct 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The P95 calculation may be off-by-one due to floating point truncation. For a sorted array of 100 items, this calculates index 95, but P95 should typically be at index 94 (zero-indexed). Consider using times.len().saturating_sub(1).min((times.len() as f64 * 0.95).ceil() as usize) for more accurate percentile calculation.
| let p95 = times[(times.len() as f64 * 0.95) as usize]; | |
| let p95_idx = times.len().saturating_sub(1).min((times.len() as f64 * 0.95).ceil() as usize); | |
| let p95 = times[p95_idx]; |
| impl Drop for Qwen3EmbeddingModelBatched { | ||
| fn drop(&mut self) { | ||
| self.shutdown(); | ||
| } | ||
| } |
Copilot
AI
Oct 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Drop implementation calls shutdown() which may block waiting for the scheduler thread. If the model is dropped in a critical section or during process termination, this could cause deadlocks or hangs. Consider using a non-blocking shutdown mechanism or documenting the blocking behavior.

What type of PR is this?
Adding
What this PR does / why we need it:
Also inspired by vLLM, the embedding continuous batching fully utilizes the GPU compute and reduces the embedding latency by 11x.
Which issue(s) this PR fixes:
Fixes #
Release Notes: Yes/No