Fix IllegalStateException and NullPointerException in spilling hash join when LEFT JOIN is adaptively reordered to RIGHT JOIN#29190
Fix IllegalStateException and NullPointerException in spilling hash join when LEFT JOIN is adaptively reordered to RIGHT JOIN#29190yui2010 wants to merge 5 commits intotrinodb:masterfrom
Conversation
|
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
|
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
|
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
Description
Fix IllegalStateException and NullPointerException in spilling hash join when a LEFT JOIN is adaptively reordered to a RIGHT JOIN (LOOKUP_OUTER) at runtime.
In fault-tolerant execution (FTE) with spilling enabled, AdaptiveReorderPartitionedJoin may flip a LEFT JOIN to a RIGHT JOIN at runtime when the actual build-side data exceeds the probe side by a configurable ratio (default 1.5×, minimum 5 GB). The flip changes the JoinNode type from LEFT to RIGHT, which maps to LOOKUP_OUTER in the physical operator layer.
A LOOKUP_OUTER join requires an additional outer driver pipeline to emit unmatched build-side rows. LocalExecutionPlanner.addLookupOuterDrivers creates this pipeline by duplicating every operator factory that follows the LOOKUP_OUTER factory in the probe pipeline.
When the probe pipeline contains a downstream spilling join after the LOOKUP_OUTER join, addLookupOuterDrivers duplicates the LookupJoinOperatorFactory into the outer driver pipeline. The duplicate was created before incrementTotalOperatorsCount is called, and totalOperatorsCount was a plain OptionalInt field — copied by value into the duplicate. The duplicate therefore permanently held the stale count N (task concurrency) instead of N+1.
PartitionedLookupSourceFactory uses totalOperatorsCount to track how many probe operators will call finishProbeOperator. With N declared but N+1 actual callers, two failures occur depending on scheduling order:
IllegalStateException: N+1 probe operators finished out of N declared — when the N original probe operators finish, freePartitions() is triggered and lookupSourceSupplier is nulled; the outer driver operator then calls finishProbeOperator one more time, tripping the checkState guard.
NullPointerException: lookupSourceSupplier is null — if the outer driver operator calls withLease() after freePartitions() has already nulled lookupSourceSupplier, a dereference NPE is thrown inside SpillAwareLookupSourceProvider.
Fix:
In LookupJoinOperatorFactory: replace the plain OptionalInt totalOperatorsCount field with AtomicReference. The duplicate() copy constructor shares the same AtomicReference instance, so both the original and every duplicated factory always read the same value. A new incrementTotalOperatorsCount() method atomically increments the counter.
In LocalExecutionPlanner.addLookupOuterDrivers: after duplicating the downstream operator factories, unwrap each WorkProcessorOperatorAdapter.Factory to obtain the inner LookupJoinOperatorFactory and call incrementTotalOperatorsCount(). This happens during plan construction, before any create() call, so all subsequently created operators see the correct N+1 value.
Additional context and related issues
The bug is triggered only when all of the following conditions are met:
Fault-tolerant execution is enabled (retry_policy = TASK)
Adaptive join reordering is enabled (fault_tolerant_execution_adaptive_join_reordering_enabled = true)
Spilling is enabled (spill_enabled = true)
Actual build-side data exceeds the probe side by more than fault_tolerant_execution_adaptive_join_reordering_size_difference_ratio (default 1.5×) and exceeds fault_tolerant_execution_adaptive_join_reordering_min_size_threshold (default 5 GB), triggering the LEFT → RIGHT flip
The probe pipeline contains another spilling join downstream of the flipped LOOKUP_OUTER join
Additional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text: