-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Open
Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3.7
Describe the bug
When creating an @KafkaListener that returns a Mono, the error handler does not retry when an exception occurs. I got this bug from stack overflow report.
To Reproduce
@Component
public class MyListener {
@KafkaListener(
clientIdPrefix = "myNewClient",
groupId = "aa",
topics = "monoTest"
)
public void listen(ConsumerRecord<?, ?> record) throws MyException {
System.out.println("get data: offset" + record.offset());
throw new MyException();
}
}@Configuration
public class Config {
private static final Logger logger = LoggerFactory.getLogger(Config.class);
@Bean
public CommonErrorHandler handler() {
DefaultErrorHandler errorHandler = new DefaultErrorHandler(new FixedBackOff(1L, 20L));
errorHandler.addRetryableExceptions(MyException.class, TimeoutException.class, ListenerExecutionFailedException.class);
errorHandler.setRetryListeners((record, ex, attempt) ->
logger.info("[CUSTOM-RETRY] attempt={} topic={} partition={} offset={} ex={}",
attempt, record.topic(), record.partition(), record.offset(), ex.toString())
);
return errorHandler;
}
}Expected behavior
When an exception occurs, the message is retried. For non-Mono listeners, retries behave as shown below.
2025-12-04T16:07:54.241+09:00 INFO 9045 --- [| adminclient-2] o.apache.kafka.common.metrics.Metrics : Metrics reporters closed
get data: offset0
2025-12-04T16:07:54.246+09:00 INFO 9045 --- [ntainer#0-0-C-1] org.example.consumer.Config : [CUSTOM-RETRY] attempt=1 topic=monoTest partition=0 offset=0 ex=org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void org.example.consumer.MyListener.listen(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>) throws org.example.consumer.MyException' threw exception
2025-12-04T16:07:54.257+09:00 INFO 9045 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=myNewClient-0, groupId=aa] Seeking to offset 0 for partition monoTest-0
2025-12-04T16:07:54.258+09:00 INFO 9045 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered
get data: offset0
2025-12-04T16:07:54.742+09:00 INFO 9045 --- [ntainer#0-0-C-1] org.example.consumer.Config : [CUSTOM-RETRY] attempt=2 topic=monoTest partition=0 offset=0 ex=org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void org.example.consumer.MyListener.listen(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>) throws org.example.consumer.MyException' threw exception
2025-12-04T16:07:54.754+09:00 INFO 9045 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=myNewClient-0, groupId=aa] Seeking to offset 0 for partition monoTest-0
2025-12-04T16:07:54.755+09:00 INFO 9045 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered
get data: offset0
But when the listener returns a Mono, the message is not retried as shown below.
get data: offset0
2025-12-04T16:12:40.109+09:00 INFO 10101 --- [ntainer#0-0-C-1] org.example.consumer.Config : [CUSTOM-RETRY] attempt=1 topic=monoTest partition=0 offset=0 ex=org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public reactor.core.publisher.Mono<java.lang.Void> org.example.consumer.MyListener.listen(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>) throws org.example.consumer.MyException' threw exception
2025-12-04T16:12:40.122+09:00 INFO 10101 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=myNewClient-0, groupId=aa] Seeking to offset 0 for partition monoTest-0
2025-12-04T16:12:40.123+09:00 INFO 10101 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered
... then no retry
Sample
A link to a GitHub repository with a minimal, reproducible, sample.
Reports that include a sample will take priority over reports that do not.
At times, we may require a sample, so it is good to try and include a sample up front.
Reactions are currently unavailable