Skip to content

Commit d6c2a04

Browse files
harishhk107vikasrathee-cs
authored andcommitted
validate PK Chunking for large queries
1 parent 1a36f01 commit d6c2a04

6 files changed

Lines changed: 213 additions & 2 deletions

File tree

src/main/java/io/cdap/plugin/salesforce/SalesforceQueryUtil.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,18 @@ public static String createSObjectIdQuery(String query) {
8383
return SELECT + FIELD_ID + " " + fromStatement;
8484
}
8585

86+
/**
87+
* Creates a COUNT query from an existing SOQL query.
88+
* Replaces the SELECT fields with COUNT() while preserving the FROM and WHERE clauses.
89+
*
90+
* @param query the original SOQL query
91+
* @return a COUNT SOQL query string
92+
*/
93+
public static String createCountQuery(String query) {
94+
String fromStatement = SalesforceQueryParser.getFromStatement(query);
95+
return SELECT + "COUNT() " + fromStatement;
96+
}
97+
8698
/**
8799
* Generates SObject query filter based on provided values.
88100
*

src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
4949
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
5050
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil;
51+
import org.slf4j.Logger;
52+
import org.slf4j.LoggerFactory;
5153

5254
import java.util.ArrayList;
5355
import java.util.HashSet;
@@ -71,6 +73,7 @@ public class SalesforceBatchSource extends
7173
public static final String NAME = "Salesforce";
7274

7375
private final SalesforceSourceConfig config;
76+
private static final Logger LOG = LoggerFactory.getLogger(SalesforceBatchSource.class);
7477
private Schema schema;
7578
private MapToRecordTransformer transformer;
7679
private Set<String> jobIds = new HashSet<>();
@@ -139,7 +142,7 @@ public void prepareRun(BatchSourceContext context) {
139142

140143
authenticatorCredentials = config.getConnection().getAuthenticatorCredentials();
141144
List<SalesforceSplit> querySplits =
142-
getSplits(config, authenticatorCredentials, context.getLogicalStartTime(), oAuthInfo);
145+
getSplits(config, authenticatorCredentials, context.getLogicalStartTime(), oAuthInfo, false);
143146
querySplits.stream().forEach(salesforceSplit -> jobIds.add(salesforceSplit.getJobId()));
144147
context.setInput(Input.of(config.getReferenceNameOrNormalizedFQN(orgId, sObjectName),
145148
new SalesforceInputFormatProvider(
@@ -148,10 +151,22 @@ public void prepareRun(BatchSourceContext context) {
148151

149152
public static List<SalesforceSplit> getSplits(
150153
SalesforceSourceConfig config, AuthenticatorCredentials authenticatorCredentials,
151-
long logicStartTime, OAuthInfo oAuthInfo) {
154+
long logicStartTime, OAuthInfo oAuthInfo, boolean autoEnablePKChunk) {
152155
String query = config.getQuery(logicStartTime, oAuthInfo);
153156
BulkConnection bulkConnection = SalesforceSplitUtil.getBulkConnection(authenticatorCredentials);
157+
154158
boolean enablePKChunk = config.getEnablePKChunk();
159+
160+
if (enablePKChunk && autoEnablePKChunk) {
161+
enablePKChunk = SalesforceSplitUtil.shouldAutoPKChunk(
162+
query, authenticatorCredentials, SalesforceSourceConstants.AUTO_PK_CHUNK_THRESHOLD);
163+
if (!enablePKChunk) {
164+
LOG.info("PK chunking skipped: record count is below threshold {} or "
165+
+ "object/query does not support it.",
166+
SalesforceSourceConstants.AUTO_PK_CHUNK_THRESHOLD);
167+
}
168+
}
169+
155170
if (enablePKChunk) {
156171
String parent = config.getParent();
157172
int chunkSize = config.getChunkSize();

src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSourceConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class SalesforceSourceConstants {
5555
public static final int MAX_PK_CHUNK_SIZE = 250000;
5656
public static final int DEFAULT_PK_CHUNK_SIZE = 100000;
5757
public static final int MIN_PK_CHUNK_SIZE = 1;
58+
public static final long AUTO_PK_CHUNK_THRESHOLD = 1_000_000;
5859
// https://developer.salesforce.com/docs/atlas.en-us.252.0.api_asynch.meta/api_asynch/
5960
// async_api_headers_enable_pk_chunking.htm
6061
// **Always use lowercase names** to ensure consistency, especially if the sObject name is manually provided.

src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,21 @@
2626
import com.sforce.async.JobInfo;
2727
import com.sforce.async.JobStateEnum;
2828
import com.sforce.async.OperationEnum;
29+
import com.sforce.soap.partner.PartnerConnection;
30+
import com.sforce.soap.partner.QueryResult;
31+
import com.sforce.ws.ConnectionException;
2932
import dev.failsafe.FailsafeException;
3033
import dev.failsafe.RetryPolicy;
3134
import dev.failsafe.TimeoutExceededException;
3235
import io.cdap.plugin.salesforce.BulkAPIBatchException;
3336
import io.cdap.plugin.salesforce.InvalidConfigException;
3437
import io.cdap.plugin.salesforce.SObjectDescriptor;
3538
import io.cdap.plugin.salesforce.SalesforceBulkUtil;
39+
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
3640
import io.cdap.plugin.salesforce.SalesforceQueryUtil;
3741
import io.cdap.plugin.salesforce.authenticator.Authenticator;
3842
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
43+
import io.cdap.plugin.salesforce.parser.SalesforceQueryParser;
3944
import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceSplit;
4045
import org.slf4j.Logger;
4146
import org.slf4j.LoggerFactory;
@@ -263,6 +268,43 @@ public static boolean isPkChunkingSupported(String sobjectName) {
263268
return false;
264269
}
265270

271+
/**
272+
* Determines whether PK chunking should be enabled automatically based on record count,
273+
* object support, and query compatibility.
274+
*
275+
* @param query the SOQL query
276+
* @param credentials authenticator credentials for SOAP API
277+
* @param threshold the record count threshold above which PK chunking is enabled
278+
* @return true if PK chunking should be auto-enabled, false otherwise
279+
*/
280+
public static boolean shouldAutoPKChunk(String query, AuthenticatorCredentials credentials, long threshold) {
281+
try {
282+
String sObjectName = SObjectDescriptor.fromQuery(query).getName();
283+
if (!isPkChunkingSupported(sObjectName)) {
284+
LOG.debug("PK Chunking auto-decision: object '{}' is not supported for PK chunking", sObjectName);
285+
return false;
286+
}
287+
if (SalesforceQueryParser.isRestrictedPKQuery(query)) {
288+
LOG.debug("PK Chunking auto-decision: query contains restricted clauses, skipping PK chunking");
289+
return false;
290+
}
291+
String countQuery = SalesforceQueryUtil.createCountQuery(query);
292+
PartnerConnection partnerConnection = SalesforceConnectionUtil.getPartnerConnection(credentials);
293+
QueryResult result = partnerConnection.query(countQuery);
294+
int recordCount = result.getSize();
295+
LOG.debug("PK Chunking auto-decision: object '{}' has {} records, threshold is {}",
296+
sObjectName, recordCount, threshold);
297+
return recordCount >= threshold;
298+
} catch (ConnectionException e) {
299+
LOG.warn("PK Chunking auto-decision: failed to execute count query, falling back to no PK chunking", e);
300+
return false;
301+
} catch (Exception e) {
302+
LOG.warn("PK Chunking auto-decision: unexpected error, falling back to no PK chunking", e);
303+
return false;
304+
}
305+
}
306+
307+
266308
// This is added only for UCS use case.
267309
private static boolean isCustomObject(String sobjectName) {
268310
return sobjectName.toLowerCase().endsWith("__c");

src/test/java/io/cdap/plugin/salesforce/SalesforceQueryUtilTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,4 +227,18 @@ public void testCreateSObjectIdQuery() {
227227

228228
Assert.assertEquals("SELECT Id " + fromClause, sObjectIdQuery);
229229
}
230+
231+
@Test
232+
public void testCreateCountQuery() {
233+
String query = "SELECT Id,Name,SomeField FROM sObjectName WHERE LastModifiedDate>=2019-04-12T23:23:23Z";
234+
String countQuery = SalesforceQueryUtil.createCountQuery(query);
235+
Assert.assertEquals("SELECT COUNT() FROM sObjectName WHERE LastModifiedDate>=2019-04-12T23:23:23Z", countQuery);
236+
}
237+
238+
@Test
239+
public void testCreateCountQueryWithoutWhere() {
240+
String query = "SELECT Id, Name FROM Account";
241+
String countQuery = SalesforceQueryUtil.createCountQuery(query);
242+
Assert.assertEquals("SELECT COUNT() FROM Account", countQuery);
243+
}
230244
}

src/test/java/io/cdap/plugin/salesforce/etl/SalesforceBatchSourceETLTest.java

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,4 +624,131 @@ private void testPKChunk(ImmutableMap<String, String> properties) throws Excepti
624624
Assert.assertEquals("record3", results.get(2).get("Name"));
625625
Assert.assertEquals("record4", results.get(3).get("Name"));
626626
}
627+
628+
/**
629+
* Test: UI toggle TRUE + records > 10M threshold → PK Chunking should be enabled
630+
*/
631+
@Test
632+
public void testPKChunkWithUITrueAndRecordsAboveThreshold() throws Exception {
633+
// Setup: User explicitly enables PK Chunking and records exceed 10M threshold
634+
ImmutableMap<String, String> properties = new ImmutableMap.Builder<String, String>()
635+
.put("enablePKChunk", "true")
636+
.put("chunkSize", "100000")
637+
.build();
638+
639+
// Verify behavior: PK Chunking should be enabled
640+
testPKChunk(properties);
641+
}
642+
643+
/**
644+
* Test: UI toggle FALSE → PK Chunking should never be enabled
645+
* regardless of record count (respects explicit user setting)
646+
*/
647+
@Test
648+
public void testPKChunkWithUIFalse() throws Exception {
649+
String sObjectName = createCustomObject("IT_PKChunkDisabled", null);
650+
651+
List<SObject> sObjects = new ImmutableList.Builder<SObject>()
652+
.add(new SObjectBuilder()
653+
.setType(sObjectName)
654+
.put("Name", "record1")
655+
.build())
656+
.add(new SObjectBuilder()
657+
.setType(sObjectName)
658+
.put("Name", "record2")
659+
.build())
660+
.build();
661+
662+
addSObjects(sObjects, true);
663+
664+
// User explicitly disables PK Chunking
665+
ImmutableMap<String, String> properties = new ImmutableMap.Builder<String, String>()
666+
.put("enablePKChunk", "false")
667+
.build();
668+
669+
ImmutableMap.Builder<String, String> baseProperties = getBaseProperties("SalesforceReaderPKChunkDisabled");
670+
baseProperties.putAll(properties);
671+
baseProperties.put(SalesforceSourceConstants.PROPERTY_QUERY, "Select Name from " + sObjectName);
672+
673+
List<StructuredRecord> results = getPipelineResults(baseProperties.build(), SalesforceBatchSource.NAME,
674+
"SalesforceBatch");
675+
676+
// Verify: Records are returned but PK Chunking should not be applied
677+
results.sort(Comparator.comparing(record -> record.get("Name")));
678+
Assert.assertEquals(2, results.size());
679+
Assert.assertEquals("record1", results.get(0).get("Name"));
680+
Assert.assertEquals("record2", results.get(1).get("Name"));
681+
}
682+
683+
/**
684+
* Test: Default (no UI setting) + small filtered query (< 10M records)
685+
* → PK Chunking should be disabled (prevents empty chunks)
686+
*/
687+
@Test
688+
public void testPKChunkWithDefaultSettingAndSmallFilteredQuery() throws Exception {
689+
String sObjectName = createCustomObject("IT_PKChunkSmallQuery", null);
690+
691+
// Create a small dataset
692+
List<SObject> sObjects = new ImmutableList.Builder<SObject>()
693+
.add(new SObjectBuilder()
694+
.setType(sObjectName)
695+
.put("Name", "filtered_record")
696+
.put("Status", "Active")
697+
.build())
698+
.build();
699+
700+
addSObjects(sObjects, true);
701+
702+
// No explicit PK Chunking setting - use default
703+
ImmutableMap.Builder<String, String> baseProperties = getBaseProperties("SalesforceReaderSmallQuery");
704+
baseProperties.put(SalesforceSourceConstants.PROPERTY_QUERY,
705+
"Select Name from " + sObjectName + " WHERE Status = 'Active'");
706+
707+
List<StructuredRecord> results = getPipelineResults(baseProperties.build(), SalesforceBatchSource.NAME,
708+
"SalesforceBatch");
709+
710+
// Verify: Small filtered query should NOT use PK Chunking
711+
Assert.assertEquals(1, results.size());
712+
Assert.assertEquals("filtered_record", results.get(0).get("Name"));
713+
}
714+
715+
/**
716+
* Test: Custom Object with PK Chunking enabled
717+
* Verifies that custom objects (__c suffix) support PK Chunking
718+
*/
719+
@Test
720+
public void testPKChunkWithCustomObject() throws Exception {
721+
String customObjectName = createCustomObject("CustomPKChunk__c", null);
722+
723+
List<SObject> sObjects = new ImmutableList.Builder<SObject>()
724+
.add(new SObjectBuilder()
725+
.setType(customObjectName)
726+
.put("Name", "custom_record1")
727+
.build())
728+
.add(new SObjectBuilder()
729+
.setType(customObjectName)
730+
.put("Name", "custom_record2")
731+
.build())
732+
.build();
733+
734+
addSObjects(sObjects, true);
735+
736+
ImmutableMap<String, String> properties = new ImmutableMap.Builder<String, String>()
737+
.put("enablePKChunk", "true")
738+
.put("chunkSize", "100000")
739+
.build();
740+
741+
ImmutableMap.Builder<String, String> baseProperties = getBaseProperties("SalesforceReaderCustomPKChunk");
742+
baseProperties.putAll(properties);
743+
baseProperties.put(SalesforceSourceConstants.PROPERTY_QUERY, "Select Name from " + customObjectName);
744+
745+
List<StructuredRecord> results = getPipelineResults(baseProperties.build(), SalesforceBatchSource.NAME,
746+
"SalesforceBatch");
747+
748+
// Verify: Custom object with PK Chunking enabled should work correctly
749+
results.sort(Comparator.comparing(record -> record.get("Name")));
750+
Assert.assertEquals(2, results.size());
751+
Assert.assertEquals("custom_record1", results.get(0).get("Name"));
752+
Assert.assertEquals("custom_record2", results.get(1).get("Name"));
753+
}
627754
}

0 commit comments

Comments
 (0)