This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d874c3ef84e9875a30f989c078840f9410bfdead Author: hrzzzz <[email protected]> AuthorDate: Thu Aug 14 16:14:29 2025 +0800 [fix][broker] Fix duplicate watcher registration after SessionReestablished (#24621) Co-authored-by: ruihongzhou <[email protected]> (cherry picked from commit e7fe8893bd559ea9db085d2dc7121ab04767fdfb) --- .../pulsar/metadata/impl/ZKMetadataStore.java | 7 ++-- .../org/apache/pulsar/metadata/ZKSessionTest.java | 49 ++++++++++++++++++++++ 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 56c0962f250..d3065fcaae2 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -81,6 +81,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore private final boolean isZkManaged; private final ZooKeeper zkc; private final ZKSessionWatcher sessionWatcher; + private final Watcher eventWatcher = this::handleWatchEvent; public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig, boolean enableSessionWatcher) throws MetadataStoreException { @@ -108,7 +109,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore } else { sessionWatcher = null; } - zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE); + zkc.addWatch("/", eventWatcher, AddWatchMode.PERSISTENT_RECURSIVE); } catch (Throwable t) { throw new MetadataStoreException(t); } @@ -142,14 +143,14 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore this.isZkManaged = isZkManaged; this.zkc = zkc; this.sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent); - zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE); + zkc.addWatch("/", eventWatcher, AddWatchMode.PERSISTENT_RECURSIVE); } @Override protected void receivedSessionEvent(SessionEvent event) { if (event == SessionEvent.SessionReestablished) { // Recreate the persistent watch on the new session - zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE, + zkc.addWatch("/", eventWatcher, AddWatchMode.PERSISTENT_RECURSIVE, (rc, path, ctx) -> { if (rc == Code.OK.intValue()) { super.receivedSessionEvent(event); diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java index 0c8869989bc..cf8000ebef6 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java @@ -20,15 +20,19 @@ package org.apache.pulsar.metadata; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import java.time.Duration; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.coordination.CoordinationService; import org.apache.pulsar.metadata.api.coordination.LeaderElection; import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; @@ -99,6 +103,51 @@ public class ZKSessionTest extends BaseMetadataStoreTest { assertNull(e); } + @Test + public void testNoDuplicateWatcherRegistrationAfterSessionReestablished() throws Exception { + @Cleanup + MetadataStoreExtended store = MetadataStoreExtended.create(zks.getConnectionString(), + MetadataStoreConfig.builder() + .sessionTimeoutMillis(2_000) + .build()); + + BlockingQueue<SessionEvent> sessionEvents = new LinkedBlockingQueue<>(); + store.registerSessionListener(sessionEvents::add); + + BlockingQueue<Notification> notifications = new LinkedBlockingQueue<>(); + store.registerListener(notifications::add); + + zks.stop(); + + SessionEvent e = sessionEvents.poll(5, TimeUnit.SECONDS); + assertEquals(e, SessionEvent.ConnectionLost); + + e = sessionEvents.poll(10, TimeUnit.SECONDS); + assertEquals(e, SessionEvent.SessionLost); + + zks.start(); + boolean zkServerReady = zks.waitForServerUp(zks.getConnectionString(), 30_000); + assertTrue(zkServerReady); + e = sessionEvents.poll(10, TimeUnit.SECONDS); + assertEquals(e, SessionEvent.Reconnected); + e = sessionEvents.poll(10, TimeUnit.SECONDS); + assertEquals(e, SessionEvent.SessionReestablished); + + // perform a put operation to trigger watch notification + String path = "/a"; + store.put(path, "value-a".getBytes(), Optional.of(-1L)); + + // receive creation notification + Notification n = notifications.poll(5, TimeUnit.SECONDS); + assertNotNull(n); + assertEquals(n.getPath(), path); + assertEquals(n.getType(), NotificationType.Created); + + // no duplicate notification is received + n = notifications.poll(1, TimeUnit.SECONDS); + assertNull(n); + } + @Test public void testReacquireLocksAfterSessionLost() throws Exception { @Cleanup
