diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java index 38487d237..8cc1e9d63 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java @@ -27,6 +27,7 @@ import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorClosedException; import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.CuratorListener; import org.apache.curator.framework.imps.CuratorFrameworkBase; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.listen.StandardListenerManager; @@ -53,6 +54,11 @@ public class PersistentWatcher implements Closeable { reset(); } }; + private final CuratorListener clientCloseListener = (ignored, event) -> { + if (event.getType() == CuratorEventType.CLOSING) { + onClientClosed(); + } + }; private final Watcher watcher = event -> listeners.forEach(w -> w.process(event)); private final CuratorFramework client; private final String basePath; @@ -82,11 +88,7 @@ public void start() { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started"); client.getConnectionStateListenable().addListener(connectionStateListener); // This could be a namespaced facade which does not support getCuratorListenable. - ((CuratorFrameworkBase) client).client().getCuratorListenable().addListener(((ignored, event) -> { - if (event.getType() == CuratorEventType.CLOSING) { - onClientClosed(); - } - })); + ((CuratorFrameworkBase) client).client().getCuratorListenable().addListener(clientCloseListener); reset(); } @@ -97,6 +99,7 @@ public void start() { public void close() { if (state.compareAndSet(State.STARTED, State.CLOSED)) { listeners.clear(); + ((CuratorFrameworkBase) client).client().getCuratorListenable().removeListener(clientCloseListener); client.getConnectionStateListenable().removeListener(connectionStateListener); try { client.watchers().remove(watcher).guaranteed().inBackground().forPath(basePath); diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java index a94a99281..efc3640c4 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.listen.StandardListenerManager; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.compatibility.CuratorTestBase; @@ -134,6 +135,24 @@ public void testAfterClientClose() throws Exception { assertEquals(Watcher.Event.KeeperState.Closed, event.getState()); } + // GH-1292 Curator listener Leak in PersistentWatcher + @Test + public void testCuratorListenersNotLeaking() { + try (CuratorFramework client = CuratorFrameworkFactory.newClient( + server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) { + client.start(); + + for (int i = 0; i < 100; i++) { + try (PersistentWatcher persistentWatcher = new PersistentWatcher(client, "/top/main", true)) { + persistentWatcher.start(); + } + } + + assertEquals(0, ((StandardListenerManager) client.getCuratorListenable()).size(), + "Curator listeners set up by the now closed PersistentWatchers should have been de-registered"); + } + } + private void internalTest(boolean recursive) throws Exception { try (CuratorFramework client = CuratorFrameworkFactory.newClient( server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) {