Skip to content

Subscription handling cleanup#931

Open
hylkevds wants to merge 10 commits intomoquette-io:mainfrom
FraunhoferIOSB:subscription_rework
Open

Subscription handling cleanup#931
hylkevds wants to merge 10 commits intomoquette-io:mainfrom
FraunhoferIOSB:subscription_rework

Conversation

@hylkevds
Copy link
Collaborator

@hylkevds hylkevds commented Jan 31, 2026

While working on a subscription related feature I had to carefully review the way subscriptions are passed through Moquette, and did some massive cleanup and simplification.

Subscription information was being converted back and forth between several classes,
resulting in much unneeded object creation. But in the end it would always end up as Subscription.

  • This removes all the intermediate Objects, simplifying the code.
  • This includes the removal of the SharedSubscription class, since in the end, those are also turned into normal Subscriptions, making the extra class meaningless.
  • The logic contained many superfluous if/else checks for subscriptionIdentifiers caused by the wrapping and unwrapping of Subscription data.
  • All methods that would take several parameters with subscription information now simply accept a subscription.
  • Subscriptions are (already) registered on the Session, so when an unsubscribe is received, the subscription can be fetched from the Session. This allows the subscription-remove functions to be greatly simplified.
  • To simplify and speed up fetching the subscription from the Session, it is now stored in a Map instead of a Set.

These changes will greatly improve the maintainability of the code, and probably increase efficiency.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files (and/or docker env variables)
  • I have added tests that prove my fix is effective or that my feature works
  • I have updated the Changelog if it's a feature or a fix that has to be reported

@hylkevds
Copy link
Collaborator Author

Note that this PR is on top of PR #919, since I discovered that problem at the start of this work.

@hylkevds hylkevds requested a review from andsel January 31, 2026 14:01
@andsel
Copy link
Collaborator

andsel commented Feb 6, 2026

Hi @hylkevds I'll check it, sorry but I'm pretty loaded and this PR is not so trivial.

@hylkevds
Copy link
Collaborator Author

hylkevds commented Feb 7, 2026

No rush, I know how life is. I had wanted to get this done half a year ago, but I also just didn't find the time for it.
You should also prioritize 919, not this one.
This PR is far from trivial, and it doesn't actually fix or add any functionality. It just makes the logic a lot easier to follow, and probably improves performance though I've not measured that.

@hylkevds hylkevds force-pushed the subscription_rework branch from 657050e to da9b7da Compare March 9, 2026 09:27
hylkevds and others added 2 commits March 9, 2026 16:26
Subscription information was being converted back and forth between several classes,
resulting in much unneeded object creation. But in the end it would always end up as Subscription.
- This removes all the intermediate Objects, simplifying the code.
- This includes the removal of the SharedSubscription class, since in the end, those are also turned into normal
  Subscriptions, making the extra class meaningless.
- The logic contained many superfluous if/else checks for subscriptionIdentifiers caused by the wrapping
  and unwrapping of Subscription data.
- All methods that would take several parameters with subscription information now simply accept a subscription.
Subscriptions are registered on the Session, so when an unsubscribe is
received, the subscription can be fetched from the Session. This allows
the subscription-remove functions to be greatly simplified.
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors Moquette’s subscription handling by removing intermediate subscription wrapper types (including SharedSubscription) and standardizing APIs to pass Subscription objects end-to-end, with corresponding updates across persistence, session tracking, and subscription matching.

Changes:

  • Removed SharedSubscription and migrated shared-subscription storage/matching to use Subscription directly.
  • Simplified subscribe/unsubscribe flows by storing subscriptions on Session in a map keyed by topic string and reusing the stored Subscription for removals.
  • Updated repositories (H2, in-memory), trie structures, and tests to the new Subscription-centric APIs.

Reviewed changes

Copilot reviewed 23 out of 23 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
broker/src/main/java/io/moquette/broker/PostOffice.java Builds shared/non-shared Subscription objects directly; unsubscribe now looks up stored subscriptions from Session.
broker/src/main/java/io/moquette/broker/Session.java Stores subscriptions in a Map keyed by topic string; adds lookup helpers for unsubscribe.
broker/src/main/java/io/moquette/broker/SessionRegistry.java Adjusts subscription removal calls to new signatures (but still needs shared vs non-shared handling fixes).
broker/src/main/java/io/moquette/broker/ISubscriptionsRepository.java Repository API now removes/adds using Subscription objects; shared listing now returns Subscription.
broker/src/main/java/io/moquette/persistence/H2SubscriptionsRepository.java Persists shared subscriptions as Subscription; updates MVStore (de)serialization logic accordingly.
broker/src/main/java/io/moquette/persistence/MemorySubscriptionsRepository.java In-memory shared-subscription storage now uses Subscription; removal now takes Subscription.
broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java Directory API now accepts Subscription; shared ops now use Subscription directly.
broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java Removes SubscriptionRequest/UnsubscribeRequest wrappers; trie now mutates using Subscription.
broker/src/main/java/io/moquette/broker/subscriptions/CNode.java Stores shared subscriptions as Subscription and returns them directly during match selection.
broker/src/main/java/io/moquette/broker/subscriptions/Subscription.java Share name is now ShareName; adds helper to reconstruct $share/... topic string; renames option accessor.
broker/src/main/java/io/moquette/broker/subscriptions/ShareName.java Adds EMPTY_SHARENAME, isEmpty(), and Comparable.
broker/src/main/java/io/moquette/broker/subscriptions/TNode.java Updates signatures to accept Subscription.
broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java Directory interface now accepts/removes Subscription objects directly.
broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java Updates debug dump to use new option accessor.
broker/src/main/java/io/moquette/interception/messages/InterceptSubscribeMessage.java Uses Subscription.getOption() accessor.
broker/src/main/java/io/moquette/broker/subscriptions/SharedSubscription.java Deleted (type removed).
broker/src/test/java/io/moquette/persistence/H2SubscriptionsRepositorySharedSubscriptionsTest.java Updates tests to use Subscription for shared subscriptions.
broker/src/test/java/io/moquette/broker/subscriptions/SubscriptionTestUtils.java Updates helpers to construct shared subscriptions with ShareName.
broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java Updates tests to call trie methods with Subscription directly.
broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java Updates tests to new directory signatures (add/remove take Subscription).
broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java Updates shared-subscription matching tests to use Subscription.
broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java Updates perf test to use Subscription directly.
ChangeLog.txt Adds changelog entry for the cleanup.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

public List<Subscription> getSubscriptions() {
return new ArrayList<>(subscriptions);
public Collection<Subscription> getSubscriptions() {
return subscriptions.values();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hylkevds I think it's right

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. The performance cost of making the copy are mostly irrelevant, since the method is only used when re-initialising a server after a restart, or when cleaning up a session.
I'll also change the method header back to returning a List.

Comment on lines +402 to 404
Optional<Subscription> invalidSharedSubscription = sharedSubscriptions.stream()
.filter(subData -> !SharedSubscriptionUtils.validateShareName(subData.getShareName().toString()))
.findFirst();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the actual correct fix would be to make sure an empty share name can never happen. En empty share name is bound to cause other issues too.
We should track this as a separate issue, since it already exists in the current version and is not specific to the PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second look, ShareName#toString is indeed the wrong method. Should be ShareName#getShareName. That is a bit confusing indeed.
Fixed now.

Comment on lines +56 to 60
for (Subscription shared : subscriptionsRepository.listAllSharedSubscription()) {
LOG.debug("Re-subscribing shared {}", shared);
ctrie.addToTree(SubscriptionRequest.buildShared(shared.getShareName(), shared.topicFilter(),
shared.clientId(), MqttSubscriptionOption.onlyFromQos(shared.requestedQoS())));
// TODO: This must call all registered InterceptHandler.onSubscribe
ctrie.addToTree(shared);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hylkevds I think this observation is right. I think that we can address in a follow-up PR, because requires also unit test to verify the error, and this PR is already big and maybe this change if out of context with the PR.

Comment on lines 355 to 360
for (Subscription existingSub : session.getSubscriptions()) {
final boolean topicReadable = authorizator.canRead(existingSub.getTopicFilter(), username,
session.getClientID());
if (!topicReadable) {
subscriptionsDirectory.removeSubscription(existingSub.getTopicFilter(), session.getClientID());
subscriptionsDirectory.removeSubscription(existingSub);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hylkevds I think it's right, please could you check that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's correct that there is a problem. There already was a problem, and now it slightly changed :)
I think the old case never handled shared subscription at all, when re-activating a subscription.
The session.removeSubscription(existingSub); was already missing in the original version...

Maybe we should simplify the interface of ISubscriptionsDirectory, and have that isShared() check there?

Comment on lines 366 to 369
private void unsubscribe(Session session) {
for (Subscription existingSub : session.getSubscriptions()) {
subscriptionsDirectory.removeSubscription(existingSub.getTopicFilter(), session.getClientID());
subscriptionsDirectory.removeSubscription(existingSub);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hylkevds Also here I think it's right. I recall that I created pretty big unit tests, but maybe this is more an integration one that's missed. WDYT ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unsubscribe(Session) method is only used when purging a session. So we'd need an integration tests that

  • Creates a shared subscription with a persistent session.
  • Reconnects with the same clientId, but requesting a clean session.

That test currently doesn't exist... And I'm pretty sure the problem already existed before as a result. I'll add the hasShareName check.

Copy link
Collaborator

@andsel andsel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hylkevds thank's a TON for your monumental work here, the spirit of the original code was to make think smoother but at the end created a little bit of mess that you correctly fixed.

Now that we don't have anymore a specialized SharedSubscription but only the Subscription which can be shared or not, I think that in every method that is something like "sharedSubscription(Subscription)" we have to put a defensive check that effectively the provided Subscription parameter is of shared kind. This to avoid of potential leak of non shared Subscription in the shared Subscription paths. WDYT?

I left suggestion comments and also spotted that Copilot spotted maybe real problems, please could you check those?

I've create #934 as follow-up work on this, to avoid making the context of this PR too big. Feel free to add even more tasks to that issue, maybe all the other Copilot suggestions if you think that fixing in this PR is out-of-context or make it too big.

Great work 👏

for (Subscription subscription : this.subscriptionsRepository.listAllSubscriptions()) {
LOG.debug("Re-subscribing {}", subscription);
ctrie.addToTree(SubscriptionRequest.buildNonShared(subscription));
// TODO: This must call all registered InterceptHandler.onSubscribe
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hylkevds why should call the onSubscribe? onSubscribe callback is intended to be invoked when a client effectively send a SUBSCRIBE to a series of topic filter. This init method, just reconstructs the directory from the data provided by the subscription's repository (the storage) so aren't real SUBSCRIBE commands.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because an embedded server should also be brought back up to the state of all subscriptions. For instance, FROST-Server only sends messages on topics that actually have subscriptions. But to know a subscription exists, it registers an InterceptHandler so it knows about subscribe and unsubscribe events.

You could argue that the embedding server should save and restore its own state, but that just leads to the double storing of data, with the significant chance of these two states diverging due to race-conditions during shutdown. It's much more reliable to have Moquette holding the master storage, and the embedding server following that.

An alternative would be to have a separate method on the InterceptHandler for subscriptions restored from storage.

ChangeLog.txt Outdated
@@ -1,4 +1,5 @@
Version 0.19-SNAPSHOT
[Cleanup] Cleaned up subscription handling.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[Cleanup] Cleaned up subscription handling.
[cleanup] Cleaned up subscription handling. (#931)

*
* @return the original topic string.
*/
public String getOriginalTopicWithSharename() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public String getOriginalTopicWithSharename() {
public String getOriginalTopicFilterWithSharename() {


@Override
public int compareTo(ShareName o) {
return shareName.compareTo(o.shareName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If shareName is null this is a NPE.
If we know that it can't ever be null we need to use a null check assertion in the constructor, or alternatively if null is same as EMPTY_SHARENAME replace null shareName with EMPTY_SHARENAME in the constructor and remove null checks around.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed validateShareName already does the null and empty check. But I'll add a non-null check regardless, to ensure we can't accidentally add a null sharename in the future.

}

private void addSharedSubscriptionRequest(SubscriptionRequest shareSubRequest) {
private void addSharedSubscriptionRequest(Subscription shareSubRequest) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addSharedSubscriptionRequest -> addShared

Comment on lines +56 to 60
for (Subscription shared : subscriptionsRepository.listAllSharedSubscription()) {
LOG.debug("Re-subscribing shared {}", shared);
ctrie.addToTree(SubscriptionRequest.buildShared(shared.getShareName(), shared.topicFilter(),
shared.clientId(), MqttSubscriptionOption.onlyFromQos(shared.requestedQoS())));
// TODO: This must call all registered InterceptHandler.onSubscribe
ctrie.addToTree(shared);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hylkevds I think this observation is right. I think that we can address in a follow-up PR, because requires also unit test to verify the error, and this PR is already big and maybe this change if out of context with the PR.

Optional<SharedSubscriptionData> invalidSharedSubscription = sharedSubscriptions.stream()
.filter(subData -> !SharedSubscriptionUtils.validateShareName(subData.name.toString()))
Optional<Subscription> invalidSharedSubscription = sharedSubscriptions.stream()
.filter(subData -> !SharedSubscriptionUtils.validateShareName(subData.getShareName().toString()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subData now can became simply sub

public List<Subscription> getSubscriptions() {
return new ArrayList<>(subscriptions);
public Collection<Subscription> getSubscriptions() {
return subscriptions.values();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hylkevds I think it's right

Comment on lines 355 to 360
for (Subscription existingSub : session.getSubscriptions()) {
final boolean topicReadable = authorizator.canRead(existingSub.getTopicFilter(), username,
session.getClientID());
if (!topicReadable) {
subscriptionsDirectory.removeSubscription(existingSub.getTopicFilter(), session.getClientID());
subscriptionsDirectory.removeSubscription(existingSub);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hylkevds I think it's right, please could you check that?

Comment on lines 366 to 369
private void unsubscribe(Session session) {
for (Subscription existingSub : session.getSubscriptions()) {
subscriptionsDirectory.removeSubscription(existingSub.getTopicFilter(), session.getClientID());
subscriptionsDirectory.removeSubscription(existingSub);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hylkevds Also here I think it's right. I recall that I created pretty big unit tests, but maybe this is more an integration one that's missed. WDYT ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants