From 6d8299e8eaaa63e74b7c1477680f9236e9ef1f8f Mon Sep 17 00:00:00 2001 From: ShengHuang Date: Thu, 13 Nov 2025 11:59:57 +0800 Subject: [PATCH 1/7] [Fix][Connector-doris] DorisStreamLoad loading state mismanagement causes RecordBuffer infinite loop during shutdown --- .../doris/sink/writer/DorisSinkWriter.java | 22 ++++++++++------ .../doris/sink/writer/DorisStreamLoad.java | 25 +++++++++++-------- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java index 7d1637e381c..5380ecd9ec4 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java @@ -255,14 +255,20 @@ private void checkLoadException() { @Override public void close() throws IOException { - if (!dorisSinkConfig.getEnable2PC()) { - flush(); - } - if (scheduledExecutorService != null) { - scheduledExecutorService.shutdownNow(); - } - if (dorisStreamLoad != null) { - dorisStreamLoad.close(); + try { + if (!dorisSinkConfig.getEnable2PC()) { + flush(); + } + } catch (Exception e) { + log.error("Flush data failed when close doris writer.", e); + throw e; + } finally { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdownNow(); + } + if (dorisStreamLoad != null) { + dorisStreamLoad.close(); + } } } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java index 406e7d95ff0..c626fbc530a 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java @@ -83,6 +83,7 @@ public class DorisStreamLoad implements Serializable { private final ExecutorService executorService; private volatile boolean loadBatchFirstRecord; private volatile boolean loading = false; + private volatile boolean flushing = false; private String label; @Getter private long recordCount = 0; @@ -192,18 +193,19 @@ public void writeRecord(byte[] record) throws IOException { } public String getLoadFailedMsg() { - if (!loading) { - return null; - } - if (this.getPendingLoadFuture() != null && this.getPendingLoadFuture().isDone()) { - String errorMessage; - try { - errorMessage = handlePreCommitResponse(pendingLoadFuture.get()).getMessage(); - } catch (Exception e) { - errorMessage = ExceptionUtils.getMessage(e); + if (flushing || loading) { + if (this.getPendingLoadFuture() != null && this.getPendingLoadFuture().isDone()) { + String errorMessage; + try { + errorMessage = handlePreCommitResponse(pendingLoadFuture.get()).getMessage(); + } catch (Exception e) { + errorMessage = ExceptionUtils.getMessage(e); + } + recordStream.setErrorMessageByStreamLoad(errorMessage); + return errorMessage; + } else { + return null; } - recordStream.setErrorMessageByStreamLoad(errorMessage); - return errorMessage; } else { return null; } @@ -222,6 +224,7 @@ private RespContent handlePreCommitResponse(CloseableHttpResponse response) thro public RespContent stopLoad() throws IOException { loading = false; + flushing = true; if (pendingLoadFuture != null) { log.info("stream load stopped."); recordStream.endInput(); From b1dd23430c80e5268a0698bcba961b58f1c075e5 Mon Sep 17 00:00:00 2001 From: ShengHuang Date: Fri, 14 Nov 2025 23:24:34 +0800 Subject: [PATCH 2/7] Add e2e test case --- .../e2e/connector/doris/DorisIT.java | 62 +++++++++++++++++++ ...doris_source_and_sink_with_cast_error.conf | 53 ++++++++++++++++ 2 files changed, 115 insertions(+) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_cast_error.conf diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java index d817c7743b4..558df8eaea0 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java @@ -60,6 +60,7 @@ @Slf4j public class DorisIT extends AbstractDorisIT { private static final String UNIQUE_TABLE = "doris_e2e_unique_table"; + private static final String CAST_ERROR_SINK_TABLE = "doris_e2e_cast_error_sink_table"; private static final String DUPLICATE_TABLE = "doris_duplicate_table"; private static final String sourceDB = "e2e_source"; private static final String sinkDB = "e2e_sink"; @@ -207,11 +208,25 @@ public void testNoSchemaDoris(TestContainer container) throws IOException, InterruptedException { initializeJdbcTable(); batchInsertUniqueTableData(); + // Test that the task can terminate normally instead of being blocked when Doris parsing error occurs + // Verify that when source and target table data types are incompatible (e.g., varchar to bitmap), + // the task should fail gracefully and exit rather than hang indefinitely Container.ExecResult execResult1 = container.executeJob("/doris_source_no_schema.conf"); Assertions.assertEquals(0, execResult1.getExitCode()); checkSinkData(); } + @TestTemplate + public void testDorisCastError(TestContainer container) + throws IOException, InterruptedException { + initializeJdbcTable(); + batchInsertUniqueTableData(); + Container.ExecResult execResult = + container.executeJob("/doris_source_and_sink_with_cast_error.conf"); + Assertions.assertEquals(1, execResult.getExitCode()); + Assertions.assertTrue(execResult.getStderr().contains("can not cast from origin type")); + } + private void checkAllTypeSinkData() { try { assertHasData(sourceDB, DUPLICATE_TABLE); @@ -422,6 +437,8 @@ protected void initializeJdbcTable() { // create source and sink table statement.execute(createUniqueTableForTest(sourceDB)); statement.execute(createDuplicateTableForTest(sourceDB)); + + statement.execute(createTypeCastErrorSinkTableForTest(sinkDB)); log.info("create source and sink table succeed"); } catch (SQLException e) { throw new RuntimeException("Initializing table failed!", e); @@ -477,6 +494,51 @@ private String createUniqueTableForTest(String db) { return String.format(createTableSql, db, UNIQUE_TABLE); } + private String createTypeCastErrorSinkTableForTest(String db) { + // The type of column MAP_VARCHAR_STRING in sink table bitmap, source table is varchar type. + // In this case, doris will report an error. After seatunnel receives the error msg from doris, + // it should stop the task normally instead of being blocked and unable to terminate. + String createTableSql = + "create table if not exists `%s`.`%s`(\n" + + "F_ID bigint null,\n" + + "F_INT int null,\n" + + "F_BIGINT bigint null,\n" + + "F_TINYINT tinyint null,\n" + + "F_SMALLINT smallint null,\n" + + "F_DECIMAL decimal(18,6) null,\n" + + "F_LARGEINT largeint null,\n" + + "F_BOOLEAN boolean null,\n" + + "F_DOUBLE double null,\n" + + "F_FLOAT float null,\n" + + "F_CHAR char null,\n" + + "F_VARCHAR_11 ARRAY,\n" + + "F_STRING string null,\n" + + "F_DATETIME_P datetime(6),\n" + + "F_DATETIME datetime,\n" + + "F_DATE date,\n" + + "MAP_VARCHAR_BOOLEAN map,\n" + + "MAP_CHAR_TINYINT MAP,\n" + + "MAP_STRING_SMALLINT MAP,\n" + + "MAP_INT_INT MAP,\n" + + "MAP_TINYINT_BIGINT MAP,\n" + + "MAP_SMALLINT_LARGEINT MAP,\n" + + "MAP_BIGINT_FLOAT MAP,\n" + + "MAP_LARGEINT_DOUBLE MAP,\n" + + "MAP_STRING_DECIMAL MAP,\n" + + "MAP_DECIMAL_DATE MAP,\n" + + "MAP_DATE_DATETIME MAP,\n" + + "MAP_DATETIME_CHAR MAP,\n" + + "MAP_CHAR_VARCHAR MAP,\n" + + "MAP_VARCHAR_STRING bitmap NOT NULL\n" + + ")\n" + + "UNIQUE KEY(`F_ID`)\n" + + "DISTRIBUTED BY HASH(`F_ID`) BUCKETS 1\n" + + "properties(\n" + + "\"replication_allocation\" = \"tag.location.default: 1\"" + + ");"; + return String.format(createTableSql, db, CAST_ERROR_SINK_TABLE); + } + private String createDuplicateTableForTest(String db) { String createDuplicateTableSql = "create table if not exists `%s`.`%s`(\n" diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_cast_error.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_cast_error.conf new file mode 100644 index 00000000000..939bc8407d2 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_cast_error.conf @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env{ + parallelism = 1 + job.mode = "BATCH" +} + +source{ + Doris { + fenodes = "doris_e2e:8030" + username = root + password = "" + database = "e2e_source" + table = "doris_e2e_unique_table" + doris.batch.size = 10 + } +} + +transform {} + +sink{ + Doris { + fenodes = "doris_e2e:8030" + username = root + password = "" + table.identifier = "e2e_sink.doris_e2e_cast_error_sink_table" + sink.enable-2pc = "false" + sink.label-prefix = "test_json" + sink.check-interval = 20000 + doris.batch.size = 10 + sink.buffer-count = 2 + sink.buffer-size = 5120 + doris.config = { + format="json" + read_json_by_line="true" + } + } + } \ No newline at end of file From cd5156a2b08ddf77082d098a1c96249b15db1a95 Mon Sep 17 00:00:00 2001 From: ShengHuang Date: Sat, 15 Nov 2025 10:09:47 +0800 Subject: [PATCH 3/7] add e2e test case --- .../connectors/doris/sink/writer/DorisStreamLoad.java | 1 + .../apache/seatunnel/e2e/connector/doris/DorisIT.java | 9 +++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java index c626fbc530a..b1ab298e36f 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java @@ -233,6 +233,7 @@ public RespContent stopLoad() throws IOException { } catch (Exception e) { throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e); } finally { + flushing = false; pendingLoadFuture = null; } } else { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java index 558df8eaea0..ff494a1521c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java @@ -208,9 +208,6 @@ public void testNoSchemaDoris(TestContainer container) throws IOException, InterruptedException { initializeJdbcTable(); batchInsertUniqueTableData(); - // Test that the task can terminate normally instead of being blocked when Doris parsing error occurs - // Verify that when source and target table data types are incompatible (e.g., varchar to bitmap), - // the task should fail gracefully and exit rather than hang indefinitely Container.ExecResult execResult1 = container.executeJob("/doris_source_no_schema.conf"); Assertions.assertEquals(0, execResult1.getExitCode()); checkSinkData(); @@ -221,6 +218,9 @@ public void testDorisCastError(TestContainer container) throws IOException, InterruptedException { initializeJdbcTable(); batchInsertUniqueTableData(); + // Test that the task can terminate normally instead of being blocked + // when Doris parsing error occurs(e.g., ANALYSIS_ERROR), + // the task should fail gracefully and exit rather than hang indefinitely. Container.ExecResult execResult = container.executeJob("/doris_source_and_sink_with_cast_error.conf"); Assertions.assertEquals(1, execResult.getExitCode()); @@ -496,7 +496,8 @@ private String createUniqueTableForTest(String db) { private String createTypeCastErrorSinkTableForTest(String db) { // The type of column MAP_VARCHAR_STRING in sink table bitmap, source table is varchar type. - // In this case, doris will report an error. After seatunnel receives the error msg from doris, + // In this case, doris will report an error. After seatunnel receives the error msg from + // doris, // it should stop the task normally instead of being blocked and unable to terminate. String createTableSql = "create table if not exists `%s`.`%s`(\n" From a8420c4e6aab1f8f8042bc2681237fdc0efb544b Mon Sep 17 00:00:00 2001 From: ShengHuang Date: Sat, 15 Nov 2025 22:48:09 +0800 Subject: [PATCH 4/7] Compatible with spark2 --- .../doris/sink/writer/DorisSinkWriter.java | 19 +++++-- .../e2e/connector/doris/DorisIT.java | 9 +++- ...rce_and_sink_with_cast_error_2pc_true.conf | 53 +++++++++++++++++++ 3 files changed, 75 insertions(+), 6 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_cast_error_2pc_true.conf diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java index 5380ecd9ec4..7dd384a0b07 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java @@ -225,11 +225,22 @@ private void startLoad(String label) { @Override public void abortPrepare() { - if (dorisSinkConfig.getEnable2PC()) { - try { + try { + if (dorisSinkConfig.getEnable2PC()) { dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1); - } catch (Exception e) { - throw new RuntimeException(e); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdownNow(); + } + if (dorisStreamLoad != null) { + try { + dorisStreamLoad.close(); + } catch (IOException e) { + log.warn("Error closing doris stream load during abort prepare", e); + } } } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java index ff494a1521c..73dc0400c3f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java @@ -219,12 +219,17 @@ public void testDorisCastError(TestContainer container) initializeJdbcTable(); batchInsertUniqueTableData(); // Test that the task can terminate normally instead of being blocked - // when Doris parsing error occurs(e.g., ANALYSIS_ERROR), - // the task should fail gracefully and exit rather than hang indefinitely. + // when Doris return parsing error(e.g., ANALYSIS_ERROR), + // the seatunnel task should fail gracefully and exit rather than hang indefinitely. Container.ExecResult execResult = container.executeJob("/doris_source_and_sink_with_cast_error.conf"); Assertions.assertEquals(1, execResult.getExitCode()); Assertions.assertTrue(execResult.getStderr().contains("can not cast from origin type")); + + Container.ExecResult execResult2 = + container.executeJob("/doris_source_and_sink_with_cast_error_2pc_true.conf"); + Assertions.assertEquals(1, execResult2.getExitCode()); + Assertions.assertTrue(execResult2.getStderr().contains("can not cast from origin type")); } private void checkAllTypeSinkData() { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_cast_error_2pc_true.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_cast_error_2pc_true.conf new file mode 100644 index 00000000000..9d1cc4b2862 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_cast_error_2pc_true.conf @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env{ + parallelism = 1 + job.mode = "BATCH" +} + +source{ + Doris { + fenodes = "doris_e2e:8030" + username = root + password = "" + database = "e2e_source" + table = "doris_e2e_unique_table" + doris.batch.size = 10 + } +} + +transform {} + +sink{ + Doris { + fenodes = "doris_e2e:8030" + username = root + password = "" + table.identifier = "e2e_sink.doris_e2e_cast_error_sink_table" + sink.enable-2pc = "true" + sink.label-prefix = "test_json" + sink.check-interval = 20000 + doris.batch.size = 10 + sink.buffer-count = 2 + sink.buffer-size = 5120 + doris.config = { + format="json" + read_json_by_line="true" + } + } + } \ No newline at end of file From ab4b85de74cbf29bfd611c7d95fc65d818365eca Mon Sep 17 00:00:00 2001 From: ShengHuang Date: Tue, 18 Nov 2025 21:46:34 +0800 Subject: [PATCH 5/7] opt code --- .../doris/sink/writer/DorisStreamLoad.java | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java index b1ab298e36f..b3fc4f1fba4 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java @@ -83,7 +83,6 @@ public class DorisStreamLoad implements Serializable { private final ExecutorService executorService; private volatile boolean loadBatchFirstRecord; private volatile boolean loading = false; - private volatile boolean flushing = false; private String label; @Getter private long recordCount = 0; @@ -193,19 +192,18 @@ public void writeRecord(byte[] record) throws IOException { } public String getLoadFailedMsg() { - if (flushing || loading) { - if (this.getPendingLoadFuture() != null && this.getPendingLoadFuture().isDone()) { - String errorMessage; - try { - errorMessage = handlePreCommitResponse(pendingLoadFuture.get()).getMessage(); - } catch (Exception e) { - errorMessage = ExceptionUtils.getMessage(e); - } - recordStream.setErrorMessageByStreamLoad(errorMessage); - return errorMessage; - } else { - return null; + if (!loading) { + return null; + } + if (this.getPendingLoadFuture() != null && this.getPendingLoadFuture().isDone()) { + String errorMessage; + try { + errorMessage = handlePreCommitResponse(pendingLoadFuture.get()).getMessage(); + } catch (Exception e) { + errorMessage = ExceptionUtils.getMessage(e); } + recordStream.setErrorMessageByStreamLoad(errorMessage); + return errorMessage; } else { return null; } @@ -223,8 +221,6 @@ private RespContent handlePreCommitResponse(CloseableHttpResponse response) thro } public RespContent stopLoad() throws IOException { - loading = false; - flushing = true; if (pendingLoadFuture != null) { log.info("stream load stopped."); recordStream.endInput(); @@ -233,7 +229,7 @@ public RespContent stopLoad() throws IOException { } catch (Exception e) { throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e); } finally { - flushing = false; + loading = false; pendingLoadFuture = null; } } else { From 19d092c3d61e8ca60debcfe5bb63f2f5eb75bded Mon Sep 17 00:00:00 2001 From: ShengHuang Date: Thu, 20 Nov 2025 20:48:07 +0800 Subject: [PATCH 6/7] close doris sink writer after spark data writer aborted --- .../doris/sink/writer/DorisSinkWriter.java | 19 ++++------------- .../spark/sink/writer/SparkDataWriter.java | 21 ++++++++++++------- .../sink/write/SeaTunnelSparkDataWriter.java | 4 +++- 3 files changed, 21 insertions(+), 23 deletions(-) diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java index 7dd384a0b07..5380ecd9ec4 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java @@ -225,22 +225,11 @@ private void startLoad(String label) { @Override public void abortPrepare() { - try { - if (dorisSinkConfig.getEnable2PC()) { + if (dorisSinkConfig.getEnable2PC()) { + try { dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1); - } - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - if (scheduledExecutorService != null) { - scheduledExecutorService.shutdownNow(); - } - if (dorisStreamLoad != null) { - try { - dorisStreamLoad.close(); - } catch (IOException e) { - log.warn("Error closing doris stream load during abort prepare", e); - } + } catch (Exception e) { + throw new RuntimeException(e); } } } diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java index c34a0783eb1..455f4780b5c 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java @@ -114,15 +114,22 @@ public WriterCommitMessage commit() throws IOException { @Override public void abort() throws IOException { - sinkWriter.abortPrepare(); - if (sinkCommitter != null) { - if (latestCommitInfoT == null) { - sinkCommitter.abort(Collections.emptyList()); - } else { - sinkCommitter.abort(Collections.singletonList(latestCommitInfoT)); + try { + sinkWriter.abortPrepare(); + if (sinkCommitter != null) { + if (latestCommitInfoT == null) { + sinkCommitter.abort(Collections.emptyList()); + } else { + sinkCommitter.abort(Collections.singletonList(latestCommitInfoT)); + } } + cleanCommitInfo(); + } catch (Throwable e) { + log.error("---> Abort spark writer throw error", e); + throw e; + } finally { + sinkWriter.close(); } - cleanCommitInfo(); } private void cleanCommitInfo() { diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java index 1a97f6e618b..6ea04adbaa5 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java @@ -122,5 +122,7 @@ private void cleanCommitInfo() { } @Override - public void close() throws IOException {} + public void close() throws IOException { + sinkWriter.close(); + } } From 4dc6c6216e7b5b58a9e63b7d33b6d99db6981bc5 Mon Sep 17 00:00:00 2001 From: ShengHuang Date: Fri, 21 Nov 2025 11:38:44 +0800 Subject: [PATCH 7/7] opt sink close --- .../translation/spark/sink/write/SeaTunnelSparkDataWriter.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java index 6ea04adbaa5..c07e1f08278 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java @@ -92,8 +92,6 @@ public WriterCommitMessage commit() throws IOException { SeaTunnelSparkWriterCommitMessage seaTunnelSparkWriterCommitMessage = new SeaTunnelSparkWriterCommitMessage<>(latestCommitInfoT); cleanCommitInfo(); - sinkWriter.close(); - context.getEventListener().onEvent(new WriterCloseEvent()); try { if (resourceManager != null) { resourceManager.close(); @@ -124,5 +122,6 @@ private void cleanCommitInfo() { @Override public void close() throws IOException { sinkWriter.close(); + context.getEventListener().onEvent(new WriterCloseEvent()); } }