Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
needs: build
name: Test
runs-on: ubuntu-latest
timeout-minutes: 90
timeout-minutes: 120
strategy:
fail-fast: false
matrix:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,14 @@ public class ConverterAndQuery {
/**
* When requesting a partition, the projection clause contains only static columns.
* When requesting a wide row, the projection clause contains regular and static columns
* When deleting a single row or a partition, the projection contains regular and static columns
* @param whereClauseLength number of columns in the CQL where clause.
* @return the projection clause
*/
public CqlIdentifier[] getProjectionClause(int whereClauseLength) {
return primaryKeyClause.length == whereClauseLength
// when primary key columns are different from where clause columns and static columns are absent, we still
// need to include regular columns in the projection clause (e.g. for DELETE by partition key use cases)
return primaryKeyClause.length == whereClauseLength || staticProjectionClause.length == 0
? projectionClause
: staticProjectionClause;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.LocalDate;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -154,16 +155,27 @@ public static void initBeforeClass() throws Exception {

@AfterAll
public static void closeAfterAll() {
cassandraContainer1.close();
cassandraContainer2.close();
pulsarContainer.close();
if (cassandraContainer1 != null) {
cassandraContainer1.close();
}
if (cassandraContainer2 != null) {
cassandraContainer2.close();
}
if (pulsarContainer != null) {
pulsarContainer.close();
}
}

@Test
public void testSinglePk() throws InterruptedException, IOException {
testSinglePk("ks1");
}

@Test
public void testClusteringKey() throws InterruptedException, IOException {
testClusteringKey("ks1");
}

@Test
public void testCompoundPk() throws InterruptedException, IOException {
testCompoundPk("ks1");
Expand Down Expand Up @@ -322,6 +334,118 @@ public void testSinglePk(String ksName) throws InterruptedException, IOException
}
}

// docker exec -it pulsar cat /pulsar/logs/functions/public/default/cassandra-source-ks1-table5/cassandra-source-ks1-table5-0.log
public void testClusteringKey(String ksName) throws InterruptedException, IOException {
try {
try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) {
cqlSession.execute("CREATE KEYSPACE IF NOT EXISTS " + ksName +
" WITH replication = {'class':'SimpleStrategy','replication_factor':'2'};");
cqlSession.execute("CREATE TABLE IF NOT EXISTS " + ksName + ".table5 (pk text, c1 date, c2 uuid, val int, PRIMARY KEY (pk, c1, c2)) WITH cdc=true");
cqlSession.execute("INSERT INTO " + ksName + ".table5 (pk, c1, c2, val) VALUES('1','2021-01-10', 016b123d-f732-4173-9225-c6717066c7af, 1)");
cqlSession.execute("INSERT INTO " + ksName + ".table5 (pk, c1, c2, val) VALUES('2','2021-01-10', 016b123d-f732-4173-9225-c6717066c7af, 1)");
cqlSession.execute("INSERT INTO " + ksName + ".table5 (pk, c1, c2, val) VALUES('3','2021-01-10', 016b123d-f732-4173-9225-c6717066c7af, 1)");
}
deployConnector(ksName, "table5");

try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build()) {
Map<String, Integer> mutationTable5 = new HashMap<>();
try (Consumer<GenericRecord> consumer = pulsarClient.newConsumer(org.apache.pulsar.client.api.Schema.AUTO_CONSUME())
.topic(String.format(Locale.ROOT, "data-%s.table5", ksName))
.subscriptionName("sub1")
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()) {
Message<GenericRecord> msg;
while ((msg = consumer.receive(90, TimeUnit.SECONDS)) != null &&
mutationTable5.values().stream().mapToInt(i -> i).sum() < 4) {
GenericRecord record = msg.getValue();
assertEquals(this.schemaType, record.getSchemaType());
Object key = getKey(msg);
GenericRecord value = getValue(record);
assertEquals((Integer) 0, mutationTable5.computeIfAbsent(getAndAssertKeyFieldAsString(key, "pk"), k -> 0));
assertEquals((int) LocalDate.parse("2021-01-10").toEpochDay(), getAndAssertKeyFieldAsInt(key, "c1"));
assertEquals("016b123d-f732-4173-9225-c6717066c7af", getAndAssertKeyFieldAsString(key, "c2"));
assertEquals(1, value.getField("val"));
mutationTable5.compute(getAndAssertKeyFieldAsString(key, "pk"), (k, v) -> v + 1);
consumer.acknowledge(msg);
}
assertEquals((Integer) 1, mutationTable5.get("1"));
assertEquals((Integer) 1, mutationTable5.get("2"));
assertEquals((Integer) 1, mutationTable5.get("3"));

// trigger a schema update
try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) {
cqlSession.execute("ALTER TABLE " + ksName + ".table5 ADD d double");
cqlSession.execute("INSERT INTO " + ksName + ".table5 (pk,c1,c2,val,d) VALUES('1','2021-01-10', 016b123d-f732-4173-9225-c6717066c7af,1,1.0)");
}
while ((msg = consumer.receive(90, TimeUnit.SECONDS)) != null &&
mutationTable5.values().stream().mapToInt(i -> i).sum() < 5) {
GenericRecord record = msg.getValue();
assertEquals(this.schemaType, record.getSchemaType());
Object key = getKey(msg);
GenericRecord value = getValue(record);
assertEquals("1", getAndAssertKeyFieldAsString(key, "pk"));
assertEquals((int) LocalDate.parse("2021-01-10").toEpochDay(), getAndAssertKeyFieldAsInt(key, "c1"));
assertEquals("016b123d-f732-4173-9225-c6717066c7af", getAndAssertKeyFieldAsString(key, "c2"));
assertEquals(1, value.getField("val"));
mutationTable5.compute(getAndAssertKeyFieldAsString(key, "pk"), (k, v) -> v + 1);
consumer.acknowledge(msg);
}
assertEquals((Integer) 2, mutationTable5.get("1")); // 2 inserts for pk=1
assertEquals((Integer) 1, mutationTable5.get("2"));
assertEquals((Integer) 1, mutationTable5.get("3"));

// delete a row by partition key only
try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) {
cqlSession.execute("DELETE FROM " + ksName + ".table5 WHERE pk = '1'");
}
while ((msg = consumer.receive(60, TimeUnit.SECONDS)) != null &&
mutationTable5.values().stream().mapToInt(i -> i).sum() < 6) {
GenericRecord record = msg.getValue();
assertEquals(this.schemaType, record.getSchemaType());
Object key = getKey(msg);
GenericRecord value = getValue(record);
assertEquals("1", getAndAssertKeyFieldAsString(key, "pk"));
// clustering keys are null because we deleted by partition key only
assertKeyFieldIsNull(key, "c1");
assertKeyFieldIsNull(key, "c2");
assertNullValue(value);
mutationTable5.compute(getAndAssertKeyFieldAsString(key, "pk"), (k, v) -> v + 1);
consumer.acknowledge(msg);
}
assertEquals((Integer) 3, mutationTable5.get("1")); // 2 inserts and 1 delete for pk=1
assertEquals((Integer) 1, mutationTable5.get("2"));
assertEquals((Integer) 1, mutationTable5.get("3"));

// delete a row by partition and clustering keys
try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) {
cqlSession.execute("DELETE FROM " + ksName + ".table5 WHERE pk = '2' and c1 = '2021-01-10' and c2 = 016b123d-f732-4173-9225-c6717066c7af");
}
while ((msg = consumer.receive(60, TimeUnit.SECONDS)) != null &&
mutationTable5.values().stream().mapToInt(i -> i).sum() < 6) {
GenericRecord record = msg.getValue();
assertEquals(this.schemaType, record.getSchemaType());
Object key = getKey(msg);
GenericRecord value = getValue(record);
assertEquals("2", getAndAssertKeyFieldAsString(key, "pk"));
assertEquals((int) LocalDate.parse("2021-01-10").toEpochDay(), getAndAssertKeyFieldAsInt(key, "c1"));
assertEquals("016b123d-f732-4173-9225-c6717066c7af", getAndAssertKeyFieldAsString(key, "c2"));
assertNullValue(value);
mutationTable5.compute(getAndAssertKeyFieldAsString(key, "pk"), (k, v) -> v + 1);
consumer.acknowledge(msg);
}
assertEquals((Integer) 3, mutationTable5.get("1")); // 2 inserts and 1 delete for pk=1
assertEquals((Integer) 2, mutationTable5.get("2")); // 1 insert and 1 delete for pk=2
assertEquals((Integer) 1, mutationTable5.get("3"));
}
}
} finally {
dumpFunctionLogs("cassandra-source-" + ksName + "-table5");
undeployConnector(ksName, "table5");
}
}

// docker exec -it pulsar cat /pulsar/logs/functions/public/default/cassandra-source-ks1-table2/cassandra-source-ks1-table2-0.log
public void testCompoundPk(String ksName) throws InterruptedException, IOException {
try {
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ testPulsarImageTag=2.10_3.4

kafkaVersion=3.4.0
vavrVersion=0.10.3
testContainersVersion=1.16.2
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't work anymore with Docker Desktop for Mac (with Apple Silicon)

testContainersVersion=1.19.1
caffeineVersion=2.8.8
guavaVersion=30.1-jre
messagingConnectorsCommonsVersion=1.0.14
Expand Down