This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3936ce49e78 [fix][broker] Handle missing replicator during snapshot
request processing (#25266)
3936ce49e78 is described below
commit 3936ce49e786210a213ce46977bc5e33bd1b28d9
Author: sinan liu <[email protected]>
AuthorDate: Mon Mar 16 17:15:38 2026 +0800
[fix][broker] Handle missing replicator during snapshot request processing
(#25266)
---
.../ReplicatedSubscriptionsController.java | 6 +
.../ReplicatedSubscriptionsControllerTest.java | 168 +++++++++++++++++++++
2 files changed, 174 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index 5eaa313c3d0..a156566d0fe 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -147,6 +147,12 @@ public class ReplicatedSubscriptionsController implements
AutoCloseable, Topic.P
private void
receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest request) {
// if replicator producer is already closed, restart it to send
snapshot response
Replicator replicator =
topic.getReplicators().get(request.getSourceCluster());
+ if (replicator == null) {
+ log.warn("[{}] Received replicated subscription snapshot request
{} from cluster {}, but no replicator is"
+ + " configured for that cluster. Ignoring the
request.",
+ topic.getName(), request.getSnapshotId(),
request.getSourceCluster());
+ return;
+ }
if (!replicator.isConnected()) {
topic.startReplProducers();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsControllerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsControllerTest.java
new file mode 100644
index 00000000000..92a15ad2605
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsControllerTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import io.netty.buffer.ByteBuf;
+import java.time.Clock;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.qos.MonotonicClock;
+import org.apache.pulsar.broker.service.BacklogQuotaManager;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Replicator;
+import org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats;
+import org.apache.pulsar.common.api.proto.MarkerType;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.Markers;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-replication")
+public class ReplicatedSubscriptionsControllerTest {
+
+ @Test
+ public void
testSnapshotRequestWhenReplicatorRemovedConcurrentlyDoesNotThrow() throws
Exception {
+ // Use a real PersistentTopic instance so that the replicator removal
happens via the production code path
+ // (PersistentTopic#removeReplicator), instead of directly doing
"replicators.remove(..)" in the test.
+ PulsarService pulsar = mock(PulsarService.class);
+ ScheduledExecutorService executor =
mock(ScheduledExecutorService.class);
+ @SuppressWarnings("rawtypes")
+ ScheduledFuture timer = mock(ScheduledFuture.class);
+ ServiceConfiguration config = new ServiceConfiguration();
+ config.setReplicatorPrefix("pulsar.repl");
+ config.setEnableReplicatedSubscriptions(true);
+ OpenTelemetryReplicatedSubscriptionStats stats =
mock(OpenTelemetryReplicatedSubscriptionStats.class);
+ MonotonicClock monotonicClock = System::nanoTime;
+ BacklogQuotaManager backlogQuotaManager =
mock(BacklogQuotaManager.class);
+
when(backlogQuotaManager.getDefaultQuota()).thenReturn(BacklogQuotaImpl.builder()
+ .limitSize(0)
+ .limitTime(0)
+
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold)
+ .build());
+ BrokerService brokerService = mock(BrokerService.class);
+ ManagedLedger ledger = mock(ManagedLedger.class);
+ MessageDeduplication messageDeduplication =
mock(MessageDeduplication.class);
+
+ when(brokerService.getClock()).thenReturn(Clock.systemUTC());
+ when(brokerService.pulsar()).thenReturn(pulsar);
+ when(brokerService.getPulsar()).thenReturn(pulsar);
+
when(brokerService.getBacklogQuotaManager()).thenReturn(backlogQuotaManager);
+
+ when(pulsar.getExecutor()).thenReturn(executor);
+ when(pulsar.getConfiguration()).thenReturn(config);
+
when(pulsar.getOpenTelemetryReplicatedSubscriptionStats()).thenReturn(stats);
+ when(pulsar.getMonotonicClock()).thenReturn(monotonicClock);
+ when(executor.scheduleAtFixedRate(any(Runnable.class), anyLong(),
anyLong(), any(TimeUnit.class)))
+ .thenReturn(timer);
+
+ doAnswer(invocation -> {
+ DeleteCursorCallback callback = invocation.getArgument(1);
+ callback.deleteCursorComplete(null);
+ return null;
+ }).when(ledger).asyncDeleteCursor(anyString(),
any(DeleteCursorCallback.class), any());
+
+ PersistentTopic topic =
+ new PersistentTopic("persistent://public/default/t1",
brokerService, ledger, messageDeduplication);
+
+ Replicator replicator = mock(Replicator.class);
+
when(replicator.terminate()).thenReturn(CompletableFuture.completedFuture(null));
+ topic.getReplicators().put("remote", replicator);
+
+ // Create a spy topic that blocks at the moment of
"replicators.get(remote)" so we can deterministically
+ // reproduce the race between:
+ // 1) Replicator removal (policy update ->
PersistentTopic#removeReplicator)
+ // 2) Handling an in-flight snapshot request marker from that remote
cluster
+ PersistentTopic topicSpy = Mockito.spy(topic);
+ Map<String, Replicator> delegateReplicators = topic.getReplicators();
+ CountDownLatch enteredGet = new CountDownLatch(1);
+ CountDownLatch allowGet = new CountDownLatch(1);
+ Map<String, Replicator> blockingReplicators = new AbstractMap<>() {
+ @Override
+ public Replicator get(Object key) {
+ enteredGet.countDown();
+ try {
+ allowGet.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return delegateReplicators.get(key);
+ }
+
+ @Override
+ public Set<Entry<String, Replicator>> entrySet() {
+ return delegateReplicators.entrySet();
+ }
+ };
+ when(topicSpy.getReplicators()).thenReturn(blockingReplicators);
+
+ ReplicatedSubscriptionsController controller = new
ReplicatedSubscriptionsController(topicSpy, "local");
+
+ ByteBuf marker =
Markers.newReplicatedSubscriptionsSnapshotRequest("snapshot-1", "remote");
+ try {
+ // The controller expects the "payload" buffer whose readerIndex
already points to the marker payload.
+ Commands.skipMessageMetadata(marker);
+
+ @Cleanup("shutdownNow")
+ var pool = Executors.newSingleThreadExecutor();
+ var handlingFuture = pool.submit(() ->
controller.receivedReplicatedSubscriptionMarker(
+ PositionFactory.create(1, 1),
+ MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE,
+ marker));
+
+ // Wait until the marker handling code is about to read the
replicators map.
+ Assert.assertTrue(enteredGet.await(10, TimeUnit.SECONDS));
+
+ // Remove the replicator using the production code path.
+ topic.removeReplicator("remote").join();
+ verify(replicator).terminate();
+ verify(ledger).asyncDeleteCursor(anyString(),
any(DeleteCursorCallback.class), eq(null));
+
+ // Let the marker handling continue and observe a missing
replicator.
+ allowGet.countDown();
+ handlingFuture.get(10, TimeUnit.SECONDS);
+ } finally {
+ marker.release();
+ controller.close();
+ }
+ }
+}