Skip to content

Conversation

@Larborator
Copy link
Contributor

@Larborator Larborator commented Nov 19, 2025

Purpose of this pull request

Problem

The stopBufferData() method in RecordBuffer can block indefinitely in a while loop waiting for a buffer from writeQueue.poll(100ms) to send an EOF marker. The checkErrorMessageByStreamLoad() check inside the loop never receives an error signal to interrupt the wait, even when Doris FE has already returned an error response.

flink jstack
image

Root Cause

  1. Buffer pool exhaustion: All buffers are moved from writeQueue to readQueue by the write thread, but the read thread (HTTP upload) doesn't consume them (e.g., due to Doris node restart or network issues). Without consumption, no buffers are recycled back to writeQueue.

  2. Error message not propagated: In DorisStreamLoad.stopLoad(), loading = false is set immediately, which prevents getLoadFailedMsg() from checking pendingLoadFuture and setting the error message. The endInput() call blocks before pendingLoadFuture.get() can be reached to parse the error response.

  3. Circular dependency: Need EOF to finish → need buffer for EOF → buffer unavailable → need error to interrupt → error not propagated → infinite wait.

Solution

checkDone() before calling stopLoad()

Does this PR introduce any user-facing change?

no

How was this patch tested?

Check list

… interrupt blocking poll() in RecordBuffer
@zhangshenghang
Copy link
Member

thanks @Larborator , I think this PR might solve your problem. Delay the assignment operation of loading=false until after recordStream. endInput(). @Mrhs121 Please confirm it.

@Larborator
Copy link
Contributor Author

thanks @Larborator , I think this PR might solve your problem. Delay the assignment operation of loading=false until after recordStream. endInput(). @Mrhs121 Please confirm it.

great work, I'll close this PR.

@Larborator Larborator closed this Nov 20, 2025
@Mrhs121
Copy link

Mrhs121 commented Nov 20, 2025

thanks @Larborator , I think this PR might solve your problem. Delay the assignment operation of loading=false until after recordStream. endInput(). @Mrhs121 Please confirm it.

Confirm that it is the same issue.
FYI, additionally, you need to pay attention to the close method of DorisSinkWriter. If an exception is thrown during the flush process, it may prevent the scheduledExecutorService from shutting down properly, which could cause the SeaTunnel job to fail to terminate.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants