This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e7fe8893bd5 [fix][broker] Fix duplicate watcher registration after
SessionReestablished (#24621)
e7fe8893bd5 is described below
commit e7fe8893bd559ea9db085d2dc7121ab04767fdfb
Author: hrzzzz <[email protected]>
AuthorDate: Thu Aug 14 16:14:29 2025 +0800
[fix][broker] Fix duplicate watcher registration after SessionReestablished
(#24621)
Co-authored-by: ruihongzhou <[email protected]>
---
.../pulsar/metadata/impl/ZKMetadataStore.java | 7 ++--
.../org/apache/pulsar/metadata/ZKSessionTest.java | 49 ++++++++++++++++++++++
2 files changed, 53 insertions(+), 3 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 56c0962f250..d3065fcaae2 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -81,6 +81,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
private final boolean isZkManaged;
private final ZooKeeper zkc;
private final ZKSessionWatcher sessionWatcher;
+ private final Watcher eventWatcher = this::handleWatchEvent;
public ZKMetadataStore(String metadataURL, MetadataStoreConfig
metadataStoreConfig, boolean enableSessionWatcher)
throws MetadataStoreException {
@@ -108,7 +109,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
} else {
sessionWatcher = null;
}
- zkc.addWatch("/", this::handleWatchEvent,
AddWatchMode.PERSISTENT_RECURSIVE);
+ zkc.addWatch("/", eventWatcher, AddWatchMode.PERSISTENT_RECURSIVE);
} catch (Throwable t) {
throw new MetadataStoreException(t);
}
@@ -142,14 +143,14 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
this.isZkManaged = isZkManaged;
this.zkc = zkc;
this.sessionWatcher = new ZKSessionWatcher(zkc,
this::receivedSessionEvent);
- zkc.addWatch("/", this::handleWatchEvent,
AddWatchMode.PERSISTENT_RECURSIVE);
+ zkc.addWatch("/", eventWatcher, AddWatchMode.PERSISTENT_RECURSIVE);
}
@Override
protected void receivedSessionEvent(SessionEvent event) {
if (event == SessionEvent.SessionReestablished) {
// Recreate the persistent watch on the new session
- zkc.addWatch("/", this::handleWatchEvent,
AddWatchMode.PERSISTENT_RECURSIVE,
+ zkc.addWatch("/", eventWatcher, AddWatchMode.PERSISTENT_RECURSIVE,
(rc, path, ctx) -> {
if (rc == Code.OK.intValue()) {
super.receivedSessionEvent(event);
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
index 0c8869989bc..cf8000ebef6 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
@@ -20,15 +20,19 @@ package org.apache.pulsar.metadata;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.time.Duration;
+import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.coordination.LeaderElection;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
@@ -99,6 +103,51 @@ public class ZKSessionTest extends BaseMetadataStoreTest {
assertNull(e);
}
+ @Test
+ public void testNoDuplicateWatcherRegistrationAfterSessionReestablished()
throws Exception {
+ @Cleanup
+ MetadataStoreExtended store =
MetadataStoreExtended.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder()
+ .sessionTimeoutMillis(2_000)
+ .build());
+
+ BlockingQueue<SessionEvent> sessionEvents = new
LinkedBlockingQueue<>();
+ store.registerSessionListener(sessionEvents::add);
+
+ BlockingQueue<Notification> notifications = new
LinkedBlockingQueue<>();
+ store.registerListener(notifications::add);
+
+ zks.stop();
+
+ SessionEvent e = sessionEvents.poll(5, TimeUnit.SECONDS);
+ assertEquals(e, SessionEvent.ConnectionLost);
+
+ e = sessionEvents.poll(10, TimeUnit.SECONDS);
+ assertEquals(e, SessionEvent.SessionLost);
+
+ zks.start();
+ boolean zkServerReady = zks.waitForServerUp(zks.getConnectionString(),
30_000);
+ assertTrue(zkServerReady);
+ e = sessionEvents.poll(10, TimeUnit.SECONDS);
+ assertEquals(e, SessionEvent.Reconnected);
+ e = sessionEvents.poll(10, TimeUnit.SECONDS);
+ assertEquals(e, SessionEvent.SessionReestablished);
+
+ // perform a put operation to trigger watch notification
+ String path = "/a";
+ store.put(path, "value-a".getBytes(), Optional.of(-1L));
+
+ // receive creation notification
+ Notification n = notifications.poll(5, TimeUnit.SECONDS);
+ assertNotNull(n);
+ assertEquals(n.getPath(), path);
+ assertEquals(n.getType(), NotificationType.Created);
+
+ // no duplicate notification is received
+ n = notifications.poll(1, TimeUnit.SECONDS);
+ assertNull(n);
+ }
+
@Test
public void testReacquireLocksAfterSessionLost() throws Exception {
@Cleanup