Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/kafka-connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ for exactly-once semantics. This requires Kafka 2.5 or later.
| iceberg.control.commit.timeout-ms | Commit timeout interval in msec, default is 30,000 (30 sec) |
| iceberg.control.commit.threads | Number of threads to use for commits, default is (`cores * 2`) |
| iceberg.coordinator.transactional.prefix | Prefix for the transactional id to use for the coordinator producer, default is to use no/empty prefix |
| iceberg.worker.poll.duration-ms | Worker poll duration, in millis, default is 0 |
| iceberg.catalog | Name of the catalog, default is `iceberg` |
| iceberg.catalog.* | Properties passed through to Iceberg catalog initialization |
| iceberg.hadoop-conf-dir | If specified, Hadoop config files in this directory will be loaded |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String CONNECT_GROUP_ID_PROP = "iceberg.connect.group-id";
private static final String TRANSACTIONAL_PREFIX_PROP =
"iceberg.coordinator.transactional.prefix";
private static final String WORKER_POLL_DURATION_MS_PROP = "iceberg.worker.poll.duration-ms";
private static final int WORKER_POLL_DURATION_MS_DEFAULT = 0;
private static final String HADOOP_CONF_DIR_PROP = "iceberg.hadoop-conf-dir";

private static final String NAME_PROP = "name";
Expand Down Expand Up @@ -235,6 +237,12 @@ private static ConfigDef newConfigDef() {
120000L,
Importance.LOW,
"config to control coordinator executor keep alive time");
configDef.define(
WORKER_POLL_DURATION_MS_PROP,
ConfigDef.Type.INT,
WORKER_POLL_DURATION_MS_DEFAULT,
Importance.MEDIUM,
"Worker poll duration, in millis");
return configDef;
}

Expand Down Expand Up @@ -396,6 +404,10 @@ public String controlGroupIdPrefix() {
return getString(CONTROL_GROUP_ID_PREFIX_PROP);
}

public int workerPollDurationMs() {
return getInt(WORKER_POLL_DURATION_MS_PROP);
}

public String connectGroupId() {
String result = getString(CONNECT_GROUP_ID_PROP);
if (result != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class Worker extends Channel {
private final IcebergSinkConfig config;
private final SinkTaskContext context;
private final SinkWriter sinkWriter;
private final Duration pollDuration;

Worker(
IcebergSinkConfig config,
Expand All @@ -59,10 +60,11 @@ class Worker extends Channel {
this.config = config;
this.context = context;
this.sinkWriter = sinkWriter;
this.pollDuration = Duration.ofMillis(config.workerPollDurationMs());
}

void process() {
consumeAvailable(Duration.ZERO);
consumeAvailable(pollDuration);
}

@Override
Expand Down