diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
index aa8ce77936a..50206761a19 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
@@ -31,7 +31,7 @@ Postgres CDC Pipeline 连接器允许从 Postgres 数据库读取快照数据和
## 示例
-从 Postgres 读取数据同步到 Doris 的 Pipeline 可以定义如下:
+从 Postgres 读取数据同步到 Fluss 的 Pipeline 可以定义如下:
```yaml
source:
@@ -41,19 +41,23 @@ source:
port: 5432
username: admin
password: pass
- tables: adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, [app|web].schema_\.*.order_\.*
+ # 需要确保所有的表来自同一个database
+ tables: adb.\.*.\.*
decoding.plugin.name: pgoutput
slot.name: pgtest
sink:
- type: doris
- name: Doris Sink
- fenodes: 127.0.0.1:8030
- username: root
- password: pass
+ type: fluss
+ name: Fluss Sink
+ bootstrap.servers: localhost:9123
+ # Security-related properties for the Fluss client
+ properties.client.security.protocol: sasl
+ properties.client.security.sasl.mechanism: PLAIN
+ properties.client.security.sasl.username: developer
+ properties.client.security.sasl.password: developer-pass
pipeline:
- name: Postgres to Doris Pipeline
+ name: Postgres to Fluss Pipeline
parallelism: 4
```
@@ -105,8 +109,9 @@ pipeline:
(none) |
String |
需要监视的 Postgres 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。
- 需要注意的是,点号(.)被视为数据库和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。
- 例如,adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, [app|web].schema_\.*.order_\.* |
+ 需要确保所有的表来自同一个数据库。
+ 需要注意的是,点号(.)被视为数据库、模式和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。
+ 例如,bdb.user_schema_[0-9].user_table_[0-9]+, bdb.schema_\.*.order_\.*
| slot.name |
diff --git a/docs/content/docs/connectors/pipeline-connectors/postgres.md b/docs/content/docs/connectors/pipeline-connectors/postgres.md
index 1a51486f0f6..03dd4e5a314 100644
--- a/docs/content/docs/connectors/pipeline-connectors/postgres.md
+++ b/docs/content/docs/connectors/pipeline-connectors/postgres.md
@@ -32,29 +32,33 @@ Note: Since the Postgres WAL log cannot parse table structure change records, Po
## Example
-An example of the pipeline for reading data from Postgres and sink to Doris can be defined as follows:
+An example of the pipeline for reading data from Postgres and sink to Fluss can be defined as follows:
```yaml
source:
- type: posgtres
+ type: postgres
name: Postgres Source
hostname: 127.0.0.1
port: 5432
username: admin
password: pass
- tables: adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, [app|web].schema_\.*.order_\.*
+ # make sure all the tables share same database.
+ tables: adb.\.*.\.*
decoding.plugin.name: pgoutput
slot.name: pgtest
sink:
- type: doris
- name: Doris Sink
- fenodes: 127.0.0.1:8030
- username: root
- password: pass
+ type: fluss
+ name: Fluss Sink
+ bootstrap.servers: localhost:9123
+ # Security-related properties for the Fluss client
+ properties.client.security.protocol: sasl
+ properties.client.security.sasl.mechanism: PLAIN
+ properties.client.security.sasl.username: developer
+ properties.client.security.sasl.password: developer-pass
pipeline:
- name: Postgres to Doris Pipeline
+ name: Postgres to Fluss Pipeline
parallelism: 4
```
@@ -106,9 +110,10 @@ pipeline:
(none) |
String |
Table name of the Postgres database to monitor. The table-name also supports regular expressions to monitor multiple tables that satisfy the regular expressions.
- It is important to note that the dot (.) is treated as a delimiter for database and table names.
+ All the tables are required to share same database.
+ It is important to note that the dot (.) is treated as a delimiter for database, schema and table names.
If there is a need to use a dot (.) in a regular expression to match any character, it is necessary to escape the dot with a backslash.
- 例如,adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, [app|web].schema_\.*.order_\.* |
+ for example: bdb.user_schema_[0-9].user_table_[0-9]+, bdb.schema_\.*.order_\.*
| slot.name |
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfig.java
index afa730667e0..65b6292399e 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfig.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfig.java
@@ -19,6 +19,7 @@
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.IncrementalSource;
+import org.apache.flink.table.catalog.ObjectPath;
import io.debezium.config.Configuration;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
@@ -26,6 +27,7 @@
import java.time.Duration;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
/**
@@ -46,7 +48,7 @@ public abstract class JdbcSourceConfig extends BaseSourceConfig {
protected final Duration connectTimeout;
protected final int connectMaxRetries;
protected final int connectionPoolSize;
- protected final String chunkKeyColumn;
+ protected final Map chunkKeyColumns;
public JdbcSourceConfig(
StartupOptions startupOptions,
@@ -71,7 +73,7 @@ public JdbcSourceConfig(
Duration connectTimeout,
int connectMaxRetries,
int connectionPoolSize,
- String chunkKeyColumn,
+ Map chunkKeyColumns,
boolean skipSnapshotBackfill,
boolean isScanNewlyAddedTableEnabled,
boolean assignUnboundedChunkFirst) {
@@ -101,7 +103,7 @@ public JdbcSourceConfig(
this.connectTimeout = connectTimeout;
this.connectMaxRetries = connectMaxRetries;
this.connectionPoolSize = connectionPoolSize;
- this.chunkKeyColumn = chunkKeyColumn;
+ this.chunkKeyColumns = chunkKeyColumns;
}
public abstract RelationalDatabaseConnectorConfig getDbzConnectorConfig();
@@ -154,8 +156,8 @@ public int getConnectionPoolSize() {
return connectionPoolSize;
}
- public String getChunkKeyColumn() {
- return chunkKeyColumn;
+ public Map getChunkKeyColumns() {
+ return chunkKeyColumns;
}
@Override
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
index 82034f48822..3a587357354 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
@@ -22,10 +22,13 @@
import org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions;
import org.apache.flink.cdc.connectors.base.options.SourceOptions;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.table.catalog.ObjectPath;
import java.time.Duration;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
/** A {@link Factory} to provide {@link SourceConfig} of JDBC data source. */
@@ -55,7 +58,7 @@ public abstract class JdbcSourceConfigFactory implements Factory chunkKeyColumns = new HashMap<>();
protected boolean skipSnapshotBackfill =
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue();
protected boolean scanNewlyAddedTableEnabled =
@@ -198,8 +201,17 @@ public JdbcSourceConfigFactory debeziumProperties(Properties properties) {
* The chunk key of table snapshot, captured tables are split into multiple chunks by the chunk
* key column when read the snapshot of table.
*/
- public JdbcSourceConfigFactory chunkKeyColumn(String chunkKeyColumn) {
- this.chunkKeyColumn = chunkKeyColumn;
+ public JdbcSourceConfigFactory chunkKeyColumn(ObjectPath objectPath, String chunkKeyColumn) {
+ this.chunkKeyColumns.put(objectPath, chunkKeyColumn);
+ return this;
+ }
+
+ /**
+ * The chunk key of table snapshot, captured tables are split into multiple chunks by the chunk
+ * key column when read the snapshot of table.
+ */
+ public JdbcSourceConfigFactory chunkKeyColumn(Map chunkKeyColumns) {
+ this.chunkKeyColumns.putAll(chunkKeyColumns);
return this;
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
index 545db56f8b1..44d7a59a4c0 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
@@ -24,6 +24,7 @@
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtils;
import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
@@ -253,11 +254,13 @@ protected double calculateDistributionFactor(
* Get the column which is seen as chunk key.
*
* @param table table identity.
- * @param chunkKeyColumn column name which is seen as chunk key, if chunkKeyColumn is null, use
- * primary key instead. @Column the column which is seen as chunk key.
+ * @param chunkKeyColumns column name which is seen as chunk key, if chunkKeyColumns is null,
+ * use primary key instead.
+ * @return the column which is seen as chunk key.
*/
- protected Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) {
- return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumn);
+ protected Column getSplitColumn(
+ Table table, @Nullable Map chunkKeyColumns) {
+ return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumns);
}
/** ChunkEnd less than or equal to max. */
@@ -360,7 +363,7 @@ private void analyzeTable(TableId tableId) {
try {
currentSchema = dialect.queryTableSchema(jdbcConnection, tableId);
currentSplittingTable = Objects.requireNonNull(currentSchema).getTable();
- splitColumn = getSplitColumn(currentSplittingTable, sourceConfig.getChunkKeyColumn());
+ splitColumn = getSplitColumn(currentSplittingTable, sourceConfig.getChunkKeyColumns());
splitType = getSplitType(splitColumn);
minMaxOfSplitColumn = queryMinMax(jdbcConnection, tableId, splitColumn);
approximateRowCnt = queryApproximateRowCnt(jdbcConnection, tableId);
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java
index 3892229d472..25b9c11dd93 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java
@@ -37,6 +37,8 @@
import io.debezium.util.SchemaNameAdjuster;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.Collection;
@@ -47,7 +49,7 @@
/** The context for fetch task that fetching data of snapshot split from JDBC data source. */
@Internal
public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context {
-
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceFetchTaskContext.class);
protected final JdbcSourceConfig sourceConfig;
protected final JdbcDataSourceDialect dataSourceDialect;
protected CommonConnectorConfig dbzConnectorConfig;
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java
index b6de3d211c7..87862b189b9 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java
@@ -18,15 +18,21 @@
package org.apache.flink.cdc.connectors.base.source.utils;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectPath;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.sql.SQLException;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -35,6 +41,8 @@
/** Utilities to split chunks of table. */
public class JdbcChunkUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcChunkUtils.class);
+
/**
* Query the maximum and minimum value of the column in the table. e.g. query string
* SELECT MIN(%s) FROM %s WHERE %s > ?
@@ -100,15 +108,55 @@ public static Object queryMin(
});
}
+ // Write createTableFilter method here to avoid the dependency on DebeziumUtils
+ private static Tables.TableFilter createTableFilter(String schemaName, String tableName) {
+ return new Tables.TableFilter() {
+ @Override
+ public boolean isIncluded(TableId tableId) {
+ final String catalog = tableId.catalog();
+ final String schema = tableId.schema();
+ final String table = tableId.table();
+
+ if (schemaName != null && !schemaName.equalsIgnoreCase(schema)) {
+ return false;
+ }
+
+ if (tableName != null && !tableName.equalsIgnoreCase(table)) {
+ return false;
+ }
+
+ return true;
+ }
+ };
+ }
+
+ @Nullable
+ private static String findChunkKeyColumn(
+ TableId tableId, Map chunkKeyColumns) {
+ String schemaName = tableId.schema();
+
+ for (ObjectPath table : chunkKeyColumns.keySet()) {
+ Tables.TableFilter filter = createTableFilter(schemaName, table.getObjectName());
+ if (filter.isIncluded(tableId)) {
+ String chunkKeyColumn = chunkKeyColumns.get(table);
+ return chunkKeyColumn;
+ }
+ }
+
+ return null;
+ }
+
/**
* Get the column which is seen as chunk key.
*
* @param table table identity.
- * @param chunkKeyColumn column name which is seen as chunk key, if chunkKeyColumn is null, use
+ * @param chunkKeyColumns column name which is seen as chunk key, if chunkKeyColumn is null, use
* primary key instead. @Column the column which is seen as chunk key.
*/
- public static Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) {
+ public static Column getSplitColumn(
+ Table table, @Nullable Map chunkKeyColumns) {
List primaryKeys = table.primaryKeyColumns();
+ String chunkKeyColumn = findChunkKeyColumn(table.id(), chunkKeyColumns);
if (primaryKeys.isEmpty() && chunkKeyColumn == null) {
throw new ValidationException(
"To use incremental snapshot, 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.");
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SourceRecordUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SourceRecordUtils.java
index 2755e0c3712..133975b3cda 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SourceRecordUtils.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SourceRecordUtils.java
@@ -28,6 +28,8 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.math.BigDecimal;
@@ -43,6 +45,7 @@
/** Utility class to deal record. */
public class SourceRecordUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(SourceRecordUtils.class);
private SourceRecordUtils() {}
@@ -131,15 +134,86 @@ public static TableId getTableId(SourceRecord dataRecord) {
public static Object[] getSplitKey(
RowType splitBoundaryType, SourceRecord dataRecord, SchemaNameAdjuster nameAdjuster) {
- // the split key field contains single field now
String splitFieldName = nameAdjuster.adjust(splitBoundaryType.getFieldNames().get(0));
- Struct key = (Struct) dataRecord.key();
- return new Object[] {key.get(splitFieldName)};
+
+ // Try primary key struct first (for backward compatibility)
+ Struct keyStruct = (Struct) dataRecord.key();
+ if (keyStruct != null && keyStruct.schema().field(splitFieldName) != null) {
+ return new Object[] {keyStruct.get(splitFieldName)};
+ }
+ LOG.info("Get Split Key From Value {} {}", dataRecord, splitFieldName);
+ // For non-primary key chunk keys, use value-based approach
+ return getSplitKeyFromValue(dataRecord, splitFieldName);
+ }
+
+ /** Extract chunk key from value struct (AFTER/BEFORE) for non-primary key chunk keys. */
+ private static Object[] getSplitKeyFromValue(SourceRecord dataRecord, String splitFieldName) {
+ Struct value = (Struct) dataRecord.value();
+ if (value == null) {
+ return null; // No value struct available
+ }
+
+ String op = value.getString(Envelope.FieldName.OPERATION);
+ Struct targetStruct = null;
+
+ if (op == null) {
+ // READ operation (snapshot)
+ targetStruct = value.getStruct(Envelope.FieldName.AFTER);
+ } else {
+ switch (op) {
+ case "c": // CREATE
+ case "r": // READ
+ targetStruct = value.getStruct(Envelope.FieldName.AFTER);
+ break;
+ case "u": // UPDATE - prefer AFTER for current state
+ targetStruct = value.getStruct(Envelope.FieldName.AFTER);
+ if (targetStruct == null
+ || targetStruct.schema().field(splitFieldName) == null) {
+ // Fallback to BEFORE if AFTER doesn't have the field
+ targetStruct = value.getStruct(Envelope.FieldName.BEFORE);
+ }
+ break;
+ case "d": // DELETE - use BEFORE, but fallback if missing
+ targetStruct = value.getStruct(Envelope.FieldName.BEFORE);
+ if (targetStruct == null
+ || targetStruct.schema().field(splitFieldName) == null) {
+ // For DELETE with missing chunk key, return null to indicate "emit without
+ // filtering"
+ return null;
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown operation: " + op);
+ }
+ }
+
+ if (targetStruct == null || targetStruct.schema().field(splitFieldName) == null) {
+ // Chunk key field not found in value struct
+ // This could happen with schema changes or configuration issues
+ LOG.debug(
+ "Chunk key field '{}' not found in record, emitting without filtering. Table: {}, Operation: {}",
+ splitFieldName,
+ getTableId(dataRecord),
+ dataRecord.value() != null
+ ? ((Struct) dataRecord.value()).getString(Envelope.FieldName.OPERATION)
+ : "unknown");
+ return null;
+ }
+
+ return new Object[] {targetStruct.get(splitFieldName)};
}
/** Returns the specific key contains in the split key range or not. */
public static boolean splitKeyRangeContains(
Object[] key, Object[] splitKeyStart, Object[] splitKeyEnd) {
+ // If key is null, chunk key field was not found (e.g., DELETE with non-primary key chunk
+ // key)
+ // Emit the record without filtering to prevent data loss
+ if (key == null) {
+ LOG.debug("Chunk key is null, emitting record without filtering");
+ return true;
+ }
+
// for all range
if (splitKeyStart == null && splitKeyEnd == null) {
return true;
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
index bbd65b5e9ab..236bfcf55a7 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
@@ -42,9 +42,12 @@
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.relational.TableId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.List;
@@ -60,6 +63,8 @@ public class PostgresSourceBuilder {
private final PostgresSourceConfigFactory configFactory = new PostgresSourceConfigFactory();
private DebeziumDeserializationSchema deserializer;
+ private static final Logger LOG = LoggerFactory.getLogger(PostgresSourceBuilder.class);
+
private PostgresSourceBuilder() {}
/**
@@ -219,8 +224,8 @@ public PostgresSourceBuilder startupOptions(StartupOptions startupOptions) {
* The chunk key of table snapshot, captured tables are split into multiple chunks by the chunk
* key column when read the snapshot of table.
*/
- public PostgresSourceBuilder chunkKeyColumn(String chunkKeyColumn) {
- this.configFactory.chunkKeyColumn(chunkKeyColumn);
+ public PostgresSourceBuilder chunkKeyColumn(ObjectPath objectPath, String chunkKeyColumn) {
+ this.configFactory.chunkKeyColumn(objectPath, chunkKeyColumn);
return this;
}
@@ -313,8 +318,12 @@ public PostgresSourceBuilder includePartitionedTables(boolean includePartitio
public PostgresIncrementalSource build() {
PostgresOffsetFactory offsetFactory = new PostgresOffsetFactory();
PostgresDialect dialect = new PostgresDialect(configFactory.create(0));
- return new PostgresIncrementalSource<>(
- configFactory, checkNotNull(deserializer), offsetFactory, dialect);
+
+ PostgresIncrementalSource source =
+ new PostgresIncrementalSource<>(
+ configFactory, checkNotNull(deserializer), offsetFactory, dialect);
+
+ return source;
}
public PostgresSourceConfigFactory getConfigFactory() {
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
index 30271612800..c19c0598657 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
@@ -19,6 +19,7 @@
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.table.catalog.ObjectPath;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
@@ -27,6 +28,7 @@
import java.time.Duration;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SLOT_NAME;
@@ -64,7 +66,7 @@ public PostgresSourceConfig(
Duration connectTimeout,
int connectMaxRetries,
int connectionPoolSize,
- @Nullable String chunkKeyColumn,
+ @Nullable Map chunkKeyColumns,
boolean skipSnapshotBackfill,
boolean isScanNewlyAddedTableEnabled,
int lsnCommitCheckpointsDelay,
@@ -93,10 +95,11 @@ public PostgresSourceConfig(
connectTimeout,
connectMaxRetries,
connectionPoolSize,
- chunkKeyColumn,
+ chunkKeyColumns,
skipSnapshotBackfill,
isScanNewlyAddedTableEnabled,
assignUnboundedChunkFirst);
+
this.subtaskId = subtaskId;
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
this.includePartitionedTables = includePartitionedTables;
@@ -120,6 +123,18 @@ public int getLsnCommitCheckpointsDelay() {
return this.lsnCommitCheckpointsDelay;
}
+ public Map getChunkKeyColumns() {
+ return chunkKeyColumns;
+ }
+
+ /**
+ * Returns {@code includePartitionedTables} value.
+ *
+ * @return include partitioned table
+ */
+ public boolean includePartitionedTables() {
+ return includePartitionedTables;
+ }
/**
* Returns {@code includePartitionedTables} value.
*
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
index 670d4f37a56..74deca5bcb8 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
@@ -19,6 +19,7 @@
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfigFactory;
import org.apache.flink.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory;
+import org.apache.flink.table.catalog.ObjectPath;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnector;
@@ -26,7 +27,9 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.UUID;
@@ -52,6 +55,8 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {
private int lsnCommitCheckpointsDelay;
+ private Map chunkKeyColumns = new HashMap<>();
+
private boolean includePartitionedTables;
/** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */
@@ -107,6 +112,7 @@ public PostgresSourceConfig create(int subtaskId) {
props.setProperty("snapshot.mode", "never");
Configuration dbzConfiguration = Configuration.from(props);
+
return new PostgresSourceConfig(
subtaskId,
startupOptions,
@@ -131,7 +137,7 @@ public PostgresSourceConfig create(int subtaskId) {
connectTimeout,
connectMaxRetries,
connectionPoolSize,
- chunkKeyColumn,
+ chunkKeyColumns,
skipSnapshotBackfill,
scanNewlyAddedTableEnabled,
lsnCommitCheckpointsDelay,
@@ -185,6 +191,21 @@ public void setLsnCommitCheckpointsDelay(int lsnCommitCheckpointsDelay) {
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
}
+ /**
+ * The chunk key of table snapshot, captured tables are split into multiple chunks by the chunk
+ * key column when read the snapshot of table.
+ */
+ public PostgresSourceConfigFactory chunkKeyColumn(
+ ObjectPath objectPath, String chunkKeyColumn) {
+ this.chunkKeyColumns.put(objectPath, chunkKeyColumn);
+ return this;
+ }
+
+ public PostgresSourceConfigFactory chunkKeyColumn(Map chunkKeyColumns) {
+ this.chunkKeyColumns.putAll(chunkKeyColumns);
+ return this;
+ }
+
/** Enable include partitioned table. */
public void setIncludePartitionedTables(boolean includePartitionedTables) {
this.includePartitionedTables = includePartitionedTables;
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
index 35568d2ce93..773b515a8c5 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
@@ -25,6 +25,7 @@
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
+import org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtils;
import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
@@ -288,7 +289,8 @@ public PostgresSchema getDatabaseSchema() {
@Override
public RowType getSplitType(Table table) {
- Column splitColumn = ChunkUtils.getSplitColumn(table, sourceConfig.getChunkKeyColumn());
+ Column splitColumn =
+ JdbcChunkUtils.getSplitColumn(table, sourceConfig.getChunkKeyColumns());
return ChunkUtils.getSplitType(splitColumn);
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
index 7ebb43f9b4e..9e700035f96 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
@@ -27,6 +27,7 @@
import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
import org.apache.flink.cdc.debezium.table.MetadataConverter;
import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -226,7 +227,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.connectMaxRetries(connectMaxRetries)
.connectionPoolSize(connectionPoolSize)
.startupOptions(startupOptions)
- .chunkKeyColumn(chunkKeyColumn)
+ .chunkKeyColumn(new ObjectPath(schemaName, tableName), chunkKeyColumn)
.heartbeatInterval(heartbeatInterval)
.closeIdleReaders(closeIdleReaders)
.skipSnapshotBackfill(skipSnapshotBackfill)
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresChunkyColumnsConfigTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresChunkyColumnsConfigTest.java
new file mode 100644
index 00000000000..d3f4772aaad
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresChunkyColumnsConfigTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
+import org.apache.flink.table.catalog.ObjectPath;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for chunky columns configuration functionality. */
+public class PostgresChunkyColumnsConfigTest {
+
+ @Test
+ public void testChunkKeyColumnConfiguration() {
+ // Create a config factory to test chunk key column functionality
+ PostgresSourceConfigFactory configFactory = new PostgresSourceConfigFactory();
+
+ // Test basic configuration
+ configFactory.hostname("localhost");
+ configFactory.port(5432);
+ configFactory.database("testdb");
+ configFactory.username("user");
+ configFactory.password("password");
+
+ // Test chunk key column configuration
+ ObjectPath tablePath = new ObjectPath("public", "customers");
+ configFactory.chunkKeyColumn(tablePath, "customer_id");
+
+ // Verify the factory is properly configured
+ assertThat(configFactory).isNotNull();
+ }
+
+ @Test
+ public void testMultipleChunkKeyColumns() {
+ PostgresSourceConfigFactory configFactory = new PostgresSourceConfigFactory();
+
+ // Configure basic settings
+ configFactory.hostname("localhost");
+ configFactory.port(5432);
+ configFactory.database("testdb");
+ configFactory.username("user");
+ configFactory.password("password");
+
+ // Configure multiple chunk key columns for different tables
+ configFactory.chunkKeyColumn(new ObjectPath("public", "customers"), "customer_id");
+ configFactory.chunkKeyColumn(new ObjectPath("public", "orders"), "order_id");
+ configFactory.chunkKeyColumn(new ObjectPath("public", "products"), "product_id");
+
+ // Verify the factory accepts multiple chunk key columns
+ assertThat(configFactory).isNotNull();
+ }
+
+ @Test
+ public void testChunkKeyColumnWithDifferentSchemas() {
+ PostgresSourceConfigFactory configFactory = new PostgresSourceConfigFactory();
+
+ // Configure basic settings
+ configFactory.hostname("localhost");
+ configFactory.port(5432);
+ configFactory.database("testdb");
+ configFactory.username("user");
+ configFactory.password("password");
+
+ // Configure chunk key columns for tables in different schemas
+ configFactory.chunkKeyColumn(new ObjectPath("public", "customers"), "customer_id");
+ configFactory.chunkKeyColumn(new ObjectPath("inventory", "products"), "product_id");
+ configFactory.chunkKeyColumn(new ObjectPath("sales", "orders"), "order_id");
+
+ // Verify the factory handles different schemas
+ assertThat(configFactory).isNotNull();
+ }
+
+ @Test
+ public void testChunkKeyColumnWithSpecialCharacters() {
+ PostgresSourceConfigFactory configFactory = new PostgresSourceConfigFactory();
+
+ // Configure basic settings
+ configFactory.hostname("localhost");
+ configFactory.port(5432);
+ configFactory.database("testdb");
+ configFactory.username("user");
+ configFactory.password("password");
+
+ // Configure chunk key columns with special characters
+ configFactory.chunkKeyColumn(new ObjectPath("public", "user_profiles"), "user_profile_id");
+ configFactory.chunkKeyColumn(new ObjectPath("public", "order_items"), "order_item_id");
+
+ // Verify the factory handles special characters
+ assertThat(configFactory).isNotNull();
+ }
+
+ @Test
+ public void testChunkKeyColumnWithSplitSizeConfiguration() {
+ PostgresSourceConfigFactory configFactory = new PostgresSourceConfigFactory();
+
+ // Configure basic settings
+ configFactory.hostname("localhost");
+ configFactory.port(5432);
+ configFactory.database("testdb");
+ configFactory.username("user");
+ configFactory.password("password");
+
+ // Configure split size along with chunk key column
+ configFactory.splitSize(1000);
+ configFactory.chunkKeyColumn(new ObjectPath("public", "large_table"), "id");
+
+ // Verify the factory handles both split size and chunk key column
+ assertThat(configFactory).isNotNull();
+ }
+
+ @Test
+ public void testChunkKeyColumnWithEmptyTableName() {
+ PostgresSourceConfigFactory configFactory = new PostgresSourceConfigFactory();
+
+ // Configure basic settings
+ configFactory.hostname("localhost");
+ configFactory.port(5432);
+ configFactory.database("testdb");
+ configFactory.username("user");
+ configFactory.password("password");
+
+ // Test with empty table name (should handle gracefully)
+ try {
+ ObjectPath emptyTablePath = new ObjectPath("public", "");
+ configFactory.chunkKeyColumn(emptyTablePath, "id");
+ } catch (Exception e) {
+ // Expected to handle empty table names gracefully
+ assertThat(e).isInstanceOf(IllegalArgumentException.class);
+ }
+
+ // Verify the factory still exists
+ assertThat(configFactory).isNotNull();
+ }
+
+ @Test
+ public void testChunkKeyColumnWithEmptyColumnName() {
+ PostgresSourceConfigFactory configFactory = new PostgresSourceConfigFactory();
+
+ // Configure basic settings
+ configFactory.hostname("localhost");
+ configFactory.port(5432);
+ configFactory.database("testdb");
+ configFactory.username("user");
+ configFactory.password("password");
+
+ // Test with empty column name (should handle gracefully)
+ ObjectPath tablePath = new ObjectPath("public", "customers");
+ configFactory.chunkKeyColumn(tablePath, "");
+
+ // Verify the factory handles empty column names
+ assertThat(configFactory).isNotNull();
+ }
+
+ @Test
+ public void testChunkKeyColumnWithNullValues() {
+ PostgresSourceConfigFactory configFactory = new PostgresSourceConfigFactory();
+
+ // Configure basic settings
+ configFactory.hostname("localhost");
+ configFactory.port(5432);
+ configFactory.database("testdb");
+ configFactory.username("user");
+ configFactory.password("password");
+
+ // Test with null values (should handle gracefully)
+ try {
+ configFactory.chunkKeyColumn(null, "id");
+ } catch (Exception e) {
+ // Expected to handle null values gracefully
+ assertThat(e).isInstanceOf(NullPointerException.class);
+ }
+
+ try {
+ configFactory.chunkKeyColumn(new ObjectPath("public", "customers"), null);
+ } catch (Exception e) {
+ // Expected to handle null values gracefully
+ assertThat(e).isInstanceOf(NullPointerException.class);
+ }
+
+ assertThat(configFactory).isNotNull();
+ }
+
+ @Test
+ public void testChunkKeyColumnWithLongTableName() {
+ PostgresSourceConfigFactory configFactory = new PostgresSourceConfigFactory();
+
+ // Configure basic settings
+ configFactory.hostname("localhost");
+ configFactory.port(5432);
+ configFactory.database("testdb");
+ configFactory.username("user");
+ configFactory.password("password");
+
+ // Test with very long table name
+ String longTableName =
+ "a_very_long_table_name_that_exceeds_normal_length_limits_for_testing_purposes";
+ configFactory.chunkKeyColumn(new ObjectPath("public", longTableName), "id");
+
+ // Verify the factory handles long table names
+ assertThat(configFactory).isNotNull();
+ }
+
+ @Test
+ public void testChunkKeyColumnWithLongColumnName() {
+ PostgresSourceConfigFactory configFactory = new PostgresSourceConfigFactory();
+
+ // Configure basic settings
+ configFactory.hostname("localhost");
+ configFactory.port(5432);
+ configFactory.database("testdb");
+ configFactory.username("user");
+ configFactory.password("password");
+
+ // Test with very long column name
+ String longColumnName =
+ "a_very_long_column_name_that_exceeds_normal_length_limits_for_testing_purposes";
+ configFactory.chunkKeyColumn(new ObjectPath("public", "customers"), longColumnName);
+
+ // Verify the factory handles long column names
+ assertThat(configFactory).isNotNull();
+ }
+}