From c0a2665a60ab53feac6a1e47214da910b13f9701 Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Sat, 9 Aug 2025 11:27:29 +0530 Subject: [PATCH 1/5] added metadat and data path in case of dynamic routing --- committer/build.gradle | 36 ++ .../java/org/apache/iceberg/Coordinator.java | 315 ++++++++++++++++++ .../main/java/org/apache/iceberg/Main.java | 7 + .../iceberg/connect/IcebergSinkConfig.java | 34 +- .../iceberg/connect/TableSinkConfig.java | 13 +- .../connect/data/IcebergWriterFactory.java | 11 +- 6 files changed, 413 insertions(+), 3 deletions(-) create mode 100644 committer/build.gradle create mode 100644 committer/src/main/java/org/apache/iceberg/Coordinator.java create mode 100644 committer/src/main/java/org/apache/iceberg/Main.java diff --git a/committer/build.gradle b/committer/build.gradle new file mode 100644 index 000000000000..0de61912d850 --- /dev/null +++ b/committer/build.gradle @@ -0,0 +1,36 @@ +plugins { + id 'java-library' +} + +group = 'org.apache.iceberg' +version = '1.7.0-SNAPSHOT' + +repositories { + mavenCentral() +} + +dependencies { + api project(':iceberg-api') + implementation project(':iceberg-kafka-connect') + implementation project(':iceberg-core') + implementation project(':iceberg-common') + implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + implementation project(':iceberg-data') + implementation project(':iceberg-kafka-connect:iceberg-kafka-connect-events') + implementation platform(libs.jackson.bom) + implementation libs.jackson.core + implementation libs.jackson.databind + implementation libs.avro.avro + + compileOnly libs.kafka.clients + compileOnly libs.kafka.connect.api + compileOnly libs.kafka.connect.json + + testImplementation libs.hadoop3.client + testRuntimeOnly project(':iceberg-parquet') + testRuntimeOnly project(':iceberg-orc') +} + +test { + useJUnitPlatform() +} diff --git a/committer/src/main/java/org/apache/iceberg/Coordinator.java b/committer/src/main/java/org/apache/iceberg/Coordinator.java new file mode 100644 index 000000000000..6a260d91bb80 --- /dev/null +++ b/committer/src/main/java/org/apache/iceberg/Coordinator.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.channels.Channel; +import java.time.Duration; +import java.time.OffsetDateTime; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.channel.CommitState; +import org.apache.iceberg.connect.events.CommitComplete; +import org.apache.iceberg.connect.events.CommitToTable; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.StartCommit; +import org.apache.iceberg.connect.events.TableReference; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class Coordinator { + + private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id"; + private static final String VALID_THROUGH_TS_SNAPSHOT_PROP = "kafka.connect.valid-through-ts"; + private static final Duration POLL_DURATION = Duration.ofSeconds(1); + + private final Catalog catalog; + private final IcebergSinkConfig config; + private final int totalPartitionCount; + private final String snapshotOffsetsProp; + private final ExecutorService exec; + private final CommitState commitState; + private volatile boolean terminated; + + Coordinator( + Catalog catalog, + IcebergSinkConfig config, + Collection members, + KafkaClientFactory clientFactory, + SinkTaskContext context) { + // pass consumer group ID to which we commit low watermark offsets + super("coordinator", config.connectGroupId() + "-coord", config, clientFactory, context); + + this.catalog = catalog; + this.config = config; + this.totalPartitionCount = + members.stream().mapToInt(desc -> desc.assignment().topicPartitions().size()).sum(); + this.snapshotOffsetsProp = + String.format( + "kafka.connect.offsets.%s.%s", config.controlTopic(), config.connectGroupId()); + this.exec = ThreadPools.newFixedThreadPool("iceberg-committer", config.commitThreads()); + this.commitState = new CommitState(config); + } + + void process() { + if (commitState.isCommitIntervalReached()) { + // send out begin commit + commitState.startNewCommit(); + Event event = + new Event(config.connectGroupId(), new StartCommit(commitState.currentCommitId())); + send(event); + LOG.info("Commit {} initiated", commitState.currentCommitId()); + } + + consumeAvailable(POLL_DURATION); + + if (commitState.isCommitTimedOut()) { + commit(true); + } + } + + @Override + protected boolean receive(Envelope envelope) { + switch (envelope.event().payload().type()) { + case DATA_WRITTEN: + commitState.addResponse(envelope); + return true; + case DATA_COMPLETE: + commitState.addReady(envelope); + if (commitState.isCommitReady(totalPartitionCount)) { + commit(false); + } + return true; + } + return false; + } + + private void commit(boolean partialCommit) { + try { + doCommit(partialCommit); + } catch (Exception e) { + LOG.warn("Commit failed, will try again next cycle", e); + } finally { + commitState.endCurrentCommit(); + } + } + + private void doCommit(boolean partialCommit) { + Map> commitMap = commitState.tableCommitMap(); + + String offsetsJson = offsetsJson(); + OffsetDateTime validThroughTs = commitState.validThroughTs(partialCommit); + + Tasks.foreach(commitMap.entrySet()) + .executeWith(exec) + .stopOnFailure() + .run( + entry -> { + commitToTable(entry.getKey(), entry.getValue(), offsetsJson, validThroughTs); + }); + + // we should only get here if all tables committed successfully... + commitConsumerOffsets(); + commitState.clearResponses(); + + Event event = + new Event( + config.connectGroupId(), + new CommitComplete(commitState.currentCommitId(), validThroughTs)); + send(event); + + LOG.info( + "Commit {} complete, committed to {} table(s), valid-through {}", + commitState.currentCommitId(), + commitMap.size(), + validThroughTs); + } + + private String offsetsJson() { + try { + return MAPPER.writeValueAsString(controlTopicOffsets()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private void commitToTable( + TableReference tableReference, + List envelopeList, + String offsetsJson, + OffsetDateTime validThroughTs) { + TableIdentifier tableIdentifier = tableReference.identifier(); + Table table; + try { + table = catalog.loadTable(tableIdentifier); + } catch (NoSuchTableException e) { + LOG.warn("Table not found, skipping commit: {}", tableIdentifier, e); + return; + } + + String branch = config.tableConfig(tableIdentifier.toString()).commitBranch(); + + Map committedOffsets = lastCommittedOffsetsForTable(table, branch); + + List payloads = + envelopeList.stream() + .filter( + envelope -> { + Long minOffset = committedOffsets.get(envelope.partition()); + return minOffset == null || envelope.offset() >= minOffset; + }) + .map(envelope -> (DataWritten) envelope.event().payload()) + .collect(Collectors.toList()); + + List dataFiles = + payloads.stream() + .filter(payload -> payload.dataFiles() != null) + .flatMap(payload -> payload.dataFiles().stream()) + .filter(dataFile -> dataFile.recordCount() > 0) + .filter(distinctByKey(ContentFile::location)) + .collect(Collectors.toList()); + + List deleteFiles = + payloads.stream() + .filter(payload -> payload.deleteFiles() != null) + .flatMap(payload -> payload.deleteFiles().stream()) + .filter(deleteFile -> deleteFile.recordCount() > 0) + .filter(distinctByKey(ContentFile::location)) + .collect(Collectors.toList()); + + if (terminated) { + throw new ConnectException("Coordinator is terminated, commit aborted"); + } + + if (dataFiles.isEmpty() && deleteFiles.isEmpty()) { + LOG.info("Nothing to commit to table {}, skipping", tableIdentifier); + } else { + if (deleteFiles.isEmpty()) { + AppendFiles appendOp = table.newAppend(); + if (branch != null) { + appendOp.toBranch(branch); + } + appendOp.set(snapshotOffsetsProp, offsetsJson); + appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); + if (validThroughTs != null) { + appendOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); + } + dataFiles.forEach(appendOp::appendFile); + appendOp.commit(); + } else { + RowDelta deltaOp = table.newRowDelta(); + if (branch != null) { + deltaOp.toBranch(branch); + } + deltaOp.set(snapshotOffsetsProp, offsetsJson); + deltaOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); + if (validThroughTs != null) { + deltaOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); + } + dataFiles.forEach(deltaOp::addRows); + deleteFiles.forEach(deltaOp::addDeletes); + deltaOp.commit(); + } + + Long snapshotId = latestSnapshot(table, branch).snapshotId(); + Event event = + new Event( + config.connectGroupId(), + new CommitToTable( + commitState.currentCommitId(), tableReference, snapshotId, validThroughTs)); + send(event); + + LOG.info( + "Commit complete to table {}, snapshot {}, commit ID {}, valid-through {}", + tableIdentifier, + snapshotId, + commitState.currentCommitId(), + validThroughTs); + } + } + + private Predicate distinctByKey(Function keyExtractor) { + Map seen = Maps.newConcurrentMap(); + return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null; + } + + private Snapshot latestSnapshot(Table table, String branch) { + if (branch == null) { + return table.currentSnapshot(); + } + return table.snapshot(branch); + } + + private Map lastCommittedOffsetsForTable(Table table, String branch) { + Snapshot snapshot = latestSnapshot(table, branch); + while (snapshot != null) { + Map summary = snapshot.summary(); + String value = summary.get(snapshotOffsetsProp); + if (value != null) { + TypeReference> typeRef = new TypeReference>() {}; + try { + return MAPPER.readValue(value, typeRef); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + Long parentSnapshotId = snapshot.parentId(); + snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; + } + return ImmutableMap.of(); + } + + void terminate() { + this.terminated = true; + + exec.shutdownNow(); + + // wait for coordinator termination, else cause the sink task to fail + try { + if (!exec.awaitTermination(1, TimeUnit.MINUTES)) { + throw new ConnectException("Timed out waiting for coordinator shutdown"); + } + } catch (InterruptedException e) { + throw new ConnectException("Interrupted while waiting for coordinator shutdown", e); + } + } +} diff --git a/committer/src/main/java/org/apache/iceberg/Main.java b/committer/src/main/java/org/apache/iceberg/Main.java new file mode 100644 index 000000000000..3a6b1ae495ae --- /dev/null +++ b/committer/src/main/java/org/apache/iceberg/Main.java @@ -0,0 +1,7 @@ +package org.apache.iceberg; + +public class Main { + public static void main(String[] args) { + System.out.println("Hello world!"); + } +} \ No newline at end of file diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 9650ce16270c..f6dd7cf3c0a6 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -28,6 +28,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -56,6 +57,7 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String ID_COLUMNS = "id-columns"; private static final String PARTITION_BY = "partition-by"; private static final String COMMIT_BRANCH = "commit-branch"; + private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = "iceberg.dynamic-route-data-metadata-prefix"; private static final String CATALOG_PROP_PREFIX = "iceberg.catalog."; private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop."; @@ -235,6 +237,13 @@ private static ConfigDef newConfigDef() { 120000L, Importance.LOW, "config to control coordinator executor keep alive time"); + configDef.define( + DYNAMIC_ROUTE_DATA_METADATA_PREFIX, + ConfigDef.Type.STRING, + "", + Importance.HIGH, + "prefix for creation of metadata path and data path in case of dynamic routing" + ); return configDef; } @@ -375,7 +384,20 @@ public TableSinkConfig tableConfig(String tableName) { String commitBranch = tableConfig.getOrDefault(COMMIT_BRANCH, tablesDefaultCommitBranch()); - return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch); + String metadataPath = "", dataPath = ""; + + if (dynamicTablesEnabled()) { + TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); + if (originalProps.containsKey("iceberg.catalog.warehouse")) { + metadataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; + dataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; + } else { + metadataPath = tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); + dataPath = tableConfig.getOrDefault("write.data.path", defaultDataPath(tableName)); + } + } + + return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); }); } @@ -388,6 +410,16 @@ static List stringToList(String value, String regex) { return Arrays.stream(value.split(regex)).map(String::trim).collect(Collectors.toList()); } + private String defaultDataPath(String tableName) { + TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; + } + + private String defaultMetadataPath(String tableName) { + TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; + } + public String controlTopic() { return getString(CONTROL_TOPIC_PROP); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java index 0ecde1f7dd0b..28fdd4f36d15 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java @@ -27,13 +27,24 @@ public class TableSinkConfig { private final List idColumns; private final List partitionBy; private final String commitBranch; + private final String dataPath, metadataPath; public TableSinkConfig( - Pattern routeRegex, List idColumns, List partitionBy, String commitBranch) { + Pattern routeRegex, List idColumns, List partitionBy, String commitBranch, String dataPath, String metadataPath) { this.routeRegex = routeRegex; this.idColumns = idColumns; this.partitionBy = partitionBy; this.commitBranch = commitBranch; + this.dataPath = dataPath; + this.metadataPath = metadataPath; + } + + public String getDataPath() { + return dataPath; + } + + public String getMetadataPath() { + return metadataPath; } public Pattern routeRegex() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index 92f5af2d7a87..5de1039e77b0 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; @@ -89,6 +90,14 @@ Table autoCreateTable(String tableName, SinkRecord sample) { createNamespaceIfNotExist(catalog, identifier.namespace()); List partitionBy = config.tableConfig(tableName).partitionBy(); + + Map tableAutoCreateProps = config.autoCreateProps(); + + if (config.dynamicTablesEnabled()) { + tableAutoCreateProps.put("write.metadata.path", config.tableConfig(tableName).getMetadataPath()); + tableAutoCreateProps.put("write.data.path", config.tableConfig(tableName).getDataPath()); + } + PartitionSpec spec; try { spec = SchemaUtils.createPartitionSpec(schema, partitionBy); @@ -110,7 +119,7 @@ Table autoCreateTable(String tableName, SinkRecord sample) { try { result.set( catalog.createTable( - identifier, schema, partitionSpec, config.autoCreateProps())); + identifier, schema, partitionSpec, tableAutoCreateProps)); } catch (AlreadyExistsException e) { result.set(catalog.loadTable(identifier)); } From 67619ec140a65c7768af074178c95408d76e605d Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Sat, 9 Aug 2025 11:28:07 +0530 Subject: [PATCH 2/5] spotless --- .../iceberg/connect/IcebergSinkConfig.java | 44 +++++++++++++++---- .../iceberg/connect/TableSinkConfig.java | 7 ++- .../connect/data/IcebergWriterFactory.java | 6 +-- 3 files changed, 44 insertions(+), 13 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index f6dd7cf3c0a6..8339cf7f89ac 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -57,7 +57,8 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String ID_COLUMNS = "id-columns"; private static final String PARTITION_BY = "partition-by"; private static final String COMMIT_BRANCH = "commit-branch"; - private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = "iceberg.dynamic-route-data-metadata-prefix"; + private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = + "iceberg.dynamic-route-data-metadata-prefix"; private static final String CATALOG_PROP_PREFIX = "iceberg.catalog."; private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop."; @@ -242,8 +243,7 @@ private static ConfigDef newConfigDef() { ConfigDef.Type.STRING, "", Importance.HIGH, - "prefix for creation of metadata path and data path in case of dynamic routing" - ); + "prefix for creation of metadata path and data path in case of dynamic routing"); return configDef; } @@ -389,15 +389,27 @@ public TableSinkConfig tableConfig(String tableName) { if (dynamicTablesEnabled()) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); if (originalProps.containsKey("iceberg.catalog.warehouse")) { - metadataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; - dataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; + metadataPath = + originalProps.get("iceberg.catalog.warehouse") + + tableIdentifier.namespace() + + "/" + + tableIdentifier.name() + + "/metadata"; + dataPath = + originalProps.get("iceberg.catalog.warehouse") + + tableIdentifier.namespace() + + "/" + + tableIdentifier.name() + + "/data"; } else { - metadataPath = tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); + metadataPath = + tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); dataPath = tableConfig.getOrDefault("write.data.path", defaultDataPath(tableName)); } } - return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); + return new TableSinkConfig( + routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); }); } @@ -412,12 +424,26 @@ static List stringToList(String value, String regex) { private String defaultDataPath(String tableName) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + + "/" + + connectorName() + + "/" + + tableIdentifier.namespace() + + "/" + + tableIdentifier.name() + + "/data"; } private String defaultMetadataPath(String tableName) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + + "/" + + connectorName() + + "/" + + tableIdentifier.namespace() + + "/" + + tableIdentifier.name() + + "/metadata"; } public String controlTopic() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java index 28fdd4f36d15..36879bf6929a 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java @@ -30,7 +30,12 @@ public class TableSinkConfig { private final String dataPath, metadataPath; public TableSinkConfig( - Pattern routeRegex, List idColumns, List partitionBy, String commitBranch, String dataPath, String metadataPath) { + Pattern routeRegex, + List idColumns, + List partitionBy, + String commitBranch, + String dataPath, + String metadataPath) { this.routeRegex = routeRegex; this.idColumns = idColumns; this.partitionBy = partitionBy; diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index 5de1039e77b0..df87e8d2a958 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -94,7 +94,8 @@ Table autoCreateTable(String tableName, SinkRecord sample) { Map tableAutoCreateProps = config.autoCreateProps(); if (config.dynamicTablesEnabled()) { - tableAutoCreateProps.put("write.metadata.path", config.tableConfig(tableName).getMetadataPath()); + tableAutoCreateProps.put( + "write.metadata.path", config.tableConfig(tableName).getMetadataPath()); tableAutoCreateProps.put("write.data.path", config.tableConfig(tableName).getDataPath()); } @@ -118,8 +119,7 @@ Table autoCreateTable(String tableName, SinkRecord sample) { notUsed -> { try { result.set( - catalog.createTable( - identifier, schema, partitionSpec, tableAutoCreateProps)); + catalog.createTable(identifier, schema, partitionSpec, tableAutoCreateProps)); } catch (AlreadyExistsException e) { result.set(catalog.loadTable(identifier)); } From 6b15ae402deded39a34843f85f5342dbb9ae28a0 Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Sat, 9 Aug 2025 17:54:24 +0530 Subject: [PATCH 3/5] Revert "spotless" This reverts commit 67619ec140a65c7768af074178c95408d76e605d. --- .../iceberg/connect/IcebergSinkConfig.java | 44 ++++--------------- .../iceberg/connect/TableSinkConfig.java | 7 +-- .../connect/data/IcebergWriterFactory.java | 6 +-- 3 files changed, 13 insertions(+), 44 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 8339cf7f89ac..f6dd7cf3c0a6 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -57,8 +57,7 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String ID_COLUMNS = "id-columns"; private static final String PARTITION_BY = "partition-by"; private static final String COMMIT_BRANCH = "commit-branch"; - private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = - "iceberg.dynamic-route-data-metadata-prefix"; + private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = "iceberg.dynamic-route-data-metadata-prefix"; private static final String CATALOG_PROP_PREFIX = "iceberg.catalog."; private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop."; @@ -243,7 +242,8 @@ private static ConfigDef newConfigDef() { ConfigDef.Type.STRING, "", Importance.HIGH, - "prefix for creation of metadata path and data path in case of dynamic routing"); + "prefix for creation of metadata path and data path in case of dynamic routing" + ); return configDef; } @@ -389,27 +389,15 @@ public TableSinkConfig tableConfig(String tableName) { if (dynamicTablesEnabled()) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); if (originalProps.containsKey("iceberg.catalog.warehouse")) { - metadataPath = - originalProps.get("iceberg.catalog.warehouse") - + tableIdentifier.namespace() - + "/" - + tableIdentifier.name() - + "/metadata"; - dataPath = - originalProps.get("iceberg.catalog.warehouse") - + tableIdentifier.namespace() - + "/" - + tableIdentifier.name() - + "/data"; + metadataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; + dataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; } else { - metadataPath = - tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); + metadataPath = tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); dataPath = tableConfig.getOrDefault("write.data.path", defaultDataPath(tableName)); } } - return new TableSinkConfig( - routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); + return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); }); } @@ -424,26 +412,12 @@ static List stringToList(String value, String regex) { private String defaultDataPath(String tableName) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) - + "/" - + connectorName() - + "/" - + tableIdentifier.namespace() - + "/" - + tableIdentifier.name() - + "/data"; + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; } private String defaultMetadataPath(String tableName) { TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) - + "/" - + connectorName() - + "/" - + tableIdentifier.namespace() - + "/" - + tableIdentifier.name() - + "/metadata"; + return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; } public String controlTopic() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java index 36879bf6929a..28fdd4f36d15 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java @@ -30,12 +30,7 @@ public class TableSinkConfig { private final String dataPath, metadataPath; public TableSinkConfig( - Pattern routeRegex, - List idColumns, - List partitionBy, - String commitBranch, - String dataPath, - String metadataPath) { + Pattern routeRegex, List idColumns, List partitionBy, String commitBranch, String dataPath, String metadataPath) { this.routeRegex = routeRegex; this.idColumns = idColumns; this.partitionBy = partitionBy; diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index df87e8d2a958..5de1039e77b0 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -94,8 +94,7 @@ Table autoCreateTable(String tableName, SinkRecord sample) { Map tableAutoCreateProps = config.autoCreateProps(); if (config.dynamicTablesEnabled()) { - tableAutoCreateProps.put( - "write.metadata.path", config.tableConfig(tableName).getMetadataPath()); + tableAutoCreateProps.put("write.metadata.path", config.tableConfig(tableName).getMetadataPath()); tableAutoCreateProps.put("write.data.path", config.tableConfig(tableName).getDataPath()); } @@ -119,7 +118,8 @@ Table autoCreateTable(String tableName, SinkRecord sample) { notUsed -> { try { result.set( - catalog.createTable(identifier, schema, partitionSpec, tableAutoCreateProps)); + catalog.createTable( + identifier, schema, partitionSpec, tableAutoCreateProps)); } catch (AlreadyExistsException e) { result.set(catalog.loadTable(identifier)); } From 8398e4c2675f5e319850c6fe1555007c153ec143 Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Sat, 9 Aug 2025 17:54:36 +0530 Subject: [PATCH 4/5] Revert "added metadat and data path in case of dynamic routing" This reverts commit c0a2665a60ab53feac6a1e47214da910b13f9701. --- committer/build.gradle | 36 -- .../java/org/apache/iceberg/Coordinator.java | 315 ------------------ .../main/java/org/apache/iceberg/Main.java | 7 - .../iceberg/connect/IcebergSinkConfig.java | 34 +- .../iceberg/connect/TableSinkConfig.java | 13 +- .../connect/data/IcebergWriterFactory.java | 11 +- 6 files changed, 3 insertions(+), 413 deletions(-) delete mode 100644 committer/build.gradle delete mode 100644 committer/src/main/java/org/apache/iceberg/Coordinator.java delete mode 100644 committer/src/main/java/org/apache/iceberg/Main.java diff --git a/committer/build.gradle b/committer/build.gradle deleted file mode 100644 index 0de61912d850..000000000000 --- a/committer/build.gradle +++ /dev/null @@ -1,36 +0,0 @@ -plugins { - id 'java-library' -} - -group = 'org.apache.iceberg' -version = '1.7.0-SNAPSHOT' - -repositories { - mavenCentral() -} - -dependencies { - api project(':iceberg-api') - implementation project(':iceberg-kafka-connect') - implementation project(':iceberg-core') - implementation project(':iceberg-common') - implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') - implementation project(':iceberg-data') - implementation project(':iceberg-kafka-connect:iceberg-kafka-connect-events') - implementation platform(libs.jackson.bom) - implementation libs.jackson.core - implementation libs.jackson.databind - implementation libs.avro.avro - - compileOnly libs.kafka.clients - compileOnly libs.kafka.connect.api - compileOnly libs.kafka.connect.json - - testImplementation libs.hadoop3.client - testRuntimeOnly project(':iceberg-parquet') - testRuntimeOnly project(':iceberg-orc') -} - -test { - useJUnitPlatform() -} diff --git a/committer/src/main/java/org/apache/iceberg/Coordinator.java b/committer/src/main/java/org/apache/iceberg/Coordinator.java deleted file mode 100644 index 6a260d91bb80..000000000000 --- a/committer/src/main/java/org/apache/iceberg/Coordinator.java +++ /dev/null @@ -1,315 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.channels.Channel; -import java.time.Duration; -import java.time.OffsetDateTime; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Collectors; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.connect.IcebergSinkConfig; -import org.apache.iceberg.connect.channel.CommitState; -import org.apache.iceberg.connect.events.CommitComplete; -import org.apache.iceberg.connect.events.CommitToTable; -import org.apache.iceberg.connect.events.DataWritten; -import org.apache.iceberg.connect.events.Event; -import org.apache.iceberg.connect.events.StartCommit; -import org.apache.iceberg.connect.events.TableReference; -import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.util.Tasks; -import org.apache.iceberg.util.ThreadPools; -import org.apache.kafka.clients.admin.MemberDescription; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.sink.SinkTaskContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class Coordinator { - - private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class); - private static final ObjectMapper MAPPER = new ObjectMapper(); - private static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id"; - private static final String VALID_THROUGH_TS_SNAPSHOT_PROP = "kafka.connect.valid-through-ts"; - private static final Duration POLL_DURATION = Duration.ofSeconds(1); - - private final Catalog catalog; - private final IcebergSinkConfig config; - private final int totalPartitionCount; - private final String snapshotOffsetsProp; - private final ExecutorService exec; - private final CommitState commitState; - private volatile boolean terminated; - - Coordinator( - Catalog catalog, - IcebergSinkConfig config, - Collection members, - KafkaClientFactory clientFactory, - SinkTaskContext context) { - // pass consumer group ID to which we commit low watermark offsets - super("coordinator", config.connectGroupId() + "-coord", config, clientFactory, context); - - this.catalog = catalog; - this.config = config; - this.totalPartitionCount = - members.stream().mapToInt(desc -> desc.assignment().topicPartitions().size()).sum(); - this.snapshotOffsetsProp = - String.format( - "kafka.connect.offsets.%s.%s", config.controlTopic(), config.connectGroupId()); - this.exec = ThreadPools.newFixedThreadPool("iceberg-committer", config.commitThreads()); - this.commitState = new CommitState(config); - } - - void process() { - if (commitState.isCommitIntervalReached()) { - // send out begin commit - commitState.startNewCommit(); - Event event = - new Event(config.connectGroupId(), new StartCommit(commitState.currentCommitId())); - send(event); - LOG.info("Commit {} initiated", commitState.currentCommitId()); - } - - consumeAvailable(POLL_DURATION); - - if (commitState.isCommitTimedOut()) { - commit(true); - } - } - - @Override - protected boolean receive(Envelope envelope) { - switch (envelope.event().payload().type()) { - case DATA_WRITTEN: - commitState.addResponse(envelope); - return true; - case DATA_COMPLETE: - commitState.addReady(envelope); - if (commitState.isCommitReady(totalPartitionCount)) { - commit(false); - } - return true; - } - return false; - } - - private void commit(boolean partialCommit) { - try { - doCommit(partialCommit); - } catch (Exception e) { - LOG.warn("Commit failed, will try again next cycle", e); - } finally { - commitState.endCurrentCommit(); - } - } - - private void doCommit(boolean partialCommit) { - Map> commitMap = commitState.tableCommitMap(); - - String offsetsJson = offsetsJson(); - OffsetDateTime validThroughTs = commitState.validThroughTs(partialCommit); - - Tasks.foreach(commitMap.entrySet()) - .executeWith(exec) - .stopOnFailure() - .run( - entry -> { - commitToTable(entry.getKey(), entry.getValue(), offsetsJson, validThroughTs); - }); - - // we should only get here if all tables committed successfully... - commitConsumerOffsets(); - commitState.clearResponses(); - - Event event = - new Event( - config.connectGroupId(), - new CommitComplete(commitState.currentCommitId(), validThroughTs)); - send(event); - - LOG.info( - "Commit {} complete, committed to {} table(s), valid-through {}", - commitState.currentCommitId(), - commitMap.size(), - validThroughTs); - } - - private String offsetsJson() { - try { - return MAPPER.writeValueAsString(controlTopicOffsets()); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - private void commitToTable( - TableReference tableReference, - List envelopeList, - String offsetsJson, - OffsetDateTime validThroughTs) { - TableIdentifier tableIdentifier = tableReference.identifier(); - Table table; - try { - table = catalog.loadTable(tableIdentifier); - } catch (NoSuchTableException e) { - LOG.warn("Table not found, skipping commit: {}", tableIdentifier, e); - return; - } - - String branch = config.tableConfig(tableIdentifier.toString()).commitBranch(); - - Map committedOffsets = lastCommittedOffsetsForTable(table, branch); - - List payloads = - envelopeList.stream() - .filter( - envelope -> { - Long minOffset = committedOffsets.get(envelope.partition()); - return minOffset == null || envelope.offset() >= minOffset; - }) - .map(envelope -> (DataWritten) envelope.event().payload()) - .collect(Collectors.toList()); - - List dataFiles = - payloads.stream() - .filter(payload -> payload.dataFiles() != null) - .flatMap(payload -> payload.dataFiles().stream()) - .filter(dataFile -> dataFile.recordCount() > 0) - .filter(distinctByKey(ContentFile::location)) - .collect(Collectors.toList()); - - List deleteFiles = - payloads.stream() - .filter(payload -> payload.deleteFiles() != null) - .flatMap(payload -> payload.deleteFiles().stream()) - .filter(deleteFile -> deleteFile.recordCount() > 0) - .filter(distinctByKey(ContentFile::location)) - .collect(Collectors.toList()); - - if (terminated) { - throw new ConnectException("Coordinator is terminated, commit aborted"); - } - - if (dataFiles.isEmpty() && deleteFiles.isEmpty()) { - LOG.info("Nothing to commit to table {}, skipping", tableIdentifier); - } else { - if (deleteFiles.isEmpty()) { - AppendFiles appendOp = table.newAppend(); - if (branch != null) { - appendOp.toBranch(branch); - } - appendOp.set(snapshotOffsetsProp, offsetsJson); - appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); - if (validThroughTs != null) { - appendOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); - } - dataFiles.forEach(appendOp::appendFile); - appendOp.commit(); - } else { - RowDelta deltaOp = table.newRowDelta(); - if (branch != null) { - deltaOp.toBranch(branch); - } - deltaOp.set(snapshotOffsetsProp, offsetsJson); - deltaOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); - if (validThroughTs != null) { - deltaOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); - } - dataFiles.forEach(deltaOp::addRows); - deleteFiles.forEach(deltaOp::addDeletes); - deltaOp.commit(); - } - - Long snapshotId = latestSnapshot(table, branch).snapshotId(); - Event event = - new Event( - config.connectGroupId(), - new CommitToTable( - commitState.currentCommitId(), tableReference, snapshotId, validThroughTs)); - send(event); - - LOG.info( - "Commit complete to table {}, snapshot {}, commit ID {}, valid-through {}", - tableIdentifier, - snapshotId, - commitState.currentCommitId(), - validThroughTs); - } - } - - private Predicate distinctByKey(Function keyExtractor) { - Map seen = Maps.newConcurrentMap(); - return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null; - } - - private Snapshot latestSnapshot(Table table, String branch) { - if (branch == null) { - return table.currentSnapshot(); - } - return table.snapshot(branch); - } - - private Map lastCommittedOffsetsForTable(Table table, String branch) { - Snapshot snapshot = latestSnapshot(table, branch); - while (snapshot != null) { - Map summary = snapshot.summary(); - String value = summary.get(snapshotOffsetsProp); - if (value != null) { - TypeReference> typeRef = new TypeReference>() {}; - try { - return MAPPER.readValue(value, typeRef); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - Long parentSnapshotId = snapshot.parentId(); - snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; - } - return ImmutableMap.of(); - } - - void terminate() { - this.terminated = true; - - exec.shutdownNow(); - - // wait for coordinator termination, else cause the sink task to fail - try { - if (!exec.awaitTermination(1, TimeUnit.MINUTES)) { - throw new ConnectException("Timed out waiting for coordinator shutdown"); - } - } catch (InterruptedException e) { - throw new ConnectException("Interrupted while waiting for coordinator shutdown", e); - } - } -} diff --git a/committer/src/main/java/org/apache/iceberg/Main.java b/committer/src/main/java/org/apache/iceberg/Main.java deleted file mode 100644 index 3a6b1ae495ae..000000000000 --- a/committer/src/main/java/org/apache/iceberg/Main.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.apache.iceberg; - -public class Main { - public static void main(String[] args) { - System.out.println("Hello world!"); - } -} \ No newline at end of file diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index f6dd7cf3c0a6..9650ce16270c 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -28,7 +28,6 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.iceberg.IcebergBuild; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -57,7 +56,6 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String ID_COLUMNS = "id-columns"; private static final String PARTITION_BY = "partition-by"; private static final String COMMIT_BRANCH = "commit-branch"; - private static final String DYNAMIC_ROUTE_DATA_METADATA_PREFIX = "iceberg.dynamic-route-data-metadata-prefix"; private static final String CATALOG_PROP_PREFIX = "iceberg.catalog."; private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop."; @@ -237,13 +235,6 @@ private static ConfigDef newConfigDef() { 120000L, Importance.LOW, "config to control coordinator executor keep alive time"); - configDef.define( - DYNAMIC_ROUTE_DATA_METADATA_PREFIX, - ConfigDef.Type.STRING, - "", - Importance.HIGH, - "prefix for creation of metadata path and data path in case of dynamic routing" - ); return configDef; } @@ -384,20 +375,7 @@ public TableSinkConfig tableConfig(String tableName) { String commitBranch = tableConfig.getOrDefault(COMMIT_BRANCH, tablesDefaultCommitBranch()); - String metadataPath = "", dataPath = ""; - - if (dynamicTablesEnabled()) { - TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - if (originalProps.containsKey("iceberg.catalog.warehouse")) { - metadataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; - dataPath = originalProps.get("iceberg.catalog.warehouse") + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; - } else { - metadataPath = tableConfig.getOrDefault("write.metadata.path", defaultMetadataPath(tableName)); - dataPath = tableConfig.getOrDefault("write.data.path", defaultDataPath(tableName)); - } - } - - return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch, dataPath, metadataPath); + return new TableSinkConfig(routeRegex, idColumns, partitionBy, commitBranch); }); } @@ -410,16 +388,6 @@ static List stringToList(String value, String regex) { return Arrays.stream(value.split(regex)).map(String::trim).collect(Collectors.toList()); } - private String defaultDataPath(String tableName) { - TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/data"; - } - - private String defaultMetadataPath(String tableName) { - TableIdentifier tableIdentifier = TableIdentifier.parse(tableName); - return getString(DYNAMIC_ROUTE_DATA_METADATA_PREFIX) + "/" + connectorName() + "/" + tableIdentifier.namespace() + "/" + tableIdentifier.name() + "/metadata"; - } - public String controlTopic() { return getString(CONTROL_TOPIC_PROP); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java index 28fdd4f36d15..0ecde1f7dd0b 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/TableSinkConfig.java @@ -27,24 +27,13 @@ public class TableSinkConfig { private final List idColumns; private final List partitionBy; private final String commitBranch; - private final String dataPath, metadataPath; public TableSinkConfig( - Pattern routeRegex, List idColumns, List partitionBy, String commitBranch, String dataPath, String metadataPath) { + Pattern routeRegex, List idColumns, List partitionBy, String commitBranch) { this.routeRegex = routeRegex; this.idColumns = idColumns; this.partitionBy = partitionBy; this.commitBranch = commitBranch; - this.dataPath = dataPath; - this.metadataPath = metadataPath; - } - - public String getDataPath() { - return dataPath; - } - - public String getMetadataPath() { - return metadataPath; } public Pattern routeRegex() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index 5de1039e77b0..92f5af2d7a87 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -20,7 +20,6 @@ import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; @@ -90,14 +89,6 @@ Table autoCreateTable(String tableName, SinkRecord sample) { createNamespaceIfNotExist(catalog, identifier.namespace()); List partitionBy = config.tableConfig(tableName).partitionBy(); - - Map tableAutoCreateProps = config.autoCreateProps(); - - if (config.dynamicTablesEnabled()) { - tableAutoCreateProps.put("write.metadata.path", config.tableConfig(tableName).getMetadataPath()); - tableAutoCreateProps.put("write.data.path", config.tableConfig(tableName).getDataPath()); - } - PartitionSpec spec; try { spec = SchemaUtils.createPartitionSpec(schema, partitionBy); @@ -119,7 +110,7 @@ Table autoCreateTable(String tableName, SinkRecord sample) { try { result.set( catalog.createTable( - identifier, schema, partitionSpec, tableAutoCreateProps)); + identifier, schema, partitionSpec, config.autoCreateProps())); } catch (AlreadyExistsException e) { result.set(catalog.loadTable(identifier)); } From f0817526b14d16c854d054652c613ee056360852 Mon Sep 17 00:00:00 2001 From: Pritam Kumar Mishra Date: Fri, 12 Dec 2025 08:11:25 +0530 Subject: [PATCH 5/5] Add option to add required column --- .../iceberg/connect/data/RecordConverter.java | 8 +- .../iceberg/connect/data/SchemaUpdate.java | 12 +- .../iceberg/connect/data/SchemaUtils.java | 8 +- .../connect/data/TestRecordConverter.java | 122 ++++++++++++++++++ .../connect/data/TestSchemaUpdate.java | 18 ++- .../iceberg/connect/data/TestSchemaUtils.java | 48 ++++++- 6 files changed, 205 insertions(+), 11 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java index 1a57a6444870..914325c3956e 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java @@ -183,7 +183,7 @@ private GenericRecord convertToStruct( if (type != null) { String parentFieldName = structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId); - schemaUpdateConsumer.addColumn(parentFieldName, recordFieldName, type); + schemaUpdateConsumer.addColumn(parentFieldName, recordFieldName, type, true); } } } else { @@ -218,7 +218,11 @@ private GenericRecord convertToStruct( String parentFieldName = structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId); Type type = SchemaUtils.toIcebergType(recordField.schema(), config); - schemaUpdateConsumer.addColumn(parentFieldName, recordField.name(), type); + schemaUpdateConsumer.addColumn( + parentFieldName, + recordField.name(), + type, + config.schemaForceOptional() || recordField.schema().isOptional()); } } else { boolean hasSchemaUpdates = false; diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUpdate.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUpdate.java index 809bea84dcc2..0b9f9fd28ea5 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUpdate.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUpdate.java @@ -47,8 +47,8 @@ boolean empty() { return addColumns.isEmpty() && updateTypes.isEmpty() && makeOptionals.isEmpty(); } - void addColumn(String parentName, String name, Type type) { - AddColumn addCol = new AddColumn(parentName, name, type); + void addColumn(String parentName, String name, Type type, boolean isOptional) { + AddColumn addCol = new AddColumn(parentName, name, type, isOptional); addColumns.put(addCol.key(), addCol); } @@ -65,11 +65,13 @@ static class AddColumn extends SchemaUpdate { private final String parentName; private final String name; private final Type type; + private final boolean isOptional; - AddColumn(String parentName, String name, Type type) { + AddColumn(String parentName, String name, Type type, boolean isOptional) { this.parentName = parentName; this.name = name; this.type = type; + this.isOptional = isOptional; } String parentName() { @@ -87,6 +89,10 @@ String key() { Type type() { return type; } + + boolean isOptional() { + return isOptional; + } } static class UpdateType extends SchemaUpdate { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java index b0dd56b45d67..3694387c36ed 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java @@ -123,7 +123,13 @@ private static void commitSchemaUpdates(Table table, SchemaUpdate.Consumer updat // apply the updates UpdateSchema updateSchema = table.updateSchema(); addColumns.forEach( - update -> updateSchema.addColumn(update.parentName(), update.name(), update.type())); + update -> { + if (update.isOptional()) { + updateSchema.addColumn(update.parentName(), update.name(), update.type()); + } else { + updateSchema.addRequiredColumn(update.parentName(), update.name(), update.type()); + } + }); updateTypes.forEach(update -> updateSchema.updateColumn(update.name(), update.type())); makeOptionals.forEach(update -> updateSchema.makeColumnOptional(update.name())); updateSchema.commit(); diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestRecordConverter.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestRecordConverter.java index 45d07f69591b..4abe3d321f65 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestRecordConverter.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestRecordConverter.java @@ -817,6 +817,128 @@ private void assertTypesAddedFromStruct(Function fn) { assertThat(fn.apply("ma")).isInstanceOf(MapType.class); } + @Test + public void testAddColumnOptionalityForMapInferredColumns() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(ID_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map data = ImmutableMap.of("new_col", "test_value"); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + AddColumn addCol = addCols.iterator().next(); + assertThat(addCol.name()).isEqualTo("new_col"); + assertThat(addCol.isOptional()).isTrue(); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testAddColumnOptionalityForStructWithRequiredFields(boolean forceOptional) { + Table table = mock(Table.class); + when(table.schema()).thenReturn(ID_SCHEMA); + when(config.schemaForceOptional()).thenReturn(forceOptional); + RecordConverter converter = new RecordConverter(table, config); + + Schema valueSchema = SchemaBuilder.struct().field("new_col", Schema.STRING_SCHEMA).build(); + Struct data = new Struct(valueSchema).put("new_col", "test_value"); + + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + AddColumn addCol = addCols.iterator().next(); + assertThat(addCol.name()).isEqualTo("new_col"); + assertThat(addCol.isOptional()).isEqualTo(forceOptional); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testAddColumnOptionalityForStructWithOptionalFields(boolean forceOptional) { + Table table = mock(Table.class); + when(table.schema()).thenReturn(ID_SCHEMA); + when(config.schemaForceOptional()).thenReturn(forceOptional); + RecordConverter converter = new RecordConverter(table, config); + + Schema valueSchema = + SchemaBuilder.struct().field("new_col", Schema.OPTIONAL_STRING_SCHEMA).build(); + Struct data = new Struct(valueSchema).put("new_col", "test_value"); + + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + AddColumn addCol = addCols.iterator().next(); + assertThat(addCol.name()).isEqualTo("new_col"); + assertThat(addCol.isOptional()).isTrue(); + } + + @Test + public void testAddColumnOptionalityForStructRequiredFieldWithNoForceOptional() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(ID_SCHEMA); + when(config.schemaForceOptional()).thenReturn(false); + RecordConverter converter = new RecordConverter(table, config); + + Schema valueSchema = + SchemaBuilder.struct() + .field("req_col", Schema.STRING_SCHEMA) + .field("opt_col", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + Struct data = new Struct(valueSchema).put("req_col", "test1").put("opt_col", "test2"); + + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(2); + Map colMap = Maps.newHashMap(); + addCols.forEach(col -> colMap.put(col.name(), col)); + + assertThat(colMap.get("req_col").isOptional()).isFalse(); + assertThat(colMap.get("opt_col").isOptional()).isTrue(); + } + + @Test + public void testAddColumnOptionalityForNestedStructFields() { + org.apache.iceberg.Schema baseSchema = + new org.apache.iceberg.Schema(NestedField.required(1, "base", IntegerType.get())); + + Table table = mock(Table.class); + when(table.schema()).thenReturn(baseSchema); + when(config.schemaForceOptional()).thenReturn(false); + RecordConverter converter = new RecordConverter(table, config); + + Schema nestedSchema = + SchemaBuilder.struct() + .field("req_nested", Schema.STRING_SCHEMA) + .field("opt_nested", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + Schema valueSchema = + SchemaBuilder.struct().field("base", Schema.INT32_SCHEMA).field("nested", nestedSchema); + + Struct nestedStruct = + new Struct(nestedSchema).put("req_nested", "test1").put("opt_nested", "test2"); + Struct data = new Struct(valueSchema).put("base", 1).put("nested", nestedStruct); + + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + AddColumn nestedCol = addCols.iterator().next(); + assertThat(nestedCol.name()).isEqualTo("nested"); + assertThat(nestedCol.type()).isInstanceOf(StructType.class); + + StructType nestedType = nestedCol.type().asStructType(); + assertThat(nestedType.field("req_nested").isRequired()).isTrue(); + assertThat(nestedType.field("opt_nested").isOptional()).isTrue(); + } + @Test public void testEvolveTypeDetectionStruct() { org.apache.iceberg.Schema tableSchema = diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUpdate.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUpdate.java index 22b3c6d53537..e1342e467dd2 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUpdate.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUpdate.java @@ -28,7 +28,7 @@ public class TestSchemaUpdate { @Test public void testAddColumn() { SchemaUpdate.Consumer updateConsumer = new SchemaUpdate.Consumer(); - updateConsumer.addColumn("parent", "name", Types.StringType.get()); + updateConsumer.addColumn("parent", "name", Types.StringType.get(), true); assertThat(updateConsumer.addColumns()).hasSize(1); assertThat(updateConsumer.updateTypes()).isEmpty(); assertThat(updateConsumer.makeOptionals()).isEmpty(); @@ -37,6 +37,22 @@ public void testAddColumn() { assertThat(addColumn.parentName()).isEqualTo("parent"); assertThat(addColumn.name()).isEqualTo("name"); assertThat(addColumn.type()).isEqualTo(Types.StringType.get()); + assertThat(addColumn.isOptional()).isTrue(); + } + + @Test + public void testAddColumnRequired() { + SchemaUpdate.Consumer updateConsumer = new SchemaUpdate.Consumer(); + updateConsumer.addColumn("parent", "name", Types.StringType.get(), false); + assertThat(updateConsumer.addColumns()).hasSize(1); + assertThat(updateConsumer.updateTypes()).isEmpty(); + assertThat(updateConsumer.makeOptionals()).isEmpty(); + + SchemaUpdate.AddColumn addColumn = updateConsumer.addColumns().iterator().next(); + assertThat(addColumn.parentName()).isEqualTo("parent"); + assertThat(addColumn.name()).isEqualTo("name"); + assertThat(addColumn.type()).isEqualTo(Types.StringType.get()); + assertThat(addColumn.isOptional()).isFalse(); } @Test diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUtils.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUtils.java index bde2452128b9..768c8133f27b 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUtils.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUtils.java @@ -103,11 +103,11 @@ public void testApplySchemaUpdates() { // the updates to "i" should be ignored as it already exists and is the same type SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); - consumer.addColumn(null, "i", IntegerType.get()); + consumer.addColumn(null, "i", IntegerType.get(), true); consumer.updateType("i", IntegerType.get()); consumer.makeOptional("i"); consumer.updateType("f", DoubleType.get()); - consumer.addColumn(null, "s", StringType.get()); + consumer.addColumn(null, "s", StringType.get(), true); SchemaUtils.applySchemaUpdates(table, consumer); verify(table).refresh(); @@ -133,11 +133,11 @@ public void testApplyNestedSchemaUpdates() { // the updates to "st.i" should be ignored as it already exists and is the same type SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); - consumer.addColumn("st", "i", IntegerType.get()); + consumer.addColumn("st", "i", IntegerType.get(), true); consumer.updateType("st.i", IntegerType.get()); consumer.makeOptional("st.i"); consumer.updateType("st.f", DoubleType.get()); - consumer.addColumn("st", "s", StringType.get()); + consumer.addColumn("st", "s", StringType.get(), true); SchemaUtils.applySchemaUpdates(table, consumer); verify(table).refresh(); @@ -168,6 +168,46 @@ public void testApplySchemaUpdatesNoUpdates() { verify(table, times(0)).updateSchema(); } + @Test + public void testApplySchemaUpdatesWithRequiredColumns() { + UpdateSchema updateSchema = mock(UpdateSchema.class); + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + when(table.updateSchema()).thenReturn(updateSchema); + + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + consumer.addColumn(null, "s1", StringType.get(), true); + consumer.addColumn(null, "s2", StringType.get(), false); + + SchemaUtils.applySchemaUpdates(table, consumer); + verify(table).refresh(); + verify(table).updateSchema(); + + verify(updateSchema).addColumn(isNull(), eq("s1"), isA(StringType.class)); + verify(updateSchema).addRequiredColumn(isNull(), eq("s2"), isA(StringType.class)); + verify(updateSchema).commit(); + } + + @Test + public void testApplyNestedSchemaUpdatesWithRequiredColumns() { + UpdateSchema updateSchema = mock(UpdateSchema.class); + Table table = mock(Table.class); + when(table.schema()).thenReturn(NESTED_SCHEMA); + when(table.updateSchema()).thenReturn(updateSchema); + + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + consumer.addColumn("st", "s1", StringType.get(), true); + consumer.addColumn("st", "s2", StringType.get(), false); + + SchemaUtils.applySchemaUpdates(table, consumer); + verify(table).refresh(); + verify(table).updateSchema(); + + verify(updateSchema).addColumn(eq("st"), eq("s1"), isA(StringType.class)); + verify(updateSchema).addRequiredColumn(eq("st"), eq("s2"), isA(StringType.class)); + verify(updateSchema).commit(); + } + @Test public void testNeedsDataTypeUpdate() { // valid updates