Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Release Notes

## [5.4.1] - 2025-06-26
- Bug fix: Fixed auto column mapping bug when `target` table has more columns than `origin`.

## [5.4.0] - 2025-06-16
- Use `UNSET` value for null fields (including empty texts) to avoid creating (or carrying forward) tombstones during row creation.

Expand Down
23 changes: 23 additions & 0 deletions SIT/features/08_map_columns_origin_target/breakData.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

DELETE FROM target.map_columns WHERE key_a=2;
UPDATE target.map_columns SET val_a='valueD' WHERE key_a=3;

INSERT INTO origin.map_columns(key_a, key_b, val_a, val_b) VALUES (1, 'key1','valueA', 21);
INSERT INTO origin.map_columns(key_a, key_b, val_a, val_b) VALUES (2, 'key2','valueB', 22);
INSERT INTO origin.map_columns(key_a, key_b, val_a, val_b) VALUES (3, 'key3','valueC', 23);

SELECT * FROM target.map_columns;

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Read Record Count: 3
Mismatch Record Count: 1
Corrected Mismatch Record Count: 1
Missing Record Count: 1
Corrected Missing Record Count: 1
Valid Record Count: 1
Skipped Record Count: 0
Error Record Count: 0
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Read Record Count: 3
Write Record Count: 3
Skipped Record Count: 0
Error Record Count: 0
3 changes: 3 additions & 0 deletions SIT/features/08_map_columns_origin_target/cdm.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
migrateData com.datastax.cdm.job.Migrate migrate.properties
validateData com.datastax.cdm.job.DiffData migrate.properties
fixData com.datastax.cdm.job.DiffData fix.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Read Record Count: 3
Mismatch Record Count: 0
Corrected Mismatch Record Count: 0
Missing Record Count: 0
Corrected Missing Record Count: 0
Valid Record Count: 3
Skipped Record Count: 0
Error Record Count: 0
29 changes: 29 additions & 0 deletions SIT/features/08_map_columns_origin_target/execute.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#!/bin/bash -e

workingDir="$1"
cd "$workingDir"

/local/cdm.sh -f cdm.txt -s migrateData -d "$workingDir" > cdm.migrateData.out 2>cdm.migrateData.err
/local/cdm-assert.sh -f cdm.migrateData.out -a cdm.migrateData.assert -d "$workingDir"

/local/cdm.sh -f cdm.txt -s validateData -d "$workingDir" > cdm.validateData.out 2>cdm.validateData.err
/local/cdm-assert.sh -f cdm.validateData.out -a cdm.validateData.assert -d "$workingDir"

cqlsh -u $CASS_USERNAME -p $CASS_PASSWORD $CASS_CLUSTER -f $workingDir/breakData.cql > $workingDir/breakData.out 2> $workingDir/breakData.err

/local/cdm.sh -f cdm.txt -s fixData -d "$workingDir" > cdm.fixData.out 2>cdm.fixData.err
/local/cdm-assert.sh -f cdm.fixData.out -a cdm.fixData.assert -d "$workingDir"
15 changes: 15 additions & 0 deletions SIT/features/08_map_columns_origin_target/expected.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

SELECT * FROM target.map_columns;
8 changes: 8 additions & 0 deletions SIT/features/08_map_columns_origin_target/expected.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

key_a | val_a | val_c
-------+--------+-------
1 | valueA | null
2 | valueB | null
3 | valueC | null

(3 rows)
23 changes: 23 additions & 0 deletions SIT/features/08_map_columns_origin_target/fix.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

spark.cdm.connect.origin.host cdm-sit-cass
spark.cdm.connect.target.host cdm-sit-cass

spark.cdm.schema.origin.keyspaceTable origin.map_columns
spark.cdm.schema.target.keyspaceTable target.map_columns
spark.cdm.perfops.numParts 1

spark.cdm.autocorrect.missing true
spark.cdm.autocorrect.mismatch true
20 changes: 20 additions & 0 deletions SIT/features/08_map_columns_origin_target/migrate.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

spark.cdm.connect.origin.host cdm-sit-cass
spark.cdm.connect.target.host cdm-sit-cass

spark.cdm.schema.origin.keyspaceTable origin.map_columns
spark.cdm.schema.target.keyspaceTable target.map_columns
spark.cdm.perfops.numParts 1
22 changes: 22 additions & 0 deletions SIT/features/08_map_columns_origin_target/setup.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

DROP TABLE IF EXISTS origin.map_columns;
CREATE TABLE origin.map_columns(key_a int, key_b text, val_a text, val_b int, PRIMARY KEY (key_a, key_b));
INSERT INTO origin.map_columns(key_a, key_b, val_a, val_b) VALUES (1, 'key1','valueA', 21);
INSERT INTO origin.map_columns(key_a, key_b, val_a, val_b) VALUES (2, 'key2','valueB', 22);
INSERT INTO origin.map_columns(key_a, key_b, val_a, val_b) VALUES (3, 'key3','valueC', 23);

DROP TABLE IF EXISTS target.map_columns;
CREATE TABLE target.map_columns(key_a int, val_a text, val_c int, PRIMARY KEY (key_a));
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,18 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr
} else {
int originIndex = cqlTable.getCorrespondingIndex(targetIndex);
if (originIndex < 0) // we don't have data to bind for this column; continue to the next targetIndex
{
currentBindIndex++;
continue;
}
bindValue = cqlTable.getOtherCqlTable().getAndConvertData(originIndex, originRow);
}

if (null == bindValue || bindValue instanceof String && ((String) bindValue).isEmpty()) {
boundStatement = boundStatement.unset(currentBindIndex++);
} else {
boundStatement = boundStatement.set(currentBindIndex++, bindValue,
if (!(null == bindValue || (bindValue instanceof String && ((String) bindValue).isEmpty()))) {
boundStatement = boundStatement.set(currentBindIndex, bindValue,
cqlTable.getBindClass(targetIndex));
}
currentBindIndex++;
} catch (Exception e) {
logger.error(
"Error trying to bind value: {} of class: {} to column: {} of targetDataType: {}/{} at column index: {} and bind index: {} of statement: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,20 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr
originIndex = extractJsonFeature.getOriginColumnIndex();
bindValueTarget = extractJsonFeature.extract(originRow.getString(originIndex));
} else {
if (originIndex < 0)
// we don't have data to bind for this column; continue to the next targetIndex
if (originIndex < 0) // we don't have data to bind for this column; continue to the next targetIndex
{
currentBindIndex++;
continue;
}
bindValueTarget = cqlTable.getOtherCqlTable().getAndConvertData(originIndex, originRow);
}

if (null == bindValueTarget
|| bindValueTarget instanceof String && ((String) bindValueTarget).isEmpty()) {
boundStatement = boundStatement.unset(currentBindIndex++);
} else {
boundStatement = boundStatement.set(currentBindIndex++, bindValueTarget,
if (!(null == bindValueTarget
|| (bindValueTarget instanceof String && ((String) bindValueTarget).isEmpty()))) {
boundStatement = boundStatement.set(currentBindIndex, bindValueTarget,
cqlTable.getBindClass(targetIndex));
}
currentBindIndex++;
} catch (Exception e) {
logger.error("Error trying to bind value:" + bindValueTarget + " to column:"
+ targetColumnNames.get(targetIndex) + " of targetDataType:"
Expand Down
6 changes: 0 additions & 6 deletions src/main/java/com/datastax/cdm/job/DiffJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,6 @@ private String isDifferent(Record record) {
pk, targetIndex, targetColumnNames.get(targetIndex), origin);
} else if (targetIndex == extractJsonFeature.getTargetColumnIndex()) {
originIndex = extractJsonFeature.getOriginColumnIndex();
} else {
throw new RuntimeException("Target column \"" + targetColumnNames.get(targetIndex)
+ "\" at index " + targetIndex
+ " cannot be found on Origin, and is neither a constant column (indexes:"
+ constantColumnIndexes + ") nor an explode map column (keyIndex:"
+ explodeMapKeyIndex + ", valueIndex:" + explodeMapValueIndex + ")");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ public void bind_nullToUnset_true() {

// Assert
assertNotNull(result);
verify(boundStatement, atLeastOnce()).unset(anyInt());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ public void bind_nullToUnset_true() {

// Assert
assertNotNull(result);
verify(boundStatement, atLeastOnce()).unset(anyInt());
}

}