NIFI-15464 - ConsumeKafka - Commit pending offsets for revoked partitions #10769
+558
−2
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary
NIFI-15464 - ConsumeKafka - Commit pending offsets for revoked partitions
Problem
The
ConsumeKafkaprocessor usingKafka3ConnectionServicecauses duplicate message processing when a consumer group rebalance occurs (when a consumer starts or stops for example). This happens because theKafka3ConsumerService.onPartitionsRevoked()callback only performs a rollback instead of committing pending offsets before partitions are revoked.When a rebalance is triggered:
onPartitionsRevoked()before the consumer loses ownership of its partitionsRebalanceInProgressExceptionThis issue did not occur with the legacy
ConsumeKafka_2_6processor in NiFi 1.x.Root Cause
In NiFi 1.x, the
ConsumerLeaseclass implementedConsumerRebalanceListenerand had direct access to both the uncommitted offsets (tracked internally) and theProcessSession. WhenonPartitionsRevoked()was called, it would commit pending offsets before partitions were revoked.For reference:
In NiFi 2.x, the architecture changed:
Kafka3ConsumerServicehandles Kafka consumer operations and implementsConsumerRebalanceListenerConsumeKafkaprocessor handles session/FlowFile operations and offset tracking (viaOffsetTracker)Kafka3ConsumerServiceto commit pending offsets duringonPartitionsRevoked()because it didn't track themSolution
This PR restores the NiFi 1.x behavior by tracking uncommitted offsets internally within
Kafka3ConsumerService:ConcurrentHashMap<TopicPartition, Long>to track the maximum offset for each partition as records are polledpoll()to update the tracked offsets for each record consumedonPartitionsRevoked()to commit the tracked offsets for revoked partitions before they are taken awaycommit()to clear tracked offsets for partitions that have been committedrollback()to clear tracked offsets for partitions being rolled backThis approach mirrors what was done in
ConsumerLease.uncommittedOffsetsMapin NiFi 1.x.Important Note
While this fix restores correct behavior, a cleaner architectural approach would be to introduce a callback mechanism where the processor can be notified during
onPartitionsRevoked()and provide the offsets to commit. This would:This larger refactor could be done as a follow-up iteration if desired. This would increase the coupling between processors and the service layer so not sure this is an awesome idea.
Testing
I spent some time adding integration testing and it works locally. Depending on timing and such, it may reveal being flaky... If that's the case, we can drop the new IT class completely but let's see...
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000VerifiedstatusPull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation