From 88fc1014bd8ae4c481b4b86d9f201d5a50cf6789 Mon Sep 17 00:00:00 2001 From: Madhavan Sridharan Date: Fri, 13 Jun 2025 13:57:29 -0400 Subject: [PATCH 1/4] Convert nulls on origin to unset at target by a new property --- .../cql/statement/TargetInsertStatement.java | 8 +- .../cql/statement/TargetUpdateStatement.java | 17 +- .../cql/statement/TargetUpsertStatement.java | 8 + .../cdm/properties/KnownProperties.java | 3 + src/resources/cdm-detailed.properties | 6 + .../statement/TargetInsertStatementTest.java | 38 +++++ .../statement/TargetUpdateStatementTest.java | 38 +++++ .../statement/TargetUpsertStatementTest.java | 34 ++++ .../com/datastax/cdm/data/CqlDataTest.java | 151 ++++++++++++++++++ 9 files changed, 299 insertions(+), 4 deletions(-) create mode 100644 src/test/java/com/datastax/cdm/data/CqlDataTest.java diff --git a/src/main/java/com/datastax/cdm/cql/statement/TargetInsertStatement.java b/src/main/java/com/datastax/cdm/cql/statement/TargetInsertStatement.java index 24f3b489..572dd18d 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/TargetInsertStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/TargetInsertStatement.java @@ -54,6 +54,7 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr int currentBindIndex = 0; Object bindValue = null; + boolean nullToUnset = propertyHelper.getBoolean(KnownProperties.TRANSFORM_NULL_TO_UNSET); if (logDebug) logger.debug("bind using conversions: {}", cqlTable.getOtherCqlTable().getConversions()); for (int targetIndex = 0; targetIndex < targetColumnTypes.size(); targetIndex++) { @@ -76,7 +77,12 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr bindValue = cqlTable.getOtherCqlTable().getAndConvertData(originIndex, originRow); } - boundStatement = boundStatement.set(currentBindIndex++, bindValue, cqlTable.getBindClass(targetIndex)); + if (bindValue == null && nullToUnset) { + boundStatement = boundStatement.unset(currentBindIndex++); + } else { + boundStatement = boundStatement.set(currentBindIndex++, bindValue, + cqlTable.getBindClass(targetIndex)); + } } catch (Exception e) { logger.error( "Error trying to bind value: {} of class: {} to column: {} of targetDataType: {}/{} at column index: {} and bind index: {} of statement: {}", diff --git a/src/main/java/com/datastax/cdm/cql/statement/TargetUpdateStatement.java b/src/main/java/com/datastax/cdm/cql/statement/TargetUpdateStatement.java index a5037fd3..1c0caa86 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/TargetUpdateStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/TargetUpdateStatement.java @@ -26,6 +26,7 @@ import com.datastax.cdm.data.EnhancedPK; import com.datastax.cdm.data.PKFactory; import com.datastax.cdm.properties.IPropertyHelper; +import com.datastax.cdm.properties.KnownProperties; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.Row; @@ -59,6 +60,7 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr boundStatement = boundStatement.set(currentBindIndex++, writeTime, Long.class); } + boolean nullToUnset = propertyHelper.getBoolean(KnownProperties.TRANSFORM_NULL_TO_UNSET); Object originValue, targetValue; Object bindValueTarget = null; for (int targetIndex : columnIndexesToBind) { @@ -68,7 +70,12 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr if (usingCounter && counterIndexes.contains(targetIndex)) { originValue = cqlTable.getOtherCqlTable().getData(originIndex, originRow); if (null == originValue) { - currentBindIndex++; + if (nullToUnset) { + boundStatement = boundStatement.unset(currentBindIndex++); + } else { + boundStatement = boundStatement.set(currentBindIndex++, null, + cqlTable.getBindClass(targetIndex)); + } continue; } targetValue = (null == targetRow ? 0L : cqlTable.getData(targetIndex, targetRow)); @@ -87,8 +94,12 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr bindValueTarget = cqlTable.getOtherCqlTable().getAndConvertData(originIndex, originRow); } - boundStatement = boundStatement.set(currentBindIndex++, bindValueTarget, - cqlTable.getBindClass(targetIndex)); + if (bindValueTarget == null && nullToUnset) { + boundStatement = boundStatement.unset(currentBindIndex++); + } else { + boundStatement = boundStatement.set(currentBindIndex++, bindValueTarget, + cqlTable.getBindClass(targetIndex)); + } } catch (Exception e) { logger.error("Error trying to bind value:" + bindValueTarget + " to column:" + targetColumnNames.get(targetIndex) + " of targetDataType:" diff --git a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertStatement.java b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertStatement.java index c212cafd..ca2f6e3d 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertStatement.java @@ -13,6 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +/* + * Null-to-Unset Feature: + * When migrating data, nulls read from the origin cluster are now written as unset values on the target cluster. + * This avoids writing tombstones and preserves unset semantics in Cassandra. + */ + package com.datastax.cdm.cql.statement; import java.util.ArrayList; @@ -89,6 +96,7 @@ public TargetUpsertStatement(IPropertyHelper propertyHelper, EnhancedSession ses } public BoundStatement bindRecord(Record record) { + // Null-to-Unset: Nulls in the record will be written as unset values on the target cluster. if (null == record) throw new RuntimeException("record is null"); diff --git a/src/main/java/com/datastax/cdm/properties/KnownProperties.java b/src/main/java/com/datastax/cdm/properties/KnownProperties.java index 6632c216..21c91861 100644 --- a/src/main/java/com/datastax/cdm/properties/KnownProperties.java +++ b/src/main/java/com/datastax/cdm/properties/KnownProperties.java @@ -195,6 +195,7 @@ public enum PropertyType { public static final String TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT = "spark.cdm.transform.codecs.timestamp.string.format"; public static final String TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT_ZONE = "spark.cdm.transform.codecs.timestamp.string.zone"; public static final String TRANSFORM_MAP_REMOVE_KEY_WITH_NO_VALUE = "spark.cdm.transform.map.remove.null.value"; + public static final String TRANSFORM_NULL_TO_UNSET = "spark.cdm.transform.null.to.unset"; static { types.put(TRANSFORM_REPLACE_MISSING_TS, PropertyType.NUMBER); @@ -211,6 +212,8 @@ public enum PropertyType { defaults.put(TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT_ZONE, "UTC"); types.put(TRANSFORM_MAP_REMOVE_KEY_WITH_NO_VALUE, PropertyType.BOOLEAN); defaults.put(TRANSFORM_MAP_REMOVE_KEY_WITH_NO_VALUE, "false"); + types.put(TRANSFORM_NULL_TO_UNSET, PropertyType.BOOLEAN); + defaults.put(TRANSFORM_NULL_TO_UNSET, "false"); } // ========================================================================== diff --git a/src/resources/cdm-detailed.properties b/src/resources/cdm-detailed.properties index f40ef396..64bebe40 100644 --- a/src/resources/cdm-detailed.properties +++ b/src/resources/cdm-detailed.properties @@ -326,6 +326,11 @@ spark.cdm.perfops.ratelimit.target 20000 # field with empty or null values. Such values can create NPE exception # if the value type does not support empty or null values (like Timestamp) # and this property is false. Set it to true to handle such exceptions. +# +# .null.to.unset Default is false. When true, any null value read from the origin cluster +# will be written as an unset value on the target cluster. This avoids +# writing tombstones and preserves unset semantics in Cassandra. When false, +# nulls are written as nulls (the legacy behavior). #----------------------------------------------------------------------------------------------------------- #spark.cdm.transform.missing.key.ts.replace.value 1685577600000 #spark.cdm.transform.custom.writetime 0 @@ -335,6 +340,7 @@ spark.cdm.perfops.ratelimit.target 20000 #spark.cdm.transform.codecs.timestamp.string.format yyyyMMddHHmmss #spark.cdm.transform.codecs.timestamp.string.zone UTC #spark.cdm.transform.map.remove.null.value false +#spark.cdm.transform.null.to.unset false #=========================================================================================================== # Cassandra Filters are applied on the coordinator node. Note that, depending on the filter, the coordinator diff --git a/src/test/java/com/datastax/cdm/cql/statement/TargetInsertStatementTest.java b/src/test/java/com/datastax/cdm/cql/statement/TargetInsertStatementTest.java index b8624450..ef399498 100644 --- a/src/test/java/com/datastax/cdm/cql/statement/TargetInsertStatementTest.java +++ b/src/test/java/com/datastax/cdm/cql/statement/TargetInsertStatementTest.java @@ -35,6 +35,12 @@ public class TargetInsertStatementTest extends CommonMocks { public void setup() { commonSetup(); targetInsertStatement = new TargetInsertStatement(propertyHelper, targetSession); + // Ensure prepareStatement().bind() returns the boundStatement mock + when(targetInsertStatement.prepareStatement()).thenReturn(preparedStatement); + when(preparedStatement.bind()).thenReturn(boundStatement); + // Chain set and unset to return boundStatement + when(boundStatement.set(anyInt(), any(), any(Class.class))).thenReturn(boundStatement); + when(boundStatement.unset(anyInt())).thenReturn(boundStatement); } @Test @@ -217,4 +223,36 @@ public void bind_withVectorColumns() { verify(boundStatement, times(targetColumnNames.size())).set(anyInt(), any(), any(Class.class)); } + @Test + public void bind_nullToUnset_true() { + // Arrange + when(propertyHelper.getBoolean(com.datastax.cdm.properties.KnownProperties.TRANSFORM_NULL_TO_UNSET)).thenReturn(true); + // Simulate a null value from origin + when(originTable.getAndConvertData(anyInt(), eq(originRow))).thenReturn(null); + targetInsertStatement = new TargetInsertStatement(propertyHelper, targetSession); + + // Act + BoundStatement result = targetInsertStatement.bind(originRow, targetRow, null, null, null, null); + + // Assert + assertNotNull(result); + verify(boundStatement, atLeastOnce()).unset(anyInt()); + } + + @Test + public void bind_nullToUnset_false() { + // Arrange + when(propertyHelper.getBoolean(com.datastax.cdm.properties.KnownProperties.TRANSFORM_NULL_TO_UNSET)).thenReturn(false); + // Simulate a null value from origin + when(originTable.getAndConvertData(anyInt(), eq(originRow))).thenReturn(null); + targetInsertStatement = new TargetInsertStatement(propertyHelper, targetSession); + + // Act + BoundStatement result = targetInsertStatement.bind(originRow, targetRow, null, null, null, null); + + // Assert + assertNotNull(result); + verify(boundStatement, atLeastOnce()).set(anyInt(), isNull(), any(Class.class)); + } + } diff --git a/src/test/java/com/datastax/cdm/cql/statement/TargetUpdateStatementTest.java b/src/test/java/com/datastax/cdm/cql/statement/TargetUpdateStatementTest.java index a2679b3b..b3e8a193 100644 --- a/src/test/java/com/datastax/cdm/cql/statement/TargetUpdateStatementTest.java +++ b/src/test/java/com/datastax/cdm/cql/statement/TargetUpdateStatementTest.java @@ -39,6 +39,12 @@ public void setup() { // UPDATE is needed by counters, though the class should handle non-counter updates commonSetup(false, false, true); targetUpdateStatement = new TargetUpdateStatement(propertyHelper, targetSession); + // Ensure prepareStatement().bind() returns the boundStatement mock + when(targetUpdateStatement.prepareStatement()).thenReturn(preparedStatement); + when(preparedStatement.bind()).thenReturn(boundStatement); + // Chain set and unset to return boundStatement + when(boundStatement.set(anyInt(), any(), any(Class.class))).thenReturn(boundStatement); + when(boundStatement.unset(anyInt())).thenReturn(boundStatement); updateCQLBeginning = "UPDATE " + targetKeyspaceTableName; @@ -195,4 +201,36 @@ public void bind_withExceptionWhenBindingValue() { assertThrows(RuntimeException.class, () -> targetUpdateStatement.bind(originRow, targetRow, null,null,null,null)); } + @Test + public void bind_nullToUnset_true() { + // Arrange + when(propertyHelper.getBoolean(com.datastax.cdm.properties.KnownProperties.TRANSFORM_NULL_TO_UNSET)).thenReturn(true); + // Simulate a null value from origin + when(originTable.getAndConvertData(anyInt(), eq(originRow))).thenReturn(null); + targetUpdateStatement = new TargetUpdateStatement(propertyHelper, targetSession); + + // Act + BoundStatement result = targetUpdateStatement.bind(originRow, targetRow, null, null, null, null); + + // Assert + assertNotNull(result); + verify(boundStatement, atLeastOnce()).unset(anyInt()); + } + + @Test + public void bind_nullToUnset_false() { + // Arrange + when(propertyHelper.getBoolean(com.datastax.cdm.properties.KnownProperties.TRANSFORM_NULL_TO_UNSET)).thenReturn(false); + // Simulate a null value from origin + when(originTable.getAndConvertData(anyInt(), eq(originRow))).thenReturn(null); + targetUpdateStatement = new TargetUpdateStatement(propertyHelper, targetSession); + + // Act + BoundStatement result = targetUpdateStatement.bind(originRow, targetRow, null, null, null, null); + + // Assert + assertNotNull(result); + verify(boundStatement, atLeastOnce()).set(anyInt(), isNull(), any(Class.class)); + } + } diff --git a/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertStatementTest.java b/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertStatementTest.java index 92ae3178..2b5dcd63 100644 --- a/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertStatementTest.java +++ b/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertStatementTest.java @@ -179,6 +179,40 @@ public void usingTTLTimestamp_WriteTimeOnly() { assertEquals(" USING TIMESTAMP ?", targetUpsertStatement.usingTTLTimestamp()); } + @Test + public void checkBindInputs_successfulPath() { + // Should not throw any exception + assertDoesNotThrow(() -> targetUpsertStatement.checkBindInputs(1, 1L, null, null)); + } + + @Test + public void checkBindInputs_synchronizedBlock() { + // First call sets haveCheckedBindInputsOnce, second call should return early + assertDoesNotThrow(() -> targetUpsertStatement.checkBindInputs(1, 1L, null, null)); + assertDoesNotThrow(() -> targetUpsertStatement.checkBindInputs(1, 1L, null, null)); + } + + @Test + public void bindRecord_validRecord_delegatesToBind() { + // Arrange: Use a spy to verify bind is called + TargetUpsertStatement spyStatement = spy(targetUpsertStatement); + doReturn(mock(BoundStatement.class)).when(spyStatement).bind(any(), any(), any(), any(), any(), any()); + when(record.getPk()).thenReturn(pk); + when(record.getOriginRow()).thenReturn(originRow); + when(record.getTargetRow()).thenReturn(targetRow); + when(pk.getTTL()).thenReturn(1); + when(pk.getWriteTimestamp()).thenReturn(1L); + when(pk.getExplodeMapKey()).thenReturn(null); + when(pk.getExplodeMapValue()).thenReturn(null); + + // Act + BoundStatement result = spyStatement.bindRecord(record); + + // Assert + assertNotNull(result); + verify(spyStatement).bind(originRow, targetRow, 1, 1L, null, null); + } + protected class TestTargetUpsertStatement extends TargetUpsertStatement { public TestTargetUpsertStatement(IPropertyHelper h, EnhancedSession s, String statement) { super(h, s); diff --git a/src/test/java/com/datastax/cdm/data/CqlDataTest.java b/src/test/java/com/datastax/cdm/data/CqlDataTest.java new file mode 100644 index 00000000..3f69ae82 --- /dev/null +++ b/src/test/java/com/datastax/cdm/data/CqlDataTest.java @@ -0,0 +1,151 @@ +package com.datastax.cdm.data; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import com.datastax.oss.driver.api.core.data.UdtValue; +import com.datastax.oss.driver.api.core.type.DataType; +import com.datastax.oss.driver.api.core.type.DataTypes; +import com.datastax.oss.driver.api.core.type.ListType; +import com.datastax.oss.driver.api.core.type.MapType; +import com.datastax.oss.driver.api.core.type.SetType; +import com.datastax.oss.driver.api.core.type.TupleType; +import com.datastax.oss.driver.api.core.type.UserDefinedType; +import com.datastax.oss.driver.api.core.type.VectorType; + +public class CqlDataTest { + @Test + public void testToType_Primitive() { + assertEquals(CqlData.Type.PRIMITIVE, CqlData.toType(DataTypes.TEXT)); + } + + @Test + public void testToType_List() { + ListType listType = mock(ListType.class); + assertEquals(CqlData.Type.LIST, CqlData.toType(listType)); + } + + @Test + public void testToType_Set() { + SetType setType = mock(SetType.class); + assertEquals(CqlData.Type.SET, CqlData.toType(setType)); + } + + @Test + public void testToType_Map() { + MapType mapType = mock(MapType.class); + assertEquals(CqlData.Type.MAP, CqlData.toType(mapType)); + } + + @Test + public void testToType_Tuple() { + TupleType tupleType = mock(TupleType.class); + assertEquals(CqlData.Type.TUPLE, CqlData.toType(tupleType)); + } + + @Test + public void testToType_UDT() { + UserDefinedType udt = mock(UserDefinedType.class); + assertEquals(CqlData.Type.UDT, CqlData.toType(udt)); + } + + @Test + public void testToType_Vector() { + VectorType vectorType = mock(VectorType.class); + assertEquals(CqlData.Type.VECTOR, CqlData.toType(vectorType)); + } + + @Test + public void testIsPrimitive() { + assertTrue(CqlData.isPrimitive(DataTypes.TEXT)); + DataType unknown = mock(DataType.class); + assertFalse(CqlData.isPrimitive(unknown)); + } + + @Test + public void testIsCollection() { + assertTrue(CqlData.isCollection(mock(ListType.class))); + assertTrue(CqlData.isCollection(mock(SetType.class))); + assertTrue(CqlData.isCollection(mock(MapType.class))); + assertTrue(CqlData.isCollection(mock(TupleType.class))); + assertTrue(CqlData.isCollection(mock(UserDefinedType.class))); + assertTrue(CqlData.isCollection(mock(VectorType.class))); + assertFalse(CqlData.isCollection(DataTypes.TEXT)); + } + + @Test + public void testIsFrozen_Primitive() { + assertFalse(CqlData.isFrozen(DataTypes.TEXT)); + } + + @Test + public void testIsFrozen_UDT() { + UserDefinedType udt = mock(UserDefinedType.class); + when(udt.isFrozen()).thenReturn(true); + assertTrue(CqlData.isFrozen(udt)); + } + + @Test + public void testGetBindClass_Primitive() { + assertEquals(String.class, CqlData.getBindClass(DataTypes.TEXT)); + } + + @Test + public void testGetBindClass_List() { + ListType listType = mock(ListType.class); + assertEquals(java.util.List.class, CqlData.getBindClass(listType)); + } + + @Test + public void testExtractDataTypesFromCollection_List() { + ListType listType = mock(ListType.class); + when(listType.getElementType()).thenReturn(DataTypes.TEXT); + assertEquals(Collections.singletonList(DataTypes.TEXT), CqlData.extractDataTypesFromCollection(listType)); + } + + @Test + public void testExtractDataTypesFromCollection_Map() { + MapType mapType = mock(MapType.class); + when(mapType.getKeyType()).thenReturn(DataTypes.TEXT); + when(mapType.getValueType()).thenReturn(DataTypes.INT); + assertEquals(Arrays.asList(DataTypes.TEXT, DataTypes.INT), CqlData.extractDataTypesFromCollection(mapType)); + } + + @Test + public void testGetFormattedContent_Primitive() { + assertEquals("abc", CqlData.getFormattedContent(CqlData.Type.PRIMITIVE, "abc")); + } + + @Test + public void testGetFormattedContent_Null() { + assertEquals("", CqlData.getFormattedContent(CqlData.Type.PRIMITIVE, null)); + } + + @Test + public void testGetFormattedContent_List() { + String result = CqlData.getFormattedContent(CqlData.Type.LIST, Arrays.asList("a", "b")); + assertTrue(result.startsWith("[")); + } + + @Test + public void testGetFormattedContent_Map() { + Map map = new HashMap<>(); + map.put("k", "v"); + String result = CqlData.getFormattedContent(CqlData.Type.MAP, map); + assertTrue(result.startsWith("{")); + } + + @Test + public void testGetFormattedContent_UDT() { + UdtValue udtValue = mock(UdtValue.class); + when(udtValue.getFormattedContents()).thenReturn("{udt}"); + assertEquals("{udt}", CqlData.getFormattedContent(CqlData.Type.UDT, udtValue)); + } +} From c2e2cce70cbdab3289cf4cf5c666f1f84fb6d827 Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Fri, 13 Jun 2025 15:37:39 -0400 Subject: [PATCH 2/4] Undo the option to allow null values as they will be eliminated during compaction. It provides no value other than slowing migration & target cluster. Also, doing a strict check for String (which includes all C* text types) types with empty values --- .../cql/statement/TargetInsertStatement.java | 3 +-- .../cql/statement/TargetUpdateStatement.java | 12 +++--------- .../cql/statement/TargetUpsertStatement.java | 7 ------- .../cdm/properties/KnownProperties.java | 3 --- src/resources/cdm-detailed.properties | 6 ------ .../statement/TargetInsertStatementTest.java | 18 ------------------ .../statement/TargetUpdateStatementTest.java | 18 ------------------ 7 files changed, 4 insertions(+), 63 deletions(-) diff --git a/src/main/java/com/datastax/cdm/cql/statement/TargetInsertStatement.java b/src/main/java/com/datastax/cdm/cql/statement/TargetInsertStatement.java index 572dd18d..5f085432 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/TargetInsertStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/TargetInsertStatement.java @@ -54,7 +54,6 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr int currentBindIndex = 0; Object bindValue = null; - boolean nullToUnset = propertyHelper.getBoolean(KnownProperties.TRANSFORM_NULL_TO_UNSET); if (logDebug) logger.debug("bind using conversions: {}", cqlTable.getOtherCqlTable().getConversions()); for (int targetIndex = 0; targetIndex < targetColumnTypes.size(); targetIndex++) { @@ -77,7 +76,7 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr bindValue = cqlTable.getOtherCqlTable().getAndConvertData(originIndex, originRow); } - if (bindValue == null && nullToUnset) { + if (null == bindValue || bindValue instanceof String && ((String) bindValue).isEmpty()) { boundStatement = boundStatement.unset(currentBindIndex++); } else { boundStatement = boundStatement.set(currentBindIndex++, bindValue, diff --git a/src/main/java/com/datastax/cdm/cql/statement/TargetUpdateStatement.java b/src/main/java/com/datastax/cdm/cql/statement/TargetUpdateStatement.java index 1c0caa86..009d05bf 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/TargetUpdateStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/TargetUpdateStatement.java @@ -26,7 +26,6 @@ import com.datastax.cdm.data.EnhancedPK; import com.datastax.cdm.data.PKFactory; import com.datastax.cdm.properties.IPropertyHelper; -import com.datastax.cdm.properties.KnownProperties; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.Row; @@ -60,7 +59,6 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr boundStatement = boundStatement.set(currentBindIndex++, writeTime, Long.class); } - boolean nullToUnset = propertyHelper.getBoolean(KnownProperties.TRANSFORM_NULL_TO_UNSET); Object originValue, targetValue; Object bindValueTarget = null; for (int targetIndex : columnIndexesToBind) { @@ -70,12 +68,7 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr if (usingCounter && counterIndexes.contains(targetIndex)) { originValue = cqlTable.getOtherCqlTable().getData(originIndex, originRow); if (null == originValue) { - if (nullToUnset) { - boundStatement = boundStatement.unset(currentBindIndex++); - } else { - boundStatement = boundStatement.set(currentBindIndex++, null, - cqlTable.getBindClass(targetIndex)); - } + currentBindIndex++; continue; } targetValue = (null == targetRow ? 0L : cqlTable.getData(targetIndex, targetRow)); @@ -94,7 +87,8 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr bindValueTarget = cqlTable.getOtherCqlTable().getAndConvertData(originIndex, originRow); } - if (bindValueTarget == null && nullToUnset) { + if (null == bindValueTarget + || bindValueTarget instanceof String && ((String) bindValueTarget).isEmpty()) { boundStatement = boundStatement.unset(currentBindIndex++); } else { boundStatement = boundStatement.set(currentBindIndex++, bindValueTarget, diff --git a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertStatement.java b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertStatement.java index ca2f6e3d..918724f3 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertStatement.java @@ -14,12 +14,6 @@ * limitations under the License. */ -/* - * Null-to-Unset Feature: - * When migrating data, nulls read from the origin cluster are now written as unset values on the target cluster. - * This avoids writing tombstones and preserves unset semantics in Cassandra. - */ - package com.datastax.cdm.cql.statement; import java.util.ArrayList; @@ -96,7 +90,6 @@ public TargetUpsertStatement(IPropertyHelper propertyHelper, EnhancedSession ses } public BoundStatement bindRecord(Record record) { - // Null-to-Unset: Nulls in the record will be written as unset values on the target cluster. if (null == record) throw new RuntimeException("record is null"); diff --git a/src/main/java/com/datastax/cdm/properties/KnownProperties.java b/src/main/java/com/datastax/cdm/properties/KnownProperties.java index 21c91861..6632c216 100644 --- a/src/main/java/com/datastax/cdm/properties/KnownProperties.java +++ b/src/main/java/com/datastax/cdm/properties/KnownProperties.java @@ -195,7 +195,6 @@ public enum PropertyType { public static final String TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT = "spark.cdm.transform.codecs.timestamp.string.format"; public static final String TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT_ZONE = "spark.cdm.transform.codecs.timestamp.string.zone"; public static final String TRANSFORM_MAP_REMOVE_KEY_WITH_NO_VALUE = "spark.cdm.transform.map.remove.null.value"; - public static final String TRANSFORM_NULL_TO_UNSET = "spark.cdm.transform.null.to.unset"; static { types.put(TRANSFORM_REPLACE_MISSING_TS, PropertyType.NUMBER); @@ -212,8 +211,6 @@ public enum PropertyType { defaults.put(TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT_ZONE, "UTC"); types.put(TRANSFORM_MAP_REMOVE_KEY_WITH_NO_VALUE, PropertyType.BOOLEAN); defaults.put(TRANSFORM_MAP_REMOVE_KEY_WITH_NO_VALUE, "false"); - types.put(TRANSFORM_NULL_TO_UNSET, PropertyType.BOOLEAN); - defaults.put(TRANSFORM_NULL_TO_UNSET, "false"); } // ========================================================================== diff --git a/src/resources/cdm-detailed.properties b/src/resources/cdm-detailed.properties index 64bebe40..f40ef396 100644 --- a/src/resources/cdm-detailed.properties +++ b/src/resources/cdm-detailed.properties @@ -326,11 +326,6 @@ spark.cdm.perfops.ratelimit.target 20000 # field with empty or null values. Such values can create NPE exception # if the value type does not support empty or null values (like Timestamp) # and this property is false. Set it to true to handle such exceptions. -# -# .null.to.unset Default is false. When true, any null value read from the origin cluster -# will be written as an unset value on the target cluster. This avoids -# writing tombstones and preserves unset semantics in Cassandra. When false, -# nulls are written as nulls (the legacy behavior). #----------------------------------------------------------------------------------------------------------- #spark.cdm.transform.missing.key.ts.replace.value 1685577600000 #spark.cdm.transform.custom.writetime 0 @@ -340,7 +335,6 @@ spark.cdm.perfops.ratelimit.target 20000 #spark.cdm.transform.codecs.timestamp.string.format yyyyMMddHHmmss #spark.cdm.transform.codecs.timestamp.string.zone UTC #spark.cdm.transform.map.remove.null.value false -#spark.cdm.transform.null.to.unset false #=========================================================================================================== # Cassandra Filters are applied on the coordinator node. Note that, depending on the filter, the coordinator diff --git a/src/test/java/com/datastax/cdm/cql/statement/TargetInsertStatementTest.java b/src/test/java/com/datastax/cdm/cql/statement/TargetInsertStatementTest.java index ef399498..f9350f13 100644 --- a/src/test/java/com/datastax/cdm/cql/statement/TargetInsertStatementTest.java +++ b/src/test/java/com/datastax/cdm/cql/statement/TargetInsertStatementTest.java @@ -225,8 +225,6 @@ public void bind_withVectorColumns() { @Test public void bind_nullToUnset_true() { - // Arrange - when(propertyHelper.getBoolean(com.datastax.cdm.properties.KnownProperties.TRANSFORM_NULL_TO_UNSET)).thenReturn(true); // Simulate a null value from origin when(originTable.getAndConvertData(anyInt(), eq(originRow))).thenReturn(null); targetInsertStatement = new TargetInsertStatement(propertyHelper, targetSession); @@ -239,20 +237,4 @@ public void bind_nullToUnset_true() { verify(boundStatement, atLeastOnce()).unset(anyInt()); } - @Test - public void bind_nullToUnset_false() { - // Arrange - when(propertyHelper.getBoolean(com.datastax.cdm.properties.KnownProperties.TRANSFORM_NULL_TO_UNSET)).thenReturn(false); - // Simulate a null value from origin - when(originTable.getAndConvertData(anyInt(), eq(originRow))).thenReturn(null); - targetInsertStatement = new TargetInsertStatement(propertyHelper, targetSession); - - // Act - BoundStatement result = targetInsertStatement.bind(originRow, targetRow, null, null, null, null); - - // Assert - assertNotNull(result); - verify(boundStatement, atLeastOnce()).set(anyInt(), isNull(), any(Class.class)); - } - } diff --git a/src/test/java/com/datastax/cdm/cql/statement/TargetUpdateStatementTest.java b/src/test/java/com/datastax/cdm/cql/statement/TargetUpdateStatementTest.java index b3e8a193..b611d471 100644 --- a/src/test/java/com/datastax/cdm/cql/statement/TargetUpdateStatementTest.java +++ b/src/test/java/com/datastax/cdm/cql/statement/TargetUpdateStatementTest.java @@ -203,8 +203,6 @@ public void bind_withExceptionWhenBindingValue() { @Test public void bind_nullToUnset_true() { - // Arrange - when(propertyHelper.getBoolean(com.datastax.cdm.properties.KnownProperties.TRANSFORM_NULL_TO_UNSET)).thenReturn(true); // Simulate a null value from origin when(originTable.getAndConvertData(anyInt(), eq(originRow))).thenReturn(null); targetUpdateStatement = new TargetUpdateStatement(propertyHelper, targetSession); @@ -217,20 +215,4 @@ public void bind_nullToUnset_true() { verify(boundStatement, atLeastOnce()).unset(anyInt()); } - @Test - public void bind_nullToUnset_false() { - // Arrange - when(propertyHelper.getBoolean(com.datastax.cdm.properties.KnownProperties.TRANSFORM_NULL_TO_UNSET)).thenReturn(false); - // Simulate a null value from origin - when(originTable.getAndConvertData(anyInt(), eq(originRow))).thenReturn(null); - targetUpdateStatement = new TargetUpdateStatement(propertyHelper, targetSession); - - // Act - BoundStatement result = targetUpdateStatement.bind(originRow, targetRow, null, null, null, null); - - // Assert - assertNotNull(result); - verify(boundStatement, atLeastOnce()).set(anyInt(), isNull(), any(Class.class)); - } - } From c07294699f3913e84a0e0f51c97a95adafe2ab18 Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Mon, 16 Jun 2025 12:59:26 -0400 Subject: [PATCH 3/4] Added release notes and updated readme --- README.md | 1 + RELEASE.md | 3 +++ 2 files changed, 4 insertions(+) diff --git a/README.md b/README.md index 93a6303b..2b228e34 100644 --- a/README.md +++ b/README.md @@ -167,6 +167,7 @@ spark-submit --properties-file cdm.properties \ - CDM ignores using collection and UDT fields for `ttl` & `writetime` calculations by default for performance reasons. If you want to include such fields, set `spark.cdm.schema.ttlwritetime.calc.useCollections` param to `true`. - If a table has only collection and/or UDT non-key columns and no table-level `ttl` configuration, the target will have no `ttl`, which can lead to inconsistencies between `origin` and `target` as rows expire on `origin` due to `ttl` expiry. If you want to avoid this, we recommend setting `spark.cdm.schema.ttlwritetime.calc.useCollections` param to `true` in such scenarios. - If a table has only collection and/or UDT non-key columns, the `writetime` used on target will be time the job was run. If you want to avoid this, we recommend setting `spark.cdm.schema.ttlwritetime.calc.useCollections` param to `true` in such scenarios. +- CDM uses `UNSET` value for null fields (including empty texts) to avoid creating (or carrying forward) tombstones during row creation. - When CDM migration (or validation with autocorrect) is run multiple times on the same table (for whatever reasons), it could lead to duplicate entries in `list` type columns. Note this is [due to a Cassandra/DSE bug](https://issues.apache.org/jira/browse/CASSANDRA-11368) and not a CDM issue. This issue can be addressed by enabling and setting a positive value for `spark.cdm.transform.custom.writetime.incrementBy` param. This param was specifically added to address this issue. - When you rerun job to resume from a previous run, the run metrics (read, write, skipped, etc.) captured in table `cdm_run_info` will be only for the current run. If the previous run was killed for some reasons, its run metrics may not have been saved. If the previous run did complete (not killed) but with errors, then you will have all run metrics from previous run as well. - When running on a Spark Cluster (and not a single VM), the rate-limit values (`spark.cdm.perfops.ratelimit.origin` & `spark.cdm.perfops.ratelimit.target`) applies to individual Spark worker nodes. Hence this value should be set to the effective-rate-limit-you-need/number-of-spark-worker-nodes . E.g. If you need an effective rate-limit of 10000, and the number of Spark worker nodes are 4, then you should set the above rate-limit params to a value of 2500. diff --git a/RELEASE.md b/RELEASE.md index 2a572ffb..b65786cd 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,5 +1,8 @@ # Release Notes +## [5.4.0] - 2025-06-16 +- Use `UNSET` value for null fields (including empty texts) to avoid creating (or carrying forward) tombstones during row creation. + ## [5.3.1] - 2025-06-03 - Upgrade Spark version to [`3.5.6`](https://spark.apache.org/releases/spark-release-3-5-6.html). From 66ae1f20e3654567f820aebe4a1289adc477daa4 Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Mon, 16 Jun 2025 13:50:00 -0400 Subject: [PATCH 4/4] Added missing license --- .../java/com/datastax/cdm/data/CqlDataTest.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/test/java/com/datastax/cdm/data/CqlDataTest.java b/src/test/java/com/datastax/cdm/data/CqlDataTest.java index 3f69ae82..ce02c699 100644 --- a/src/test/java/com/datastax/cdm/data/CqlDataTest.java +++ b/src/test/java/com/datastax/cdm/data/CqlDataTest.java @@ -1,3 +1,18 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.datastax.cdm.data; import static org.junit.jupiter.api.Assertions.*;