diff --git a/connector/src/main/java/com/datastax/oss/cdc/CassandraClient.java b/connector/src/main/java/com/datastax/oss/cdc/CassandraClient.java
index 8cfa33e4..7617f109 100644
--- a/connector/src/main/java/com/datastax/oss/cdc/CassandraClient.java
+++ b/connector/src/main/java/com/datastax/oss/cdc/CassandraClient.java
@@ -42,6 +42,7 @@
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.api.core.servererrors.UnavailableException;
import com.datastax.oss.driver.api.querybuilder.select.Select;
+import com.datastax.oss.driver.api.querybuilder.select.SelectFrom;
import com.datastax.oss.driver.internal.core.auth.PlainTextAuthProvider;
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader;
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultProgrammaticDriverConfigLoaderBuilder;
@@ -155,8 +156,8 @@ public CqlIdentifier[] buildPrimaryKeyClause(TableMetadata tableMetadata) {
/**
* Build a SELECT prepared statement for the first pkLength primary key columns.
- * @param keyspaceName
- * @param tableName
+ * @param keyspaceName keyspace name
+ * @param tableName table name
* @param projection columns
* @param pk primary key columns
* @param pkLength primary key length
@@ -166,7 +167,9 @@ public PreparedStatement prepareSelect(String keyspaceName, String tableName,
CqlIdentifier[] projection,
CqlIdentifier[] pk,
int pkLength) {
- Select query = selectFrom(keyspaceName, tableName).columns(projection);
+ // select columns according to projection array length
+ Select query = selectFrom(keyspaceName, tableName)
+ .columns(projection.length != 0 ? projection : pk);
for (int i = 0; i < pkLength; i++)
query = query.whereColumn(pk[i]).isEqualTo(bindMarker());
query.limit(1);
diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java b/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java
index 21716a59..75b2ecd1 100644
--- a/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java
+++ b/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java
@@ -70,15 +70,7 @@
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
@@ -346,16 +338,29 @@ void initCassandraClient() throws InvocationTargetException, NoSuchMethodExcepti
setValueConverterAndQuery(tuple._1, tuple._2);
}
+ /**
+ * Check if the table has only primary key columns.
+ * @param tableMetadata the table metadata
+ * @return true if the table has only primary key columns, false otherwise
+ */
+ private boolean isPrimaryKeyOnlyTable(TableMetadata tableMetadata) {
+ // if the table has no columns other than the primary key, we can skip the value converter
+ return tableMetadata.getColumns().size() == tableMetadata.getPrimaryKey().size() &&
+ new HashSet<>(tableMetadata.getPrimaryKey()).containsAll(tableMetadata.getColumns().values());
+ }
+
synchronized void setValueConverterAndQuery(KeyspaceMetadata ksm, TableMetadata tableMetadata) {
try {
+ boolean isPrimaryKeyOnlyTable = isPrimaryKeyOnlyTable(tableMetadata);
List columns = tableMetadata.getColumns().values().stream()
// include primary keys in the json only output format options
// TODO: PERF: Infuse the key values instead of reading from DB https://github.com/datastax/cdc-apache-cassandra/issues/84
- .filter(c -> config.isJsonOnlyOutputFormat() ? true : !tableMetadata.getPrimaryKey().contains(c))
+ // If primary key only table, then add all the columns into the value schema.
+ .filter(c -> config.isJsonOnlyOutputFormat() || isPrimaryKeyOnlyTable || !tableMetadata.getPrimaryKey().contains(c))
.filter(c -> !columnPattern.isPresent() || columnPattern.get().matcher(c.getName().asInternal()).matches())
.collect(Collectors.toList());
List staticColumns = tableMetadata.getColumns().values().stream()
- .filter(c -> c.isStatic())
+ .filter(ColumnMetadata::isStatic)
.filter(c -> !tableMetadata.getPrimaryKey().contains(c))
.filter(c -> !columnPattern.isPresent() || columnPattern.get().matcher(c.getName().asInternal()).matches())
.collect(Collectors.toList());
@@ -379,9 +384,9 @@ synchronized void setValueConverterAndQuery(KeyspaceMetadata ksm, TableMetadata
/**
* Build the CQL prepared statement for the specified where clause length.
- * NOTE: The prepared statement cannot be build from the schema listener thread to avoid a possible deadlock.
+ * NOTE: The prepared statement cannot be built from the schema listener thread to avoid a possible deadlock.
*
- * @param valueConverterAndQuery
+ * @param valueConverterAndQuery the converter and query parameters
* @param whereClauseLength the number of columns in the where clause
* @return preparedStatement
*/
@@ -392,7 +397,8 @@ synchronized PreparedStatement getSelectStatement(ConverterAndQuery valueConvert
valueConverterAndQuery.tableName,
valueConverterAndQuery.getProjectionClause(whereClauseLength),
valueConverterAndQuery.primaryKeyClause,
- k));
+ k
+ ));
}
Class> getKeyConverterClass() {
diff --git a/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java b/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java
index 286e0ce7..ad115c77 100644
--- a/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java
+++ b/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java
@@ -180,6 +180,11 @@ public void testClusteringKey() throws InterruptedException, IOException {
public void testCompoundPk() throws InterruptedException, IOException {
testCompoundPk("ks1");
}
+
+ @Test
+ public void testOnlyPk() throws InterruptedException, IOException {
+ testOnlyPk("ks1");
+ }
@Test
public void testSchema() throws InterruptedException, IOException {
@@ -536,6 +541,67 @@ public void testCompoundPk(String ksName) throws InterruptedException, IOExcepti
}
}
+ public void testOnlyPk(String ksName) throws InterruptedException, IOException {
+ try {
+ try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) {
+ cqlSession.execute("CREATE KEYSPACE IF NOT EXISTS " + ksName +
+ " WITH replication = {'class':'SimpleStrategy','replication_factor':'2'};");
+ cqlSession.execute("CREATE TABLE IF NOT EXISTS " + ksName + ".table6 (a text, b int, PRIMARY KEY(a,b)) WITH cdc=true");
+ cqlSession.execute("INSERT INTO " + ksName + ".table6 (a,b) VALUES('1',1)");
+ cqlSession.execute("INSERT INTO " + ksName + ".table6 (a,b) VALUES('2',2)");
+ cqlSession.execute("INSERT INTO " + ksName + ".table6 (a,b) VALUES('3',3)");
+ }
+ deployConnector(ksName, "table6");
+ try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build()) {
+ try (Consumer consumer = pulsarClient.newConsumer(org.apache.pulsar.client.api.Schema.AUTO_CONSUME())
+ .topic(String.format(Locale.ROOT, "data-%s.table6", ksName))
+ .subscriptionName("sub1")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscriptionMode(SubscriptionMode.Durable)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe()) {
+ Message msg;
+ int receivedCount = 1;
+ while ((msg = consumer.receive(60, TimeUnit.SECONDS)) != null &&
+ receivedCount < 5) {
+ GenericRecord record = msg.getValue();
+ assertEquals(this.schemaType, record.getSchemaType());
+ Object key = getKey(msg);
+ GenericRecord value = getValue(record);
+ // assert key fields
+ assertEquals(Integer.toString(receivedCount) , getAndAssertKeyFieldAsString(key, "a"));
+ assertEquals(receivedCount, getAndAssertKeyFieldAsInt(key, "b"));
+ // assert value fields
+ assertEquals(Integer.toString(receivedCount), value.getField("a"));
+ assertEquals(receivedCount, value.getField("b"));
+ consumer.acknowledge(msg);
+ receivedCount++;
+ }
+
+ // delete a row
+ try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) {
+ cqlSession.execute("DELETE FROM " + ksName + ".table6 WHERE a = '1' AND b = 1");
+ }
+ while ((msg = consumer.receive(30, TimeUnit.SECONDS)) != null &&
+ receivedCount < 6) {
+ GenericRecord record = msg.getValue();
+ assertEquals(this.schemaType, record.getSchemaType());
+ Object key = getKey(msg);
+ GenericRecord value = getValue(record);
+ assertEquals("1", getAndAssertKeyFieldAsString(key,"a"));
+ assertEquals(1, getAndAssertKeyFieldAsInt(key, "b"));
+ assertNullValue(value);
+ consumer.acknowledge(msg);
+ receivedCount++;
+ }
+ }
+ }
+ } finally {
+ dumpFunctionLogs("cassandra-source-" + ksName + "-table6");
+ undeployConnector(ksName, "table6");
+ }
+ }
+
// docker exec -it pulsar cat /pulsar/logs/functions/public/default/cassandra-source-ks1-table3/cassandra-source-ks1-table3-0.log
public void testSchema(String ksName) throws InterruptedException, IOException {
try {