diff --git a/docs/Salesforce-batchsource.md b/docs/Salesforce-batchsource.md index 93c6bf35..8513725d 100644 --- a/docs/Salesforce-batchsource.md +++ b/docs/Salesforce-batchsource.md @@ -248,6 +248,7 @@ PK chunking only works with the following objects: | LoginHistory | | LoyaltyAggrPointExprLedger | | LoyaltyLedger | +| LoyaltyLedgerTraceability | | LoyaltyMemberCurrency | | LoyaltyMemberTier | | LoyaltyPartnerProduct | diff --git a/pom.xml b/pom.xml index 85b62902..7dddfa48 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ Salesforce plugins io.cdap.plugin salesforce-plugins - 1.7.0-SNAPSHOT + 1.7.2-SNAPSHOT jar Salesforce Plugins https://github.com/data-integrations/salesforce @@ -66,14 +66,14 @@ 2.1.3 2.10.0 3.18.0 - 62.0.0 + 64.0.0 4.0.0 4.7.2 2.23.0 1.6 1.9.13 2.17.1 - 20180813 + 20231013 3.1.6 1.2 ${project.basedir}/src/test/java/ diff --git a/src/main/java/io/cdap/plugin/salesforce/SalesforceConstants.java b/src/main/java/io/cdap/plugin/salesforce/SalesforceConstants.java index d295252c..a6991cfc 100644 --- a/src/main/java/io/cdap/plugin/salesforce/SalesforceConstants.java +++ b/src/main/java/io/cdap/plugin/salesforce/SalesforceConstants.java @@ -28,7 +28,7 @@ */ public class SalesforceConstants { - public static final String API_VERSION = "62.0"; + public static final String API_VERSION = "64.0"; public static final String REFERENCE_NAME_DELIMITER = "."; public static final String PROPERTY_CONSUMER_KEY = "consumerKey"; diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSourceConstants.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSourceConstants.java index d4b73a30..dff1130b 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSourceConstants.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSourceConstants.java @@ -57,6 +57,8 @@ public class SalesforceSourceConstants { public static final int MIN_PK_CHUNK_SIZE = 1; // https://developer.salesforce.com/docs/atlas.en-us.252.0.api_asynch.meta/api_asynch/ // async_api_headers_enable_pk_chunking.htm + // **Always use lowercase names** to ensure consistency, especially if the sObject name is manually provided. + // Update this list with each API version upgrade. public static final List SUPPORTED_OBJECTS_WITH_PK_CHUNK = Arrays.asList("account", "accountcontactrelation", "accountteammember", @@ -108,6 +110,7 @@ public class SalesforceSourceConstants { "loginhistory", "loyaltyaggrpointexprledger", "loyaltyledger", + "loyaltyledgertraceability", "loyaltymembercurrency", "loyaltymembertier", "loyaltypartnerproduct", diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java index b95baef4..467af177 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java @@ -15,6 +15,7 @@ */ package io.cdap.plugin.salesforce.plugin.source.batch.util; +import com.google.common.base.Strings; import com.sforce.async.AsyncApiException; import com.sforce.async.AsyncExceptionCode; import com.sforce.async.BatchInfo; @@ -252,4 +253,18 @@ public static RetryPolicy getRetryPolicy(Long initialRetryDuration, Long return RetryPolicy.builder().withMaxRetries(0).build(); } } + + // This is added for UCS use case only to identify the objects where PK chunking needs to be enabled by default. + public static boolean isPkChunkingSupported(String sobjectName) { + if (!Strings.isNullOrEmpty(sobjectName)) { + return SalesforceSourceConstants.SUPPORTED_OBJECTS_WITH_PK_CHUNK.contains(sobjectName.toLowerCase()) + || isCustomObject(sobjectName); + } + return false; + } + + // This is added only for UCS use case. + private static boolean isCustomObject(String sobjectName) { + return sobjectName.toLowerCase().endsWith("__c"); + } } diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforceStreamingSourceUtil.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforceStreamingSourceUtil.java index e2c3f9f5..232d6692 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforceStreamingSourceUtil.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforceStreamingSourceUtil.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import scala.reflect.ClassTag$; +import java.math.BigDecimal; import java.time.Instant; import java.time.LocalTime; import java.time.format.DateTimeFormatter; @@ -148,6 +149,13 @@ private static Object convertValue(Object value, Schema.Field field) { } } + // NOTE: org.json >= 20230227 returns BigDecimal for all non-integer JSON numbers. + if (value instanceof BigDecimal && fieldSchemaType.equals(Schema.Type.DOUBLE)) { + // Avro Schema.Type.DOUBLE expects a Double instance (or primitive double) at serialization time, + // so converting BigDecimal → double for compatibility. + return ((BigDecimal) value).doubleValue(); + } + return value; } diff --git a/src/test/java/io/cdap/plugin/salesforce/SalesforceBulkUtilTest.java b/src/test/java/io/cdap/plugin/salesforce/SalesforceBulkUtilTest.java index 4857f5c6..40c9f715 100644 --- a/src/test/java/io/cdap/plugin/salesforce/SalesforceBulkUtilTest.java +++ b/src/test/java/io/cdap/plugin/salesforce/SalesforceBulkUtilTest.java @@ -23,6 +23,9 @@ import com.sforce.async.BulkConnection; import com.sforce.async.ConcurrencyMode; import com.sforce.async.JobInfo; +import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants; +import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -83,4 +86,25 @@ public void testAwaitCompletionBatchCompleted() throws Exception { // Call the awaitCompletion method and verify that it completes successfully SalesforceBulkUtil.awaitCompletion(bulkConnection, job, Collections.singletonList(batchInfo), true); } + + @Test + public void testPKChunkingEnabledForSobjectsWithAllCases() { + Assert.assertTrue(SalesforceSplitUtil.isPkChunkingSupported("Account")); + Assert.assertTrue(SalesforceSplitUtil.isPkChunkingSupported("account")); + Assert.assertTrue(SalesforceSplitUtil.isPkChunkingSupported("aCcOuNt")); + Assert.assertTrue(SalesforceSplitUtil.isPkChunkingSupported("Case")); + Assert.assertTrue(SalesforceSplitUtil.isPkChunkingSupported("CONTACT")); + Assert.assertTrue(SalesforceSplitUtil.isPkChunkingSupported("test_custom__c")); + Assert.assertTrue(SalesforceSplitUtil.isPkChunkingSupported("Test_custom__c")); + Assert.assertTrue(SalesforceSplitUtil.isPkChunkingSupported("TEST_CUSTOM__C")); + Assert.assertFalse(SalesforceSplitUtil.isPkChunkingSupported("Not_Supported")); + Assert.assertFalse(SalesforceSplitUtil.isPkChunkingSupported("notsupported")); + } + + @Test + public void testPKChunkingSupportedListContainsNoUpperCaseValues() { + // SUPPORTED_OBJECTS_WITH_PK_CHUNK list must have only lower case values. + Assert.assertFalse(SalesforceSourceConstants.SUPPORTED_OBJECTS_WITH_PK_CHUNK.stream() + .anyMatch(s -> s.matches(".*[A-Z].*"))); + } }