Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright 2026-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.client.config;

import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.Executor;

import org.aopalliance.aop.Advice;
import org.jspecify.annotations.Nullable;

/**
* The abstract {@link AmqpListenerEndpoint} implementation holding common properties.
*
* @author Artem Bilan
*
* @since 4.1
*/
public abstract class AbstractAmqpListenerEndpoint implements AmqpListenerEndpoint {

private final String[] addresses;

private @Nullable String id;

private @Nullable Integer concurrency;

private @Nullable Boolean autoStartup;

private @Nullable Executor taskExecutor;

private @Nullable Boolean autoAccept;

private @Nullable Integer initialCredits;

private @Nullable Duration receiveTimeout;

private @Nullable Duration gracefulShutdownPeriod;

private Advice @Nullable [] adviceChain;

protected AbstractAmqpListenerEndpoint(String... addresses) {
this.addresses = Arrays.copyOf(addresses, addresses.length);
}

@Override
public String[] getAddresses() {
return this.addresses;
}

public void setId(String id) {
this.id = id;
}

@Override
public @Nullable String getId() {
return this.id;
}

public void setConcurrency(int concurrency) {
this.concurrency = concurrency;
}

@Override
public @Nullable Integer getConcurrency() {
return this.concurrency;
}

public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}

@Override
public @Nullable Boolean getAutoStartup() {
return this.autoStartup;
}

public void setTaskExecutor(Executor taskExecutor) {
this.taskExecutor = taskExecutor;
}

@Override
public @Nullable Executor getTaskExecutor() {
return this.taskExecutor;
}

public void setAutoAccept(boolean autoAccept) {
this.autoAccept = autoAccept;
}

@Override
public @Nullable Boolean getAutoAccept() {
return this.autoAccept;
}

public void setInitialCredits(int initialCredits) {
this.initialCredits = initialCredits;
}

@Override
public @Nullable Integer getInitialCredits() {
return this.initialCredits;
}

public void setReceiveTimeout(Duration receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}

@Override
public @Nullable Duration getReceiveTimeout() {
return this.receiveTimeout;
}

public void setGracefulShutdownPeriod(Duration gracefulShutdownPeriod) {
this.gracefulShutdownPeriod = gracefulShutdownPeriod;
}

@Override
public @Nullable Duration getGracefulShutdownPeriod() {
return this.gracefulShutdownPeriod;
}

public void setAdviceChain(Advice... adviceChain) {
this.adviceChain = Arrays.copyOf(adviceChain, adviceChain.length);
}

@Override
public Advice @Nullable [] getAdviceChain() {
return this.adviceChain;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.qpid.protonj2.client.ClientOptions;
import org.jspecify.annotations.Nullable;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.Bean;
Expand All @@ -43,6 +44,8 @@
* Provides default bean for the ProtonJ {@link Client}.
* If this class is imported via {@link EnableAmqp}, then {@link ClientOptions} are
* based on {@link EnableAmqp} attributes.
* <p>
* Also registers {@link AmqpListenerEndpointRegistry}.
*
* @author Artem Bilan
*
Expand All @@ -54,6 +57,11 @@
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class AmqpDefaultConfiguration implements ImportAware, EnvironmentAware {

/**
* The bean name of the default {@link AmqpMessageListenerContainerFactory}.
*/
public static final String DEFAULT_AMQP_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "amqpListenerContainerFactory";

private @Nullable AnnotationAttributes attributes;

@SuppressWarnings("NullAway.Init")
Expand All @@ -73,6 +81,16 @@ public void setEnvironment(Environment environment) {
this.environment = environment;
}

@Bean
AmqpListenerEndpointRegistry amqpListenerEndpointRegistry(
@Qualifier(DEFAULT_AMQP_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
@Nullable AmqpMessageListenerContainerFactory factory) {

return factory != null
? new AmqpListenerEndpointRegistry(factory)
: new AmqpListenerEndpointRegistry();
}

@Bean
@Conditional(OnMissingClientBeanCondition.class)
Client protonClient() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright 2026-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.client.config;

import java.time.Duration;
import java.util.concurrent.Executor;

import org.aopalliance.aop.Advice;
import org.jspecify.annotations.Nullable;

import org.springframework.amqp.core.MessageListener;

/**
* The configuration model to represent a {@link MessageListener}
* with properties for target {@link org.springframework.amqp.client.listener.AmqpMessageListenerContainer}
* to be registered as a bean in the application context.
*
* @author Artem Bilan
*
* @since 4.1
*
* @see AmqpMessageListenerContainerFactory
* @see org.springframework.amqp.client.listener.AmqpMessageListenerContainer
*/
public interface AmqpListenerEndpoint {

/**
* Return the message listener to be used in the target listener container bean.
* @return the message listener.
*/
MessageListener getMessageListener();

/**
* Return the AMQP addresses to listen to.
* @return the addresses.
*/
String[] getAddresses();

/**
* Return the id of the target listener container bean.
* TODO: If not provided, the {@link AmqpMessageListenerContainerFactory} makes a decision about generated bean name.
* @return the id
*/
@Nullable
String getId();

/**
* Return the concurrency (consumers per AMQP address) for the target listener container bean.
* The {@link AmqpMessageListenerContainerFactory} configuration is used by default.
* @return the concurrency
*/
@Nullable
Integer getConcurrency();

/**
* Override of the default {@code autoStartup} property from the {@link AmqpMessageListenerContainerFactory}.
* @return the autoStartup.
*/
@Nullable
Boolean getAutoStartup();

/**
* Get the task executor for the target listener container bean.
* Overrides any executor set on the {@link AmqpMessageListenerContainerFactory}.
* @return the executor.
*/
@Nullable
Executor getTaskExecutor();

/**
* Override the {@code autoAccept} property in the {@link AmqpMessageListenerContainerFactory}.
* @return true if auto-accept is enabled.
*/
@Nullable
Boolean getAutoAccept();

/**
* The initial number of credits to grant to the AMQP receiver.
* Override the {@code initialCredits} property in the {@link AmqpMessageListenerContainerFactory}.
* @return number of initial credits
*/
@Nullable
Integer getInitialCredits();

/**
* The {@code receiveTimeout} for the target listener container bean.
* @return the timeout for receiving messages.
*/
@Nullable
Duration getReceiveTimeout();

/**
* The graceful shutdown period for the target listener container bean.
* @return the graceful shutdown period.
*/
@Nullable
Duration getGracefulShutdownPeriod();

/**
* The advice chain for the target listener container bean.
* @return the advice chain.
*/
Advice @Nullable [] getAdviceChain();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2026-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.client.config;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.jspecify.annotations.Nullable;

/**
* The configuration container to trigger {@link org.springframework.amqp.client.listener.AmqpMessageListenerContainer}
* bean registrations based on the provided {@link AmqpListenerEndpoint} instances
* and {@link AmqpMessageListenerContainerFactory}.
* <p>
* If {@link AmqpMessageListenerContainerFactory} is not provided,
* a bean with name {@link AmqpDefaultConfiguration#DEFAULT_AMQP_LISTENER_CONTAINER_FACTORY_BEAN_NAME}
* is used by default.
* <p>
* The instance of this class can be declared as a bean, or used directly with the {@link AmqpListenerEndpointRegistry}.
*
* @author Artem Bilan
*
* @since 4.1
*
* @see AmqpListenerEndpointRegistry
*/
public class AmqpListenerEndpointRegistration {

private final List<AmqpListenerEndpoint> endpoints = new ArrayList<>();

private final @Nullable AmqpMessageListenerContainerFactory factory;

public AmqpListenerEndpointRegistration(AmqpListenerEndpoint... endpoints) {
this.factory = null;
this.endpoints.addAll(List.of(endpoints));
}

public AmqpListenerEndpointRegistration(AmqpMessageListenerContainerFactory factory,
AmqpListenerEndpoint... endpoints) {

this.factory = factory;
this.endpoints.addAll(List.of(endpoints));
}

public AmqpListenerEndpointRegistration addEndpoint(AmqpListenerEndpoint endpoint) {
this.endpoints.add(endpoint);
return this;
}

public List<AmqpListenerEndpoint> getEndpoints() {
return Collections.unmodifiableList(this.endpoints);
}

public @Nullable AmqpMessageListenerContainerFactory getContainerFactory() {
return this.factory;
}

}
Loading