Skip to content

Conversation

@pvillard31
Copy link
Contributor

@pvillard31 pvillard31 commented Jan 14, 2026

Summary

NIFI-15464 - ConsumeKafka - Commit pending offsets for revoked partitions

Problem

The ConsumeKafka processor using Kafka3ConnectionService causes duplicate message processing when a consumer group rebalance occurs (when a consumer starts or stops for example). This happens because the Kafka3ConsumerService.onPartitionsRevoked() callback only performs a rollback instead of committing pending offsets before partitions are revoked.

When a rebalance is triggered:

  • Kafka calls onPartitionsRevoked() before the consumer loses ownership of its partitions
  • The current implementation just rolls back to the last committed offset
  • Any messages that were polled but not yet committed are lost
  • The processor later attempts to commit offsets but receives RebalanceInProgressException
  • Another consumer re-processes the same messages, causing duplicates

This issue did not occur with the legacy ConsumeKafka_2_6 processor in NiFi 1.x.

Root Cause

In NiFi 1.x, the ConsumerLease class implemented ConsumerRebalanceListener and had direct access to both the uncommitted offsets (tracked internally) and the ProcessSession. When onPartitionsRevoked() was called, it would commit pending offsets before partitions were revoked.

For reference:

In NiFi 2.x, the architecture changed:

  • Kafka3ConsumerService handles Kafka consumer operations and implements ConsumerRebalanceListener
  • ConsumeKafka processor handles session/FlowFile operations and offset tracking (via OffsetTracker)
  • There was no mechanism for Kafka3ConsumerService to commit pending offsets during onPartitionsRevoked() because it didn't track them

Solution

This PR restores the NiFi 1.x behavior by tracking uncommitted offsets internally within Kafka3ConsumerService:

  • Added a ConcurrentHashMap<TopicPartition, Long> to track the maximum offset for each partition as records are polled
  • Modified poll() to update the tracked offsets for each record consumed
  • Modified onPartitionsRevoked() to commit the tracked offsets for revoked partitions before they are taken away
  • Modified commit() to clear tracked offsets for partitions that have been committed
  • Modified rollback() to clear tracked offsets for partitions being rolled back

This approach mirrors what was done in ConsumerLease.uncommittedOffsetsMap in NiFi 1.x.

Important Note

While this fix restores correct behavior, a cleaner architectural approach would be to introduce a callback mechanism where the processor can be notified during onPartitionsRevoked() and provide the offsets to commit. This would:

  • Maintain proper separation of concerns between the service and processor layers
  • Preserve the correct ordering of session commit before Kafka offset commit
  • Give the processor full control over what gets committed during rebalance

This larger refactor could be done as a follow-up iteration if desired. This would increase the coupling between processors and the service layer so not sure this is an awesome idea.

Testing

I spent some time adding integration testing and it works locally. Depending on timing and such, it may reveal being flaky... If that's the case, we can drop the new IT class completely but let's see...

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000
  • Pull request contains commits signed with a registered key indicating Verified status

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant