Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions sgl-router/examples/mcp_config_web_search.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Example MCP Configuration with Web Search Preview
#
# This configuration demonstrates how to set up the web_search_preview tool
# for the OpenAI router.
#
# Usage:
# export SGLANG_MCP_CONFIG=/path/to/this/mcp_config_web_search.yaml
# export WEB_SEARCH_MCP_URL="https://your-search-server.com/sse"
# export WEB_SEARCH_MCP_TOKEN="your-token"

# Global proxy configuration (optional)
# proxy:
# http: "http://proxy:8080"
# https: "http://proxy:8080"
# no_proxy: "localhost,127.0.0.1,*.internal"

# Connection pool settings
pool:
max_connections: 100
idle_timeout: 300

# Tool inventory settings
inventory:
enable_refresh: true
tool_ttl: 300 # 5 minutes
refresh_interval: 60 # 1 minute
refresh_on_error: true

# MCP Servers
servers:
# Web Search Preview Server
# This server provides web search capabilities to the LLM
- name: "web_search_preview"
protocol: sse
url: http://localhost:8001/sse
# token: "${WEB_SEARCH_MCP_TOKEN}"
required: true # Don't fail startup if unavailable

# Example: Additional MCP server
# - name: "another-mcp-server"
# protocol: stdio
# command: "node"
# args: ["./mcp-server.js"]
# required: false

# Pre-warm connections at startup (optional)
# warmup:
# - url: "${WEB_SEARCH_MCP_URL}"
# label: "web_search_preview"
# token: "${WEB_SEARCH_MCP_TOKEN}"
20 changes: 20 additions & 0 deletions sgl-router/src/mcp/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,24 @@ impl McpManager {

// Convert args with type coercion based on schema
let tool_schema = Some(serde_json::Value::Object((*tool_info.input_schema).clone()));

debug!(
"Tool '{}' schema: {}",
tool_name,
serde_json::to_string_pretty(&tool_schema).unwrap_or_default()
);

let args_map = args
.into()
.into_map(tool_schema.as_ref())
.map_err(McpError::InvalidArguments)?;

debug!(
"Tool '{}' arguments after type coercion: {}",
tool_name,
serde_json::to_string_pretty(&args_map).unwrap_or_default()
);

// Get client for that server
let client = self
.get_client(&server_name)
Expand All @@ -254,6 +267,13 @@ impl McpManager {
arguments: args_map,
};

debug!(
"Sending MCP request to server '{}': tool={}, args={}",
server_name,
tool_name,
serde_json::to_string(&request.arguments).unwrap_or_default()
);

client
.call_tool(request)
.await
Expand Down
125 changes: 92 additions & 33 deletions sgl-router/src/routers/openai/mcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ pub(super) async fn execute_streaming_tool_calls(
state: &mut ToolLoopState,
server_label: &str,
sequence_number: &mut u64,
is_web_search: bool,
) -> bool {
// Execute all pending tool calls (sequential, as PR3 is skipped)
for call in pending_calls {
Expand Down Expand Up @@ -258,6 +259,7 @@ pub(super) async fn execute_streaming_tool_calls(
success,
error_msg.as_deref(),
sequence_number,
is_web_search,
) {
// Client disconnected, no point continuing tool execution
return false;
Expand All @@ -277,6 +279,7 @@ pub(super) async fn execute_streaming_tool_calls(
pub(super) fn prepare_mcp_payload_for_streaming(
payload: &mut Value,
active_mcp: &Arc<mcp::McpManager>,
is_web_search: bool,
) {
if let Some(obj) = payload.as_object_mut() {
// Remove any non-function tools from outgoing payload
Expand All @@ -291,10 +294,24 @@ pub(super) fn prepare_mcp_payload_for_streaming(
}
}

// Build function tools for all discovered MCP tools
// Build function tools for discovered MCP tools
let mut tools_json = Vec::new();
let tools = active_mcp.list_tools();
for t in tools {

// Get tools with server names from inventory
// Returns Vec<(tool_name, server_name, Tool)>
let tools = active_mcp.inventory().list_tools();

// Filter tools based on context
let filtered_tools: Vec<_> = if is_web_search {
// Only include tools from web_search_preview server
tools.into_iter()
.filter(|(_, server_name, _)| server_name == "web_search_preview")
.collect()
} else {
tools
};

for (_, _, t) in filtered_tools {
let parameters = Value::Object((*t.input_schema).clone());
let tool = serde_json::json!({
"type": event_types::ITEM_TYPE_FUNCTION,
Expand Down Expand Up @@ -474,6 +491,7 @@ pub(super) fn send_mcp_call_completion_events_with_error(
success: bool,
error_msg: Option<&str>,
sequence_number: &mut u64,
is_web_search: bool,
) -> bool {
let effective_output_index = call.effective_output_index();

Expand All @@ -485,6 +503,7 @@ pub(super) fn send_mcp_call_completion_events_with_error(
server_label,
success,
error_msg,
is_web_search,
);

// Get the mcp_call item_id
Expand Down Expand Up @@ -538,28 +557,40 @@ pub(super) fn inject_mcp_metadata_streaming(
state: &ToolLoopState,
mcp: &Arc<mcp::McpManager>,
server_label: &str,
is_web_search: bool,
) {
if let Some(output_array) = response.get_mut("output").and_then(|v| v.as_array_mut()) {
output_array.retain(|item| {
item.get("type").and_then(|t| t.as_str()) != Some(event_types::ITEM_TYPE_MCP_LIST_TOOLS)
});

let list_tools_item = build_mcp_list_tools_item(mcp, server_label);
output_array.insert(0, list_tools_item);
let mut insert_pos = 0;

// Only add mcp_list_tools for non-web-search cases
if !is_web_search {
let list_tools_item = build_mcp_list_tools_item(mcp, server_label);
output_array.insert(0, list_tools_item);
insert_pos = 1;
}

let mcp_call_items =
build_executed_mcp_call_items(&state.conversation_history, server_label);
let mut insert_pos = 1;
build_executed_mcp_call_items(&state.conversation_history, server_label, is_web_search);
for item in mcp_call_items {
output_array.insert(insert_pos, item);
insert_pos += 1;
}
} else if let Some(obj) = response.as_object_mut() {
let mut output_items = Vec::new();
output_items.push(build_mcp_list_tools_item(mcp, server_label));

// Only add mcp_list_tools for non-web-search cases
if !is_web_search {
output_items.push(build_mcp_list_tools_item(mcp, server_label));
}

output_items.extend(build_executed_mcp_call_items(
&state.conversation_history,
server_label,
is_web_search,
));
obj.insert("output".to_string(), Value::Array(output_items));
}
Expand All @@ -578,6 +609,7 @@ pub(super) async fn execute_tool_loop(
original_body: &ResponsesRequest,
active_mcp: &Arc<mcp::McpManager>,
config: &McpLoopConfig,
is_web_search: bool,
) -> Result<Value, String> {
let mut state = ToolLoopState::new(original_body.input.clone());

Expand Down Expand Up @@ -658,6 +690,7 @@ pub(super) async fn execute_tool_loop(
"max_tool_calls",
active_mcp,
original_body,
is_web_search,
);
}

Expand Down Expand Up @@ -716,22 +749,25 @@ pub(super) async fn execute_tool_loop(
})
.unwrap_or("mcp");

// Build mcp_list_tools item
let list_tools_item = build_mcp_list_tools_item(active_mcp, server_label);

// Insert at beginning of output array
if let Some(output_array) = response_json
.get_mut("output")
.and_then(|v| v.as_array_mut())
{
output_array.insert(0, list_tools_item);
let mut insert_pos = 0;

// Build mcp_call items using helper function
// Only add mcp_list_tools for non-web-search cases
if !is_web_search {
let list_tools_item = build_mcp_list_tools_item(active_mcp, server_label);
output_array.insert(0, list_tools_item);
insert_pos = 1;
}

// Build mcp_call items (will be web_search_call for web search tools)
let mcp_call_items =
build_executed_mcp_call_items(&state.conversation_history, server_label);
build_executed_mcp_call_items(&state.conversation_history, server_label, is_web_search);

// Insert mcp_call items after mcp_list_tools using mutable position
let mut insert_pos = 1;
// Insert call items after mcp_list_tools (if present)
for item in mcp_call_items {
output_array.insert(insert_pos, item);
insert_pos += 1;
Expand All @@ -751,6 +787,7 @@ pub(super) fn build_incomplete_response(
reason: &str,
active_mcp: &Arc<mcp::McpManager>,
original_body: &ResponsesRequest,
is_web_search: bool,
) -> Result<Value, String> {
let obj = response
.as_object_mut()
Expand Down Expand Up @@ -799,27 +836,33 @@ pub(super) fn build_incomplete_response(
server_label,
false, // Not successful
Some("Not executed - response stopped due to limit"),
is_web_search,
);
mcp_call_items.push(mcp_call_item);
}
}

// Add mcp_list_tools and executed mcp_call items at the beginning
if state.total_calls > 0 || !mcp_call_items.is_empty() {
let list_tools_item = build_mcp_list_tools_item(active_mcp, server_label);
output_array.insert(0, list_tools_item);
let mut insert_pos = 0;

// Add mcp_call items for executed calls using helper
// Only add mcp_list_tools for non-web-search cases
if !is_web_search {
let list_tools_item = build_mcp_list_tools_item(active_mcp, server_label);
output_array.insert(0, list_tools_item);
insert_pos = 1;
}

// Add mcp_call items for executed calls (will be web_search_call for web search)
let executed_items =
build_executed_mcp_call_items(&state.conversation_history, server_label);
build_executed_mcp_call_items(&state.conversation_history, server_label, is_web_search);

let mut insert_pos = 1;
for item in executed_items {
output_array.insert(insert_pos, item);
insert_pos += 1;
}

// Add incomplete mcp_call items
// Add incomplete mcp_call items (will be web_search_call for web search)
for item in mcp_call_items {
output_array.insert(insert_pos, item);
insert_pos += 1;
Expand Down Expand Up @@ -884,24 +927,39 @@ pub(super) fn build_mcp_call_item(
server_label: &str,
success: bool,
error: Option<&str>,
is_web_search: bool,
) -> Value {
json!({
"id": generate_id("mcp"),
"type": event_types::ITEM_TYPE_MCP_CALL,
"status": if success { "completed" } else { "failed" },
"approval_request_id": Value::Null,
"arguments": arguments,
"error": error,
"name": tool_name,
"output": output,
"server_label": server_label
})
// Check if this is a web_search_preview context - if so, build web_search_call format
if is_web_search {
// Build web_search_call item (MVP - status only, no results)
if success {
crate::routers::openai::web_search::build_web_search_call_item()
} else {
crate::routers::openai::web_search::build_web_search_call_item_failed(
error.unwrap_or("Tool execution failed"),
)
}
} else {
// Regular mcp_call item
json!({
"id": generate_id("mcp"),
"type": event_types::ITEM_TYPE_MCP_CALL,
"status": if success { "completed" } else { "failed" },
"approval_request_id": Value::Null,
"arguments": arguments,
"error": error,
"name": tool_name,
"output": output,
"server_label": server_label
})
}
}

/// Helper function to build mcp_call items from executed tool calls in conversation history
pub(super) fn build_executed_mcp_call_items(
conversation_history: &[Value],
server_label: &str,
is_web_search: bool,
) -> Vec<Value> {
let mut mcp_call_items = Vec::new();

Expand Down Expand Up @@ -940,6 +998,7 @@ pub(super) fn build_executed_mcp_call_items(
} else {
None
},
is_web_search,
);
mcp_call_items.push(mcp_call_item);
}
Expand Down
1 change: 1 addition & 0 deletions sgl-router/src/routers/openai/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod responses;
mod router;
mod streaming;
mod utils;
pub(crate) mod web_search;

// Re-export the main router type for external use
pub use router::OpenAIRouter;
Loading
Loading