Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Release Notes

## [5.6.2] - 2025-10-21
- Bug fix: This fix address the gap in feature `spark.cdm.trackRun.autoRerun` where it did not pick the previous-run-id when the previous run completed (did not fail) but had failed token-ranges. It considered such runs a successful, but now with this fix, it will find the failed token-ranges from a successful run and resume those in the new run.

## [5.6.1] - 2025-10-04
- Improved logging.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,10 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable)
this.session.execute("CREATE TABLE IF NOT EXISTS " + cdmKsTabDetails
+ " (table_name TEXT, run_id BIGINT, start_time TIMESTAMP, token_min BIGINT, token_max BIGINT, status TEXT, run_info TEXT, PRIMARY KEY ((table_name, run_id), token_min))");

// TODO: Remove this code block after a few releases, its only added for backward compatibility
try {
this.session.execute("ALTER TABLE " + cdmKsTabInfo + " ADD status TEXT");
} catch (Exception e) { // ignore if column already exists
logger.debug("Column 'status' already exists in table {}", cdmKsTabInfo);
}
try {
this.session.execute("ALTER TABLE " + cdmKsTabDetails + " ADD run_info TEXT");
} catch (Exception e) { // ignore if column already exists
logger.debug("Column 'run_info' already exists in table {}", cdmKsTabDetails);
}

boundInitInfoStatement = bindStatement("INSERT INTO " + cdmKsTabInfo
+ " (table_name, run_id, run_type, prev_run_id, start_time, status) VALUES (?, ?, ?, ?, totimestamp(now()), ?)");
// Add statement to get the latest run id for a table
boundPrevRunIdStatement = bindStatement("SELECT run_id, status FROM " + cdmKsTabInfo
boundPrevRunIdStatement = bindStatement("SELECT run_id, run_info, status FROM " + cdmKsTabInfo
+ " WHERE table_name = ? and run_type = ? ORDER BY run_id DESC LIMIT 1 ALLOW FILTERING");
boundInitStatement = bindStatement("INSERT INTO " + cdmKsTabDetails
+ " (table_name, run_id, token_min, token_max, status) VALUES (?, ?, ?, ?, ?)");
Expand All @@ -99,15 +87,46 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable)
public long getPreviousRunId(JobType jobType) {
ResultSet rs = session.execute(
boundPrevRunIdStatement.setString("table_name", tableName).setString("run_type", jobType.toString()));
long prevRunId = 0;
Row row = rs.one();
if (row != null && !TrackRun.RUN_STATUS.ENDED.toString().equals(row.getString("status"))) {
long prevRunId = row.getLong("run_id");
logger.info("###################### Previous Run Id for table {} is: {} ######################", tableName,
prevRunId);
return prevRunId;
if (row != null && (runHasNotEnded(row) || runHasFailedPartitions(row))) {
prevRunId = row.getLong("run_id");
logger.info(
"###################### Previous Run {} for table {} did not fully succeed, resuming from that point ######################",
prevRunId, tableName);
}

return prevRunId;
}

private boolean runHasNotEnded(Row row) {
if (row == null || row.isNull("status")) {
return false;
}
String status = row.getString("status");
return !TrackRun.RUN_STATUS.ENDED.toString().equals(status);
}

private boolean runHasFailedPartitions(Row row) {
if (row == null || row.isNull("run_info")) {
return false;
}
String runInfo = row.getString("run_info");
String[] parts = runInfo.split(";");
for (String part : parts) {
part = part.trim();
if (part.startsWith("Partitions Failed:")) {
String[] kv = part.split(":");
if (kv.length == 2) {
int failedParts = Integer.parseInt(kv[1].trim());
if (failedParts > 0) {
return true;
}
}
}
}

return 0;
return false;
}

public Collection<PartitionRange> getPendingPartitions(long prevRunId, JobType jobType)
Expand Down
19 changes: 10 additions & 9 deletions src/main/java/com/datastax/cdm/schema/CqlTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,8 @@ public CqlTable(IPropertyHelper propertyHelper, boolean isOrigin, CqlSession ses

if (null == this.columnNames || this.columnNames.isEmpty()) {
if (null == this.cqlAllColumns || this.cqlAllColumns.isEmpty()) {
throw new IllegalArgumentException(
"No columns defined for table " + this.keyspaceName + "." + this.tableName +
" on " + (isOrigin ? "origin" : "target"));
throw new IllegalArgumentException("No columns defined for table " + this.keyspaceName + "."
+ this.tableName + " on " + (isOrigin ? "origin" : "target"));
}
this.columnNames = this.cqlAllColumns.stream().map(columnMetadata -> columnMetadata.getName().asInternal())
.collect(Collectors.toList());
Expand Down Expand Up @@ -334,8 +333,8 @@ public int byteCount(int index, Object object) {
} catch (IllegalArgumentException | CodecNotFoundException | NullPointerException e) {
throw new IllegalArgumentException(
"Unable to encode object " + object + " of Class/DataType " + object.getClass().getName() + "/"
+ getDataType(index) + " for column " + this.columnNames.get(index) +
" on " + (isOrigin ? "origin" : "target") + " table " + this.keyspaceName + "." + this.tableName,
+ getDataType(index) + " for column " + this.columnNames.get(index) + " on "
+ (isOrigin ? "origin" : "target") + " table " + this.keyspaceName + "." + this.tableName,
e);
}
}
Expand Down Expand Up @@ -388,12 +387,14 @@ public Object convertNull(int thisIndex) {
} else {
logger.error(
"{}: This index {} corresponds to That index {}, which is a primary key column and cannot be null. Consider setting {}.",
isOrigin ? "origin" : "target", thisIndex, otherIndex, KnownProperties.TRANSFORM_REPLACE_MISSING_TS);
isOrigin ? "origin" : "target", thisIndex, otherIndex,
KnownProperties.TRANSFORM_REPLACE_MISSING_TS);
return null;
}
}

logger.error("{}: This index {} corresponds to That index {}, which is a primary key column and cannot be null.",
logger.error(
"{}: This index {} corresponds to That index {}, which is a primary key column and cannot be null.",
isOrigin ? "origin" : "target", thisIndex, otherIndex);
return null;
}
Expand Down Expand Up @@ -449,8 +450,8 @@ private void setCqlMetadata(CqlSession cqlSession) {

Optional<TableMetadata> tableMetadataOpt = keyspaceMetadata.getTable(formatName(this.tableName));
if (!tableMetadataOpt.isPresent()) {
throw new IllegalArgumentException(
"Table not found on " + (isOrigin ? "origin" : "target") + ": " + this.keyspaceName + "." + this.tableName);
throw new IllegalArgumentException("Table not found on " + (isOrigin ? "origin" : "target") + ": "
+ this.keyspaceName + "." + this.tableName);
}
TableMetadata tableMetadata = tableMetadataOpt.get();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,32 @@ public void getPartitionsByStatus() {
// This test is incorrect, but needs to be troubleshot & fixed. The actual code works, but the test does not
assertEquals(0, parts.size());
}

@Test
public void getPreviousRunId() {
when(rs.one()).thenReturn(row1);
when(row1.getLong("run_id")).thenReturn(555l);

targetUpsertRunDetailsStatement = new TargetUpsertRunDetailsStatement(cqlSession, "ks.table1");
long runId = targetUpsertRunDetailsStatement.getPreviousRunId(JobType.MIGRATE);
assertEquals(555l, runId);
}

@Test
public void getPreviousRunId_noRun() {
when(rs.one()).thenReturn(null);
targetUpsertRunDetailsStatement = new TargetUpsertRunDetailsStatement(cqlSession, "ks.table1");
long runId = targetUpsertRunDetailsStatement.getPreviousRunId(JobType.MIGRATE);
assertEquals(0l, runId);
}

@Test
public void getPreviousRunId_noRunId() {
when(rs.one()).thenReturn(row1);
when(row1.getLong("run_id")).thenReturn(0l);
targetUpsertRunDetailsStatement = new TargetUpsertRunDetailsStatement(cqlSession, "ks.table1");
long runId = targetUpsertRunDetailsStatement.getPreviousRunId(JobType.MIGRATE);
assertEquals(0l, runId);
}

}