diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a922430c..48b95fca 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -33,7 +33,7 @@ jobs: needs: build name: Test runs-on: ubuntu-latest - timeout-minutes: 90 + timeout-minutes: 120 strategy: fail-fast: false matrix: diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/ConverterAndQuery.java b/connector/src/main/java/com/datastax/oss/pulsar/source/ConverterAndQuery.java index 37f4763b..2542644f 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/ConverterAndQuery.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/ConverterAndQuery.java @@ -64,11 +64,14 @@ public class ConverterAndQuery { /** * When requesting a partition, the projection clause contains only static columns. * When requesting a wide row, the projection clause contains regular and static columns + * When deleting a single row or a partition, the projection contains regular and static columns * @param whereClauseLength number of columns in the CQL where clause. * @return the projection clause */ public CqlIdentifier[] getProjectionClause(int whereClauseLength) { - return primaryKeyClause.length == whereClauseLength + // when primary key columns are different from where clause columns and static columns are absent, we still + // need to include regular columns in the projection clause (e.g. for DELETE by partition key use cases) + return primaryKeyClause.length == whereClauseLength || staticProjectionClause.length == 0 ? projectionClause : staticProjectionClause; } diff --git a/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java b/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java index 7c31fe16..286e0ce7 100644 --- a/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java +++ b/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java @@ -68,6 +68,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.time.LocalDate; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -154,9 +155,15 @@ public static void initBeforeClass() throws Exception { @AfterAll public static void closeAfterAll() { - cassandraContainer1.close(); - cassandraContainer2.close(); - pulsarContainer.close(); + if (cassandraContainer1 != null) { + cassandraContainer1.close(); + } + if (cassandraContainer2 != null) { + cassandraContainer2.close(); + } + if (pulsarContainer != null) { + pulsarContainer.close(); + } } @Test @@ -164,6 +171,11 @@ public void testSinglePk() throws InterruptedException, IOException { testSinglePk("ks1"); } + @Test + public void testClusteringKey() throws InterruptedException, IOException { + testClusteringKey("ks1"); + } + @Test public void testCompoundPk() throws InterruptedException, IOException { testCompoundPk("ks1"); @@ -322,6 +334,118 @@ public void testSinglePk(String ksName) throws InterruptedException, IOException } } + // docker exec -it pulsar cat /pulsar/logs/functions/public/default/cassandra-source-ks1-table5/cassandra-source-ks1-table5-0.log + public void testClusteringKey(String ksName) throws InterruptedException, IOException { + try { + try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) { + cqlSession.execute("CREATE KEYSPACE IF NOT EXISTS " + ksName + + " WITH replication = {'class':'SimpleStrategy','replication_factor':'2'};"); + cqlSession.execute("CREATE TABLE IF NOT EXISTS " + ksName + ".table5 (pk text, c1 date, c2 uuid, val int, PRIMARY KEY (pk, c1, c2)) WITH cdc=true"); + cqlSession.execute("INSERT INTO " + ksName + ".table5 (pk, c1, c2, val) VALUES('1','2021-01-10', 016b123d-f732-4173-9225-c6717066c7af, 1)"); + cqlSession.execute("INSERT INTO " + ksName + ".table5 (pk, c1, c2, val) VALUES('2','2021-01-10', 016b123d-f732-4173-9225-c6717066c7af, 1)"); + cqlSession.execute("INSERT INTO " + ksName + ".table5 (pk, c1, c2, val) VALUES('3','2021-01-10', 016b123d-f732-4173-9225-c6717066c7af, 1)"); + } + deployConnector(ksName, "table5"); + + try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build()) { + Map mutationTable5 = new HashMap<>(); + try (Consumer consumer = pulsarClient.newConsumer(org.apache.pulsar.client.api.Schema.AUTO_CONSUME()) + .topic(String.format(Locale.ROOT, "data-%s.table5", ksName)) + .subscriptionName("sub1") + .subscriptionType(SubscriptionType.Key_Shared) + .subscriptionMode(SubscriptionMode.Durable) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe()) { + Message msg; + while ((msg = consumer.receive(90, TimeUnit.SECONDS)) != null && + mutationTable5.values().stream().mapToInt(i -> i).sum() < 4) { + GenericRecord record = msg.getValue(); + assertEquals(this.schemaType, record.getSchemaType()); + Object key = getKey(msg); + GenericRecord value = getValue(record); + assertEquals((Integer) 0, mutationTable5.computeIfAbsent(getAndAssertKeyFieldAsString(key, "pk"), k -> 0)); + assertEquals((int) LocalDate.parse("2021-01-10").toEpochDay(), getAndAssertKeyFieldAsInt(key, "c1")); + assertEquals("016b123d-f732-4173-9225-c6717066c7af", getAndAssertKeyFieldAsString(key, "c2")); + assertEquals(1, value.getField("val")); + mutationTable5.compute(getAndAssertKeyFieldAsString(key, "pk"), (k, v) -> v + 1); + consumer.acknowledge(msg); + } + assertEquals((Integer) 1, mutationTable5.get("1")); + assertEquals((Integer) 1, mutationTable5.get("2")); + assertEquals((Integer) 1, mutationTable5.get("3")); + + // trigger a schema update + try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) { + cqlSession.execute("ALTER TABLE " + ksName + ".table5 ADD d double"); + cqlSession.execute("INSERT INTO " + ksName + ".table5 (pk,c1,c2,val,d) VALUES('1','2021-01-10', 016b123d-f732-4173-9225-c6717066c7af,1,1.0)"); + } + while ((msg = consumer.receive(90, TimeUnit.SECONDS)) != null && + mutationTable5.values().stream().mapToInt(i -> i).sum() < 5) { + GenericRecord record = msg.getValue(); + assertEquals(this.schemaType, record.getSchemaType()); + Object key = getKey(msg); + GenericRecord value = getValue(record); + assertEquals("1", getAndAssertKeyFieldAsString(key, "pk")); + assertEquals((int) LocalDate.parse("2021-01-10").toEpochDay(), getAndAssertKeyFieldAsInt(key, "c1")); + assertEquals("016b123d-f732-4173-9225-c6717066c7af", getAndAssertKeyFieldAsString(key, "c2")); + assertEquals(1, value.getField("val")); + mutationTable5.compute(getAndAssertKeyFieldAsString(key, "pk"), (k, v) -> v + 1); + consumer.acknowledge(msg); + } + assertEquals((Integer) 2, mutationTable5.get("1")); // 2 inserts for pk=1 + assertEquals((Integer) 1, mutationTable5.get("2")); + assertEquals((Integer) 1, mutationTable5.get("3")); + + // delete a row by partition key only + try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) { + cqlSession.execute("DELETE FROM " + ksName + ".table5 WHERE pk = '1'"); + } + while ((msg = consumer.receive(60, TimeUnit.SECONDS)) != null && + mutationTable5.values().stream().mapToInt(i -> i).sum() < 6) { + GenericRecord record = msg.getValue(); + assertEquals(this.schemaType, record.getSchemaType()); + Object key = getKey(msg); + GenericRecord value = getValue(record); + assertEquals("1", getAndAssertKeyFieldAsString(key, "pk")); + // clustering keys are null because we deleted by partition key only + assertKeyFieldIsNull(key, "c1"); + assertKeyFieldIsNull(key, "c2"); + assertNullValue(value); + mutationTable5.compute(getAndAssertKeyFieldAsString(key, "pk"), (k, v) -> v + 1); + consumer.acknowledge(msg); + } + assertEquals((Integer) 3, mutationTable5.get("1")); // 2 inserts and 1 delete for pk=1 + assertEquals((Integer) 1, mutationTable5.get("2")); + assertEquals((Integer) 1, mutationTable5.get("3")); + + // delete a row by partition and clustering keys + try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) { + cqlSession.execute("DELETE FROM " + ksName + ".table5 WHERE pk = '2' and c1 = '2021-01-10' and c2 = 016b123d-f732-4173-9225-c6717066c7af"); + } + while ((msg = consumer.receive(60, TimeUnit.SECONDS)) != null && + mutationTable5.values().stream().mapToInt(i -> i).sum() < 6) { + GenericRecord record = msg.getValue(); + assertEquals(this.schemaType, record.getSchemaType()); + Object key = getKey(msg); + GenericRecord value = getValue(record); + assertEquals("2", getAndAssertKeyFieldAsString(key, "pk")); + assertEquals((int) LocalDate.parse("2021-01-10").toEpochDay(), getAndAssertKeyFieldAsInt(key, "c1")); + assertEquals("016b123d-f732-4173-9225-c6717066c7af", getAndAssertKeyFieldAsString(key, "c2")); + assertNullValue(value); + mutationTable5.compute(getAndAssertKeyFieldAsString(key, "pk"), (k, v) -> v + 1); + consumer.acknowledge(msg); + } + assertEquals((Integer) 3, mutationTable5.get("1")); // 2 inserts and 1 delete for pk=1 + assertEquals((Integer) 2, mutationTable5.get("2")); // 1 insert and 1 delete for pk=2 + assertEquals((Integer) 1, mutationTable5.get("3")); + } + } + } finally { + dumpFunctionLogs("cassandra-source-" + ksName + "-table5"); + undeployConnector(ksName, "table5"); + } + } + // docker exec -it pulsar cat /pulsar/logs/functions/public/default/cassandra-source-ks1-table2/cassandra-source-ks1-table2-0.log public void testCompoundPk(String ksName) throws InterruptedException, IOException { try { diff --git a/gradle.properties b/gradle.properties index 8873adb3..d1bb2d4b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -22,7 +22,7 @@ testPulsarImageTag=2.10_3.4 kafkaVersion=3.4.0 vavrVersion=0.10.3 -testContainersVersion=1.16.2 +testContainersVersion=1.19.1 caffeineVersion=2.8.8 guavaVersion=30.1-jre messagingConnectorsCommonsVersion=1.0.14