Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.api.core.servererrors.UnavailableException;
import com.datastax.oss.driver.api.querybuilder.select.Select;
import com.datastax.oss.driver.api.querybuilder.select.SelectFrom;
import com.datastax.oss.driver.internal.core.auth.PlainTextAuthProvider;
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader;
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultProgrammaticDriverConfigLoaderBuilder;
Expand Down Expand Up @@ -155,8 +156,8 @@ public CqlIdentifier[] buildPrimaryKeyClause(TableMetadata tableMetadata) {

/**
* Build a SELECT prepared statement for the first <i>pkLength</i> primary key columns.
* @param keyspaceName
* @param tableName
* @param keyspaceName keyspace name
* @param tableName table name
* @param projection columns
* @param pk primary key columns
* @param pkLength primary key length
Expand All @@ -166,7 +167,9 @@ public PreparedStatement prepareSelect(String keyspaceName, String tableName,
CqlIdentifier[] projection,
CqlIdentifier[] pk,
int pkLength) {
Select query = selectFrom(keyspaceName, tableName).columns(projection);
// select columns according to projection array length
Select query = selectFrom(keyspaceName, tableName)
.columns(projection.length != 0 ? projection : pk);
for (int i = 0; i < pkLength; i++)
query = query.whereColumn(pk[i]).isEqualTo(bindMarker());
query.limit(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,7 @@
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -346,16 +338,29 @@ void initCassandraClient() throws InvocationTargetException, NoSuchMethodExcepti
setValueConverterAndQuery(tuple._1, tuple._2);
}

/**
* Check if the table has only primary key columns.
* @param tableMetadata the table metadata
* @return true if the table has only primary key columns, false otherwise
*/
private boolean isPrimaryKeyOnlyTable(TableMetadata tableMetadata) {
// if the table has no columns other than the primary key, we can skip the value converter
return tableMetadata.getColumns().size() == tableMetadata.getPrimaryKey().size() &&
new HashSet<>(tableMetadata.getPrimaryKey()).containsAll(tableMetadata.getColumns().values());
}

synchronized void setValueConverterAndQuery(KeyspaceMetadata ksm, TableMetadata tableMetadata) {
try {
boolean isPrimaryKeyOnlyTable = isPrimaryKeyOnlyTable(tableMetadata);
List<ColumnMetadata> columns = tableMetadata.getColumns().values().stream()
// include primary keys in the json only output format options
// TODO: PERF: Infuse the key values instead of reading from DB https://github.com/datastax/cdc-apache-cassandra/issues/84
.filter(c -> config.isJsonOnlyOutputFormat() ? true : !tableMetadata.getPrimaryKey().contains(c))
// If primary key only table, then add all the columns into the value schema.
.filter(c -> config.isJsonOnlyOutputFormat() || isPrimaryKeyOnlyTable || !tableMetadata.getPrimaryKey().contains(c))
.filter(c -> !columnPattern.isPresent() || columnPattern.get().matcher(c.getName().asInternal()).matches())
.collect(Collectors.toList());
List<ColumnMetadata> staticColumns = tableMetadata.getColumns().values().stream()
.filter(c -> c.isStatic())
.filter(ColumnMetadata::isStatic)
.filter(c -> !tableMetadata.getPrimaryKey().contains(c))
.filter(c -> !columnPattern.isPresent() || columnPattern.get().matcher(c.getName().asInternal()).matches())
.collect(Collectors.toList());
Expand All @@ -379,9 +384,9 @@ synchronized void setValueConverterAndQuery(KeyspaceMetadata ksm, TableMetadata

/**
* Build the CQL prepared statement for the specified where clause length.
* NOTE: The prepared statement cannot be build from the schema listener thread to avoid a possible deadlock.
* NOTE: The prepared statement cannot be built from the schema listener thread to avoid a possible deadlock.
*
* @param valueConverterAndQuery
* @param valueConverterAndQuery the converter and query parameters
* @param whereClauseLength the number of columns in the where clause
* @return preparedStatement
*/
Expand All @@ -392,7 +397,8 @@ synchronized PreparedStatement getSelectStatement(ConverterAndQuery valueConvert
valueConverterAndQuery.tableName,
valueConverterAndQuery.getProjectionClause(whereClauseLength),
valueConverterAndQuery.primaryKeyClause,
k));
k
));
}

Class<?> getKeyConverterClass() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ public void testClusteringKey() throws InterruptedException, IOException {
public void testCompoundPk() throws InterruptedException, IOException {
testCompoundPk("ks1");
}

@Test
public void testOnlyPk() throws InterruptedException, IOException {
testOnlyPk("ks1");
}

@Test
public void testSchema() throws InterruptedException, IOException {
Expand Down Expand Up @@ -536,6 +541,67 @@ public void testCompoundPk(String ksName) throws InterruptedException, IOExcepti
}
}

public void testOnlyPk(String ksName) throws InterruptedException, IOException {
try {
try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) {
cqlSession.execute("CREATE KEYSPACE IF NOT EXISTS " + ksName +
" WITH replication = {'class':'SimpleStrategy','replication_factor':'2'};");
cqlSession.execute("CREATE TABLE IF NOT EXISTS " + ksName + ".table6 (a text, b int, PRIMARY KEY(a,b)) WITH cdc=true");
cqlSession.execute("INSERT INTO " + ksName + ".table6 (a,b) VALUES('1',1)");
cqlSession.execute("INSERT INTO " + ksName + ".table6 (a,b) VALUES('2',2)");
cqlSession.execute("INSERT INTO " + ksName + ".table6 (a,b) VALUES('3',3)");
}
deployConnector(ksName, "table6");
try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build()) {
try (Consumer<GenericRecord> consumer = pulsarClient.newConsumer(org.apache.pulsar.client.api.Schema.AUTO_CONSUME())
.topic(String.format(Locale.ROOT, "data-%s.table6", ksName))
.subscriptionName("sub1")
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()) {
Message<GenericRecord> msg;
int receivedCount = 1;
while ((msg = consumer.receive(60, TimeUnit.SECONDS)) != null &&
receivedCount < 5) {
GenericRecord record = msg.getValue();
assertEquals(this.schemaType, record.getSchemaType());
Object key = getKey(msg);
GenericRecord value = getValue(record);
// assert key fields
assertEquals(Integer.toString(receivedCount) , getAndAssertKeyFieldAsString(key, "a"));
assertEquals(receivedCount, getAndAssertKeyFieldAsInt(key, "b"));
// assert value fields
assertEquals(Integer.toString(receivedCount), value.getField("a"));
assertEquals(receivedCount, value.getField("b"));
consumer.acknowledge(msg);
receivedCount++;
}

// delete a row
try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) {
cqlSession.execute("DELETE FROM " + ksName + ".table6 WHERE a = '1' AND b = 1");
}
while ((msg = consumer.receive(30, TimeUnit.SECONDS)) != null &&
receivedCount < 6) {
GenericRecord record = msg.getValue();
assertEquals(this.schemaType, record.getSchemaType());
Object key = getKey(msg);
GenericRecord value = getValue(record);
assertEquals("1", getAndAssertKeyFieldAsString(key,"a"));
assertEquals(1, getAndAssertKeyFieldAsInt(key, "b"));
assertNullValue(value);
consumer.acknowledge(msg);
receivedCount++;
}
}
}
} finally {
dumpFunctionLogs("cassandra-source-" + ksName + "-table6");
undeployConnector(ksName, "table6");
}
}

// docker exec -it pulsar cat /pulsar/logs/functions/public/default/cassandra-source-ks1-table3/cassandra-source-ks1-table3-0.log
public void testSchema(String ksName) throws InterruptedException, IOException {
try {
Expand Down
Loading