Skip to content

With a @KafkaListener that returns a Mono, the error handler does not retry #4198

@moonyoungCHAE

Description

@moonyoungCHAE

In what version(s) of Spring for Apache Kafka are you seeing this issue?

3.3.7

Describe the bug

When creating an @KafkaListener that returns a Mono, the error handler does not retry when an exception occurs. I got this bug from stack overflow report.

To Reproduce

@Component
public class MyListener {
    @KafkaListener(
            clientIdPrefix = "myNewClient",
            groupId = "aa",
            topics = "monoTest"
    )
    public void listen(ConsumerRecord<?, ?> record) throws MyException {
        System.out.println("get data:  offset" + record.offset());
        throw new MyException();
    }
}
@Configuration
public class Config {
    private static final Logger logger = LoggerFactory.getLogger(Config.class);

    @Bean
    public CommonErrorHandler handler() {
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(new FixedBackOff(1L, 20L));
        errorHandler.addRetryableExceptions(MyException.class, TimeoutException.class, ListenerExecutionFailedException.class);
        errorHandler.setRetryListeners((record, ex, attempt) ->
                logger.info("[CUSTOM-RETRY] attempt={} topic={} partition={} offset={} ex={}",
                        attempt, record.topic(), record.partition(), record.offset(), ex.toString())
        );
        return errorHandler;
    }
}

Expected behavior

When an exception occurs, the message is retried. For non-Mono listeners, retries behave as shown below.

2025-12-04T16:07:54.241+09:00  INFO 9045 --- [| adminclient-2] o.apache.kafka.common.metrics.Metrics    : Metrics reporters closed
get data:  offset0
2025-12-04T16:07:54.246+09:00  INFO 9045 --- [ntainer#0-0-C-1] org.example.consumer.Config              : [CUSTOM-RETRY] attempt=1 topic=monoTest partition=0 offset=0 ex=org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void org.example.consumer.MyListener.listen(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>) throws org.example.consumer.MyException' threw exception
2025-12-04T16:07:54.257+09:00  INFO 9045 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=myNewClient-0, groupId=aa] Seeking to offset 0 for partition monoTest-0
2025-12-04T16:07:54.258+09:00  INFO 9045 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
get data:  offset0
2025-12-04T16:07:54.742+09:00  INFO 9045 --- [ntainer#0-0-C-1] org.example.consumer.Config              : [CUSTOM-RETRY] attempt=2 topic=monoTest partition=0 offset=0 ex=org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void org.example.consumer.MyListener.listen(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>) throws org.example.consumer.MyException' threw exception
2025-12-04T16:07:54.754+09:00  INFO 9045 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=myNewClient-0, groupId=aa] Seeking to offset 0 for partition monoTest-0
2025-12-04T16:07:54.755+09:00  INFO 9045 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
get data:  offset0

But when the listener returns a Mono, the message is not retried as shown below.


get data:  offset0
2025-12-04T16:12:40.109+09:00  INFO 10101 --- [ntainer#0-0-C-1] org.example.consumer.Config              : [CUSTOM-RETRY] attempt=1 topic=monoTest partition=0 offset=0 ex=org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public reactor.core.publisher.Mono<java.lang.Void> org.example.consumer.MyListener.listen(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>) throws org.example.consumer.MyException' threw exception
2025-12-04T16:12:40.122+09:00  INFO 10101 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=myNewClient-0, groupId=aa] Seeking to offset 0 for partition monoTest-0
2025-12-04T16:12:40.123+09:00  INFO 10101 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
... then no retry

Sample

A link to a GitHub repository with a minimal, reproducible, sample.

Reports that include a sample will take priority over reports that do not.
At times, we may require a sample, so it is good to try and include a sample up front.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions