Skip to content

Conversation

@christian-bromann
Copy link
Member

This PR adds a new streamTimeout option to RunnableConfig that provides streaming inactivity timeout functionality. Unlike the existing timeout option which aborts after total elapsed time, streamTimeout only aborts if no chunks are received within the specified time period. The timer resets each time a new chunk arrives.

Closes #9088

Problem

When using the streaming API with the timeout option, the stream is aborted after X seconds total time, regardless of whether chunks are still actively arriving:

// Current behavior: aborts after 3 seconds TOTAL, even if chunks are still coming
await llm.stream("Hello", { timeout: 3000 });

This is problematic for:

  • Long-running responses from slow models
  • Load balancing between providers (need to detect stalled streams, not slow ones)
  • Interactive applications that need reasonable timeout limits

Solution

Add a new streamTimeout option that only aborts if the stream becomes inactive:

// New behavior: aborts only if no chunk arrives for 3 seconds
await llm.stream("Hello", { streamTimeout: 3000 });

// Can be combined with regular timeout
await llm.stream("Hello", {
  timeout: 60000,       // 60s maximum total time
  streamTimeout: 5000   // 5s maximum inactivity
});

How It Works

  1. streamTimeout is added to RunnableConfig alongside timeout
  2. In ensureConfig(), the value is stored in metadata as streamTimeoutMs
  3. AsyncGeneratorWithSetup reads this value and creates an AbortController
  4. On each chunk received, the timeout timer is reset
  5. If no chunk arrives within the timeout period, the stream is aborted

Testing

Here are some reproduction scripts to use:

Stream Hangs This example shows that if a stream hangs, we will have to wait for `timeout` to trigger.
import * as http from "http";
import { ChatOpenAI } from "@langchain/openai";

// Create a mock server that sends 3 chunks then STALLS (never completes)
const server = http.createServer((req, res) => {
  if (req.method === "POST" && req.url?.includes("/chat/completions")) {
    res.writeHead(200, {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    });

    const words = ["Hello", "from", "server..."];
    let i = 0;

    const send = () => {
      if (i < words.length) {
        const chunk = {
          id: "test",
          object: "chat.completion.chunk",
          created: Date.now(),
          model: "gpt-4",
          choices: [
            {
              index: 0,
              delta: { content: `${words[i]} ` },
              finish_reason: null,
            },
          ],
        };
        res.write(`data: ${JSON.stringify(chunk)}\n\n`);
        i++;
        setTimeout(send, 100);
      } else {
        // STALL - stop sending, keep connection open
        console.log("   [Server stalled - no more chunks]");
      }
    };
    send();
  }
});

server.listen(0, async () => {
  const port = (server.address() as { port: number }).port;
  console.log("=".repeat(50));
  console.log("ISSUE #9088: streamTimeout detects stalled streams");
  console.log("=".repeat(50));
  console.log();
  console.log("Server sends 3 chunks then STALLS (simulates hung connection)");
  console.log("streamTimeout is set to 500ms");
  console.log();
  console.log("ON MAIN: Will HANG forever (streamTimeout ignored)");
  console.log("WITH FIX: Will abort after 500ms of inactivity");
  console.log();

  const llm = new ChatOpenAI({
    model: "gpt-4",
    apiKey: "sk-mock",
    configuration: { baseURL: `http://localhost:${port}/v1` },
    maxRetries: 0,
  });

  try {
    const start = Date.now();
    const stream = await llm.stream("Hello", {
      // @ts-expect-error streamTimeout is a new option
      streamTimeout: 500,
      timeout: 10000,
    });

    for await (const chunk of stream) {
      const elapsed = ((Date.now() - start) / 1000).toFixed(1);
      process.stdout.write(`[${elapsed}s] ${chunk.content}`);
    }

    clearTimeout(safetyTimer);
    console.log("\n\n✅ Stream completed (unexpected - server stalled!)");
  } catch (e) {
    clearTimeout(safetyTimer);
    console.log(
      `\n\n✅ Correctly aborted: ${e instanceof Error ? e.message : e}`
    );
    console.log("\n^ streamTimeout detected the stall!");
  }

  server.close();
  process.exit(0);
});
Stream Aborts If the timeout is low: the stream just aborts even though there are incoming tokens.
import * as http from "http";
import { ChatOpenAI } from "@langchain/openai";

// Create a mock server that sends chunks every 200ms
const server = http.createServer((req, res) => {
  if (req.method === "POST" && req.url?.includes("/chat/completions")) {
    res.writeHead(200, {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    });

    const words =
      "This is a slow response that takes about 4 seconds total".split(" ");
    let i = 0;

    const send = () => {
      if (i < words.length) {
        const chunk = {
          id: "test",
          object: "chat.completion.chunk",
          created: Date.now(),
          model: "gpt-4",
          choices: [
            {
              index: 0,
              delta: { content: `${words[i]} ` },
              finish_reason: null,
            },
          ],
        };
        res.write(`data: ${JSON.stringify(chunk)}\n\n`);
        i++;
        setTimeout(send, 200); // 200ms between chunks, ~2.4s total for 12 words
      } else {
        res.write(
          `data: ${JSON.stringify({
            choices: [{ delta: {}, finish_reason: "stop" }],
          })}\n\n`
        );
        res.write("data: [DONE]\n\n");
        res.end();
      }
    };
    send();
  }
});

server.listen(0, async () => {
  const port = (server.address() as { port: number }).port;
  console.log("=".repeat(50));
  console.log("ISSUE #9088: timeout aborts active streams");
  console.log("=".repeat(50));
  console.log();
  console.log("Server sends chunks every 200ms (~2.4s total)");
  console.log("timeout is set to 2000ms (2 seconds)");
  console.log();
  console.log("EXPECTED: Stream aborts after 2s even though chunks are coming");
  console.log();

  const llm = new ChatOpenAI({
    model: "gpt-4",
    apiKey: "sk-mock",
    configuration: { baseURL: `http://localhost:${port}/v1` },
    maxRetries: 0,
  });

  try {
    const start = Date.now();
    const stream = await llm.stream("Hello", { timeout: 2000 });

    for await (const chunk of stream) {
      const elapsed = ((Date.now() - start) / 1000).toFixed(1);
      process.stdout.write(`[${elapsed}s] ${chunk.content}`);
    }

    console.log("\n\n✅ Stream completed (unexpected!)");
  } catch (e) {
    console.log(`\n\n❌ Aborted: ${e instanceof Error ? e.message : e}`);
    console.log("\n^ This is the PROBLEM - chunks were still arriving!");
  }

  server.close();
  process.exit(0);
});

Adds a new `streamTimeout` option to `RunnableConfig` that aborts a stream
only if no chunks are received within the specified time period. Unlike
the existing `timeout` option which aborts after total elapsed time,
`streamTimeout` resets the timer each time a new chunk arrives.

This addresses issue #9088 where users needed a way to detect stalled
streams without aborting active ones that are just slow.

Usage:
// Abort only if no chunk arrives for 5 seconds
await llm.stream("Hello", { streamTimeout: 5000 });

Closes #9088
@changeset-bot
Copy link

changeset-bot bot commented Dec 12, 2025

⚠️ No Changeset found

Latest commit: 9c05c39

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@hntrl hntrl marked this pull request as draft December 13, 2025 01:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Streaming inactivity timeout incorrectly aborts after total timeout ( @langchain/openai": "^1.0.0-alpha.1" )

2 participants