diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java index 7d1637e381c..5380ecd9ec4 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java @@ -255,14 +255,20 @@ private void checkLoadException() { @Override public void close() throws IOException { - if (!dorisSinkConfig.getEnable2PC()) { - flush(); - } - if (scheduledExecutorService != null) { - scheduledExecutorService.shutdownNow(); - } - if (dorisStreamLoad != null) { - dorisStreamLoad.close(); + try { + if (!dorisSinkConfig.getEnable2PC()) { + flush(); + } + } catch (Exception e) { + log.error("Flush data failed when close doris writer.", e); + throw e; + } finally { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdownNow(); + } + if (dorisStreamLoad != null) { + dorisStreamLoad.close(); + } } } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java index 406e7d95ff0..b3fc4f1fba4 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java @@ -221,7 +221,6 @@ private RespContent handlePreCommitResponse(CloseableHttpResponse response) thro } public RespContent stopLoad() throws IOException { - loading = false; if (pendingLoadFuture != null) { log.info("stream load stopped."); recordStream.endInput(); @@ -230,6 +229,7 @@ public RespContent stopLoad() throws IOException { } catch (Exception e) { throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e); } finally { + loading = false; pendingLoadFuture = null; } } else { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java index d817c7743b4..73dc0400c3f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java @@ -60,6 +60,7 @@ @Slf4j public class DorisIT extends AbstractDorisIT { private static final String UNIQUE_TABLE = "doris_e2e_unique_table"; + private static final String CAST_ERROR_SINK_TABLE = "doris_e2e_cast_error_sink_table"; private static final String DUPLICATE_TABLE = "doris_duplicate_table"; private static final String sourceDB = "e2e_source"; private static final String sinkDB = "e2e_sink"; @@ -212,6 +213,25 @@ public void testNoSchemaDoris(TestContainer container) checkSinkData(); } + @TestTemplate + public void testDorisCastError(TestContainer container) + throws IOException, InterruptedException { + initializeJdbcTable(); + batchInsertUniqueTableData(); + // Test that the task can terminate normally instead of being blocked + // when Doris return parsing error(e.g., ANALYSIS_ERROR), + // the seatunnel task should fail gracefully and exit rather than hang indefinitely. + Container.ExecResult execResult = + container.executeJob("/doris_source_and_sink_with_cast_error.conf"); + Assertions.assertEquals(1, execResult.getExitCode()); + Assertions.assertTrue(execResult.getStderr().contains("can not cast from origin type")); + + Container.ExecResult execResult2 = + container.executeJob("/doris_source_and_sink_with_cast_error_2pc_true.conf"); + Assertions.assertEquals(1, execResult2.getExitCode()); + Assertions.assertTrue(execResult2.getStderr().contains("can not cast from origin type")); + } + private void checkAllTypeSinkData() { try { assertHasData(sourceDB, DUPLICATE_TABLE); @@ -422,6 +442,8 @@ protected void initializeJdbcTable() { // create source and sink table statement.execute(createUniqueTableForTest(sourceDB)); statement.execute(createDuplicateTableForTest(sourceDB)); + + statement.execute(createTypeCastErrorSinkTableForTest(sinkDB)); log.info("create source and sink table succeed"); } catch (SQLException e) { throw new RuntimeException("Initializing table failed!", e); @@ -477,6 +499,52 @@ private String createUniqueTableForTest(String db) { return String.format(createTableSql, db, UNIQUE_TABLE); } + private String createTypeCastErrorSinkTableForTest(String db) { + // The type of column MAP_VARCHAR_STRING in sink table bitmap, source table is varchar type. + // In this case, doris will report an error. After seatunnel receives the error msg from + // doris, + // it should stop the task normally instead of being blocked and unable to terminate. + String createTableSql = + "create table if not exists `%s`.`%s`(\n" + + "F_ID bigint null,\n" + + "F_INT int null,\n" + + "F_BIGINT bigint null,\n" + + "F_TINYINT tinyint null,\n" + + "F_SMALLINT smallint null,\n" + + "F_DECIMAL decimal(18,6) null,\n" + + "F_LARGEINT largeint null,\n" + + "F_BOOLEAN boolean null,\n" + + "F_DOUBLE double null,\n" + + "F_FLOAT float null,\n" + + "F_CHAR char null,\n" + + "F_VARCHAR_11 ARRAY,\n" + + "F_STRING string null,\n" + + "F_DATETIME_P datetime(6),\n" + + "F_DATETIME datetime,\n" + + "F_DATE date,\n" + + "MAP_VARCHAR_BOOLEAN map,\n" + + "MAP_CHAR_TINYINT MAP,\n" + + "MAP_STRING_SMALLINT MAP,\n" + + "MAP_INT_INT MAP,\n" + + "MAP_TINYINT_BIGINT MAP,\n" + + "MAP_SMALLINT_LARGEINT MAP,\n" + + "MAP_BIGINT_FLOAT MAP,\n" + + "MAP_LARGEINT_DOUBLE MAP,\n" + + "MAP_STRING_DECIMAL MAP,\n" + + "MAP_DECIMAL_DATE MAP,\n" + + "MAP_DATE_DATETIME MAP,\n" + + "MAP_DATETIME_CHAR MAP,\n" + + "MAP_CHAR_VARCHAR MAP,\n" + + "MAP_VARCHAR_STRING bitmap NOT NULL\n" + + ")\n" + + "UNIQUE KEY(`F_ID`)\n" + + "DISTRIBUTED BY HASH(`F_ID`) BUCKETS 1\n" + + "properties(\n" + + "\"replication_allocation\" = \"tag.location.default: 1\"" + + ");"; + return String.format(createTableSql, db, CAST_ERROR_SINK_TABLE); + } + private String createDuplicateTableForTest(String db) { String createDuplicateTableSql = "create table if not exists `%s`.`%s`(\n" diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_cast_error.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_cast_error.conf new file mode 100644 index 00000000000..939bc8407d2 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_cast_error.conf @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env{ + parallelism = 1 + job.mode = "BATCH" +} + +source{ + Doris { + fenodes = "doris_e2e:8030" + username = root + password = "" + database = "e2e_source" + table = "doris_e2e_unique_table" + doris.batch.size = 10 + } +} + +transform {} + +sink{ + Doris { + fenodes = "doris_e2e:8030" + username = root + password = "" + table.identifier = "e2e_sink.doris_e2e_cast_error_sink_table" + sink.enable-2pc = "false" + sink.label-prefix = "test_json" + sink.check-interval = 20000 + doris.batch.size = 10 + sink.buffer-count = 2 + sink.buffer-size = 5120 + doris.config = { + format="json" + read_json_by_line="true" + } + } + } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_cast_error_2pc_true.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_cast_error_2pc_true.conf new file mode 100644 index 00000000000..9d1cc4b2862 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_cast_error_2pc_true.conf @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env{ + parallelism = 1 + job.mode = "BATCH" +} + +source{ + Doris { + fenodes = "doris_e2e:8030" + username = root + password = "" + database = "e2e_source" + table = "doris_e2e_unique_table" + doris.batch.size = 10 + } +} + +transform {} + +sink{ + Doris { + fenodes = "doris_e2e:8030" + username = root + password = "" + table.identifier = "e2e_sink.doris_e2e_cast_error_sink_table" + sink.enable-2pc = "true" + sink.label-prefix = "test_json" + sink.check-interval = 20000 + doris.batch.size = 10 + sink.buffer-count = 2 + sink.buffer-size = 5120 + doris.config = { + format="json" + read_json_by_line="true" + } + } + } \ No newline at end of file diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java index c34a0783eb1..455f4780b5c 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java @@ -114,15 +114,22 @@ public WriterCommitMessage commit() throws IOException { @Override public void abort() throws IOException { - sinkWriter.abortPrepare(); - if (sinkCommitter != null) { - if (latestCommitInfoT == null) { - sinkCommitter.abort(Collections.emptyList()); - } else { - sinkCommitter.abort(Collections.singletonList(latestCommitInfoT)); + try { + sinkWriter.abortPrepare(); + if (sinkCommitter != null) { + if (latestCommitInfoT == null) { + sinkCommitter.abort(Collections.emptyList()); + } else { + sinkCommitter.abort(Collections.singletonList(latestCommitInfoT)); + } } + cleanCommitInfo(); + } catch (Throwable e) { + log.error("---> Abort spark writer throw error", e); + throw e; + } finally { + sinkWriter.close(); } - cleanCommitInfo(); } private void cleanCommitInfo() { diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java index 1a97f6e618b..c07e1f08278 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java @@ -92,8 +92,6 @@ public WriterCommitMessage commit() throws IOException { SeaTunnelSparkWriterCommitMessage seaTunnelSparkWriterCommitMessage = new SeaTunnelSparkWriterCommitMessage<>(latestCommitInfoT); cleanCommitInfo(); - sinkWriter.close(); - context.getEventListener().onEvent(new WriterCloseEvent()); try { if (resourceManager != null) { resourceManager.close(); @@ -122,5 +120,8 @@ private void cleanCommitInfo() { } @Override - public void close() throws IOException {} + public void close() throws IOException { + sinkWriter.close(); + context.getEventListener().onEvent(new WriterCloseEvent()); + } }