Skip to content

Upgrade kafka-clients to 4.2.0 and fix deprecated API usage#507

Open
pjfanning wants to merge 3 commits intoapache:mainfrom
pjfanning:copilot/upgrade-kafka-clients-dependency
Open

Upgrade kafka-clients to 4.2.0 and fix deprecated API usage#507
pjfanning wants to merge 3 commits intoapache:mainfrom
pjfanning:copilot/upgrade-kafka-clients-dependency

Conversation

@pjfanning
Copy link
Copy Markdown
Member

Summary

Upgrades kafka-clients from 4.1.2 to 4.2.0 and fixes all compilation issues caused by newly deprecated/removed APIs.

Changes

Version bump

  • kafkaVersion = "4.1.2""4.2.0" in project/Versions.scala

ConsumerGroupMetadata constructors (deprecated with forRemoval=true in 4.2.0)

Instead of creating new ConsumerGroupMetadata(groupId) in TransactionalProducerStage, the actual ConsumerGroupMetadata is now threaded from the Kafka consumer through the message pipeline:

  • Added groupMetadata: Option[ConsumerGroupMetadata] to KafkaConsumerActor.Internal.Messages
  • KafkaConsumerActor includes consumer.groupMetadata() when sending non-empty messages
  • Added onGroupMetadata hook to BaseSingleSourceLogic and SubSourceLogic
  • Added consumerGroupMetadata: ConsumerGroupMetadata abstract method to TransactionalMessageBuilderBase
  • TransactionalSourceLogic and TransactionalSubSourceStageLogic track the current group metadata
  • Added consumerGroupMetadata field to PartitionOffsetCommittedMarker
  • TransactionalProducerStage.commitTransaction now uses batch.groupMetadata directly

This is a proper fix (not just suppression): transactions now use the actual consumer's group metadata (including generation ID and member ID), which provides proper zombie-fencing semantics for EOS.

consumer.close(Duration) (deprecated)

Replaced with consumer.close(CloseOptions.timeout(duration)) in KafkaConsumerActor.postStop; removed now-unneeded @nowarn annotation.

ConsumerGroupDescription.state() / ConsumerGroupState (deprecated since 4.0, forRemoval=true)

Replaced with groupState() / GroupState.STABLE in both KafkaSpec.scala and BaseKafkaTest.java.

Benchmark

Replaced new ConsumerGroupMetadata(fixture.groupId) with consumer.groupMetadata() (the fixture has a real KafkaConsumer reference).

Test infrastructure

ConsumerDummy.groupMetadata() now returns a non-throwing stub value. A @nowarn("msg=deprecated") annotation is used here since it's a mock consumer without a real Kafka connection, making the deprecated constructor the only option in this context.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants