This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e7fe8893bd5 [fix][broker] Fix duplicate watcher registration after 
SessionReestablished (#24621)
e7fe8893bd5 is described below

commit e7fe8893bd559ea9db085d2dc7121ab04767fdfb
Author: hrzzzz <[email protected]>
AuthorDate: Thu Aug 14 16:14:29 2025 +0800

    [fix][broker] Fix duplicate watcher registration after SessionReestablished 
(#24621)
    
    Co-authored-by: ruihongzhou <[email protected]>
---
 .../pulsar/metadata/impl/ZKMetadataStore.java      |  7 ++--
 .../org/apache/pulsar/metadata/ZKSessionTest.java  | 49 ++++++++++++++++++++++
 2 files changed, 53 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 56c0962f250..d3065fcaae2 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -81,6 +81,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
     private final boolean isZkManaged;
     private final ZooKeeper zkc;
     private final ZKSessionWatcher sessionWatcher;
+    private final Watcher eventWatcher = this::handleWatchEvent;
 
     public ZKMetadataStore(String metadataURL, MetadataStoreConfig 
metadataStoreConfig, boolean enableSessionWatcher)
             throws MetadataStoreException {
@@ -108,7 +109,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
             } else {
                 sessionWatcher = null;
             }
-            zkc.addWatch("/", this::handleWatchEvent, 
AddWatchMode.PERSISTENT_RECURSIVE);
+            zkc.addWatch("/", eventWatcher, AddWatchMode.PERSISTENT_RECURSIVE);
         } catch (Throwable t) {
             throw new MetadataStoreException(t);
         }
@@ -142,14 +143,14 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
         this.isZkManaged = isZkManaged;
         this.zkc = zkc;
         this.sessionWatcher = new ZKSessionWatcher(zkc, 
this::receivedSessionEvent);
-        zkc.addWatch("/", this::handleWatchEvent, 
AddWatchMode.PERSISTENT_RECURSIVE);
+        zkc.addWatch("/", eventWatcher, AddWatchMode.PERSISTENT_RECURSIVE);
     }
 
     @Override
     protected void receivedSessionEvent(SessionEvent event) {
         if (event == SessionEvent.SessionReestablished) {
             // Recreate the persistent watch on the new session
-            zkc.addWatch("/", this::handleWatchEvent, 
AddWatchMode.PERSISTENT_RECURSIVE,
+            zkc.addWatch("/", eventWatcher, AddWatchMode.PERSISTENT_RECURSIVE,
                     (rc, path, ctx) -> {
                         if (rc == Code.OK.intValue()) {
                             super.receivedSessionEvent(event);
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
index 0c8869989bc..cf8000ebef6 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
@@ -20,15 +20,19 @@ package org.apache.pulsar.metadata;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import java.time.Duration;
+import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.coordination.CoordinationService;
 import org.apache.pulsar.metadata.api.coordination.LeaderElection;
 import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
@@ -99,6 +103,51 @@ public class ZKSessionTest extends BaseMetadataStoreTest {
         assertNull(e);
     }
 
+    @Test
+    public void testNoDuplicateWatcherRegistrationAfterSessionReestablished() 
throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(zks.getConnectionString(),
+                MetadataStoreConfig.builder()
+                        .sessionTimeoutMillis(2_000)
+                        .build());
+
+        BlockingQueue<SessionEvent> sessionEvents = new 
LinkedBlockingQueue<>();
+        store.registerSessionListener(sessionEvents::add);
+
+        BlockingQueue<Notification> notifications = new 
LinkedBlockingQueue<>();
+        store.registerListener(notifications::add);
+
+        zks.stop();
+
+        SessionEvent e = sessionEvents.poll(5, TimeUnit.SECONDS);
+        assertEquals(e, SessionEvent.ConnectionLost);
+
+        e = sessionEvents.poll(10, TimeUnit.SECONDS);
+        assertEquals(e, SessionEvent.SessionLost);
+
+        zks.start();
+        boolean zkServerReady = zks.waitForServerUp(zks.getConnectionString(), 
30_000);
+        assertTrue(zkServerReady);
+        e = sessionEvents.poll(10, TimeUnit.SECONDS);
+        assertEquals(e, SessionEvent.Reconnected);
+        e = sessionEvents.poll(10, TimeUnit.SECONDS);
+        assertEquals(e, SessionEvent.SessionReestablished);
+
+        // perform a put operation to trigger watch notification
+        String path = "/a";
+        store.put(path, "value-a".getBytes(), Optional.of(-1L));
+
+        // receive creation notification
+        Notification n = notifications.poll(5, TimeUnit.SECONDS);
+        assertNotNull(n);
+        assertEquals(n.getPath(), path);
+        assertEquals(n.getType(), NotificationType.Created);
+
+        // no duplicate notification is received
+        n = notifications.poll(1, TimeUnit.SECONDS);
+        assertNull(n);
+    }
+
     @Test
     public void testReacquireLocksAfterSessionLost() throws Exception {
         @Cleanup

Reply via email to