From 905bd169fbb3f656b03b2021f0da519426cf2842 Mon Sep 17 00:00:00 2001 From: key4ng Date: Tue, 28 Oct 2025 06:08:39 +0000 Subject: [PATCH 01/15] initial implementation --- .../examples/mcp_config_web_search.yaml | 50 +++++ sgl-router/src/routers/openai/mcp.rs | 16 +- sgl-router/src/routers/openai/mod.rs | 1 + sgl-router/src/routers/openai/responses.rs | 90 ++++++-- sgl-router/src/routers/openai/router.rs | 37 +++- sgl-router/src/routers/openai/streaming.rs | 12 +- sgl-router/src/routers/openai/web_search.rs | 202 ++++++++++++++++++ 7 files changed, 379 insertions(+), 29 deletions(-) create mode 100644 sgl-router/examples/mcp_config_web_search.yaml create mode 100644 sgl-router/src/routers/openai/web_search.rs diff --git a/sgl-router/examples/mcp_config_web_search.yaml b/sgl-router/examples/mcp_config_web_search.yaml new file mode 100644 index 00000000000..4f572cc42a3 --- /dev/null +++ b/sgl-router/examples/mcp_config_web_search.yaml @@ -0,0 +1,50 @@ +# Example MCP Configuration with Web Search Preview +# +# This configuration demonstrates how to set up the web_search_preview tool +# for the OpenAI router. +# +# Usage: +# export SGLANG_MCP_CONFIG=/path/to/this/mcp_config_web_search.yaml +# export WEB_SEARCH_MCP_URL="https://your-search-server.com/sse" +# export WEB_SEARCH_MCP_TOKEN="your-token" + +# Global proxy configuration (optional) +# proxy: +# http: "http://proxy:8080" +# https: "http://proxy:8080" +# no_proxy: "localhost,127.0.0.1,*.internal" + +# Connection pool settings +pool: + max_connections: 100 + idle_timeout: 300 + +# Tool inventory settings +inventory: + enable_refresh: true + tool_ttl: 300 # 5 minutes + refresh_interval: 60 # 1 minute + refresh_on_error: true + +# MCP Servers +servers: + # Web Search Preview Server + # This server provides web search capabilities to the LLM + - name: "web_search_preview" + protocol: sse + url: http://localhost:8001/sse + # token: "${WEB_SEARCH_MCP_TOKEN}" + required: true # Don't fail startup if unavailable + + # Example: Additional MCP server + # - name: "another-mcp-server" + # protocol: stdio + # command: "node" + # args: ["./mcp-server.js"] + # required: false + +# Pre-warm connections at startup (optional) +# warmup: +# - url: "${WEB_SEARCH_MCP_URL}" +# label: "web_search_preview" +# token: "${WEB_SEARCH_MCP_TOKEN}" diff --git a/sgl-router/src/routers/openai/mcp.rs b/sgl-router/src/routers/openai/mcp.rs index 4504de1bba0..b3566bee4e0 100644 --- a/sgl-router/src/routers/openai/mcp.rs +++ b/sgl-router/src/routers/openai/mcp.rs @@ -275,6 +275,7 @@ pub(super) async fn execute_streaming_tool_calls( pub(super) fn prepare_mcp_payload_for_streaming( payload: &mut Value, active_mcp: &Arc, + is_web_search: bool, ) { if let Some(obj) = payload.as_object_mut() { // Remove any non-function tools from outgoing payload @@ -289,10 +290,21 @@ pub(super) fn prepare_mcp_payload_for_streaming( } } - // Build function tools for all discovered MCP tools + // Build function tools for discovered MCP tools let mut tools_json = Vec::new(); let tools = active_mcp.list_tools(); - for t in tools { + + // Filter tools based on context + let filtered_tools: Vec<_> = if is_web_search { + // Only include tools from web_search_preview server + tools.into_iter() + .filter(|t| t.server == "web_search_preview") + .collect() + } else { + tools + }; + + for t in filtered_tools { let parameters = t.parameters.clone().unwrap_or(serde_json::json!({ "type": "object", "properties": {}, diff --git a/sgl-router/src/routers/openai/mod.rs b/sgl-router/src/routers/openai/mod.rs index e01af396580..a845d312ac3 100644 --- a/sgl-router/src/routers/openai/mod.rs +++ b/sgl-router/src/routers/openai/mod.rs @@ -13,6 +13,7 @@ mod responses; mod router; mod streaming; mod utils; +pub(crate) mod web_search; // Re-export the main router type for external use pub use router::OpenAIRouter; diff --git a/sgl-router/src/routers/openai/responses.rs b/sgl-router/src/routers/openai/responses.rs index 336df9ea9d8..c8ea797565c 100644 --- a/sgl-router/src/routers/openai/responses.rs +++ b/sgl-router/src/routers/openai/responses.rs @@ -269,43 +269,87 @@ pub(super) fn rewrite_streaming_block( Some(rebuilt_lines.join("\n")) } +/// Transform mcp_call items to web_search_call items if web_search_preview was used +pub(super) fn transform_web_search_output_items(response: &mut Value) { + if let Some(output_array) = response.get_mut("output").and_then(|v| v.as_array_mut()) { + for item in output_array.iter_mut() { + if item.get("type").and_then(|t| t.as_str()) == Some("mcp_call") { + // Check if this mcp_call is from web_search_preview server + let server_label = item.get("server_label").and_then(|s| s.as_str()); + + if server_label == Some("web_search_preview") { + // Transform to web_search_call (minimal, status only) + let ws_item = crate::routers::openai::web_search::mcp_call_to_web_search_call(item); + *item = ws_item; + } + } + } + } +} + /// Mask function tools as MCP tools in response for client pub(super) fn mask_tools_as_mcp(resp: &mut Value, original_body: &ResponsesRequest) { + // Check for MCP tool let mcp_tool = original_body.tools.as_ref().and_then(|tools| { tools .iter() .find(|t| matches!(t.r#type, ResponseToolType::Mcp) && t.server_url.is_some()) }); - let Some(t) = mcp_tool else { - return; - }; - let mut m = serde_json::Map::new(); - m.insert("type".to_string(), Value::String("mcp".to_string())); - if let Some(label) = &t.server_label { - m.insert("server_label".to_string(), Value::String(label.clone())); - } - if let Some(url) = &t.server_url { - m.insert("server_url".to_string(), Value::String(url.clone())); - } - if let Some(desc) = &t.server_description { - m.insert( - "server_description".to_string(), - Value::String(desc.clone()), - ); + // Check for web_search_preview tool + let has_web_search = original_body + .tools + .as_ref() + .map(|tools| crate::routers::openai::web_search::has_web_search_preview_tool(tools)) + .unwrap_or(false); + + // If neither MCP nor web_search_preview, return early + if mcp_tool.is_none() && !has_web_search { + return; } - if let Some(req) = &t.require_approval { - m.insert("require_approval".to_string(), Value::String(req.clone())); + + let mut response_tools = Vec::new(); + + // Add MCP tool if present + if let Some(t) = mcp_tool { + let mut m = serde_json::Map::new(); + m.insert("type".to_string(), Value::String("mcp".to_string())); + if let Some(label) = &t.server_label { + m.insert("server_label".to_string(), Value::String(label.clone())); + } + if let Some(url) = &t.server_url { + m.insert("server_url".to_string(), Value::String(url.clone())); + } + if let Some(desc) = &t.server_description { + m.insert( + "server_description".to_string(), + Value::String(desc.clone()), + ); + } + if let Some(req) = &t.require_approval { + m.insert("require_approval".to_string(), Value::String(req.clone())); + } + if let Some(allowed) = &t.allowed_tools { + m.insert( + "allowed_tools".to_string(), + Value::Array(allowed.iter().map(|s| Value::String(s.clone())).collect()), + ); + } + response_tools.push(Value::Object(m)); } - if let Some(allowed) = &t.allowed_tools { - m.insert( - "allowed_tools".to_string(), - Value::Array(allowed.iter().map(|s| Value::String(s.clone())).collect()), + + // Add web_search_preview tool if present + if has_web_search { + let mut ws = serde_json::Map::new(); + ws.insert( + "type".to_string(), + Value::String("web_search_preview".to_string()), ); + response_tools.push(Value::Object(ws)); } if let Some(obj) = resp.as_object_mut() { - obj.insert("tools".to_string(), Value::Array(vec![Value::Object(m)])); + obj.insert("tools".to_string(), Value::Array(response_tools)); obj.entry("tool_choice") .or_insert(Value::String("auto".to_string())); } diff --git a/sgl-router/src/routers/openai/router.rs b/sgl-router/src/routers/openai/router.rs index 51126e4beea..c9f94b83be3 100644 --- a/sgl-router/src/routers/openai/router.rs +++ b/sgl-router/src/routers/openai/router.rs @@ -33,7 +33,9 @@ use super::{ ensure_request_mcp_client, execute_tool_loop, prepare_mcp_payload_for_streaming, McpLoopConfig, }, - responses::{mask_tools_as_mcp, patch_streaming_response_json}, + responses::{ + mask_tools_as_mcp, patch_streaming_response_json, transform_web_search_output_items, + }, streaming::handle_streaming_response, utils::{apply_provider_headers, extract_auth_header, probe_endpoint_for_model}, }; @@ -248,6 +250,7 @@ impl OpenAIRouter { mut payload: Value, original_body: &ResponsesRequest, original_previous_response_id: Option, + is_web_search: bool, ) -> Response { // Check if MCP is active for this request // Ensure dynamic client is created if needed @@ -269,7 +272,7 @@ impl OpenAIRouter { let config = McpLoopConfig::default(); // Transform MCP tools to function tools - prepare_mcp_payload_for_streaming(&mut payload, mcp); + prepare_mcp_payload_for_streaming(&mut payload, mcp, is_web_search); match execute_tool_loop( &self.client, @@ -341,6 +344,7 @@ impl OpenAIRouter { } // Patch response with metadata + transform_web_search_output_items(&mut response_json); mask_tools_as_mcp(&mut response_json, original_body); patch_streaming_response_json( &mut response_json, @@ -695,6 +699,33 @@ impl crate::routers::RouterTrait for OpenAIRouter { let url = format!("{}/v1/responses", base_url); + // Detect web_search_preview tool and verify MCP server availability + let has_web_search = if let Some(ref tools) = body.tools { + crate::routers::openai::web_search::has_web_search_preview_tool(tools) + } else { + false + }; + + if has_web_search { + // Check if web_search_preview MCP server is available + if !crate::routers::openai::web_search::is_web_search_mcp_available(&self.mcp_manager) + .await + { + return ( + StatusCode::BAD_REQUEST, + Json(json!({ + "error": { + "message": "Web search preview requested but MCP server 'web_search_preview' is not available. Please configure the MCP server.", + "type": "invalid_request_error", + "param": "tools", + "code": "mcp_server_unavailable" + } + })), + ) + .into_response(); + } + } + // Validate mutually exclusive params: previous_response_id and conversation // TODO: this validation logic should move the right place, also we need a proper error message module if body.previous_response_id.is_some() && body.conversation.is_some() { @@ -962,6 +993,7 @@ impl crate::routers::RouterTrait for OpenAIRouter { payload, body, original_previous_response_id, + has_web_search, ) .await } else { @@ -971,6 +1003,7 @@ impl crate::routers::RouterTrait for OpenAIRouter { payload, body, original_previous_response_id, + has_web_search, ) .await } diff --git a/sgl-router/src/routers/openai/streaming.rs b/sgl-router/src/routers/openai/streaming.rs index bebda8eba51..7901d78943c 100644 --- a/sgl-router/src/routers/openai/streaming.rs +++ b/sgl-router/src/routers/openai/streaming.rs @@ -29,7 +29,10 @@ use super::{ inject_mcp_metadata_streaming, prepare_mcp_payload_for_streaming, send_mcp_list_tools_events, McpLoopConfig, ToolLoopState, }, - responses::{mask_tools_as_mcp, patch_streaming_response_json, rewrite_streaming_block}, + responses::{ + mask_tools_as_mcp, patch_streaming_response_json, rewrite_streaming_block, + transform_web_search_output_items, + }, utils::{event_types, FunctionCallInProgress, OutputIndexMapper, StreamAction}, }; use crate::{ @@ -928,6 +931,7 @@ pub(super) fn send_final_response_event( inject_mcp_metadata_streaming(&mut final_response, state, mcp, server_label); } + transform_web_search_output_items(&mut final_response); mask_tools_as_mcp(&mut final_response, original_request); patch_streaming_response_json(&mut final_response, original_request, previous_response_id); @@ -1137,9 +1141,10 @@ pub(super) async fn handle_streaming_with_tool_interception( original_body: &ResponsesRequest, original_previous_response_id: Option, active_mcp: &Arc, + is_web_search: bool, ) -> Response { // Transform MCP tools to function tools in payload - prepare_mcp_payload_for_streaming(&mut payload, active_mcp); + prepare_mcp_payload_for_streaming(&mut payload, active_mcp, is_web_search); let (tx, rx) = mpsc::unbounded_channel::>(); let should_store = original_body.store.unwrap_or(false); @@ -1384,6 +1389,7 @@ pub(super) async fn handle_streaming_with_tool_interception( server_label, ); + transform_web_search_output_items(&mut response_json); mask_tools_as_mcp(&mut response_json, &original_request); patch_streaming_response_json( &mut response_json, @@ -1498,6 +1504,7 @@ pub(super) async fn handle_streaming_response( payload: Value, original_body: &ResponsesRequest, original_previous_response_id: Option, + is_web_search: bool, ) -> Response { // Check if MCP is active for this request // Ensure dynamic client is created if needed @@ -1545,6 +1552,7 @@ pub(super) async fn handle_streaming_response( original_body, original_previous_response_id, active_mcp, + is_web_search, ) .await } diff --git a/sgl-router/src/routers/openai/web_search.rs b/sgl-router/src/routers/openai/web_search.rs new file mode 100644 index 00000000000..64351f807ff --- /dev/null +++ b/sgl-router/src/routers/openai/web_search.rs @@ -0,0 +1,202 @@ +//! Web Search Preview Integration (MVP - Minimal) +//! +//! This module handles transformation between OpenAI's web_search_preview format +//! and MCP-based web search tool calls. +//! +//! MVP Scope: +//! - Detect web_search_preview tool in requests +//! - Check MCP server availability +//! - Transform to/from function calls +//! - Build minimal web_search_call output items (status only) +//! +//! Future: search_context_size, user_location, results exposure + +use std::sync::Arc; + +use serde_json::{json, Value}; + +use crate::mcp::McpManager; +use crate::protocols::responses::{ResponseTool, ResponseToolType}; + +use super::utils::generate_id; + +// ============================================================================ +// Tool Detection & Transformation +// ============================================================================ + +/// Detect if request has web_search_preview tool +pub fn has_web_search_preview_tool(tools: &[ResponseTool]) -> bool { + tools + .iter() + .any(|t| matches!(t.r#type, ResponseToolType::WebSearchPreview)) +} + +/// Check if MCP server "web_search_preview" is available +pub async fn is_web_search_mcp_available(mcp_manager: &Arc) -> bool { + mcp_manager + .get_client("web_search_preview") + .await + .is_some() +} + +/// Transform web_search_preview tool to MCP function tools +/// Returns function tools from the "web_search_preview" MCP server +pub fn transform_web_search_to_mcp_functions(mcp_manager: &Arc) -> Vec { + // Get tools from the "web_search_preview" MCP server + let tools = mcp_manager.list_tools(); + + tools + .iter() + .filter(|t| t.server == "web_search_preview") + .map(|t| { + json!({ + "type": "function", + "name": t.name, + "description": t.description, + "parameters": t.parameters.clone().unwrap_or_else(|| json!({ + "type": "object", + "properties": {}, + "additionalProperties": false + })) + }) + }) + .collect() +} + +// ============================================================================ +// Output Item Builders (MVP - Status Only) +// ============================================================================ + +/// Build a web_search_call output item (MVP - status only) +/// +/// The MCP search results are passed to the LLM internally via function_call_output, +/// but we don't expose them in the web_search_call item to the client. +pub fn build_web_search_call_item() -> Value { + json!({ + "id": generate_id("ws"), + "type": "web_search_call", + "status": "completed", + "action": { + "type": "search" + } + }) +} + +/// Build a failed web_search_call output item +pub fn build_web_search_call_item_failed(error: &str) -> Value { + json!({ + "id": generate_id("ws"), + "type": "web_search_call", + "status": "failed", + "action": { + "type": "search" + }, + "error": error + }) +} + +/// Convert mcp_call item to web_search_call item (MVP - minimal) +pub fn mcp_call_to_web_search_call(mcp_call_item: &Value) -> Value { + let status = mcp_call_item + .get("status") + .and_then(|v| v.as_str()) + .unwrap_or("completed"); + + if status != "completed" { + // Return failed web_search_call + let error = mcp_call_item + .get("error") + .and_then(|v| v.as_str()) + .unwrap_or("Unknown error"); + return build_web_search_call_item_failed(error); + } + + // Return successful web_search_call (status only, no results) + build_web_search_call_item() +} + +// ============================================================================ +// Tests +// ============================================================================ + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_has_web_search_preview_tool() { + let tools = vec![ResponseTool { + r#type: ResponseToolType::WebSearchPreview, + server_url: None, + authorization: None, + server_label: None, + server_description: None, + require_approval: None, + allowed_tools: None, + }]; + assert!(has_web_search_preview_tool(&tools)); + + let empty_tools: Vec = vec![]; + assert!(!has_web_search_preview_tool(&empty_tools)); + + let other_tools = vec![ResponseTool { + r#type: ResponseToolType::Mcp, + server_url: Some("http://example.com".to_string()), + authorization: None, + server_label: None, + server_description: None, + require_approval: None, + allowed_tools: None, + }]; + assert!(!has_web_search_preview_tool(&other_tools)); + } + + #[test] + fn test_build_web_search_call_item() { + let item = build_web_search_call_item(); + assert_eq!(item["type"], "web_search_call"); + assert_eq!(item["status"], "completed"); + assert_eq!(item["action"]["type"], "search"); + assert!(item.get("results").is_none()); // No results in MVP + assert!(item.get("id").is_some()); + } + + #[test] + fn test_build_web_search_call_item_failed() { + let item = build_web_search_call_item_failed("Test error"); + assert_eq!(item["type"], "web_search_call"); + assert_eq!(item["status"], "failed"); + assert_eq!(item["error"], "Test error"); + assert_eq!(item["action"]["type"], "search"); + } + + #[test] + fn test_mcp_call_to_web_search_call_success() { + let mcp_call = json!({ + "type": "mcp_call", + "status": "completed", + "server_label": "web_search_preview", + "output": "search results here" + }); + + let ws_call = mcp_call_to_web_search_call(&mcp_call); + assert_eq!(ws_call["type"], "web_search_call"); + assert_eq!(ws_call["status"], "completed"); + assert!(ws_call.get("results").is_none()); // MVP: no results + } + + #[test] + fn test_mcp_call_to_web_search_call_failed() { + let mcp_call = json!({ + "type": "mcp_call", + "status": "failed", + "server_label": "web_search_preview", + "error": "Search failed" + }); + + let ws_call = mcp_call_to_web_search_call(&mcp_call); + assert_eq!(ws_call["type"], "web_search_call"); + assert_eq!(ws_call["status"], "failed"); + assert_eq!(ws_call["error"], "Search failed"); + } +} From 594ce0d9ed23e51bebf270b7f1bbf0833661c3fe Mon Sep 17 00:00:00 2001 From: key4ng Date: Tue, 28 Oct 2025 06:08:55 +0000 Subject: [PATCH 02/15] add utils change --- sgl-router/src/routers/openai/utils.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sgl-router/src/routers/openai/utils.rs b/sgl-router/src/routers/openai/utils.rs index cdf36bad9ec..697c2fe5278 100644 --- a/sgl-router/src/routers/openai/utils.rs +++ b/sgl-router/src/routers/openai/utils.rs @@ -32,12 +32,17 @@ pub(crate) mod event_types { pub const MCP_LIST_TOOLS_IN_PROGRESS: &str = "response.mcp_list_tools.in_progress"; pub const MCP_LIST_TOOLS_COMPLETED: &str = "response.mcp_list_tools.completed"; + // Web search events + pub const WEB_SEARCH_CALL_IN_PROGRESS: &str = "response.web_search_call.in_progress"; + pub const WEB_SEARCH_CALL_COMPLETED: &str = "response.web_search_call.completed"; + // Item types pub const ITEM_TYPE_FUNCTION_CALL: &str = "function_call"; pub const ITEM_TYPE_FUNCTION_TOOL_CALL: &str = "function_tool_call"; pub const ITEM_TYPE_MCP_CALL: &str = "mcp_call"; pub const ITEM_TYPE_FUNCTION: &str = "function"; pub const ITEM_TYPE_MCP_LIST_TOOLS: &str = "mcp_list_tools"; + pub const ITEM_TYPE_WEB_SEARCH_CALL: &str = "web_search_call"; } // ============================================================================ From 9aa372a5cb98f65bdeb832b3c8db0e019418ad89 Mon Sep 17 00:00:00 2001 From: key4ng Date: Tue, 28 Oct 2025 16:57:56 +0000 Subject: [PATCH 03/15] fix compile error --- sgl-router/src/routers/openai/mcp.rs | 9 +++++--- sgl-router/src/routers/openai/web_search.rs | 23 ++++++++------------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/sgl-router/src/routers/openai/mcp.rs b/sgl-router/src/routers/openai/mcp.rs index 92bd1d83446..ad1a4542e0b 100644 --- a/sgl-router/src/routers/openai/mcp.rs +++ b/sgl-router/src/routers/openai/mcp.rs @@ -294,19 +294,22 @@ pub(super) fn prepare_mcp_payload_for_streaming( // Build function tools for discovered MCP tools let mut tools_json = Vec::new(); - let tools = active_mcp.list_tools(); + + // Get tools with server names from inventory + // Returns Vec<(tool_name, server_name, Tool)> + let tools = active_mcp.inventory().list_tools(); // Filter tools based on context let filtered_tools: Vec<_> = if is_web_search { // Only include tools from web_search_preview server tools.into_iter() - .filter(|t| t.server == "web_search_preview") + .filter(|(_, server_name, _)| server_name == "web_search_preview") .collect() } else { tools }; - for t in filtered_tools { + for (_, _, t) in filtered_tools { let parameters = Value::Object((*t.input_schema).clone()); let tool = serde_json::json!({ "type": event_types::ITEM_TYPE_FUNCTION, diff --git a/sgl-router/src/routers/openai/web_search.rs b/sgl-router/src/routers/openai/web_search.rs index 64351f807ff..f49216e4eca 100644 --- a/sgl-router/src/routers/openai/web_search.rs +++ b/sgl-router/src/routers/openai/web_search.rs @@ -16,9 +16,7 @@ use std::sync::Arc; use serde_json::{json, Value}; use crate::mcp::McpManager; -use crate::protocols::responses::{ResponseTool, ResponseToolType}; - -use super::utils::generate_id; +use crate::protocols::responses::{generate_id, ResponseTool, ResponseToolType}; // ============================================================================ // Tool Detection & Transformation @@ -42,22 +40,19 @@ pub async fn is_web_search_mcp_available(mcp_manager: &Arc) -> bool /// Transform web_search_preview tool to MCP function tools /// Returns function tools from the "web_search_preview" MCP server pub fn transform_web_search_to_mcp_functions(mcp_manager: &Arc) -> Vec { - // Get tools from the "web_search_preview" MCP server - let tools = mcp_manager.list_tools(); + // Get tools from inventory with server names + // Returns Vec<(tool_name, server_name, Tool)> + let tools = mcp_manager.inventory().list_tools(); tools .iter() - .filter(|t| t.server == "web_search_preview") - .map(|t| { + .filter(|(_, server_name, _)| server_name == "web_search_preview") + .map(|(_, _, tool)| { json!({ "type": "function", - "name": t.name, - "description": t.description, - "parameters": t.parameters.clone().unwrap_or_else(|| json!({ - "type": "object", - "properties": {}, - "additionalProperties": false - })) + "name": tool.name, + "description": tool.description, + "parameters": tool.input_schema.clone() }) }) .collect() From 3158a6eb0d7a37ccc433c5ba59f16ff241693fb0 Mon Sep 17 00:00:00 2001 From: key4ng Date: Tue, 28 Oct 2025 18:16:19 +0000 Subject: [PATCH 04/15] refactoring --- sgl-router/src/mcp/manager.rs | 20 ++++ sgl-router/src/routers/openai/mcp.rs | 104 ++++++++++++++------ sgl-router/src/routers/openai/responses.rs | 18 ---- sgl-router/src/routers/openai/router.rs | 4 +- sgl-router/src/routers/openai/streaming.rs | 9 +- sgl-router/src/routers/openai/web_search.rs | 85 ++-------------- 6 files changed, 109 insertions(+), 131 deletions(-) diff --git a/sgl-router/src/mcp/manager.rs b/sgl-router/src/mcp/manager.rs index 1a1bb5511b0..ca23fb4c9c4 100644 --- a/sgl-router/src/mcp/manager.rs +++ b/sgl-router/src/mcp/manager.rs @@ -237,11 +237,24 @@ impl McpManager { // Convert args with type coercion based on schema let tool_schema = Some(serde_json::Value::Object((*tool_info.input_schema).clone())); + + debug!( + "Tool '{}' schema: {}", + tool_name, + serde_json::to_string_pretty(&tool_schema).unwrap_or_default() + ); + let args_map = args .into() .into_map(tool_schema.as_ref()) .map_err(McpError::InvalidArguments)?; + debug!( + "Tool '{}' arguments after type coercion: {}", + tool_name, + serde_json::to_string_pretty(&args_map).unwrap_or_default() + ); + // Get client for that server let client = self .get_client(&server_name) @@ -254,6 +267,13 @@ impl McpManager { arguments: args_map, }; + debug!( + "Sending MCP request to server '{}': tool={}, args={}", + server_name, + tool_name, + serde_json::to_string(&request.arguments).unwrap_or_default() + ); + client .call_tool(request) .await diff --git a/sgl-router/src/routers/openai/mcp.rs b/sgl-router/src/routers/openai/mcp.rs index ad1a4542e0b..d044634cc4e 100644 --- a/sgl-router/src/routers/openai/mcp.rs +++ b/sgl-router/src/routers/openai/mcp.rs @@ -202,6 +202,7 @@ pub(super) async fn execute_streaming_tool_calls( state: &mut ToolLoopState, server_label: &str, sequence_number: &mut u64, + is_web_search: bool, ) -> bool { // Execute all pending tool calls (sequential, as PR3 is skipped) for call in pending_calls { @@ -258,6 +259,7 @@ pub(super) async fn execute_streaming_tool_calls( success, error_msg.as_deref(), sequence_number, + is_web_search, ) { // Client disconnected, no point continuing tool execution return false; @@ -489,6 +491,7 @@ pub(super) fn send_mcp_call_completion_events_with_error( success: bool, error_msg: Option<&str>, sequence_number: &mut u64, + is_web_search: bool, ) -> bool { let effective_output_index = call.effective_output_index(); @@ -500,6 +503,7 @@ pub(super) fn send_mcp_call_completion_events_with_error( server_label, success, error_msg, + is_web_search, ); // Get the mcp_call item_id @@ -553,28 +557,40 @@ pub(super) fn inject_mcp_metadata_streaming( state: &ToolLoopState, mcp: &Arc, server_label: &str, + is_web_search: bool, ) { if let Some(output_array) = response.get_mut("output").and_then(|v| v.as_array_mut()) { output_array.retain(|item| { item.get("type").and_then(|t| t.as_str()) != Some(event_types::ITEM_TYPE_MCP_LIST_TOOLS) }); - let list_tools_item = build_mcp_list_tools_item(mcp, server_label); - output_array.insert(0, list_tools_item); + let mut insert_pos = 0; + + // Only add mcp_list_tools for non-web-search cases + if !is_web_search { + let list_tools_item = build_mcp_list_tools_item(mcp, server_label); + output_array.insert(0, list_tools_item); + insert_pos = 1; + } let mcp_call_items = - build_executed_mcp_call_items(&state.conversation_history, server_label); - let mut insert_pos = 1; + build_executed_mcp_call_items(&state.conversation_history, server_label, is_web_search); for item in mcp_call_items { output_array.insert(insert_pos, item); insert_pos += 1; } } else if let Some(obj) = response.as_object_mut() { let mut output_items = Vec::new(); - output_items.push(build_mcp_list_tools_item(mcp, server_label)); + + // Only add mcp_list_tools for non-web-search cases + if !is_web_search { + output_items.push(build_mcp_list_tools_item(mcp, server_label)); + } + output_items.extend(build_executed_mcp_call_items( &state.conversation_history, server_label, + is_web_search, )); obj.insert("output".to_string(), Value::Array(output_items)); } @@ -593,6 +609,7 @@ pub(super) async fn execute_tool_loop( original_body: &ResponsesRequest, active_mcp: &Arc, config: &McpLoopConfig, + is_web_search: bool, ) -> Result { let mut state = ToolLoopState::new(original_body.input.clone()); @@ -673,6 +690,7 @@ pub(super) async fn execute_tool_loop( "max_tool_calls", active_mcp, original_body, + is_web_search, ); } @@ -731,22 +749,25 @@ pub(super) async fn execute_tool_loop( }) .unwrap_or("mcp"); - // Build mcp_list_tools item - let list_tools_item = build_mcp_list_tools_item(active_mcp, server_label); - // Insert at beginning of output array if let Some(output_array) = response_json .get_mut("output") .and_then(|v| v.as_array_mut()) { - output_array.insert(0, list_tools_item); + let mut insert_pos = 0; - // Build mcp_call items using helper function + // Only add mcp_list_tools for non-web-search cases + if !is_web_search { + let list_tools_item = build_mcp_list_tools_item(active_mcp, server_label); + output_array.insert(0, list_tools_item); + insert_pos = 1; + } + + // Build mcp_call items (will be web_search_call for web search tools) let mcp_call_items = - build_executed_mcp_call_items(&state.conversation_history, server_label); + build_executed_mcp_call_items(&state.conversation_history, server_label, is_web_search); - // Insert mcp_call items after mcp_list_tools using mutable position - let mut insert_pos = 1; + // Insert call items after mcp_list_tools (if present) for item in mcp_call_items { output_array.insert(insert_pos, item); insert_pos += 1; @@ -766,6 +787,7 @@ pub(super) fn build_incomplete_response( reason: &str, active_mcp: &Arc, original_body: &ResponsesRequest, + is_web_search: bool, ) -> Result { let obj = response .as_object_mut() @@ -814,6 +836,7 @@ pub(super) fn build_incomplete_response( server_label, false, // Not successful Some("Not executed - response stopped due to limit"), + is_web_search, ); mcp_call_items.push(mcp_call_item); } @@ -821,20 +844,25 @@ pub(super) fn build_incomplete_response( // Add mcp_list_tools and executed mcp_call items at the beginning if state.total_calls > 0 || !mcp_call_items.is_empty() { - let list_tools_item = build_mcp_list_tools_item(active_mcp, server_label); - output_array.insert(0, list_tools_item); + let mut insert_pos = 0; + + // Only add mcp_list_tools for non-web-search cases + if !is_web_search { + let list_tools_item = build_mcp_list_tools_item(active_mcp, server_label); + output_array.insert(0, list_tools_item); + insert_pos = 1; + } - // Add mcp_call items for executed calls using helper + // Add mcp_call items for executed calls (will be web_search_call for web search) let executed_items = - build_executed_mcp_call_items(&state.conversation_history, server_label); + build_executed_mcp_call_items(&state.conversation_history, server_label, is_web_search); - let mut insert_pos = 1; for item in executed_items { output_array.insert(insert_pos, item); insert_pos += 1; } - // Add incomplete mcp_call items + // Add incomplete mcp_call items (will be web_search_call for web search) for item in mcp_call_items { output_array.insert(insert_pos, item); insert_pos += 1; @@ -899,24 +927,39 @@ pub(super) fn build_mcp_call_item( server_label: &str, success: bool, error: Option<&str>, + is_web_search: bool, ) -> Value { - json!({ - "id": generate_id("mcp"), - "type": event_types::ITEM_TYPE_MCP_CALL, - "status": if success { "completed" } else { "failed" }, - "approval_request_id": Value::Null, - "arguments": arguments, - "error": error, - "name": tool_name, - "output": output, - "server_label": server_label - }) + // Check if this is a web_search_preview context - if so, build web_search_call format + if is_web_search { + // Build web_search_call item (MVP - status only, no results) + if success { + crate::routers::openai::web_search::build_web_search_call_item() + } else { + crate::routers::openai::web_search::build_web_search_call_item_failed( + error.unwrap_or("Tool execution failed"), + ) + } + } else { + // Regular mcp_call item + json!({ + "id": generate_id("mcp"), + "type": event_types::ITEM_TYPE_MCP_CALL, + "status": if success { "completed" } else { "failed" }, + "approval_request_id": Value::Null, + "arguments": arguments, + "error": error, + "name": tool_name, + "output": output, + "server_label": server_label + }) + } } /// Helper function to build mcp_call items from executed tool calls in conversation history pub(super) fn build_executed_mcp_call_items( conversation_history: &[Value], server_label: &str, + is_web_search: bool, ) -> Vec { let mut mcp_call_items = Vec::new(); @@ -955,6 +998,7 @@ pub(super) fn build_executed_mcp_call_items( } else { None }, + is_web_search, ); mcp_call_items.push(mcp_call_item); } diff --git a/sgl-router/src/routers/openai/responses.rs b/sgl-router/src/routers/openai/responses.rs index c8ea797565c..bbc30e6c8b0 100644 --- a/sgl-router/src/routers/openai/responses.rs +++ b/sgl-router/src/routers/openai/responses.rs @@ -269,24 +269,6 @@ pub(super) fn rewrite_streaming_block( Some(rebuilt_lines.join("\n")) } -/// Transform mcp_call items to web_search_call items if web_search_preview was used -pub(super) fn transform_web_search_output_items(response: &mut Value) { - if let Some(output_array) = response.get_mut("output").and_then(|v| v.as_array_mut()) { - for item in output_array.iter_mut() { - if item.get("type").and_then(|t| t.as_str()) == Some("mcp_call") { - // Check if this mcp_call is from web_search_preview server - let server_label = item.get("server_label").and_then(|s| s.as_str()); - - if server_label == Some("web_search_preview") { - // Transform to web_search_call (minimal, status only) - let ws_item = crate::routers::openai::web_search::mcp_call_to_web_search_call(item); - *item = ws_item; - } - } - } - } -} - /// Mask function tools as MCP tools in response for client pub(super) fn mask_tools_as_mcp(resp: &mut Value, original_body: &ResponsesRequest) { // Check for MCP tool diff --git a/sgl-router/src/routers/openai/router.rs b/sgl-router/src/routers/openai/router.rs index 4e499664d0f..a8b40a67626 100644 --- a/sgl-router/src/routers/openai/router.rs +++ b/sgl-router/src/routers/openai/router.rs @@ -34,7 +34,7 @@ use super::{ McpLoopConfig, }, responses::{ - mask_tools_as_mcp, patch_streaming_response_json, transform_web_search_output_items, + mask_tools_as_mcp, patch_streaming_response_json, }, streaming::handle_streaming_response, utils::{apply_provider_headers, extract_auth_header, probe_endpoint_for_model}, @@ -282,6 +282,7 @@ impl OpenAIRouter { original_body, mcp, &config, + is_web_search, ) .await { @@ -344,7 +345,6 @@ impl OpenAIRouter { } // Patch response with metadata - transform_web_search_output_items(&mut response_json); mask_tools_as_mcp(&mut response_json, original_body); patch_streaming_response_json( &mut response_json, diff --git a/sgl-router/src/routers/openai/streaming.rs b/sgl-router/src/routers/openai/streaming.rs index 7901d78943c..2d6aa7318f3 100644 --- a/sgl-router/src/routers/openai/streaming.rs +++ b/sgl-router/src/routers/openai/streaming.rs @@ -31,7 +31,6 @@ use super::{ }, responses::{ mask_tools_as_mcp, patch_streaming_response_json, rewrite_streaming_block, - transform_web_search_output_items, }, utils::{event_types, FunctionCallInProgress, OutputIndexMapper, StreamAction}, }; @@ -912,6 +911,7 @@ pub(super) fn send_final_response_event( original_request: &ResponsesRequest, previous_response_id: Option<&str>, server_label: &str, + is_web_search: bool, ) -> bool { let mut final_response = match handler.snapshot_final_response() { Some(resp) => resp, @@ -928,10 +928,9 @@ pub(super) fn send_final_response_event( } if let Some(mcp) = active_mcp { - inject_mcp_metadata_streaming(&mut final_response, state, mcp, server_label); + inject_mcp_metadata_streaming(&mut final_response, state, mcp, server_label, is_web_search); } - transform_web_search_output_items(&mut final_response); mask_tools_as_mcp(&mut final_response, original_request); patch_streaming_response_json(&mut final_response, original_request, previous_response_id); @@ -1366,6 +1365,7 @@ pub(super) async fn handle_streaming_with_tool_interception( &original_request, previous_response_id.as_deref(), server_label, + is_web_search, ) { return; } @@ -1387,9 +1387,9 @@ pub(super) async fn handle_streaming_with_tool_interception( &state, &active_mcp_clone, server_label, + is_web_search, ); - transform_web_search_output_items(&mut response_json); mask_tools_as_mcp(&mut response_json, &original_request); patch_streaming_response_json( &mut response_json, @@ -1449,6 +1449,7 @@ pub(super) async fn handle_streaming_with_tool_interception( &mut state, server_label, &mut sequence_number, + is_web_search, ) .await { diff --git a/sgl-router/src/routers/openai/web_search.rs b/sgl-router/src/routers/openai/web_search.rs index f49216e4eca..239be1b8ccb 100644 --- a/sgl-router/src/routers/openai/web_search.rs +++ b/sgl-router/src/routers/openai/web_search.rs @@ -1,13 +1,15 @@ //! Web Search Preview Integration (MVP - Minimal) //! -//! This module handles transformation between OpenAI's web_search_preview format -//! and MCP-based web search tool calls. +//! This module handles the web_search_preview tool type, which provides a simplified +//! interface for web search capabilities via MCP servers. //! -//! MVP Scope: +//! Key responsibilities: //! - Detect web_search_preview tool in requests //! - Check MCP server availability -//! - Transform to/from function calls -//! - Build minimal web_search_call output items (status only) +//! - Build web_search_call output items (status only, MVP) +//! +//! The actual transformation logic (MCP tools → function tools, mcp_call → web_search_call) +//! happens in other modules (prepare_mcp_payload_for_streaming, build_mcp_call_item). //! //! Future: search_context_size, user_location, results exposure @@ -19,7 +21,7 @@ use crate::mcp::McpManager; use crate::protocols::responses::{generate_id, ResponseTool, ResponseToolType}; // ============================================================================ -// Tool Detection & Transformation +// Tool Detection // ============================================================================ /// Detect if request has web_search_preview tool @@ -37,27 +39,6 @@ pub async fn is_web_search_mcp_available(mcp_manager: &Arc) -> bool .is_some() } -/// Transform web_search_preview tool to MCP function tools -/// Returns function tools from the "web_search_preview" MCP server -pub fn transform_web_search_to_mcp_functions(mcp_manager: &Arc) -> Vec { - // Get tools from inventory with server names - // Returns Vec<(tool_name, server_name, Tool)> - let tools = mcp_manager.inventory().list_tools(); - - tools - .iter() - .filter(|(_, server_name, _)| server_name == "web_search_preview") - .map(|(_, _, tool)| { - json!({ - "type": "function", - "name": tool.name, - "description": tool.description, - "parameters": tool.input_schema.clone() - }) - }) - .collect() -} - // ============================================================================ // Output Item Builders (MVP - Status Only) // ============================================================================ @@ -90,26 +71,6 @@ pub fn build_web_search_call_item_failed(error: &str) -> Value { }) } -/// Convert mcp_call item to web_search_call item (MVP - minimal) -pub fn mcp_call_to_web_search_call(mcp_call_item: &Value) -> Value { - let status = mcp_call_item - .get("status") - .and_then(|v| v.as_str()) - .unwrap_or("completed"); - - if status != "completed" { - // Return failed web_search_call - let error = mcp_call_item - .get("error") - .and_then(|v| v.as_str()) - .unwrap_or("Unknown error"); - return build_web_search_call_item_failed(error); - } - - // Return successful web_search_call (status only, no results) - build_web_search_call_item() -} - // ============================================================================ // Tests // ============================================================================ @@ -164,34 +125,4 @@ mod tests { assert_eq!(item["error"], "Test error"); assert_eq!(item["action"]["type"], "search"); } - - #[test] - fn test_mcp_call_to_web_search_call_success() { - let mcp_call = json!({ - "type": "mcp_call", - "status": "completed", - "server_label": "web_search_preview", - "output": "search results here" - }); - - let ws_call = mcp_call_to_web_search_call(&mcp_call); - assert_eq!(ws_call["type"], "web_search_call"); - assert_eq!(ws_call["status"], "completed"); - assert!(ws_call.get("results").is_none()); // MVP: no results - } - - #[test] - fn test_mcp_call_to_web_search_call_failed() { - let mcp_call = json!({ - "type": "mcp_call", - "status": "failed", - "server_label": "web_search_preview", - "error": "Search failed" - }); - - let ws_call = mcp_call_to_web_search_call(&mcp_call); - assert_eq!(ws_call["type"], "web_search_call"); - assert_eq!(ws_call["status"], "failed"); - assert_eq!(ws_call["error"], "Search failed"); - } } From 757582ad1e2afc6d7de64c18c7ee3fec9558376e Mon Sep 17 00:00:00 2001 From: key4ng Date: Tue, 28 Oct 2025 18:16:51 +0000 Subject: [PATCH 05/15] clean code --- sgl-router/src/routers/openai/utils.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sgl-router/src/routers/openai/utils.rs b/sgl-router/src/routers/openai/utils.rs index dcbba7ba98c..aa1a80b259e 100644 --- a/sgl-router/src/routers/openai/utils.rs +++ b/sgl-router/src/routers/openai/utils.rs @@ -32,17 +32,12 @@ pub(crate) mod event_types { pub const MCP_LIST_TOOLS_IN_PROGRESS: &str = "response.mcp_list_tools.in_progress"; pub const MCP_LIST_TOOLS_COMPLETED: &str = "response.mcp_list_tools.completed"; - // Web search events - pub const WEB_SEARCH_CALL_IN_PROGRESS: &str = "response.web_search_call.in_progress"; - pub const WEB_SEARCH_CALL_COMPLETED: &str = "response.web_search_call.completed"; - // Item types pub const ITEM_TYPE_FUNCTION_CALL: &str = "function_call"; pub const ITEM_TYPE_FUNCTION_TOOL_CALL: &str = "function_tool_call"; pub const ITEM_TYPE_MCP_CALL: &str = "mcp_call"; pub const ITEM_TYPE_FUNCTION: &str = "function"; pub const ITEM_TYPE_MCP_LIST_TOOLS: &str = "mcp_list_tools"; - pub const ITEM_TYPE_WEB_SEARCH_CALL: &str = "web_search_call"; } // ============================================================================ From 09168c8a0c841e3086575ee60f9a6127927e828f Mon Sep 17 00:00:00 2001 From: key4ng Date: Tue, 28 Oct 2025 20:49:50 +0000 Subject: [PATCH 06/15] refactoring the code for better readability --- .../examples/mcp_config_web_search.yaml | 2 +- sgl-router/src/routers/openai/mcp.rs | 104 ++++++++++---- sgl-router/src/routers/openai/mod.rs | 1 - sgl-router/src/routers/openai/responses.rs | 2 +- sgl-router/src/routers/openai/router.rs | 32 +++-- sgl-router/src/routers/openai/streaming.rs | 20 +-- sgl-router/src/routers/openai/utils.rs | 42 ++++++ sgl-router/src/routers/openai/web_search.rs | 128 ------------------ 8 files changed, 146 insertions(+), 185 deletions(-) delete mode 100644 sgl-router/src/routers/openai/web_search.rs diff --git a/sgl-router/examples/mcp_config_web_search.yaml b/sgl-router/examples/mcp_config_web_search.yaml index 4f572cc42a3..acc18f1d76a 100644 --- a/sgl-router/examples/mcp_config_web_search.yaml +++ b/sgl-router/examples/mcp_config_web_search.yaml @@ -34,7 +34,7 @@ servers: protocol: sse url: http://localhost:8001/sse # token: "${WEB_SEARCH_MCP_TOKEN}" - required: true # Don't fail startup if unavailable + required: false # Don't fail startup if unavailable # Example: Additional MCP server # - name: "another-mcp-server" diff --git a/sgl-router/src/routers/openai/mcp.rs b/sgl-router/src/routers/openai/mcp.rs index d044634cc4e..c31814ec5eb 100644 --- a/sgl-router/src/routers/openai/mcp.rs +++ b/sgl-router/src/routers/openai/mcp.rs @@ -3,10 +3,11 @@ //! This module contains all MCP-related functionality for the OpenAI router: //! - Tool loop state management for multi-turn tool calling //! - MCP tool execution and result handling -//! - Output item builders for MCP-specific response formats +//! - Output item builders for MCP-specific response formats (including web_search_call) //! - SSE event generation for streaming MCP operations //! - Payload transformation for MCP tool interception //! - Metadata injection for MCP operations +//! - Web search preview tool handling (simplified MCP interface) use std::{io, sync::Arc}; @@ -16,7 +17,7 @@ use serde_json::{json, to_value, Value}; use tokio::sync::mpsc; use tracing::{debug, info, warn}; -use super::utils::event_types; +use super::utils::{event_types, web_search_constants, ToolContext}; use crate::{ mcp, protocols::responses::{ @@ -202,7 +203,7 @@ pub(super) async fn execute_streaming_tool_calls( state: &mut ToolLoopState, server_label: &str, sequence_number: &mut u64, - is_web_search: bool, + tool_context: ToolContext, ) -> bool { // Execute all pending tool calls (sequential, as PR3 is skipped) for call in pending_calls { @@ -259,7 +260,7 @@ pub(super) async fn execute_streaming_tool_calls( success, error_msg.as_deref(), sequence_number, - is_web_search, + tool_context, ) { // Client disconnected, no point continuing tool execution return false; @@ -279,7 +280,7 @@ pub(super) async fn execute_streaming_tool_calls( pub(super) fn prepare_mcp_payload_for_streaming( payload: &mut Value, active_mcp: &Arc, - is_web_search: bool, + tool_context: ToolContext, ) { if let Some(obj) = payload.as_object_mut() { // Remove any non-function tools from outgoing payload @@ -302,7 +303,7 @@ pub(super) fn prepare_mcp_payload_for_streaming( let tools = active_mcp.inventory().list_tools(); // Filter tools based on context - let filtered_tools: Vec<_> = if is_web_search { + let filtered_tools: Vec<_> = if tool_context.is_web_search() { // Only include tools from web_search_preview server tools.into_iter() .filter(|(_, server_name, _)| server_name == "web_search_preview") @@ -491,7 +492,7 @@ pub(super) fn send_mcp_call_completion_events_with_error( success: bool, error_msg: Option<&str>, sequence_number: &mut u64, - is_web_search: bool, + tool_context: ToolContext, ) -> bool { let effective_output_index = call.effective_output_index(); @@ -503,7 +504,7 @@ pub(super) fn send_mcp_call_completion_events_with_error( server_label, success, error_msg, - is_web_search, + tool_context, ); // Get the mcp_call item_id @@ -557,7 +558,7 @@ pub(super) fn inject_mcp_metadata_streaming( state: &ToolLoopState, mcp: &Arc, server_label: &str, - is_web_search: bool, + tool_context: ToolContext, ) { if let Some(output_array) = response.get_mut("output").and_then(|v| v.as_array_mut()) { output_array.retain(|item| { @@ -567,14 +568,14 @@ pub(super) fn inject_mcp_metadata_streaming( let mut insert_pos = 0; // Only add mcp_list_tools for non-web-search cases - if !is_web_search { + if !tool_context.is_web_search() { let list_tools_item = build_mcp_list_tools_item(mcp, server_label); output_array.insert(0, list_tools_item); insert_pos = 1; } let mcp_call_items = - build_executed_mcp_call_items(&state.conversation_history, server_label, is_web_search); + build_executed_mcp_call_items(&state.conversation_history, server_label, tool_context); for item in mcp_call_items { output_array.insert(insert_pos, item); insert_pos += 1; @@ -583,14 +584,14 @@ pub(super) fn inject_mcp_metadata_streaming( let mut output_items = Vec::new(); // Only add mcp_list_tools for non-web-search cases - if !is_web_search { + if !tool_context.is_web_search() { output_items.push(build_mcp_list_tools_item(mcp, server_label)); } output_items.extend(build_executed_mcp_call_items( &state.conversation_history, server_label, - is_web_search, + tool_context, )); obj.insert("output".to_string(), Value::Array(output_items)); } @@ -609,7 +610,7 @@ pub(super) async fn execute_tool_loop( original_body: &ResponsesRequest, active_mcp: &Arc, config: &McpLoopConfig, - is_web_search: bool, + tool_context: ToolContext, ) -> Result { let mut state = ToolLoopState::new(original_body.input.clone()); @@ -690,7 +691,7 @@ pub(super) async fn execute_tool_loop( "max_tool_calls", active_mcp, original_body, - is_web_search, + tool_context, ); } @@ -757,7 +758,7 @@ pub(super) async fn execute_tool_loop( let mut insert_pos = 0; // Only add mcp_list_tools for non-web-search cases - if !is_web_search { + if !tool_context.is_web_search() { let list_tools_item = build_mcp_list_tools_item(active_mcp, server_label); output_array.insert(0, list_tools_item); insert_pos = 1; @@ -765,7 +766,7 @@ pub(super) async fn execute_tool_loop( // Build mcp_call items (will be web_search_call for web search tools) let mcp_call_items = - build_executed_mcp_call_items(&state.conversation_history, server_label, is_web_search); + build_executed_mcp_call_items(&state.conversation_history, server_label, tool_context); // Insert call items after mcp_list_tools (if present) for item in mcp_call_items { @@ -787,7 +788,7 @@ pub(super) fn build_incomplete_response( reason: &str, active_mcp: &Arc, original_body: &ResponsesRequest, - is_web_search: bool, + tool_context: ToolContext, ) -> Result { let obj = response .as_object_mut() @@ -836,7 +837,7 @@ pub(super) fn build_incomplete_response( server_label, false, // Not successful Some("Not executed - response stopped due to limit"), - is_web_search, + tool_context, ); mcp_call_items.push(mcp_call_item); } @@ -847,7 +848,7 @@ pub(super) fn build_incomplete_response( let mut insert_pos = 0; // Only add mcp_list_tools for non-web-search cases - if !is_web_search { + if !tool_context.is_web_search() { let list_tools_item = build_mcp_list_tools_item(active_mcp, server_label); output_array.insert(0, list_tools_item); insert_pos = 1; @@ -855,7 +856,7 @@ pub(super) fn build_incomplete_response( // Add mcp_call items for executed calls (will be web_search_call for web search) let executed_items = - build_executed_mcp_call_items(&state.conversation_history, server_label, is_web_search); + build_executed_mcp_call_items(&state.conversation_history, server_label, tool_context); for item in executed_items { output_array.insert(insert_pos, item); @@ -890,6 +891,53 @@ pub(super) fn build_incomplete_response( Ok(response) } +// ============================================================================ +// Web Search Preview Helpers +// ============================================================================ + +/// Detect if request has web_search_preview tool +pub(super) fn has_web_search_preview_tool(tools: &[ResponseTool]) -> bool { + tools + .iter() + .any(|t| matches!(t.r#type, ResponseToolType::WebSearchPreview)) +} + +/// Check if MCP server "web_search_preview" is available +pub(super) async fn is_web_search_mcp_available(mcp_manager: &Arc) -> bool { + mcp_manager + .get_client(web_search_constants::WEB_SEARCH_PREVIEW_SERVER_NAME) + .await + .is_some() +} + +/// Build a web_search_call output item (MVP - status only) +/// +/// The MCP search results are passed to the LLM internally via function_call_output, +/// but we don't expose them in the web_search_call item to the client. +fn build_web_search_call_item() -> Value { + json!({ + "id": generate_id("ws"), + "type": event_types::ITEM_TYPE_WEB_SEARCH_CALL, + "status": web_search_constants::STATUS_COMPLETED, + "action": { + "type": web_search_constants::ACTION_TYPE_SEARCH + } + }) +} + +/// Build a failed web_search_call output item +fn build_web_search_call_item_failed(error: &str) -> Value { + json!({ + "id": generate_id("ws"), + "type": event_types::ITEM_TYPE_WEB_SEARCH_CALL, + "status": web_search_constants::STATUS_FAILED, + "action": { + "type": web_search_constants::ACTION_TYPE_SEARCH + }, + "error": error + }) +} + // ============================================================================ // Output Item Builders // ============================================================================ @@ -927,17 +975,15 @@ pub(super) fn build_mcp_call_item( server_label: &str, success: bool, error: Option<&str>, - is_web_search: bool, + tool_context: ToolContext, ) -> Value { // Check if this is a web_search_preview context - if so, build web_search_call format - if is_web_search { + if tool_context.is_web_search() { // Build web_search_call item (MVP - status only, no results) if success { - crate::routers::openai::web_search::build_web_search_call_item() + build_web_search_call_item() } else { - crate::routers::openai::web_search::build_web_search_call_item_failed( - error.unwrap_or("Tool execution failed"), - ) + build_web_search_call_item_failed(error.unwrap_or("Tool execution failed")) } } else { // Regular mcp_call item @@ -959,7 +1005,7 @@ pub(super) fn build_mcp_call_item( pub(super) fn build_executed_mcp_call_items( conversation_history: &[Value], server_label: &str, - is_web_search: bool, + tool_context: ToolContext, ) -> Vec { let mut mcp_call_items = Vec::new(); @@ -998,7 +1044,7 @@ pub(super) fn build_executed_mcp_call_items( } else { None }, - is_web_search, + tool_context, ); mcp_call_items.push(mcp_call_item); } diff --git a/sgl-router/src/routers/openai/mod.rs b/sgl-router/src/routers/openai/mod.rs index a845d312ac3..e01af396580 100644 --- a/sgl-router/src/routers/openai/mod.rs +++ b/sgl-router/src/routers/openai/mod.rs @@ -13,7 +13,6 @@ mod responses; mod router; mod streaming; mod utils; -pub(crate) mod web_search; // Re-export the main router type for external use pub use router::OpenAIRouter; diff --git a/sgl-router/src/routers/openai/responses.rs b/sgl-router/src/routers/openai/responses.rs index bbc30e6c8b0..7fe3a262956 100644 --- a/sgl-router/src/routers/openai/responses.rs +++ b/sgl-router/src/routers/openai/responses.rs @@ -282,7 +282,7 @@ pub(super) fn mask_tools_as_mcp(resp: &mut Value, original_body: &ResponsesReque let has_web_search = original_body .tools .as_ref() - .map(|tools| crate::routers::openai::web_search::has_web_search_preview_tool(tools)) + .map(|tools| crate::routers::openai::mcp::has_web_search_preview_tool(tools)) .unwrap_or(false); // If neither MCP nor web_search_preview, return early diff --git a/sgl-router/src/routers/openai/router.rs b/sgl-router/src/routers/openai/router.rs index a8b40a67626..c55b737ff0b 100644 --- a/sgl-router/src/routers/openai/router.rs +++ b/sgl-router/src/routers/openai/router.rs @@ -30,14 +30,14 @@ use super::conversations::{ }; use super::{ mcp::{ - ensure_request_mcp_client, execute_tool_loop, prepare_mcp_payload_for_streaming, - McpLoopConfig, + ensure_request_mcp_client, execute_tool_loop, has_web_search_preview_tool, + is_web_search_mcp_available, prepare_mcp_payload_for_streaming, McpLoopConfig, }, responses::{ mask_tools_as_mcp, patch_streaming_response_json, }, streaming::handle_streaming_response, - utils::{apply_provider_headers, extract_auth_header, probe_endpoint_for_model}, + utils::{apply_provider_headers, extract_auth_header, probe_endpoint_for_model, ToolContext}, }; use crate::{ core::{CircuitBreaker, CircuitBreakerConfig as CoreCircuitBreakerConfig}, @@ -250,7 +250,7 @@ impl OpenAIRouter { mut payload: Value, original_body: &ResponsesRequest, original_previous_response_id: Option, - is_web_search: bool, + tool_context: ToolContext, ) -> Response { // Check if MCP is active for this request // Ensure dynamic client is created if needed @@ -272,7 +272,7 @@ impl OpenAIRouter { let config = McpLoopConfig::default(); // Transform MCP tools to function tools - prepare_mcp_payload_for_streaming(&mut payload, mcp, is_web_search); + prepare_mcp_payload_for_streaming(&mut payload, mcp, tool_context); match execute_tool_loop( &self.client, @@ -282,7 +282,7 @@ impl OpenAIRouter { original_body, mcp, &config, - is_web_search, + tool_context, ) .await { @@ -700,17 +700,19 @@ impl crate::routers::RouterTrait for OpenAIRouter { let url = format!("{}/v1/responses", base_url); // Detect web_search_preview tool and verify MCP server availability - let has_web_search = if let Some(ref tools) = body.tools { - crate::routers::openai::web_search::has_web_search_preview_tool(tools) + let tool_context = if let Some(ref tools) = body.tools { + if has_web_search_preview_tool(tools) { + ToolContext::WebSearchPreview + } else { + ToolContext::Regular + } } else { - false + ToolContext::Regular }; - if has_web_search { + if tool_context.is_web_search() { // Check if web_search_preview MCP server is available - if !crate::routers::openai::web_search::is_web_search_mcp_available(&self.mcp_manager) - .await - { + if !is_web_search_mcp_available(&self.mcp_manager).await { return ( StatusCode::BAD_REQUEST, Json(json!({ @@ -993,7 +995,7 @@ impl crate::routers::RouterTrait for OpenAIRouter { payload, body, original_previous_response_id, - has_web_search, + tool_context, ) .await } else { @@ -1003,7 +1005,7 @@ impl crate::routers::RouterTrait for OpenAIRouter { payload, body, original_previous_response_id, - has_web_search, + tool_context, ) .await } diff --git a/sgl-router/src/routers/openai/streaming.rs b/sgl-router/src/routers/openai/streaming.rs index 2d6aa7318f3..c2f4226cc58 100644 --- a/sgl-router/src/routers/openai/streaming.rs +++ b/sgl-router/src/routers/openai/streaming.rs @@ -32,7 +32,7 @@ use super::{ responses::{ mask_tools_as_mcp, patch_streaming_response_json, rewrite_streaming_block, }, - utils::{event_types, FunctionCallInProgress, OutputIndexMapper, StreamAction}, + utils::{event_types, FunctionCallInProgress, OutputIndexMapper, StreamAction, ToolContext}, }; use crate::{ data_connector::{ConversationItemStorage, ConversationStorage, ResponseStorage}, @@ -911,7 +911,7 @@ pub(super) fn send_final_response_event( original_request: &ResponsesRequest, previous_response_id: Option<&str>, server_label: &str, - is_web_search: bool, + tool_context: ToolContext, ) -> bool { let mut final_response = match handler.snapshot_final_response() { Some(resp) => resp, @@ -928,7 +928,7 @@ pub(super) fn send_final_response_event( } if let Some(mcp) = active_mcp { - inject_mcp_metadata_streaming(&mut final_response, state, mcp, server_label, is_web_search); + inject_mcp_metadata_streaming(&mut final_response, state, mcp, server_label, tool_context); } mask_tools_as_mcp(&mut final_response, original_request); @@ -1140,10 +1140,10 @@ pub(super) async fn handle_streaming_with_tool_interception( original_body: &ResponsesRequest, original_previous_response_id: Option, active_mcp: &Arc, - is_web_search: bool, + tool_context: ToolContext, ) -> Response { // Transform MCP tools to function tools in payload - prepare_mcp_payload_for_streaming(&mut payload, active_mcp, is_web_search); + prepare_mcp_payload_for_streaming(&mut payload, active_mcp, tool_context); let (tx, rx) = mpsc::unbounded_channel::>(); let should_store = original_body.store.unwrap_or(false); @@ -1365,7 +1365,7 @@ pub(super) async fn handle_streaming_with_tool_interception( &original_request, previous_response_id.as_deref(), server_label, - is_web_search, + tool_context, ) { return; } @@ -1387,7 +1387,7 @@ pub(super) async fn handle_streaming_with_tool_interception( &state, &active_mcp_clone, server_label, - is_web_search, + tool_context, ); mask_tools_as_mcp(&mut response_json, &original_request); @@ -1449,7 +1449,7 @@ pub(super) async fn handle_streaming_with_tool_interception( &mut state, server_label, &mut sequence_number, - is_web_search, + tool_context, ) .await { @@ -1505,7 +1505,7 @@ pub(super) async fn handle_streaming_response( payload: Value, original_body: &ResponsesRequest, original_previous_response_id: Option, - is_web_search: bool, + tool_context: ToolContext, ) -> Response { // Check if MCP is active for this request // Ensure dynamic client is created if needed @@ -1553,7 +1553,7 @@ pub(super) async fn handle_streaming_response( original_body, original_previous_response_id, active_mcp, - is_web_search, + tool_context, ) .await } diff --git a/sgl-router/src/routers/openai/utils.rs b/sgl-router/src/routers/openai/utils.rs index aa1a80b259e..53d9f237a82 100644 --- a/sgl-router/src/routers/openai/utils.rs +++ b/sgl-router/src/routers/openai/utils.rs @@ -38,6 +38,48 @@ pub(crate) mod event_types { pub const ITEM_TYPE_MCP_CALL: &str = "mcp_call"; pub const ITEM_TYPE_FUNCTION: &str = "function"; pub const ITEM_TYPE_MCP_LIST_TOOLS: &str = "mcp_list_tools"; + pub const ITEM_TYPE_WEB_SEARCH_CALL: &str = "web_search_call"; +} + +// ============================================================================ +// Web Search Constants +// ============================================================================ + +/// Constants for web search preview feature +pub(crate) mod web_search_constants { + /// MCP server name for web search preview + pub const WEB_SEARCH_PREVIEW_SERVER_NAME: &str = "web_search_preview"; + + /// Status constants + pub const STATUS_COMPLETED: &str = "completed"; + pub const STATUS_FAILED: &str = "failed"; + + /// Action type for web search + pub const ACTION_TYPE_SEARCH: &str = "search"; +} + +// ============================================================================ +// Tool Context Enum +// ============================================================================ + +/// Represents the context for tool handling strategy +/// +/// This enum replaces boolean flags for better type safety and clarity. +/// It makes the code more maintainable and easier to extend with new +/// tool handling strategies in the future. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum ToolContext { + /// Regular MCP tool handling with full mcp_call and mcp_list_tools items + Regular, + /// Web search preview handling with simplified web_search_call items + WebSearchPreview, +} + +impl ToolContext { + /// Check if this is web search preview context + pub fn is_web_search(&self) -> bool { + matches!(self, ToolContext::WebSearchPreview) + } } // ============================================================================ diff --git a/sgl-router/src/routers/openai/web_search.rs b/sgl-router/src/routers/openai/web_search.rs deleted file mode 100644 index 239be1b8ccb..00000000000 --- a/sgl-router/src/routers/openai/web_search.rs +++ /dev/null @@ -1,128 +0,0 @@ -//! Web Search Preview Integration (MVP - Minimal) -//! -//! This module handles the web_search_preview tool type, which provides a simplified -//! interface for web search capabilities via MCP servers. -//! -//! Key responsibilities: -//! - Detect web_search_preview tool in requests -//! - Check MCP server availability -//! - Build web_search_call output items (status only, MVP) -//! -//! The actual transformation logic (MCP tools → function tools, mcp_call → web_search_call) -//! happens in other modules (prepare_mcp_payload_for_streaming, build_mcp_call_item). -//! -//! Future: search_context_size, user_location, results exposure - -use std::sync::Arc; - -use serde_json::{json, Value}; - -use crate::mcp::McpManager; -use crate::protocols::responses::{generate_id, ResponseTool, ResponseToolType}; - -// ============================================================================ -// Tool Detection -// ============================================================================ - -/// Detect if request has web_search_preview tool -pub fn has_web_search_preview_tool(tools: &[ResponseTool]) -> bool { - tools - .iter() - .any(|t| matches!(t.r#type, ResponseToolType::WebSearchPreview)) -} - -/// Check if MCP server "web_search_preview" is available -pub async fn is_web_search_mcp_available(mcp_manager: &Arc) -> bool { - mcp_manager - .get_client("web_search_preview") - .await - .is_some() -} - -// ============================================================================ -// Output Item Builders (MVP - Status Only) -// ============================================================================ - -/// Build a web_search_call output item (MVP - status only) -/// -/// The MCP search results are passed to the LLM internally via function_call_output, -/// but we don't expose them in the web_search_call item to the client. -pub fn build_web_search_call_item() -> Value { - json!({ - "id": generate_id("ws"), - "type": "web_search_call", - "status": "completed", - "action": { - "type": "search" - } - }) -} - -/// Build a failed web_search_call output item -pub fn build_web_search_call_item_failed(error: &str) -> Value { - json!({ - "id": generate_id("ws"), - "type": "web_search_call", - "status": "failed", - "action": { - "type": "search" - }, - "error": error - }) -} - -// ============================================================================ -// Tests -// ============================================================================ - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_has_web_search_preview_tool() { - let tools = vec![ResponseTool { - r#type: ResponseToolType::WebSearchPreview, - server_url: None, - authorization: None, - server_label: None, - server_description: None, - require_approval: None, - allowed_tools: None, - }]; - assert!(has_web_search_preview_tool(&tools)); - - let empty_tools: Vec = vec![]; - assert!(!has_web_search_preview_tool(&empty_tools)); - - let other_tools = vec![ResponseTool { - r#type: ResponseToolType::Mcp, - server_url: Some("http://example.com".to_string()), - authorization: None, - server_label: None, - server_description: None, - require_approval: None, - allowed_tools: None, - }]; - assert!(!has_web_search_preview_tool(&other_tools)); - } - - #[test] - fn test_build_web_search_call_item() { - let item = build_web_search_call_item(); - assert_eq!(item["type"], "web_search_call"); - assert_eq!(item["status"], "completed"); - assert_eq!(item["action"]["type"], "search"); - assert!(item.get("results").is_none()); // No results in MVP - assert!(item.get("id").is_some()); - } - - #[test] - fn test_build_web_search_call_item_failed() { - let item = build_web_search_call_item_failed("Test error"); - assert_eq!(item["type"], "web_search_call"); - assert_eq!(item["status"], "failed"); - assert_eq!(item["error"], "Test error"); - assert_eq!(item["action"]["type"], "search"); - } -} From 3ddb5bbe39959da35d206b6cb80d4e2b13c0eaf0 Mon Sep 17 00:00:00 2001 From: key4ng Date: Tue, 28 Oct 2025 20:59:17 +0000 Subject: [PATCH 07/15] clean code --- .../examples/mcp_config_web_search.yaml | 50 ------------------- sgl-router/src/mcp/manager.rs | 19 ------- 2 files changed, 69 deletions(-) delete mode 100644 sgl-router/examples/mcp_config_web_search.yaml diff --git a/sgl-router/examples/mcp_config_web_search.yaml b/sgl-router/examples/mcp_config_web_search.yaml deleted file mode 100644 index acc18f1d76a..00000000000 --- a/sgl-router/examples/mcp_config_web_search.yaml +++ /dev/null @@ -1,50 +0,0 @@ -# Example MCP Configuration with Web Search Preview -# -# This configuration demonstrates how to set up the web_search_preview tool -# for the OpenAI router. -# -# Usage: -# export SGLANG_MCP_CONFIG=/path/to/this/mcp_config_web_search.yaml -# export WEB_SEARCH_MCP_URL="https://your-search-server.com/sse" -# export WEB_SEARCH_MCP_TOKEN="your-token" - -# Global proxy configuration (optional) -# proxy: -# http: "http://proxy:8080" -# https: "http://proxy:8080" -# no_proxy: "localhost,127.0.0.1,*.internal" - -# Connection pool settings -pool: - max_connections: 100 - idle_timeout: 300 - -# Tool inventory settings -inventory: - enable_refresh: true - tool_ttl: 300 # 5 minutes - refresh_interval: 60 # 1 minute - refresh_on_error: true - -# MCP Servers -servers: - # Web Search Preview Server - # This server provides web search capabilities to the LLM - - name: "web_search_preview" - protocol: sse - url: http://localhost:8001/sse - # token: "${WEB_SEARCH_MCP_TOKEN}" - required: false # Don't fail startup if unavailable - - # Example: Additional MCP server - # - name: "another-mcp-server" - # protocol: stdio - # command: "node" - # args: ["./mcp-server.js"] - # required: false - -# Pre-warm connections at startup (optional) -# warmup: -# - url: "${WEB_SEARCH_MCP_URL}" -# label: "web_search_preview" -# token: "${WEB_SEARCH_MCP_TOKEN}" diff --git a/sgl-router/src/mcp/manager.rs b/sgl-router/src/mcp/manager.rs index ca23fb4c9c4..f6f5ca8ea93 100644 --- a/sgl-router/src/mcp/manager.rs +++ b/sgl-router/src/mcp/manager.rs @@ -238,23 +238,11 @@ impl McpManager { // Convert args with type coercion based on schema let tool_schema = Some(serde_json::Value::Object((*tool_info.input_schema).clone())); - debug!( - "Tool '{}' schema: {}", - tool_name, - serde_json::to_string_pretty(&tool_schema).unwrap_or_default() - ); - let args_map = args .into() .into_map(tool_schema.as_ref()) .map_err(McpError::InvalidArguments)?; - debug!( - "Tool '{}' arguments after type coercion: {}", - tool_name, - serde_json::to_string_pretty(&args_map).unwrap_or_default() - ); - // Get client for that server let client = self .get_client(&server_name) @@ -267,13 +255,6 @@ impl McpManager { arguments: args_map, }; - debug!( - "Sending MCP request to server '{}': tool={}, args={}", - server_name, - tool_name, - serde_json::to_string(&request.arguments).unwrap_or_default() - ); - client .call_tool(request) .await From b36c82ccf844186cbcfbfc158fca7c8d3c61448d Mon Sep 17 00:00:00 2001 From: key4ng Date: Tue, 28 Oct 2025 21:10:32 +0000 Subject: [PATCH 08/15] fix clippy --- sgl-router/src/routers/openai/mcp.rs | 30 +++++++++++++++------- sgl-router/src/routers/openai/router.rs | 10 ++++---- sgl-router/src/routers/openai/streaming.rs | 4 +-- 3 files changed, 27 insertions(+), 17 deletions(-) diff --git a/sgl-router/src/routers/openai/mcp.rs b/sgl-router/src/routers/openai/mcp.rs index c31814ec5eb..9f06e221b95 100644 --- a/sgl-router/src/routers/openai/mcp.rs +++ b/sgl-router/src/routers/openai/mcp.rs @@ -37,11 +37,16 @@ pub(crate) struct McpLoopConfig { /// Maximum iterations as safety limit (internal only, default: 10) /// Prevents infinite loops when max_tool_calls is not set pub max_iterations: usize, + /// Tool context for handling web_search_preview vs regular tools + pub tool_context: ToolContext, } impl Default for McpLoopConfig { fn default() -> Self { - Self { max_iterations: 10 } + Self { + max_iterations: 10, + tool_context: ToolContext::Regular, + } } } @@ -305,7 +310,8 @@ pub(super) fn prepare_mcp_payload_for_streaming( // Filter tools based on context let filtered_tools: Vec<_> = if tool_context.is_web_search() { // Only include tools from web_search_preview server - tools.into_iter() + tools + .into_iter() .filter(|(_, server_name, _)| server_name == "web_search_preview") .collect() } else { @@ -484,6 +490,7 @@ pub(super) fn send_mcp_list_tools_events( /// Send mcp_call completion events after tool execution /// Returns false if client disconnected +#[allow(clippy::too_many_arguments)] pub(super) fn send_mcp_call_completion_events_with_error( tx: &mpsc::UnboundedSender>, call: &FunctionCallInProgress, @@ -610,7 +617,6 @@ pub(super) async fn execute_tool_loop( original_body: &ResponsesRequest, active_mcp: &Arc, config: &McpLoopConfig, - tool_context: ToolContext, ) -> Result { let mut state = ToolLoopState::new(original_body.input.clone()); @@ -691,7 +697,7 @@ pub(super) async fn execute_tool_loop( "max_tool_calls", active_mcp, original_body, - tool_context, + config.tool_context, ); } @@ -758,15 +764,18 @@ pub(super) async fn execute_tool_loop( let mut insert_pos = 0; // Only add mcp_list_tools for non-web-search cases - if !tool_context.is_web_search() { + if !config.tool_context.is_web_search() { let list_tools_item = build_mcp_list_tools_item(active_mcp, server_label); output_array.insert(0, list_tools_item); insert_pos = 1; } // Build mcp_call items (will be web_search_call for web search tools) - let mcp_call_items = - build_executed_mcp_call_items(&state.conversation_history, server_label, tool_context); + let mcp_call_items = build_executed_mcp_call_items( + &state.conversation_history, + server_label, + config.tool_context, + ); // Insert call items after mcp_list_tools (if present) for item in mcp_call_items { @@ -855,8 +864,11 @@ pub(super) fn build_incomplete_response( } // Add mcp_call items for executed calls (will be web_search_call for web search) - let executed_items = - build_executed_mcp_call_items(&state.conversation_history, server_label, tool_context); + let executed_items = build_executed_mcp_call_items( + &state.conversation_history, + server_label, + tool_context, + ); for item in executed_items { output_array.insert(insert_pos, item); diff --git a/sgl-router/src/routers/openai/router.rs b/sgl-router/src/routers/openai/router.rs index c55b737ff0b..22ebdf2e21a 100644 --- a/sgl-router/src/routers/openai/router.rs +++ b/sgl-router/src/routers/openai/router.rs @@ -33,9 +33,7 @@ use super::{ ensure_request_mcp_client, execute_tool_loop, has_web_search_preview_tool, is_web_search_mcp_available, prepare_mcp_payload_for_streaming, McpLoopConfig, }, - responses::{ - mask_tools_as_mcp, patch_streaming_response_json, - }, + responses::{mask_tools_as_mcp, patch_streaming_response_json}, streaming::handle_streaming_response, utils::{apply_provider_headers, extract_auth_header, probe_endpoint_for_model, ToolContext}, }; @@ -269,7 +267,10 @@ impl OpenAIRouter { // If MCP is active, execute tool loop if let Some(mcp) = active_mcp { - let config = McpLoopConfig::default(); + let config = McpLoopConfig { + tool_context, + ..Default::default() + }; // Transform MCP tools to function tools prepare_mcp_payload_for_streaming(&mut payload, mcp, tool_context); @@ -282,7 +283,6 @@ impl OpenAIRouter { original_body, mcp, &config, - tool_context, ) .await { diff --git a/sgl-router/src/routers/openai/streaming.rs b/sgl-router/src/routers/openai/streaming.rs index c2f4226cc58..ede0a81cf50 100644 --- a/sgl-router/src/routers/openai/streaming.rs +++ b/sgl-router/src/routers/openai/streaming.rs @@ -29,9 +29,7 @@ use super::{ inject_mcp_metadata_streaming, prepare_mcp_payload_for_streaming, send_mcp_list_tools_events, McpLoopConfig, ToolLoopState, }, - responses::{ - mask_tools_as_mcp, patch_streaming_response_json, rewrite_streaming_block, - }, + responses::{mask_tools_as_mcp, patch_streaming_response_json, rewrite_streaming_block}, utils::{event_types, FunctionCallInProgress, OutputIndexMapper, StreamAction, ToolContext}, }; use crate::{ From 54da29103286b6091341bf27177b9756db95b230 Mon Sep 17 00:00:00 2001 From: key4ng Date: Tue, 28 Oct 2025 21:12:30 +0000 Subject: [PATCH 09/15] remove unrelated changes --- sgl-router/src/mcp/manager.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/sgl-router/src/mcp/manager.rs b/sgl-router/src/mcp/manager.rs index f6f5ca8ea93..1a1bb5511b0 100644 --- a/sgl-router/src/mcp/manager.rs +++ b/sgl-router/src/mcp/manager.rs @@ -237,7 +237,6 @@ impl McpManager { // Convert args with type coercion based on schema let tool_schema = Some(serde_json::Value::Object((*tool_info.input_schema).clone())); - let args_map = args .into() .into_map(tool_schema.as_ref()) From 8286c9a9543ed715a36a5917a880a49fc6ff88a2 Mon Sep 17 00:00:00 2001 From: key4ng Date: Tue, 28 Oct 2025 21:24:18 +0000 Subject: [PATCH 10/15] update error message --- sgl-router/src/routers/openai/mcp.rs | 10 ++++++++++ sgl-router/src/routers/openai/router.rs | 4 ++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/sgl-router/src/routers/openai/mcp.rs b/sgl-router/src/routers/openai/mcp.rs index 9f06e221b95..56e9143f5de 100644 --- a/sgl-router/src/routers/openai/mcp.rs +++ b/sgl-router/src/routers/openai/mcp.rs @@ -164,6 +164,16 @@ pub async fn ensure_request_mcp_client( .server_label .clone() .unwrap_or_else(|| "request-mcp".to_string()); + + // Validate that web_search_preview is not used as it's a reserved name + if name == web_search_constants::WEB_SEARCH_PREVIEW_SERVER_NAME { + warn!( + "Rejecting request MCP with reserved server name: {}", + name + ); + return None; + } + let token = tool.authorization.clone(); let transport = if server_url.contains("/sse") { mcp::McpTransport::Sse { diff --git a/sgl-router/src/routers/openai/router.rs b/sgl-router/src/routers/openai/router.rs index 22ebdf2e21a..aa4f4887d96 100644 --- a/sgl-router/src/routers/openai/router.rs +++ b/sgl-router/src/routers/openai/router.rs @@ -717,10 +717,10 @@ impl crate::routers::RouterTrait for OpenAIRouter { StatusCode::BAD_REQUEST, Json(json!({ "error": { - "message": "Web search preview requested but MCP server 'web_search_preview' is not available. Please configure the MCP server.", + "message": "Web search preview is currently unavailable. Please contact your server administrator.", "type": "invalid_request_error", "param": "tools", - "code": "mcp_server_unavailable" + "code": "web_search_unavailable" } })), ) From a16911acd6ee34486020a38f093daeabb5cfd8ba Mon Sep 17 00:00:00 2001 From: key4ng Date: Tue, 28 Oct 2025 22:01:03 +0000 Subject: [PATCH 11/15] add query field for web search --- sgl-router/src/routers/openai/mcp.rs | 39 +++++++++++++++++++++------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/sgl-router/src/routers/openai/mcp.rs b/sgl-router/src/routers/openai/mcp.rs index 56e9143f5de..6a0009822f7 100644 --- a/sgl-router/src/routers/openai/mcp.rs +++ b/sgl-router/src/routers/openai/mcp.rs @@ -936,26 +936,40 @@ pub(super) async fn is_web_search_mcp_available(mcp_manager: &Arc Value { +fn build_web_search_call_item(query: Option) -> Value { + let mut action = serde_json::Map::new(); + action.insert( + "type".to_string(), + Value::String(web_search_constants::ACTION_TYPE_SEARCH.to_string()), + ); + if let Some(q) = query { + action.insert("query".to_string(), Value::String(q)); + } + json!({ "id": generate_id("ws"), "type": event_types::ITEM_TYPE_WEB_SEARCH_CALL, "status": web_search_constants::STATUS_COMPLETED, - "action": { - "type": web_search_constants::ACTION_TYPE_SEARCH - } + "action": action }) } /// Build a failed web_search_call output item -fn build_web_search_call_item_failed(error: &str) -> Value { +fn build_web_search_call_item_failed(error: &str, query: Option) -> Value { + let mut action = serde_json::Map::new(); + action.insert( + "type".to_string(), + Value::String(web_search_constants::ACTION_TYPE_SEARCH.to_string()), + ); + if let Some(q) = query { + action.insert("query".to_string(), Value::String(q)); + } + json!({ "id": generate_id("ws"), "type": event_types::ITEM_TYPE_WEB_SEARCH_CALL, "status": web_search_constants::STATUS_FAILED, - "action": { - "type": web_search_constants::ACTION_TYPE_SEARCH - }, + "action": action, "error": error }) } @@ -1001,11 +1015,16 @@ pub(super) fn build_mcp_call_item( ) -> Value { // Check if this is a web_search_preview context - if so, build web_search_call format if tool_context.is_web_search() { + // Extract query from arguments for web_search_call + let query = serde_json::from_str::(arguments) + .ok() + .and_then(|v| v.get("query").and_then(|q| q.as_str().map(|s| s.to_string()))); + // Build web_search_call item (MVP - status only, no results) if success { - build_web_search_call_item() + build_web_search_call_item(query) } else { - build_web_search_call_item_failed(error.unwrap_or("Tool execution failed")) + build_web_search_call_item_failed(error.unwrap_or("Tool execution failed"), query) } } else { // Regular mcp_call item From f9e8a03197520e70f42c6040fe15f51f3f95e8f9 Mon Sep 17 00:00:00 2001 From: key4ng Date: Tue, 28 Oct 2025 22:52:42 +0000 Subject: [PATCH 12/15] fix streaming format --- sgl-router/src/routers/openai/mcp.rs | 14 +- sgl-router/src/routers/openai/streaming.rs | 190 ++++++++++++++------- sgl-router/src/routers/openai/utils.rs | 5 + 3 files changed, 148 insertions(+), 61 deletions(-) diff --git a/sgl-router/src/routers/openai/mcp.rs b/sgl-router/src/routers/openai/mcp.rs index 6a0009822f7..eaf4182df26 100644 --- a/sgl-router/src/routers/openai/mcp.rs +++ b/sgl-router/src/routers/openai/mcp.rs @@ -524,15 +524,21 @@ pub(super) fn send_mcp_call_completion_events_with_error( tool_context, ); - // Get the mcp_call item_id + // Get the item_id let item_id = mcp_call_item .get("id") .and_then(|v| v.as_str()) .unwrap_or(""); - // Event 1: response.mcp_call.completed + // Event 1: response.{web_search_call|mcp_call}.completed + let completed_event_type = if tool_context.is_web_search() { + event_types::WEB_SEARCH_CALL_COMPLETED + } else { + event_types::MCP_CALL_COMPLETED + }; + let completed_payload = json!({ - "type": event_types::MCP_CALL_COMPLETED, + "type": completed_event_type, "sequence_number": *sequence_number, "output_index": effective_output_index, "item_id": item_id @@ -541,7 +547,7 @@ pub(super) fn send_mcp_call_completion_events_with_error( let completed_event = format!( "event: {}\ndata: {}\n\n", - event_types::MCP_CALL_COMPLETED, + completed_event_type, completed_payload ); if tx.send(Ok(Bytes::from(completed_event))).is_err() { diff --git a/sgl-router/src/routers/openai/streaming.rs b/sgl-router/src/routers/openai/streaming.rs index ede0a81cf50..d5d87324eb3 100644 --- a/sgl-router/src/routers/openai/streaming.rs +++ b/sgl-router/src/routers/openai/streaming.rs @@ -553,6 +553,7 @@ pub(super) fn apply_event_transformations_inplace( server_label: &str, original_request: &ResponsesRequest, previous_response_id: Option<&str>, + tool_context: ToolContext, ) -> bool { let mut changed = false; @@ -598,23 +599,34 @@ pub(super) fn apply_event_transformations_inplace( // Mask tools from function to MCP format (optimized without cloning) if response_obj.get("tools").is_some() { - let requested_mcp = original_request - .tools - .as_ref() - .map(|tools| { - tools - .iter() - .any(|t| matches!(t.r#type, ResponseToolType::Mcp)) - }) - .unwrap_or(false); - - if requested_mcp { - if let Some(mcp_tools) = build_mcp_tools_value(original_request) { - response_obj.insert("tools".to_string(), mcp_tools); - response_obj - .entry("tool_choice".to_string()) - .or_insert(Value::String("auto".to_string())); - changed = true; + // For web_search_preview, always use simplified tool format + if tool_context.is_web_search() { + let web_search_tool = json!([{"type": "web_search_preview"}]); + response_obj.insert("tools".to_string(), web_search_tool); + response_obj + .entry("tool_choice".to_string()) + .or_insert(Value::String("auto".to_string())); + changed = true; + } else { + // Regular MCP tools - only if requested + let requested_mcp = original_request + .tools + .as_ref() + .map(|tools| { + tools + .iter() + .any(|t| matches!(t.r#type, ResponseToolType::Mcp)) + }) + .unwrap_or(false); + + if requested_mcp { + if let Some(mcp_tools) = build_mcp_tools_value(original_request) { + response_obj.insert("tools".to_string(), mcp_tools); + response_obj + .entry("tool_choice".to_string()) + .or_insert(Value::String("auto".to_string())); + changed = true; + } } } } @@ -629,13 +641,26 @@ pub(super) fn apply_event_transformations_inplace( if item_type == event_types::ITEM_TYPE_FUNCTION_CALL || item_type == event_types::ITEM_TYPE_FUNCTION_TOOL_CALL { - item["type"] = json!(event_types::ITEM_TYPE_MCP_CALL); - item["server_label"] = json!(server_label); + // Use web_search_call for web_search_preview, mcp_call for regular MCP + if tool_context.is_web_search() { + item["type"] = json!(event_types::ITEM_TYPE_WEB_SEARCH_CALL); + // Don't include server_label for web_search_call + // Remove internal implementation fields + if let Some(obj) = item.as_object_mut() { + obj.remove("arguments"); + obj.remove("call_id"); + obj.remove("name"); + } + } else { + item["type"] = json!(event_types::ITEM_TYPE_MCP_CALL); + item["server_label"] = json!(server_label); + } - // Transform ID from fc_* to mcp_* + // Transform ID from fc_* to ws_* or mcp_* if let Some(id) = item.get("id").and_then(|v| v.as_str()) { if let Some(stripped) = id.strip_prefix("fc_") { - let new_id = format!("mcp_{}", stripped); + let prefix = if tool_context.is_web_search() { "ws" } else { "mcp" }; + let new_id = format!("{}_{}", prefix, stripped); item["id"] = json!(new_id); } } @@ -693,6 +718,7 @@ pub(super) fn forward_streaming_event( original_request: &ResponsesRequest, previous_response_id: Option<&str>, sequence_number: &mut u64, + tool_context: ToolContext, ) -> bool { // Skip individual function_call_arguments.delta events - we'll send them as one if event_name == Some(event_types::FUNCTION_CALL_ARGUMENTS_DELTA) { @@ -757,37 +783,40 @@ pub(super) fn forward_streaming_event( }; // Emit a synthetic MCP arguments delta event before the done event - let mut delta_event = json!({ - "type": event_types::MCP_CALL_ARGUMENTS_DELTA, - "sequence_number": *sequence_number, - "output_index": assigned_index, - "item_id": mcp_item_id, - "delta": arguments_value, - }); - - if let Some(obfuscation) = call.last_obfuscation.as_ref() { - if let Some(obj) = delta_event.as_object_mut() { - obj.insert( - "obfuscation".to_string(), - Value::String(obfuscation.clone()), - ); + // Skip for web_search_preview - we don't expose tool call arguments + if !tool_context.is_web_search() { + let mut delta_event = json!({ + "type": event_types::MCP_CALL_ARGUMENTS_DELTA, + "sequence_number": *sequence_number, + "output_index": assigned_index, + "item_id": mcp_item_id, + "delta": arguments_value, + }); + + if let Some(obfuscation) = call.last_obfuscation.as_ref() { + if let Some(obj) = delta_event.as_object_mut() { + obj.insert( + "obfuscation".to_string(), + Value::String(obfuscation.clone()), + ); + } + } else if let Some(obfuscation) = parsed_data.get("obfuscation").cloned() { + if let Some(obj) = delta_event.as_object_mut() { + obj.insert("obfuscation".to_string(), obfuscation); + } } - } else if let Some(obfuscation) = parsed_data.get("obfuscation").cloned() { - if let Some(obj) = delta_event.as_object_mut() { - obj.insert("obfuscation".to_string(), obfuscation); + + let delta_block = format!( + "event: {}\ndata: {}\n\n", + event_types::MCP_CALL_ARGUMENTS_DELTA, + delta_event + ); + if tx.send(Ok(Bytes::from(delta_block))).is_err() { + return false; } - } - let delta_block = format!( - "event: {}\ndata: {}\n\n", - event_types::MCP_CALL_ARGUMENTS_DELTA, - delta_event - ); - if tx.send(Ok(Bytes::from(delta_block))).is_err() { - return false; + *sequence_number += 1; } - - *sequence_number += 1; } } } @@ -813,6 +842,7 @@ pub(super) fn forward_streaming_event( server_label, original_request, previous_response_id, + tool_context, ); if let Some(response_obj) = parsed_data @@ -844,16 +874,23 @@ pub(super) fn forward_streaming_event( let mut final_block = String::new(); if let Some(evt) = event_name { // Update event name for function_call_arguments events - if evt == event_types::FUNCTION_CALL_ARGUMENTS_DELTA { + // Skip for web_search_preview - we don't expose tool call arguments + if evt == event_types::FUNCTION_CALL_ARGUMENTS_DELTA && !tool_context.is_web_search() { final_block.push_str(&format!( "event: {}\n", event_types::MCP_CALL_ARGUMENTS_DELTA )); - } else if evt == event_types::FUNCTION_CALL_ARGUMENTS_DONE { + } else if evt == event_types::FUNCTION_CALL_ARGUMENTS_DONE && !tool_context.is_web_search() { final_block.push_str(&format!( "event: {}\n", event_types::MCP_CALL_ARGUMENTS_DONE )); + } else if evt == event_types::FUNCTION_CALL_ARGUMENTS_DELTA && tool_context.is_web_search() { + // Skip this event entirely for web_search_preview + return true; + } else if evt == event_types::FUNCTION_CALL_ARGUMENTS_DONE && tool_context.is_web_search() { + // Skip this event entirely for web_search_preview + return true; } else { final_block.push_str(&format!("event: {}\n", evt)); } @@ -865,30 +902,63 @@ pub(super) fn forward_streaming_event( return false; } - // After sending output_item.added for mcp_call, inject mcp_call.in_progress event + // After sending output_item.added for mcp_call/web_search_call, inject in_progress event if event_name == Some(event_types::OUTPUT_ITEM_ADDED) { if let Some(item) = parsed_data.get("item") { - if item.get("type").and_then(|v| v.as_str()) == Some(event_types::ITEM_TYPE_MCP_CALL) { - // Already transformed to mcp_call + let item_type = item.get("type").and_then(|v| v.as_str()); + + // Check if it's an mcp_call or web_search_call + let is_mcp_or_web_search = item_type == Some(event_types::ITEM_TYPE_MCP_CALL) + || item_type == Some(event_types::ITEM_TYPE_WEB_SEARCH_CALL); + + if is_mcp_or_web_search { if let (Some(item_id), Some(output_index)) = ( item.get("id").and_then(|v| v.as_str()), parsed_data.get("output_index").and_then(|v| v.as_u64()), ) { + // Choose event type based on tool_context + let in_progress_event_type = if tool_context.is_web_search() { + event_types::WEB_SEARCH_CALL_IN_PROGRESS + } else { + event_types::MCP_CALL_IN_PROGRESS + }; + let in_progress_event = json!({ - "type": event_types::MCP_CALL_IN_PROGRESS, + "type": in_progress_event_type, "sequence_number": *sequence_number, "output_index": output_index, "item_id": item_id }); *sequence_number += 1; + let in_progress_block = format!( "event: {}\ndata: {}\n\n", - event_types::MCP_CALL_IN_PROGRESS, + in_progress_event_type, in_progress_event ); if tx.send(Ok(Bytes::from(in_progress_block))).is_err() { return false; } + + // For web_search_call, also send a "searching" event + if tool_context.is_web_search() { + let searching_event = json!({ + "type": event_types::WEB_SEARCH_CALL_SEARCHING, + "sequence_number": *sequence_number, + "output_index": output_index, + "item_id": item_id + }); + *sequence_number += 1; + + let searching_block = format!( + "event: {}\ndata: {}\n\n", + event_types::WEB_SEARCH_CALL_SEARCHING, + searching_event + ); + if tx.send(Ok(Bytes::from(searching_block))).is_err() { + return false; + } + } } } } @@ -1158,7 +1228,10 @@ pub(super) async fn handle_streaming_with_tool_interception( // Spawn the streaming loop task tokio::spawn(async move { let mut state = ToolLoopState::new(original_request.input.clone()); - let loop_config = McpLoopConfig::default(); + let loop_config = McpLoopConfig { + tool_context, + ..Default::default() + }; let max_tool_calls = original_request.max_tool_calls.map(|n| n as usize); let tools_json = payload_clone.get("tools").cloned().unwrap_or(json!([])); let base_payload = payload_clone.clone(); @@ -1277,6 +1350,7 @@ pub(super) async fn handle_streaming_with_tool_interception( &original_request, previous_response_id.as_deref(), &mut sequence_number, + loop_config.tool_context, ) { // Client disconnected return; @@ -1292,7 +1366,8 @@ pub(super) async fn handle_streaming_with_tool_interception( == Some(event_types::RESPONSE_IN_PROGRESS) { seen_in_progress = true; - if !mcp_list_tools_sent { + // Skip mcp_list_tools for web_search_preview + if !mcp_list_tools_sent && !loop_config.tool_context.is_web_search() { let list_tools_index = handler.allocate_synthetic_output_index(); if !send_mcp_list_tools_events( @@ -1325,6 +1400,7 @@ pub(super) async fn handle_streaming_with_tool_interception( &original_request, previous_response_id.as_deref(), &mut sequence_number, + loop_config.tool_context, ) { // Client disconnected return; diff --git a/sgl-router/src/routers/openai/utils.rs b/sgl-router/src/routers/openai/utils.rs index 53d9f237a82..95a3b3e94cb 100644 --- a/sgl-router/src/routers/openai/utils.rs +++ b/sgl-router/src/routers/openai/utils.rs @@ -32,6 +32,11 @@ pub(crate) mod event_types { pub const MCP_LIST_TOOLS_IN_PROGRESS: &str = "response.mcp_list_tools.in_progress"; pub const MCP_LIST_TOOLS_COMPLETED: &str = "response.mcp_list_tools.completed"; + // Web Search Call events (for web_search_preview) + pub const WEB_SEARCH_CALL_IN_PROGRESS: &str = "response.web_search_call.in_progress"; + pub const WEB_SEARCH_CALL_SEARCHING: &str = "response.web_search_call.searching"; + pub const WEB_SEARCH_CALL_COMPLETED: &str = "response.web_search_call.completed"; + // Item types pub const ITEM_TYPE_FUNCTION_CALL: &str = "function_call"; pub const ITEM_TYPE_FUNCTION_TOOL_CALL: &str = "function_tool_call"; From 33819aecea1cb15bde3b2867a83b3ffc9a43b2a4 Mon Sep 17 00:00:00 2001 From: key4ng Date: Tue, 28 Oct 2025 22:56:26 +0000 Subject: [PATCH 13/15] fix fmt --- sgl-router/src/routers/openai/mcp.rs | 15 +++++-------- sgl-router/src/routers/openai/streaming.rs | 26 +++++++++++++--------- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/sgl-router/src/routers/openai/mcp.rs b/sgl-router/src/routers/openai/mcp.rs index eaf4182df26..3e287760e09 100644 --- a/sgl-router/src/routers/openai/mcp.rs +++ b/sgl-router/src/routers/openai/mcp.rs @@ -167,10 +167,7 @@ pub async fn ensure_request_mcp_client( // Validate that web_search_preview is not used as it's a reserved name if name == web_search_constants::WEB_SEARCH_PREVIEW_SERVER_NAME { - warn!( - "Rejecting request MCP with reserved server name: {}", - name - ); + warn!("Rejecting request MCP with reserved server name: {}", name); return None; } @@ -547,8 +544,7 @@ pub(super) fn send_mcp_call_completion_events_with_error( let completed_event = format!( "event: {}\ndata: {}\n\n", - completed_event_type, - completed_payload + completed_event_type, completed_payload ); if tx.send(Ok(Bytes::from(completed_event))).is_err() { return false; @@ -1022,9 +1018,10 @@ pub(super) fn build_mcp_call_item( // Check if this is a web_search_preview context - if so, build web_search_call format if tool_context.is_web_search() { // Extract query from arguments for web_search_call - let query = serde_json::from_str::(arguments) - .ok() - .and_then(|v| v.get("query").and_then(|q| q.as_str().map(|s| s.to_string()))); + let query = serde_json::from_str::(arguments).ok().and_then(|v| { + v.get("query") + .and_then(|q| q.as_str().map(|s| s.to_string())) + }); // Build web_search_call item (MVP - status only, no results) if success { diff --git a/sgl-router/src/routers/openai/streaming.rs b/sgl-router/src/routers/openai/streaming.rs index d5d87324eb3..e646e5285c6 100644 --- a/sgl-router/src/routers/openai/streaming.rs +++ b/sgl-router/src/routers/openai/streaming.rs @@ -659,7 +659,11 @@ pub(super) fn apply_event_transformations_inplace( // Transform ID from fc_* to ws_* or mcp_* if let Some(id) = item.get("id").and_then(|v| v.as_str()) { if let Some(stripped) = id.strip_prefix("fc_") { - let prefix = if tool_context.is_web_search() { "ws" } else { "mcp" }; + let prefix = if tool_context.is_web_search() { + "ws" + } else { + "mcp" + }; let new_id = format!("{}_{}", prefix, stripped); item["id"] = json!(new_id); } @@ -880,16 +884,17 @@ pub(super) fn forward_streaming_event( "event: {}\n", event_types::MCP_CALL_ARGUMENTS_DELTA )); - } else if evt == event_types::FUNCTION_CALL_ARGUMENTS_DONE && !tool_context.is_web_search() { + } else if evt == event_types::FUNCTION_CALL_ARGUMENTS_DONE && !tool_context.is_web_search() + { final_block.push_str(&format!( "event: {}\n", event_types::MCP_CALL_ARGUMENTS_DONE )); - } else if evt == event_types::FUNCTION_CALL_ARGUMENTS_DELTA && tool_context.is_web_search() { - // Skip this event entirely for web_search_preview - return true; - } else if evt == event_types::FUNCTION_CALL_ARGUMENTS_DONE && tool_context.is_web_search() { - // Skip this event entirely for web_search_preview + } else if (evt == event_types::FUNCTION_CALL_ARGUMENTS_DELTA + || evt == event_types::FUNCTION_CALL_ARGUMENTS_DONE) + && tool_context.is_web_search() + { + // Skip these events entirely for web_search_preview return true; } else { final_block.push_str(&format!("event: {}\n", evt)); @@ -933,8 +938,7 @@ pub(super) fn forward_streaming_event( let in_progress_block = format!( "event: {}\ndata: {}\n\n", - in_progress_event_type, - in_progress_event + in_progress_event_type, in_progress_event ); if tx.send(Ok(Bytes::from(in_progress_block))).is_err() { return false; @@ -1367,7 +1371,9 @@ pub(super) async fn handle_streaming_with_tool_interception( { seen_in_progress = true; // Skip mcp_list_tools for web_search_preview - if !mcp_list_tools_sent && !loop_config.tool_context.is_web_search() { + if !mcp_list_tools_sent + && !loop_config.tool_context.is_web_search() + { let list_tools_index = handler.allocate_synthetic_output_index(); if !send_mcp_list_tools_events( From 2b089e12eabd8e60404bccf65b4a529b3867ac8c Mon Sep 17 00:00:00 2001 From: key4ng Date: Tue, 28 Oct 2025 23:26:36 +0000 Subject: [PATCH 14/15] resolve comments --- sgl-router/src/routers/openai/mcp.rs | 15 ++++++++++++--- sgl-router/src/routers/openai/responses.rs | 4 ++-- sgl-router/src/routers/openai/streaming.rs | 8 ++++++-- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/sgl-router/src/routers/openai/mcp.rs b/sgl-router/src/routers/openai/mcp.rs index 3e287760e09..16efa2987cd 100644 --- a/sgl-router/src/routers/openai/mcp.rs +++ b/sgl-router/src/routers/openai/mcp.rs @@ -319,7 +319,9 @@ pub(super) fn prepare_mcp_payload_for_streaming( // Only include tools from web_search_preview server tools .into_iter() - .filter(|(_, server_name, _)| server_name == "web_search_preview") + .filter(|(_, server_name, _)| { + server_name == web_search_constants::WEB_SEARCH_PREVIEW_SERVER_NAME + }) .collect() } else { tools @@ -816,7 +818,10 @@ pub(super) fn build_incomplete_response( .ok_or_else(|| "response not an object".to_string())?; // Set status to completed (not failed - partial success) - obj.insert("status".to_string(), Value::String("completed".to_string())); + obj.insert( + "status".to_string(), + Value::String(web_search_constants::STATUS_COMPLETED.to_string()), + ); // Set incomplete_details obj.insert( @@ -1034,7 +1039,11 @@ pub(super) fn build_mcp_call_item( json!({ "id": generate_id("mcp"), "type": event_types::ITEM_TYPE_MCP_CALL, - "status": if success { "completed" } else { "failed" }, + "status": if success { + web_search_constants::STATUS_COMPLETED + } else { + web_search_constants::STATUS_FAILED + }, "approval_request_id": Value::Null, "arguments": arguments, "error": error, diff --git a/sgl-router/src/routers/openai/responses.rs b/sgl-router/src/routers/openai/responses.rs index 7fe3a262956..45dcc73365f 100644 --- a/sgl-router/src/routers/openai/responses.rs +++ b/sgl-router/src/routers/openai/responses.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use serde_json::{json, Value}; use tracing::warn; -use super::utils::event_types; +use super::utils::{event_types, web_search_constants}; use crate::{ data_connector::{ResponseId, StoredResponse}, protocols::responses::{ResponseToolType, ResponsesRequest}, @@ -325,7 +325,7 @@ pub(super) fn mask_tools_as_mcp(resp: &mut Value, original_body: &ResponsesReque let mut ws = serde_json::Map::new(); ws.insert( "type".to_string(), - Value::String("web_search_preview".to_string()), + Value::String(web_search_constants::WEB_SEARCH_PREVIEW_SERVER_NAME.to_string()), ); response_tools.push(Value::Object(ws)); } diff --git a/sgl-router/src/routers/openai/streaming.rs b/sgl-router/src/routers/openai/streaming.rs index e646e5285c6..1c3c0090833 100644 --- a/sgl-router/src/routers/openai/streaming.rs +++ b/sgl-router/src/routers/openai/streaming.rs @@ -30,7 +30,10 @@ use super::{ send_mcp_list_tools_events, McpLoopConfig, ToolLoopState, }, responses::{mask_tools_as_mcp, patch_streaming_response_json, rewrite_streaming_block}, - utils::{event_types, FunctionCallInProgress, OutputIndexMapper, StreamAction, ToolContext}, + utils::{ + event_types, web_search_constants, FunctionCallInProgress, OutputIndexMapper, + StreamAction, ToolContext, + }, }; use crate::{ data_connector::{ConversationItemStorage, ConversationStorage, ResponseStorage}, @@ -601,7 +604,8 @@ pub(super) fn apply_event_transformations_inplace( if response_obj.get("tools").is_some() { // For web_search_preview, always use simplified tool format if tool_context.is_web_search() { - let web_search_tool = json!([{"type": "web_search_preview"}]); + let web_search_tool = + json!([{"type": web_search_constants::WEB_SEARCH_PREVIEW_SERVER_NAME}]); response_obj.insert("tools".to_string(), web_search_tool); response_obj .entry("tool_choice".to_string()) From 951853fac88de1d217301d54c9e1a53d0fe5e7db Mon Sep 17 00:00:00 2001 From: key4ng Date: Tue, 28 Oct 2025 23:28:01 +0000 Subject: [PATCH 15/15] fix format --- sgl-router/src/routers/openai/streaming.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sgl-router/src/routers/openai/streaming.rs b/sgl-router/src/routers/openai/streaming.rs index 1c3c0090833..8b1bfc3e235 100644 --- a/sgl-router/src/routers/openai/streaming.rs +++ b/sgl-router/src/routers/openai/streaming.rs @@ -31,8 +31,8 @@ use super::{ }, responses::{mask_tools_as_mcp, patch_streaming_response_json, rewrite_streaming_block}, utils::{ - event_types, web_search_constants, FunctionCallInProgress, OutputIndexMapper, - StreamAction, ToolContext, + event_types, web_search_constants, FunctionCallInProgress, OutputIndexMapper, StreamAction, + ToolContext, }, }; use crate::{