Skip to content

Commit d6f0d30

Browse files
Enable real-time agent tool call streaming for all providers (#4279)
* WIP agentic tool call streaming - OpenAI - Anthropic - Azure OpenAI * WIP rest of providers EXCLUDES Bedrock and GenericOpenAI * patch untooled complete/streaming to use chatCallback provider from provider class and not assume OpenAI client struct example: Ollama * modify ollama to function with its own overrides normalize completion/stream outputs across providers/untooled * dev build * fix message sanization for anthropic agent streaming * wip fix anthropic agentic streaming sanitization * patch gemini, webgenui, generic aibitat providers + disable providers unable to test * refactor anthropic aibitat provider for empty message and tool call formatting * Add frontend missing prop check update Azure for streaming support update Gemini to streamting support on gemini-* models generic OpenAI disable streaming verify localAI support verify NVIDIA Nim support * DPAIS, remove temp from call, support streaming' * remove 0 temp to remove possibility of bad temp error/500s/400s * Patch condition where model is non-streamable and no tools are present or called resulting in the provider `handleFunctionCallChat` being called - which returns a string. This would then fail in Untooled.complete since response would be a string and not the expected `response.choices?.[0]?.message` Modified this line to handle both conditions for stream/non-streaming and tool presence or lack thereof * Allow generic Openai to be streamable since using untooled it should work fine honor disabled streaming for provider where that concern may apply for regular chats * rename function and more gemini-specific function to gemini provider * add comments for readability .complete on azure should be non-streaming as this is the sync response * migrate CometAPI, but disable as we cannot test --------- Co-authored-by: shatfield4 <[email protected]>
1 parent d24f9c3 commit d6f0d30

File tree

36 files changed

+1727
-1275
lines changed

36 files changed

+1727
-1275
lines changed

.github/workflows/dev-build.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ concurrency:
66

77
on:
88
push:
9-
branches: ['3999-chromium-flags'] # put your current branch to create a build. Core team only.
9+
branches: ['agentic-streaming'] # put your current branch to create a build. Core team only.
1010
paths-ignore:
1111
- '**.md'
1212
- 'cloud-deployments/*'

frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/HistoricalMessage/index.jsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ const RenderChatContent = memo(
226226
);
227227
let thoughtChain = null;
228228
let msgToRender = message;
229+
if (!message) return null;
229230

230231
// If the message is a perfect thought chain, we can render it directly
231232
// Complete == open and close tags match perfectly.

frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/StatusResponse/index.jsx

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,7 @@ import { CaretDown } from "@phosphor-icons/react";
44
import AgentAnimation from "@/media/animations/agent-animation.webm";
55
import AgentStatic from "@/media/animations/agent-static.png";
66

7-
export default function StatusResponse({
8-
messages = [],
9-
isThinking = false,
10-
showCheckmark = false,
11-
}) {
7+
export default function StatusResponse({ messages = [], isThinking = false }) {
128
const [isExpanded, setIsExpanded] = useState(false);
139
const currentThought = messages[messages.length - 1];
1410
const previousThoughts = messages.slice(0, -1);

frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/index.jsx

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,6 @@ export default function ChatHistory({
174174
key={`status-group-${index}`}
175175
messages={item}
176176
isThinking={!hasSubsequentMessages && lastMessageInfo.isAnimating}
177-
showCheckmark={
178-
hasSubsequentMessages ||
179-
(!lastMessageInfo.isAnimating && !lastMessageInfo.isStatusResponse)
180-
}
181177
/>
182178
);
183179
},

frontend/src/components/WorkspaceChat/ChatContainer/index.jsx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ export default function ChatContainer({ workspace, knownHistory = [] }) {
234234
const socket = new WebSocket(
235235
`${websocketURI()}/api/agent-invocation/${socketId}`
236236
);
237+
socket.supportsAgentStreaming = false;
237238

238239
window.addEventListener(ABORT_STREAM_EVENT, () => {
239240
window.dispatchEvent(new CustomEvent(AGENT_SESSION_END));
@@ -243,7 +244,7 @@ export default function ChatContainer({ workspace, knownHistory = [] }) {
243244
socket.addEventListener("message", (event) => {
244245
setLoadingResponse(true);
245246
try {
246-
handleSocketResponse(event, setChatHistory);
247+
handleSocketResponse(socket, event, setChatHistory);
247248
} catch (e) {
248249
console.error("Failed to parse data");
249250
window.dispatchEvent(new CustomEvent(AGENT_SESSION_END));

frontend/src/utils/chat/agent.js

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ const handledEvents = [
1212
"awaitingFeedback",
1313
"wssFailure",
1414
"rechartVisualize",
15+
// Streaming events
16+
"reportStreamEvent",
1517
];
1618

1719
export function websocketURI() {
@@ -20,13 +22,13 @@ export function websocketURI() {
2022
return `${wsProtocol}//${new URL(import.meta.env.VITE_API_BASE).host}`;
2123
}
2224

23-
export default function handleSocketResponse(event, setChatHistory) {
25+
export default function handleSocketResponse(socket, event, setChatHistory) {
2426
const data = safeJsonParse(event.data, null);
2527
if (data === null) return;
2628

2729
// No message type is defined then this is a generic message
2830
// that we need to print to the user as a system response
29-
if (!data.hasOwnProperty("type")) {
31+
if (!data.hasOwnProperty("type") && !socket.supportsAgentStreaming) {
3032
return setChatHistory((prev) => {
3133
return [
3234
...prev.filter((msg) => !!msg.content),
@@ -46,6 +48,90 @@ export default function handleSocketResponse(event, setChatHistory) {
4648

4749
if (!handledEvents.includes(data.type) || !data.content) return;
4850

51+
if (data.type === "reportStreamEvent") {
52+
// Enable agent streaming for the next message so we can handle streaming or non-streaming responses
53+
// If we get this message we know the provider supports agentic streaming
54+
socket.supportsAgentStreaming = true;
55+
56+
return setChatHistory((prev) => {
57+
if (data.content.type === "removeStatusResponse")
58+
return [...prev.filter((msg) => msg.uuid !== data.content.uuid)];
59+
60+
const knownMessage = data.content.uuid
61+
? prev.find((msg) => msg.uuid === data.content.uuid)
62+
: null;
63+
if (!knownMessage) {
64+
if (data.content.type === "fullTextResponse") {
65+
return [
66+
...prev.filter((msg) => !!msg.content),
67+
{
68+
uuid: data.content.uuid,
69+
type: "textResponse",
70+
content: data.content.content,
71+
role: "assistant",
72+
sources: [],
73+
closed: true,
74+
error: null,
75+
animate: false,
76+
pending: false,
77+
},
78+
];
79+
}
80+
81+
return [
82+
...prev.filter((msg) => !!msg.content),
83+
{
84+
uuid: data.content.uuid,
85+
type: "statusResponse",
86+
content: data.content.content,
87+
role: "assistant",
88+
sources: [],
89+
closed: true,
90+
error: null,
91+
animate: false,
92+
pending: false,
93+
},
94+
];
95+
} else {
96+
const { type, content, uuid } = data.content;
97+
// For tool call invocations, we need to update the existing message entirely since it is accumulated
98+
// and we dont know if the function will have arguments or not while streaming - so replace the existing message entirely
99+
if (type === "toolCallInvocation") {
100+
const knownMessage = prev.find((msg) => msg.uuid === uuid);
101+
if (!knownMessage)
102+
return [...prev, { uuid, type: "toolCallInvocation", content }]; // If the message is not known, add it to the end of the list
103+
return [
104+
...prev.filter((msg) => msg.uuid !== uuid),
105+
{ ...knownMessage, content },
106+
]; // If the message is known, replace it with the new content
107+
}
108+
109+
if (type === "textResponseChunk") {
110+
return prev
111+
.map((msg) =>
112+
msg.uuid === uuid
113+
? {
114+
...msg,
115+
type: "textResponse",
116+
content: msg.content + content,
117+
}
118+
: msg?.content
119+
? msg
120+
: null
121+
)
122+
.filter((msg) => !!msg);
123+
}
124+
125+
// Generic text response - will be put in the agent thought bubble
126+
return prev.map((msg) =>
127+
msg.uuid === data.content.uuid
128+
? { ...msg, content: msg.content + data.content.content }
129+
: msg
130+
);
131+
}
132+
});
133+
}
134+
49135
if (data.type === "fileDownload") {
50136
saveAs(data.content.b64Content, data.content.filename ?? "unknown.txt");
51137
return;

0 commit comments

Comments
 (0)