Conversation
|
Note that this PR is on top of PR #919, since I discovered that problem at the start of this work. |
|
Hi @hylkevds I'll check it, sorry but I'm pretty loaded and this PR is not so trivial. |
|
No rush, I know how life is. I had wanted to get this done half a year ago, but I also just didn't find the time for it. |
657050e to
da9b7da
Compare
Subscription information was being converted back and forth between several classes, resulting in much unneeded object creation. But in the end it would always end up as Subscription. - This removes all the intermediate Objects, simplifying the code. - This includes the removal of the SharedSubscription class, since in the end, those are also turned into normal Subscriptions, making the extra class meaningless. - The logic contained many superfluous if/else checks for subscriptionIdentifiers caused by the wrapping and unwrapping of Subscription data. - All methods that would take several parameters with subscription information now simply accept a subscription.
Subscriptions are registered on the Session, so when an unsubscribe is received, the subscription can be fetched from the Session. This allows the subscription-remove functions to be greatly simplified.
da9b7da to
ee0cf5a
Compare
There was a problem hiding this comment.
Pull request overview
This PR refactors Moquette’s subscription handling by removing intermediate subscription wrapper types (including SharedSubscription) and standardizing APIs to pass Subscription objects end-to-end, with corresponding updates across persistence, session tracking, and subscription matching.
Changes:
- Removed
SharedSubscriptionand migrated shared-subscription storage/matching to useSubscriptiondirectly. - Simplified subscribe/unsubscribe flows by storing subscriptions on
Sessionin a map keyed by topic string and reusing the storedSubscriptionfor removals. - Updated repositories (
H2, in-memory), trie structures, and tests to the newSubscription-centric APIs.
Reviewed changes
Copilot reviewed 23 out of 23 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| broker/src/main/java/io/moquette/broker/PostOffice.java | Builds shared/non-shared Subscription objects directly; unsubscribe now looks up stored subscriptions from Session. |
| broker/src/main/java/io/moquette/broker/Session.java | Stores subscriptions in a Map keyed by topic string; adds lookup helpers for unsubscribe. |
| broker/src/main/java/io/moquette/broker/SessionRegistry.java | Adjusts subscription removal calls to new signatures (but still needs shared vs non-shared handling fixes). |
| broker/src/main/java/io/moquette/broker/ISubscriptionsRepository.java | Repository API now removes/adds using Subscription objects; shared listing now returns Subscription. |
| broker/src/main/java/io/moquette/persistence/H2SubscriptionsRepository.java | Persists shared subscriptions as Subscription; updates MVStore (de)serialization logic accordingly. |
| broker/src/main/java/io/moquette/persistence/MemorySubscriptionsRepository.java | In-memory shared-subscription storage now uses Subscription; removal now takes Subscription. |
| broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java | Directory API now accepts Subscription; shared ops now use Subscription directly. |
| broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java | Removes SubscriptionRequest/UnsubscribeRequest wrappers; trie now mutates using Subscription. |
| broker/src/main/java/io/moquette/broker/subscriptions/CNode.java | Stores shared subscriptions as Subscription and returns them directly during match selection. |
| broker/src/main/java/io/moquette/broker/subscriptions/Subscription.java | Share name is now ShareName; adds helper to reconstruct $share/... topic string; renames option accessor. |
| broker/src/main/java/io/moquette/broker/subscriptions/ShareName.java | Adds EMPTY_SHARENAME, isEmpty(), and Comparable. |
| broker/src/main/java/io/moquette/broker/subscriptions/TNode.java | Updates signatures to accept Subscription. |
| broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java | Directory interface now accepts/removes Subscription objects directly. |
| broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java | Updates debug dump to use new option accessor. |
| broker/src/main/java/io/moquette/interception/messages/InterceptSubscribeMessage.java | Uses Subscription.getOption() accessor. |
| broker/src/main/java/io/moquette/broker/subscriptions/SharedSubscription.java | Deleted (type removed). |
| broker/src/test/java/io/moquette/persistence/H2SubscriptionsRepositorySharedSubscriptionsTest.java | Updates tests to use Subscription for shared subscriptions. |
| broker/src/test/java/io/moquette/broker/subscriptions/SubscriptionTestUtils.java | Updates helpers to construct shared subscriptions with ShareName. |
| broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java | Updates tests to call trie methods with Subscription directly. |
| broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java | Updates tests to new directory signatures (add/remove take Subscription). |
| broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java | Updates shared-subscription matching tests to use Subscription. |
| broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java | Updates perf test to use Subscription directly. |
| ChangeLog.txt | Adds changelog entry for the cleanup. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public List<Subscription> getSubscriptions() { | ||
| return new ArrayList<>(subscriptions); | ||
| public Collection<Subscription> getSubscriptions() { | ||
| return subscriptions.values(); |
There was a problem hiding this comment.
Indeed. The performance cost of making the copy are mostly irrelevant, since the method is only used when re-initialising a server after a restart, or when cleaning up a session.
I'll also change the method header back to returning a List.
| Optional<Subscription> invalidSharedSubscription = sharedSubscriptions.stream() | ||
| .filter(subData -> !SharedSubscriptionUtils.validateShareName(subData.getShareName().toString())) | ||
| .findFirst(); |
There was a problem hiding this comment.
I think the actual correct fix would be to make sure an empty share name can never happen. En empty share name is bound to cause other issues too.
We should track this as a separate issue, since it already exists in the current version and is not specific to the PR.
There was a problem hiding this comment.
On second look, ShareName#toString is indeed the wrong method. Should be ShareName#getShareName. That is a bit confusing indeed.
Fixed now.
| for (Subscription shared : subscriptionsRepository.listAllSharedSubscription()) { | ||
| LOG.debug("Re-subscribing shared {}", shared); | ||
| ctrie.addToTree(SubscriptionRequest.buildShared(shared.getShareName(), shared.topicFilter(), | ||
| shared.clientId(), MqttSubscriptionOption.onlyFromQos(shared.requestedQoS()))); | ||
| // TODO: This must call all registered InterceptHandler.onSubscribe | ||
| ctrie.addToTree(shared); | ||
| } |
There was a problem hiding this comment.
@hylkevds I think this observation is right. I think that we can address in a follow-up PR, because requires also unit test to verify the error, and this PR is already big and maybe this change if out of context with the PR.
| for (Subscription existingSub : session.getSubscriptions()) { | ||
| final boolean topicReadable = authorizator.canRead(existingSub.getTopicFilter(), username, | ||
| session.getClientID()); | ||
| if (!topicReadable) { | ||
| subscriptionsDirectory.removeSubscription(existingSub.getTopicFilter(), session.getClientID()); | ||
| subscriptionsDirectory.removeSubscription(existingSub); | ||
| } |
There was a problem hiding this comment.
@hylkevds I think it's right, please could you check that?
There was a problem hiding this comment.
It's correct that there is a problem. There already was a problem, and now it slightly changed :)
I think the old case never handled shared subscription at all, when re-activating a subscription.
The session.removeSubscription(existingSub); was already missing in the original version...
Maybe we should simplify the interface of ISubscriptionsDirectory, and have that isShared() check there?
| private void unsubscribe(Session session) { | ||
| for (Subscription existingSub : session.getSubscriptions()) { | ||
| subscriptionsDirectory.removeSubscription(existingSub.getTopicFilter(), session.getClientID()); | ||
| subscriptionsDirectory.removeSubscription(existingSub); | ||
| } |
There was a problem hiding this comment.
@hylkevds Also here I think it's right. I recall that I created pretty big unit tests, but maybe this is more an integration one that's missed. WDYT ?
There was a problem hiding this comment.
This unsubscribe(Session) method is only used when purging a session. So we'd need an integration tests that
- Creates a shared subscription with a persistent session.
- Reconnects with the same clientId, but requesting a clean session.
That test currently doesn't exist... And I'm pretty sure the problem already existed before as a result. I'll add the hasShareName check.
andsel
left a comment
There was a problem hiding this comment.
Hi @hylkevds thank's a TON for your monumental work here, the spirit of the original code was to make think smoother but at the end created a little bit of mess that you correctly fixed.
Now that we don't have anymore a specialized SharedSubscription but only the Subscription which can be shared or not, I think that in every method that is something like "sharedSubscription(Subscription)" we have to put a defensive check that effectively the provided Subscription parameter is of shared kind. This to avoid of potential leak of non shared Subscription in the shared Subscription paths. WDYT?
I left suggestion comments and also spotted that Copilot spotted maybe real problems, please could you check those?
I've create #934 as follow-up work on this, to avoid making the context of this PR too big. Feel free to add even more tasks to that issue, maybe all the other Copilot suggestions if you think that fixing in this PR is out-of-context or make it too big.
Great work 👏
| for (Subscription subscription : this.subscriptionsRepository.listAllSubscriptions()) { | ||
| LOG.debug("Re-subscribing {}", subscription); | ||
| ctrie.addToTree(SubscriptionRequest.buildNonShared(subscription)); | ||
| // TODO: This must call all registered InterceptHandler.onSubscribe |
There was a problem hiding this comment.
@hylkevds why should call the onSubscribe? onSubscribe callback is intended to be invoked when a client effectively send a SUBSCRIBE to a series of topic filter. This init method, just reconstructs the directory from the data provided by the subscription's repository (the storage) so aren't real SUBSCRIBE commands.
There was a problem hiding this comment.
Because an embedded server should also be brought back up to the state of all subscriptions. For instance, FROST-Server only sends messages on topics that actually have subscriptions. But to know a subscription exists, it registers an InterceptHandler so it knows about subscribe and unsubscribe events.
You could argue that the embedding server should save and restore its own state, but that just leads to the double storing of data, with the significant chance of these two states diverging due to race-conditions during shutdown. It's much more reliable to have Moquette holding the master storage, and the embedding server following that.
An alternative would be to have a separate method on the InterceptHandler for subscriptions restored from storage.
ChangeLog.txt
Outdated
| @@ -1,4 +1,5 @@ | |||
| Version 0.19-SNAPSHOT | |||
| [Cleanup] Cleaned up subscription handling. | |||
There was a problem hiding this comment.
| [Cleanup] Cleaned up subscription handling. | |
| [cleanup] Cleaned up subscription handling. (#931) |
| * | ||
| * @return the original topic string. | ||
| */ | ||
| public String getOriginalTopicWithSharename() { |
There was a problem hiding this comment.
| public String getOriginalTopicWithSharename() { | |
| public String getOriginalTopicFilterWithSharename() { |
|
|
||
| @Override | ||
| public int compareTo(ShareName o) { | ||
| return shareName.compareTo(o.shareName); |
There was a problem hiding this comment.
If shareName is null this is a NPE.
If we know that it can't ever be null we need to use a null check assertion in the constructor, or alternatively if null is same as EMPTY_SHARENAME replace null shareName with EMPTY_SHARENAME in the constructor and remove null checks around.
There was a problem hiding this comment.
I just noticed validateShareName already does the null and empty check. But I'll add a non-null check regardless, to ensure we can't accidentally add a null sharename in the future.
| } | ||
|
|
||
| private void addSharedSubscriptionRequest(SubscriptionRequest shareSubRequest) { | ||
| private void addSharedSubscriptionRequest(Subscription shareSubRequest) { |
There was a problem hiding this comment.
addSharedSubscriptionRequest -> addShared
| for (Subscription shared : subscriptionsRepository.listAllSharedSubscription()) { | ||
| LOG.debug("Re-subscribing shared {}", shared); | ||
| ctrie.addToTree(SubscriptionRequest.buildShared(shared.getShareName(), shared.topicFilter(), | ||
| shared.clientId(), MqttSubscriptionOption.onlyFromQos(shared.requestedQoS()))); | ||
| // TODO: This must call all registered InterceptHandler.onSubscribe | ||
| ctrie.addToTree(shared); | ||
| } |
There was a problem hiding this comment.
@hylkevds I think this observation is right. I think that we can address in a follow-up PR, because requires also unit test to verify the error, and this PR is already big and maybe this change if out of context with the PR.
| Optional<SharedSubscriptionData> invalidSharedSubscription = sharedSubscriptions.stream() | ||
| .filter(subData -> !SharedSubscriptionUtils.validateShareName(subData.name.toString())) | ||
| Optional<Subscription> invalidSharedSubscription = sharedSubscriptions.stream() | ||
| .filter(subData -> !SharedSubscriptionUtils.validateShareName(subData.getShareName().toString())) |
There was a problem hiding this comment.
subData now can became simply sub
| public List<Subscription> getSubscriptions() { | ||
| return new ArrayList<>(subscriptions); | ||
| public Collection<Subscription> getSubscriptions() { | ||
| return subscriptions.values(); |
| for (Subscription existingSub : session.getSubscriptions()) { | ||
| final boolean topicReadable = authorizator.canRead(existingSub.getTopicFilter(), username, | ||
| session.getClientID()); | ||
| if (!topicReadable) { | ||
| subscriptionsDirectory.removeSubscription(existingSub.getTopicFilter(), session.getClientID()); | ||
| subscriptionsDirectory.removeSubscription(existingSub); | ||
| } |
There was a problem hiding this comment.
@hylkevds I think it's right, please could you check that?
| private void unsubscribe(Session session) { | ||
| for (Subscription existingSub : session.getSubscriptions()) { | ||
| subscriptionsDirectory.removeSubscription(existingSub.getTopicFilter(), session.getClientID()); | ||
| subscriptionsDirectory.removeSubscription(existingSub); | ||
| } |
There was a problem hiding this comment.
@hylkevds Also here I think it's right. I recall that I created pretty big unit tests, but maybe this is more an integration one that's missed. WDYT ?
While working on a subscription related feature I had to carefully review the way subscriptions are passed through Moquette, and did some massive cleanup and simplification.
Subscription information was being converted back and forth between several classes,
resulting in much unneeded object creation. But in the end it would always end up as Subscription.
These changes will greatly improve the maintainability of the code, and probably increase efficiency.
Checklist
I have made corresponding changes to the documentationI have made corresponding change to the default configuration files (and/or docker env variables)