[PLUGIN-1957] Validate PK Chunking for incremental loads#354
[PLUGIN-1957] Validate PK Chunking for incremental loads#354harishhk107 wants to merge 1 commit intodata-integrations:developfrom
Conversation
|
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
faac75b to
5b4cf9e
Compare
db562e2 to
3f613ab
Compare
3f613ab to
d6c2a04
Compare
d88d48d to
fba1f3f
Compare
| public static final int MAX_PK_CHUNK_SIZE = 250000; | ||
| public static final int DEFAULT_PK_CHUNK_SIZE = 100000; | ||
| public static final int MIN_PK_CHUNK_SIZE = 1; | ||
| public static final long AUTO_PK_CHUNK_THRESHOLD = 1_000_000; |
There was a problem hiding this comment.
1M is too high, we should use the chunk size(that we generally use to create chunks) as a threshold here
There was a problem hiding this comment.
While we could lower the threshold to the chunk size, we have tested the 1M threshold and found that it doesn't make a significant difference in execution time
Before ~13m
After ~13m
There was a problem hiding this comment.
1M is too high from readstream perspective, for some cases we may not be able to read that much data in a single stream.
Also how are you measuring execution time? in DTS it would definitely change as with chunking we will process 1M record in parallel and without chunking it would happen in a sequential manner
There was a problem hiding this comment.
I’m currently testing this locally on a CDAP pipeline.
| * @param threshold the record count threshold above which PK chunking is enabled | ||
| * @return true if PK chunking should be auto-enabled, false otherwise | ||
| */ | ||
| public static boolean shouldAutoDetectPKChunk(String query, AuthenticatorCredentials credentials, long threshold) { |
There was a problem hiding this comment.
We are not auto detecting chunking, instead it's a validation check for PK chunking.
We can use more intuitive names like hasRequiredCountForPkChunking
| return false; | ||
| } | ||
| if (SalesforceQueryParser.isRestrictedPKQuery(query)) { | ||
| LOG.debug("PK Chunking auto-decision: query contains restricted clauses, skipping PK chunking"); |
There was a problem hiding this comment.
record count criteria not auto decision, apply everywhere
| } | ||
|
|
||
| @Test | ||
| public void testCreateCountQuery() { |
There was a problem hiding this comment.
can we please follow same naming convention across all tests?
Test names should summarize the behavior being tested and its expected outcome.
| @Test | ||
| public void testCreateCountQuery() { | ||
| String query = "SELECT Id,Name,SomeField FROM sObjectName WHERE LastModifiedDate>=2019-04-12T23:23:23Z"; | ||
| String countQuery = SalesforceQueryUtil.createCountQuery(query); |
There was a problem hiding this comment.
Structure test in arrange, act, assert blocks , you can use empty line as separatot between these blocks, Comment applies to all
There was a problem hiding this comment.
applied testMethodName_stateUnderTest_expectedBehavior pattern
Added blank lines for Arrange/Act/Assert
| * for __c suffix), but threshold check still applies. | ||
| */ | ||
| @Test | ||
| public void pkChunking_customObject_returnsAll() throws Exception { |
There was a problem hiding this comment.
Please follow naming and structure guideline as described in below comments
| * @param query the original SOQL query | ||
| * @return a COUNT SOQL query string | ||
| */ | ||
| public static String createCountQuery(String query) { |
There was a problem hiding this comment.
How long does this query takes when we use filters?
Can you add tests cases with before and after time with high record count
- Test table having 30-40M records and with a filter query
- Table with 10M records with a filter query.
Basically the things that we want to tests is how does the query cost scales with record count.
ca92e80 to
73dacc9
Compare
PLUGIN-1957 Autodetect PK Chunking for incremental loads
What
Adds a record count check before enabling PK chunking in SalesforceBatchSource.getSplits().
If the record count is below AUTO_PK_CHUNK_THRESHOLD (1,000,000), PK chunking is skipped
even if enabled in config, to avoid unnecessary overhead on small datasets. Just for DTS
Why
PK chunking is designed for very large datasets. Enabling it on small datasets causes empty
chunk overhead and increased pipeline execution time without any benefit. This change ensures
chunking is only applied when it is operationally justified by the actual record count.
Changes
SalesforceBatchSource.java —> fixed getSplits() to call shouldAutoDetectPKChunk() only when
config.getEnablePKChunk() && pkChunkCountCheck is true, added pkChunkCountCheck parameter
SalesforceSplitUtil.java —> added shouldAutoDetectPKChunk() which runs a COUNT() query
to check record count against the threshold before enabling chunking
SalesforceSourceConstants.java — added AUTO_PK_CHUNK_THRESHOLD = 1_000_000
Manual Testing
Verified pipeline runs correctly with enablePKChunk=true and small dataset → chunking skipped
Verified pipeline runs correctly with enablePKChunk=true and large dataset → chunking applied
Verified pipeline runs correctly with enablePKChunk=false → chunking skipped, no count query