This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 96be3bb2e2a [fix][broker]excessive replication speed leads to error: 
Producer send queue is full (#24189)
96be3bb2e2a is described below

commit 96be3bb2e2aad0bd7925cb7fb117d1ea58bf9507
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jul 2 14:11:25 2025 +0800

    [fix][broker]excessive replication speed leads to error: Producer send 
queue is full (#24189)
    
    Co-authored-by: Kai Wang <[email protected]>
    (cherry picked from commit 37b17d3e6920e851fa56f962b43bd698963033f5)
---
 .../pulsar/broker/service/AbstractReplicator.java  |  31 +-
 .../apache/pulsar/broker/service/Replicator.java   |   2 +-
 .../nonpersistent/NonPersistentReplicator.java     |   5 +
 .../persistent/GeoPersistentReplicator.java        |  34 +-
 .../service/persistent/MessageDeduplication.java   |   5 +-
 .../service/persistent/PersistentReplicator.java   | 375 ++++++++++++++++-----
 .../broker/service/persistent/PersistentTopic.java |  11 +-
 .../service/persistent/ShadowReplicator.java       |  16 +-
 .../broker/service/AbstractReplicatorTest.java     |   3 +
 .../BrokerServiceInternalMethodInvoker.java        |  26 ++
 .../broker/service/OneWayReplicatorTest.java       | 154 ++++++++-
 .../broker/service/OneWayReplicatorTestBase.java   |  26 +-
 ...OneWayReplicatorUsingGlobalPartitionedTest.java |  12 +
 .../service/OneWayReplicatorUsingGlobalZKTest.java |  14 +-
 .../broker/service/ReplicatedSubscriptionTest.java |  16 +-
 .../pulsar/broker/service/ReplicatorTest.java      |  31 +-
 .../BrokerServicePersistInternalMethodInvoker.java |  45 +++
 .../PersistentReplicatorInflightTaskTest.java      | 367 ++++++++++++++++++++
 .../service/persistent/ShadowReplicatorTest.java   |  15 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  13 +-
 .../client/impl/GeoReplicationProducerImpl.java    |   8 +
 .../apache/pulsar/client/impl/ProducerImpl.java    |  17 +
 22 files changed, 1081 insertions(+), 145 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index aae54162e5a..365e7cc3c12 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -305,9 +305,10 @@ public abstract class AbstractReplicator implements 
Replicator {
     /**
      * This method only be used by {@link PersistentTopic#checkGC} now.
      */
-    public CompletableFuture<Void> disconnect(boolean failIfHasBacklog, 
boolean closeTheStartingProducer) {
+    @Override
+    public CompletableFuture<Void> disconnect() {
         long backlog = getNumberOfEntriesInBacklog();
-        if (failIfHasBacklog && backlog > 0) {
+        if (backlog > 0) {
             CompletableFuture<Void> disconnectFuture = new 
CompletableFuture<>();
             disconnectFuture.completeExceptionally(new 
TopicBusyException("Cannot close a replicator with backlog"));
             if (log.isDebugEnabled()) {
@@ -317,9 +318,30 @@ public abstract class AbstractReplicator implements 
Replicator {
         }
         log.info("[{}] Disconnect replicator at position {} with backlog {}", 
replicatorId,
                 getReplicatorReadPosition(), backlog);
-        return closeProducerAsync(closeTheStartingProducer);
+        return beforeDisconnect()
+            .thenCompose(__ -> closeProducerAsync(true))
+            .thenApply(__ -> {
+                afterDisconnected();
+                return null;
+            });
+    }
+
+    /**
+     * This method and {@link #afterDisconnected()} are used to solve the 
following race condition:
+     * - Thread 1: calling disconnect.
+     *             passed the check: no backlog.
+     * - Thread 2: published a message, then the cursor.pendingRead completes.
+     * - Thread 1: continue to disconnect.
+     * - Thread 2: read entries from the cursor, and try to send messages, but 
the messages will be discarded because
+     *             the producer is closed.
+     * Issue: the pending reading's read position is not correct.
+     */
+    protected CompletableFuture<Void> beforeDisconnect() {
+        return CompletableFuture.completedFuture(null);
     }
 
+    protected void afterDisconnected() {}
+
     /**
      * This method only be used by {@link PersistentTopic#checkGC} now.
      */
@@ -398,12 +420,15 @@ public abstract class AbstractReplicator implements 
Replicator {
         });
     }
 
+    protected abstract void beforeTerminate();
+
     public CompletableFuture<Void> terminate() {
         if (!tryChangeStatusToTerminating()) {
             log.info("[{}] Skip current termination since other thread is 
doing termination, state : {}", replicatorId,
                     state);
             return CompletableFuture.completedFuture(null);
         }
+        beforeTerminate();
         return doCloseProducerAsync(producer, () -> {
             STATE_UPDATER.set(this, State.Terminated);
             this.producer = null;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
index 667063e4910..86e2b6e74de 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
@@ -33,7 +33,7 @@ public interface Replicator {
 
     CompletableFuture<Void> terminate();
 
-    CompletableFuture<Void> disconnect(boolean failIfHasBacklog, boolean 
closeTheStartingProducer);
+    CompletableFuture<Void> disconnect();
 
     void updateRates();
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index 45b4ebf6e17..32b58347ae6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -258,4 +258,9 @@ public class NonPersistentReplicator extends 
AbstractReplicator implements Repli
     protected void disableReplicatorRead() {
         // No-op
     }
+
+    @Override
+    protected void beforeTerminate() {
+        // No-op
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
index bc480635bab..437067edb69 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
@@ -81,7 +81,7 @@ public class GeoPersistentReplicator extends 
PersistentReplicator {
     }
 
     @Override
-    protected boolean replicateEntries(List<Entry> entries) {
+    protected boolean replicateEntries(List<Entry> entries, final InFlightTask 
inFlightTask) {
         boolean atLeastOneMessageSentForReplication = false;
         boolean isEnableReplicatedSubscriptions =
                 
brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();
@@ -90,12 +90,13 @@ public class GeoPersistentReplicator extends 
PersistentReplicator {
             // This flag is set to true when we skip at least one local 
message,
             // in order to skip remaining local messages.
             boolean isLocalMessageSkippedOnce = false;
-            boolean skipRemainingMessages = false;
+            boolean skipRemainingMessages = 
inFlightTask.isSkipReadResultDueToCursorRewind();
             for (int i = 0; i < entries.size(); i++) {
                 Entry entry = entries.get(i);
                 // Skip the messages since the replicator need to fetch the 
schema info to replicate the schema to the
                 // remote cluster. Rewind the cursor first and continue the 
message read after fetched the schema.
                 if (skipRemainingMessages) {
+                    inFlightTask.incCompletedEntries();
                     entry.release();
                     continue;
                 }
@@ -108,12 +109,14 @@ public class GeoPersistentReplicator extends 
PersistentReplicator {
                     log.error("[{}] Failed to deserialize message at {} 
(buffer size: {}): {}", replicatorId,
                             entry.getPosition(), length, t.getMessage(), t);
                     cursor.asyncDelete(entry.getPosition(), this, 
entry.getPosition());
+                    inFlightTask.incCompletedEntries();
                     entry.release();
                     continue;
                 }
 
                 if (Markers.isTxnMarker(msg.getMessageBuilder())) {
                     cursor.asyncDelete(entry.getPosition(), this, 
entry.getPosition());
+                    inFlightTask.incCompletedEntries();
                     entry.release();
                     msg.recycle();
                     continue;
@@ -123,6 +126,7 @@ public class GeoPersistentReplicator extends 
PersistentReplicator {
                             msg.getMessageBuilder().getTxnidLeastBits());
                     if (topic.isTxnAborted(tx, entry.getPosition())) {
                         cursor.asyncDelete(entry.getPosition(), this, 
entry.getPosition());
+                        inFlightTask.incCompletedEntries();
                         entry.release();
                         msg.recycle();
                         continue;
@@ -136,6 +140,7 @@ public class GeoPersistentReplicator extends 
PersistentReplicator {
                 if (msg.isReplicated()) {
                     // Discard messages that were already replicated into this 
region
                     cursor.asyncDelete(entry.getPosition(), this, 
entry.getPosition());
+                    inFlightTask.incCompletedEntries();
                     entry.release();
                     msg.recycle();
                     continue;
@@ -147,6 +152,7 @@ public class GeoPersistentReplicator extends 
PersistentReplicator {
                                 entry.getPosition(), msg.getReplicateTo());
                     }
                     cursor.asyncDelete(entry.getPosition(), this, 
entry.getPosition());
+                    inFlightTask.incCompletedEntries();
                     entry.release();
                     msg.recycle();
                     continue;
@@ -159,6 +165,7 @@ public class GeoPersistentReplicator extends 
PersistentReplicator {
                                 replicatorId, entry.getPosition(), 
msg.getReplicateTo());
                     }
                     cursor.asyncDelete(entry.getPosition(), this, 
entry.getPosition());
+                    inFlightTask.incCompletedEntries();
                     entry.release();
                     msg.recycle();
                     continue;
@@ -166,12 +173,13 @@ public class GeoPersistentReplicator extends 
PersistentReplicator {
 
                 if (STATE_UPDATER.get(this) != State.Started || 
isLocalMessageSkippedOnce) {
                     // The producer is not ready yet after having 
stopped/restarted. Drop the message because it will
-                    // recovered when the producer is ready
+                    // recover when the producer is ready
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] Dropping read message at {} because 
producer is not ready",
                                 replicatorId, entry.getPosition());
                     }
                     isLocalMessageSkippedOnce = true;
+                    inFlightTask.incCompletedEntries();
                     entry.release();
                     msg.recycle();
                     continue;
@@ -184,14 +192,23 @@ public class GeoPersistentReplicator extends 
PersistentReplicator {
 
                 CompletableFuture<SchemaInfo> schemaFuture = 
getSchemaInfo(msg);
                 if (!schemaFuture.isDone() || 
schemaFuture.isCompletedExceptionally()) {
+                    /**
+                     * Skip in flight reading tasks.
+                     * Explain the result of the race-condition between:
+                     *   - {@link #readMoreEntries}
+                     *   - {@link 
#beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding)}
+                     * Since {@link #acquirePermitsIfNotFetchingSchema} and
+                     *   {@link 
#beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding)} acquire the
+                     * same lock, it is safe.
+                     */
+                    
beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding.Fetching_Schema);
+                    inFlightTask.incCompletedEntries();
                     entry.release();
                     headersAndPayload.release();
                     msg.recycle();
                     // Mark the replicator is fetching the schema for now and 
rewind the cursor
                     // and trigger the next read after complete the schema 
fetching.
-                    fetchSchemaInProgress = true;
                     skipRemainingMessages = true;
-                    cursor.cancelPendingReadRequest();
                     log.info("[{}] Pause the data replication due to new 
detected schema", replicatorId);
                     schemaFuture.whenComplete((__, e) -> {
                         if (e != null) {
@@ -199,9 +216,7 @@ public class GeoPersistentReplicator extends 
PersistentReplicator {
                                     replicatorId, e);
                         }
                         log.info("[{}] Resume the data replication after the 
schema fetching done", replicatorId);
-                        cursor.rewind();
-                        fetchSchemaInProgress = false;
-                        readMoreEntries();
+                        doRewindCursor(true);
                     });
                 } else {
                     msg.setSchemaInfoForReplicator(schemaFuture.get());
@@ -214,11 +229,10 @@ public class GeoPersistentReplicator extends 
PersistentReplicator {
                     stats.incrementMsgOutCounter();
                     
stats.incrementBytesOutCounter(headersAndPayload.readableBytes());
                     // Increment pending messages for messages produced locally
-                    PENDING_MESSAGES_UPDATER.incrementAndGet(this);
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] Publishing {}:{}", replicatorId, 
entry.getLedgerId(), entry.getEntryId());
                     }
-                    producer.sendAsync(msg, ProducerSendCallback.create(this, 
entry, msg));
+                    producer.sendAsync(msg, ProducerSendCallback.create(this, 
entry, msg, inFlightTask));
                     atLeastOneMessageSentForReplication = true;
                 }
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 9f734e69896..cb549b6b8fc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -95,8 +95,9 @@ public class MessageDeduplication {
     }
 
     public static class MessageDupUnknownException extends RuntimeException {
-        public MessageDupUnknownException() {
-            super("Cannot determine whether the message is a duplicate at this 
time");
+        public MessageDupUnknownException(String topicName, String 
producerName) {
+            super(String.format("[%s][%s]Cannot determine whether the message 
is a duplicate at this time", topicName,
+                    producerName));
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 881b1b804e8..5952bb14d22 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -27,13 +27,14 @@ import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
+import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.Getter;
@@ -49,10 +50,12 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.MessageExpirer;
 import org.apache.pulsar.broker.service.Replicator;
 import org.apache.pulsar.client.api.MessageId;
@@ -86,19 +89,6 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
 
     private final int producerQueueThreshold;
 
-    protected static final AtomicIntegerFieldUpdater<PersistentReplicator> 
PENDING_MESSAGES_UPDATER =
-            AtomicIntegerFieldUpdater
-                    .newUpdater(PersistentReplicator.class, "pendingMessages");
-    private volatile int pendingMessages = 0;
-
-    private static final int FALSE = 0;
-    private static final int TRUE = 1;
-
-    private static final AtomicIntegerFieldUpdater<PersistentReplicator> 
HAVE_PENDING_READ_UPDATER =
-            AtomicIntegerFieldUpdater
-                    .newUpdater(PersistentReplicator.class, "havePendingRead");
-    private volatile int havePendingRead = FALSE;
-
     protected final Rate msgOut = new Rate();
     protected final Rate msgExpired = new Rate();
 
@@ -114,8 +104,17 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
     @Getter
     protected final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();
 
-    protected volatile boolean fetchSchemaInProgress = false;
-    private volatile boolean waitForCursorRewinding = false;
+    protected volatile int waitForCursorRewindingRefCnf = 0;
+
+    protected enum ReasonOfWaitForCursorRewinding {
+        Failed_Publishing,
+        Fetching_Schema,
+        Disconnecting,
+        Terminating;
+    }
+    protected ReasonOfWaitForCursorRewinding reasonOfWaitForCursorRewinding = 
null;
+
+    protected final LinkedList<InFlightTask> inFlightTasks = new 
LinkedList<>();
 
     public PersistentReplicator(String localCluster, PersistentTopic 
localTopic, ManagedCursor cursor,
                                    String remoteCluster, String remoteTopic,
@@ -127,8 +126,6 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         this.cursor = Objects.requireNonNull(cursor);
         this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopic,
                 Codec.decode(cursor.getName()), cursor, null);
-        HAVE_PENDING_READ_UPDATER.set(this, FALSE);
-        PENDING_MESSAGES_UPDATER.set(this, 0);
 
         readBatchSize = Math.min(
                 producerQueueSize,
@@ -143,11 +140,8 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
 
     @Override
     protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) 
{
-        waitForCursorRewinding = true;
-
         // Repeat until there are no read operations in progress
-        if (STATE_UPDATER.get(this) == State.Starting && 
HAVE_PENDING_READ_UPDATER.get(this) == TRUE
-                && !cursor.cancelPendingReadRequest()) {
+        if (STATE_UPDATER.get(this) == State.Starting && hasPendingRead() && 
!cursor.cancelPendingReadRequest()) {
             brokerService.getPulsar().getExecutor()
                     .schedule(() -> 
setProducerAndTriggerReadEntries(producer), 10, TimeUnit.MILLISECONDS);
             return;
@@ -164,12 +158,10 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
             if (!(producer instanceof ProducerImpl)) {
                 log.error("[{}] The partitions count between two clusters is 
not the same, the replicator can not be"
                         + " created successfully: {}", replicatorId, state);
-                waitForCursorRewinding = false;
                 doCloseProducerAsync(producer, () -> {});
                 throw new ClassCastException(producer.getClass().getName() + " 
can not be cast to ProducerImpl");
             }
             this.producer = (ProducerImpl) producer;
-            HAVE_PENDING_READ_UPDATER.set(this, FALSE);
             // Trigger a new read.
             log.info("[{}] Created replicator producer, Replicator state: {}", 
replicatorId, state);
             backOff.reset();
@@ -178,8 +170,6 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
 
             // Rewind the cursor to be sure to read again all non-acked 
messages sent while restarting
             cursor.rewind();
-            waitForCursorRewinding = false;
-
             // read entries
             readMoreEntries();
         } else {
@@ -195,7 +185,6 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
                 log.error("[{}] Replicator state is not expected, so close the 
producer. Replicator state: {}",
                         replicatorId, changeStateRes.getRight());
             }
-            waitForCursorRewinding = false;
             // Close the producer if change the state fail.
             doCloseProducerAsync(producer, () -> {});
         }
@@ -245,8 +234,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
     /**
      * Calculate available permits for read entries.
      */
-    private AvailablePermits getAvailablePermits() {
-        int availablePermits = producerQueueSize - 
PENDING_MESSAGES_UPDATER.get(this);
+    private AvailablePermits getRateLimiterAvailablePermits(int 
availablePermits) {
 
         // return 0, if Producer queue is full, it will pause read entries.
         if (availablePermits <= 0) {
@@ -291,51 +279,59 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
     }
 
     protected void readMoreEntries() {
-        if (fetchSchemaInProgress) {
-            log.info("[{}] Skip the reading due to new detected schema", 
replicatorId);
-            return;
-        }
-        AvailablePermits availablePermits = getAvailablePermits();
-        if (availablePermits.isReadable()) {
-            int messagesToRead = availablePermits.getMessages();
-            long bytesToRead = availablePermits.getBytes();
-            if (!isWritable()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Throttling replication traffic because 
producer is not writable", replicatorId);
-                }
-                // Minimize the read size if the producer is disconnected or 
the window is already full
-                messagesToRead = 1;
+        // Acquire permits and check state of producer.
+        InFlightTask newInFlightTask = acquirePermitsIfNotFetchingSchema();
+        if (newInFlightTask == null) {
+            // no permits from rate limit
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Not scheduling read due to pending read or no 
permits.", replicatorId);
             }
-
-            // Schedule read
-            if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE)) {
-                if (waitForCursorRewinding) {
-                    log.info("[{}] Skip the reading because repl producer is 
starting", replicatorId);
-                    HAVE_PENDING_READ_UPDATER.set(this, FALSE);
-                    return;
-                }
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Schedule read of {} messages or {} bytes", 
replicatorId, messagesToRead,
-                            bytesToRead);
-                }
-                cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, 
this,
-                        null, topic.getMaxReadPosition());
+            if (!hasPendingRead()) {
+                topic.getBrokerService().executor().schedule(
+                        () -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, 
TimeUnit.MILLISECONDS);
+                return;
             } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Not scheduling read due to pending read. 
Messages To Read {}, Bytes To Read {}",
-                            replicatorId, messagesToRead, bytesToRead);
-                }
+                return;
+            }
+        }
+        // If disabled RateLimiter.
+        if (!dispatchRateLimiter.isPresent() || 
!dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
+            cursor.asyncReadEntriesOrWait(newInFlightTask.readingEntries, -1, 
this,
+                    newInFlightTask/* Context object */, 
topic.getMaxReadPosition());
+            return;
+        }
+        // No permits of RateLimiter.
+        AvailablePermits availablePermits = 
getRateLimiterAvailablePermits(newInFlightTask.readingEntries);
+        if (!availablePermits.isReadable()) {
+            // no rate limiter permits from rate limit
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Throttling replication traffic. Messages To 
Read {}, Bytes To Read {}",
+                        replicatorId, availablePermits.getMessages(), 
availablePermits.getBytes());
             }
-        } else if (availablePermits.isExceeded()) {
-            // no permits from rate limit
             topic.getBrokerService().executor().schedule(
-                () -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, 
TimeUnit.MILLISECONDS);
-        } else {
+                    () -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, 
TimeUnit.MILLISECONDS);
+            return;
+        }
+        // Has permits of RateLimiter.
+        int messagesToRead = availablePermits.getMessages();
+        long bytesToRead = availablePermits.getBytes();
+        if (!isWritable()) {
             if (log.isDebugEnabled()) {
-                log.debug("[{}] No Permits for reading. availablePermits: {}",
-                        replicatorId, availablePermits);
+                log.debug("[{}] Throttling replication traffic because 
producer is not writable", replicatorId);
             }
+            // Minimize the read size if the producer is disconnected or the 
window is already full
+            messagesToRead = 1;
+        }
+        // Update acquired permits exceeds limitation.
+        if (messagesToRead < newInFlightTask.readingEntries) {
+            newInFlightTask.setReadingEntries(messagesToRead);
         }
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Schedule read of {} messages or {} bytes", 
replicatorId, newInFlightTask.readingEntries,
+                    bytesToRead);
+        }
+        cursor.asyncReadEntriesOrWait(newInFlightTask.readingEntries, 
bytesToRead, this,
+                newInFlightTask/* Context object */, 
topic.getMaxReadPosition());
     }
 
     @Override
@@ -343,7 +339,10 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         if (log.isDebugEnabled()) {
             log.debug("[{}] Read entries complete of {} messages", 
replicatorId, entries.size());
         }
+        InFlightTask inFlightTask = (InFlightTask) ctx;
+        inFlightTask.setEntries(entries);
 
+        // After the replicator starts, the speed will be gradually increased.
         int maxReadBatchSize = 
topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize();
         if (readBatchSize < maxReadBatchSize) {
             int newReadBatchSize = Math.min(readBatchSize * 2, 
maxReadBatchSize);
@@ -357,9 +356,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
 
         readFailureBackoff.reduceToHalf();
 
-        boolean atLeastOneMessageSentForReplication = 
replicateEntries(entries);
-
-        HAVE_PENDING_READ_UPDATER.set(this, FALSE);
+        boolean atLeastOneMessageSentForReplication = 
replicateEntries(entries, inFlightTask);
 
         if (atLeastOneMessageSentForReplication && !isWritable()) {
             // Don't read any more entries until the current pending entries 
are persisted
@@ -372,7 +369,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         }
     }
 
-    protected abstract boolean replicateEntries(List<Entry> entries);
+    protected abstract boolean replicateEntries(List<Entry> entries, 
InFlightTask inFlightTask);
 
     protected CompletableFuture<SchemaInfo> getSchemaInfo(MessageImpl msg) 
throws ExecutionException {
         if (msg.getSchemaVersion() == null || msg.getSchemaVersion().length == 
0) {
@@ -394,37 +391,40 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         private PersistentReplicator replicator;
         private Entry entry;
         private MessageImpl msg;
+        private InFlightTask inFlightTask;
 
         @Override
         public void sendComplete(Throwable exception, OpSendMsgStats 
opSendMsgStats) {
             if (exception != null && !(exception instanceof 
PulsarClientException.InvalidMessageException)) {
-                log.error("[{}] Error producing on remote broker", 
replicator.replicatorId, exception);
-                // cursor should be rewinded since it was incremented when 
readMoreEntries
-                replicator.cursor.rewind();
+                log.error("[{}] Error producing on remote broker, in-flight 
messages: {}, producer pending queue size:"
+                        + " {}",
+                        replicator.replicatorId, replicator.inFlightTasks,
+                        replicator.producer.getPendingQueueSize(), exception);
+                // cursor should be rewound since it was incremented when 
readMoreEntries
+                
replicator.beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding.Failed_Publishing);
+                replicator.doRewindCursor(false);
             } else {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Message persisted on remote broker", 
replicator.replicatorId, exception);
                 }
+                inFlightTask.incCompletedEntries();
                 replicator.cursor.asyncDelete(entry.getPosition(), replicator, 
entry.getPosition());
             }
             entry.release();
 
-            int pending = PENDING_MESSAGES_UPDATER.decrementAndGet(replicator);
-
             // In general, we schedule a new batch read operation when the 
occupied queue size gets smaller than half
             // the max size, unless another read operation is already in 
progress.
             // If the producer is not currently writable (disconnected or TCP 
window full), we want to defer the reads
             // until we have emptied the whole queue, and at that point we 
will read a batch of 1 single message if the
             // producer is still not "writable".
-            if (pending < replicator.producerQueueThreshold //
-                    && HAVE_PENDING_READ_UPDATER.get(replicator) == FALSE //
-            ) {
-                if (pending == 0 || replicator.producer.isWritable()) {
+            int permits = replicator.getPermitsIfNoPendingRead();
+            if (replicator.producerQueueSize - permits < 
replicator.producerQueueThreshold) {
+                if (replicator.producerQueueSize == permits || 
replicator.producer.isWritable()) {
                     replicator.readMoreEntries();
                 } else {
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] Not resuming reads. pending: {} 
is-writable: {}",
-                                replicator.replicatorId, pending,
+                                replicator.replicatorId, 
replicator.producerQueueSize - permits,
                                 replicator.producer.isWritable());
                     }
                 }
@@ -439,15 +439,18 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
             this.recyclerHandle = recyclerHandle;
         }
 
-        static ProducerSendCallback create(PersistentReplicator replicator, 
Entry entry, MessageImpl msg) {
+        static ProducerSendCallback create(PersistentReplicator replicator, 
Entry entry, MessageImpl msg,
+                                           InFlightTask inFlightTask) {
             ProducerSendCallback sendCallback = RECYCLER.get();
             sendCallback.replicator = replicator;
             sendCallback.entry = entry;
             sendCallback.msg = msg;
+            sendCallback.inFlightTask = inFlightTask;
             return sendCallback;
         }
 
         private void recycle() {
+            inFlightTask = null;
             replicator = null;
             entry = null; //already released and recycled on sendComplete
             if (msg != null) {
@@ -517,7 +520,6 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
             }
         }
 
-        HAVE_PENDING_READ_UPDATER.set(this, FALSE);
         brokerService.executor().schedule(this::readMoreEntries, 
waitTimeMillis, TimeUnit.MILLISECONDS);
     }
 
@@ -747,4 +749,209 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
     public ManagedCursor getCursor() {
         return cursor;
     }
-}
+
+    @Data
+    protected static class InFlightTask {
+        Position readPos;
+        int readingEntries;
+        volatile List<Entry> entries;
+        volatile int completedEntries;
+        volatile boolean skipReadResultDueToCursorRewind;
+        final String replicatorId;
+
+        public synchronized void incCompletedEntries() {
+            if (!CollectionUtils.isEmpty(entries) && completedEntries < 
entries.size()) {
+                completedEntries++;
+            } else {
+                log.error("Unexpected calling of increase completed entries. 
{}", this.toString());
+            }
+        }
+
+        synchronized void recycle(Position readStart, int readingEntries) {
+            this.readPos = readStart;
+            this.readingEntries = readingEntries;
+            this.entries = null;
+            this.completedEntries = 0;
+            this.skipReadResultDueToCursorRewind = false;
+        }
+
+        public InFlightTask(Position readPos, int readingEntries, String 
replicatorId) {
+            this.readPos = readPos;
+            this.readingEntries = readingEntries;
+            this.replicatorId = replicatorId;
+        }
+
+        public boolean isDone() {
+            if (entries == null) {
+                return false;
+            }
+            if (entries != null && entries.isEmpty()) {
+                return true;
+            }
+            return completedEntries >= entries.size();
+        }
+
+        @Override
+        public String toString() {
+            return "Replicator InFlightTask "
+                + "{replicatorId=" + replicatorId
+                + ", readPos=" + readPos
+                + ", readingEntries=" + readingEntries
+                + ", readoutEntries=" + (entries == null ? "-1" : 
entries.size())
+                + ", completedEntries=" + completedEntries
+                + ", skipReadResultDueToCursorRewound=" + 
skipReadResultDueToCursorRewind
+                + "}";
+        }
+    }
+
+    @VisibleForTesting
+    InFlightTask createOrRecycleInFlightTaskIntoQueue(Position readPos, int 
readingEntries) {
+        synchronized (inFlightTasks) {
+            // Reuse projects that has done.
+            if (!inFlightTasks.isEmpty()) {
+                InFlightTask first = inFlightTasks.peek();
+                if (first.isDone()) {
+                    // Remove from the first index, and add to the latest 
index.
+                    inFlightTasks.poll();
+                    first.recycle(readPos, readingEntries);
+                    inFlightTasks.add(first);
+                    return first;
+                }
+            }
+            // New project if nothing can be reused.
+            InFlightTask task = new InFlightTask(readPos, readingEntries, 
replicatorId);
+            inFlightTasks.add(task);
+            return task;
+        }
+    }
+
+    protected InFlightTask acquirePermitsIfNotFetchingSchema() {
+        synchronized (inFlightTasks) {
+            if (hasPendingRead()) {
+                log.info("[{}] Skip the reading because there is a pending 
read task", replicatorId);
+                return null;
+            }
+            if (waitForCursorRewindingRefCnf > 0) {
+                log.info("[{}] Skip the reading due to new detected schema", 
replicatorId);
+                return null;
+            }
+            if (state != Started) {
+                log.info("[{}] Skip the reading because producer has not 
started [{}]", replicatorId, state);
+                return null;
+            }
+            // Guarantee that there is a unique cursor reading task.
+            int permits = getPermitsIfNoPendingRead();
+            if (permits == 0) {
+                return null;
+            }
+            return 
createOrRecycleInFlightTaskIntoQueue(cursor.getReadPosition(), permits);
+        }
+    }
+
+    protected int getPermitsIfNoPendingRead() {
+        synchronized (inFlightTasks) {
+            for (InFlightTask task : inFlightTasks) {
+                boolean hasPendingCursorRead = task.readPos != null && 
task.entries == null;
+                if (hasPendingCursorRead) {
+                    // Skip the current reading if there is a pending cursor 
reading.
+                    return 0;
+                }
+            }
+            return producerQueueSize - getInflightMessagesCount();
+        }
+    }
+
+    protected int getInflightMessagesCount() {
+        int inFlight = 0;
+        synchronized (inFlightTasks) {
+            for (InFlightTask task : inFlightTasks) {
+                if (task.isDone()) {
+                    continue;
+                }
+                if (task.entries == null) {
+                    inFlight += task.readingEntries;
+                    continue;
+                }
+                inFlight += Math.max(task.entries.size() - 
task.completedEntries, 0);
+            }
+        }
+        return inFlight;
+    }
+
+    protected CompletableFuture<Void> beforeDisconnect() {
+        // Ensure no in-flight task.
+        synchronized (inFlightTasks) {
+            for (PersistentReplicator.InFlightTask task : inFlightTasks) {
+                if (!task.isDone() && 
task.readPos.compareTo(cursor.getManagedLedger().getLastConfirmedEntry()) < 0) {
+                    return CompletableFuture.failedFuture(new 
BrokerServiceException
+                            .TopicBusyException("Cannot close a replicator 
with backlog"));
+                }
+            }
+            
beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding.Disconnecting);
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
+    protected void afterDisconnected() {
+        doRewindCursor(false);
+    }
+
+    protected void 
beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding reason) {
+        synchronized (inFlightTasks) {
+            boolean hasCanceledPendingRead = cursor.cancelPendingReadRequest();
+            reasonOfWaitForCursorRewinding = reason;
+            waitForCursorRewindingRefCnf += 1;
+            cancelPendingReadTasks(hasCanceledPendingRead);
+        }
+    }
+
+    protected void doRewindCursor(boolean triggerReadMoreEntries) {
+        synchronized (inFlightTasks) {
+            cursor.rewind();
+            waitForCursorRewindingRefCnf -= 1;
+            reasonOfWaitForCursorRewinding = null;
+        }
+        if (triggerReadMoreEntries) {
+            readMoreEntries();
+        }
+    }
+
+    private void cancelPendingReadTasks(boolean canceledPendingRead) {
+        InFlightTask readingTask = null;
+        synchronized (inFlightTasks) {
+            for (InFlightTask task : inFlightTasks) {
+                task.setSkipReadResultDueToCursorRewind(true);
+                if (task.entries == null) {
+                    if (readingTask != null) {
+                        log.error("Unexpected state because there are more 
than one tasks' state is pending read. {}",
+                            inFlightTasks);
+                    }
+                    readingTask = task;
+                }
+            }
+            // Correct state to avoid a replicate stuck because a pending 
reading task occupies permits.
+            // There is at most one reading task.
+            // The task will never receive a read completed callback if cancel 
pending reading successfully.
+            if (canceledPendingRead && readingTask != null) {
+                readingTask.setEntries(Collections.emptyList());
+            }
+        }
+    }
+
+    @Override
+    public void beforeTerminate() {
+        
beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding.Terminating);
+    }
+
+    protected boolean hasPendingRead() {
+        synchronized (inFlightTasks) {
+            for (InFlightTask task : inFlightTasks) {
+                if (task.readPos != null && task.entries == null) {
+                    // Skip the current reading if there is a pending cursor 
reading.
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index bda9f5e29c1..f21a18c56c0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -646,7 +646,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 decrementPendingWriteOpsAndCheck();
                 break;
             default:
-                publishContext.completed(new 
MessageDeduplication.MessageDupUnknownException(), -1, -1);
+                publishContext.completed(
+                        new MessageDeduplication.MessageDupUnknownException(
+                                topic, publishContext.getProducerName()), -1, 
-1);
                 decrementPendingWriteOpsAndCheck();
 
         }
@@ -884,8 +886,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     private synchronized CompletableFuture<Void> 
closeReplProducersIfNoBacklog() {
         List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
-        replicators.forEach((region, replicator) -> 
closeFutures.add(replicator.disconnect(true, true)));
-        shadowReplicators.forEach((__, replicator) -> 
closeFutures.add(replicator.disconnect(true, true)));
+        replicators.forEach((region, replicator) -> 
closeFutures.add(replicator.disconnect()));
+        shadowReplicators.forEach((__, replicator) -> 
closeFutures.add(replicator.disconnect()));
         return FutureUtil.waitForAll(closeFutures);
     }
 
@@ -4330,7 +4332,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 decrementPendingWriteOpsAndCheck();
                 break;
             default:
-                publishContext.completed(new 
MessageDeduplication.MessageDupUnknownException(), -1, -1);
+                publishContext.completed(new 
MessageDeduplication.MessageDupUnknownException(
+                        topic, publishContext.getProducerName()), -1, -1);
                 decrementPendingWriteOpsAndCheck();
 
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
index cb2e0457e36..a334fd86dd0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
@@ -55,15 +55,23 @@ public class ShadowReplicator extends PersistentReplicator {
     }
 
     @Override
-    protected boolean replicateEntries(List<Entry> entries) {
+    protected boolean replicateEntries(List<Entry> entries, InFlightTask 
inFlightTask) {
         boolean atLeastOneMessageSentForReplication = false;
 
         try {
             // This flag is set to true when we skip at least one local 
message,
             // in order to skip remaining local messages.
             boolean isLocalMessageSkippedOnce = false;
+            boolean skipRemainingMessages = 
inFlightTask.isSkipReadResultDueToCursorRewind();
             for (int i = 0; i < entries.size(); i++) {
                 Entry entry = entries.get(i);
+                // Skip the messages since the replicator need to fetch the 
schema info to replicate the schema to the
+                // remote cluster. Rewind the cursor first and continue the 
message read after fetched the schema.
+                if (skipRemainingMessages) {
+                    inFlightTask.incCompletedEntries();
+                    entry.release();
+                    continue;
+                }
                 int length = entry.getLength();
                 ByteBuf headersAndPayload = entry.getDataBuffer();
                 MessageImpl msg;
@@ -73,6 +81,7 @@ public class ShadowReplicator extends PersistentReplicator {
                     log.error("[{}] Failed to deserialize message at {} 
(buffer size: {}): {}", replicatorId,
                             entry.getPosition(), length, t.getMessage(), t);
                     cursor.asyncDelete(entry.getPosition(), this, 
entry.getPosition());
+                    inFlightTask.incCompletedEntries();
                     entry.release();
                     continue;
                 }
@@ -84,6 +93,7 @@ public class ShadowReplicator extends PersistentReplicator {
                                 replicatorId, entry.getPosition(), 
msg.getReplicateTo());
                     }
                     cursor.asyncDelete(entry.getPosition(), this, 
entry.getPosition());
+                    inFlightTask.incCompletedEntries();
                     entry.release();
                     msg.recycle();
                     continue;
@@ -97,6 +107,7 @@ public class ShadowReplicator extends PersistentReplicator {
                                 replicatorId, entry.getPosition());
                     }
                     isLocalMessageSkippedOnce = true;
+                    inFlightTask.incCompletedEntries();
                     entry.release();
                     msg.recycle();
                     continue;
@@ -118,8 +129,7 @@ public class ShadowReplicator extends PersistentReplicator {
                 headersAndPayload.retain();
 
                 // Increment pending messages for messages produced locally
-                PENDING_MESSAGES_UPDATER.incrementAndGet(this);
-                producer.sendAsync(msg, ProducerSendCallback.create(this, 
entry, msg));
+                producer.sendAsync(msg, ProducerSendCallback.create(this, 
entry, msg, inFlightTask));
                 atLeastOneMessageSentForReplication = true;
             }
         } catch (Exception e) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
index 374296e6867..afb9cadd55f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
@@ -162,6 +162,9 @@ public class AbstractReplicatorTest {
             return false;
         }
 
+        @Override
+        protected void beforeTerminate() {}
+
         @Override
         public long getNumberOfEntriesInBacklog() {
             return 0;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceInternalMethodInvoker.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceInternalMethodInvoker.java
new file mode 100644
index 00000000000..f33642ab13a
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceInternalMethodInvoker.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+public class BrokerServiceInternalMethodInvoker {
+
+    public static void replicatorSetState(AbstractReplicator replicator, 
AbstractReplicator.State state) {
+        replicator.state = state;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 1244300378a..57abd267ad6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static 
org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker.ensureNoBacklogByInflightTask;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -34,6 +35,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import io.netty.channel.Channel;
 import io.netty.util.concurrent.FastThreadLocalThread;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
@@ -71,11 +73,13 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.resources.ClusterResources;
 import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
+import 
org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -83,10 +87,14 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.common.api.proto.CommandSendReceipt;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -1367,10 +1375,10 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         }
         // Verify: no individual acks.
         Awaitility.await().untilAsserted(() -> {
-           PersistentTopic persistentTopic2 =
-                   (PersistentTopic) 
pulsar2.getBrokerService().getTopic(topicName, false).join().get();
-           assertTrue(
-           
persistentTopic2.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(true)
 > 0);
+            PersistentTopic persistentTopic2 =
+                    (PersistentTopic) 
pulsar2.getBrokerService().getTopic(topicName, false).join().get();
+            assertTrue(
+                    
persistentTopic2.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(true)
 > 0);
             PersistentTopic persistentTopic1 =
                     (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
             ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic1.getManagedLedger();
@@ -1507,4 +1515,142 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
             assertEquals(valueLocal, publishRateAddLocal2);
         });
     }
+
+    @Test
+    public void testConcurrencyReplicationReadEntries() throws Exception {
+        String originalReplicationStartAt = 
pulsar1.getConfig().getReplicationStartAt();
+        int originalDispatcherMaxReadBatchSize = 
pulsar1.getConfig().getDispatcherMaxReadBatchSize();
+        int originalDispatcherMinReadBatchSize = 
pulsar1.getConfig().getDispatcherMinReadBatchSize();
+        int originalReplicationProducerQueueSize = 
pulsar1.getConfig().getReplicationProducerQueueSize();
+        admin1.brokers().updateDynamicConfiguration("replicationStartAt", 
"earliest");
+        
admin1.brokers().updateDynamicConfiguration("dispatcherMaxReadBatchSize", "10");
+        
admin1.brokers().updateDynamicConfiguration("dispatcherMinReadBatchSize", "10");
+        
admin1.brokers().updateDynamicConfiguration("replicationProducerQueueSize", 
"10");
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(pulsar1.getConfig().getReplicationStartAt(), 
"earliest");
+            assertEquals(pulsar1.getConfig().getDispatcherMaxReadBatchSize(), 
10);
+            assertEquals(pulsar1.getConfig().getDispatcherMinReadBatchSize(), 
10);
+            
assertEquals(pulsar1.getConfig().getReplicationProducerQueueSize(), 10);
+        });
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+        final String subscriptionName = "s1";
+        admin1.topics().createNonPartitionedTopic(topicName);
+        admin2.topics().createNonPartitionedTopic(topicName);
+        admin1.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+        admin2.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+
+        // Publish messages.
+        Producer<byte[]> producer1 = 
client1.newProducer().topic(topicName).enableBatching(false).create();
+        CompletableFuture<MessageId> latestSend = null;
+        for (int i = 0; i < 1000; i++) {
+            latestSend = producer1.sendAsync(new byte[]{1});
+        }
+        latestSend.join();
+        log.info("Cluster: {}, Publish finished", cluster1);
+
+        // Inject two delay:
+        // 1. delay publish responding,
+        // 2. delay switch concurrent mechanism of 
"replicator.readMoreEntries".
+        ClientBuilderImpl clientBuilder2 = (ClientBuilderImpl) 
PulsarClient.builder().serviceUrl(url2.toString());
+        PulsarClient injectedReplClient2 = 
InjectedClientCnxClientBuilder.create(clientBuilder2,
+                (conf, eventLoopGroup) -> {
+           return new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) 
{
+
+               @Override
+               protected void handleSendReceipt(CommandSendReceipt 
sendReceipt) {
+                   ctx().executor().schedule(() -> 
super.handleSendReceipt(sendReceipt), 3600, TimeUnit.SECONDS);
+               }
+
+               @Override
+               protected Channel channel() {
+                   boolean delay = false;
+                   StackTraceElement[] stacks = 
Thread.currentThread().getStackTrace();
+                   for (StackTraceElement stack : stacks) {
+                       if (stack.toString().contains("readMoreEntries")) {
+                           delay = true;
+                           break;
+                       }
+                   }
+                   if (!delay) {
+                       return super.ctx().channel();
+                   }
+                   try {
+                       Thread.sleep(3000);
+                   } catch (InterruptedException e) {
+                       throw new RuntimeException(e);
+                   }
+                   return super.ctx().channel();
+               }
+            };
+        });
+        PulsarClient originalReplClient2 = 
pulsar1.getBrokerService().getReplicationClients()
+                .put(cluster2, injectedReplClient2);
+
+        // Start replication and inject race conditions of 
"replicator.readMoreEntries".
+        admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+        PersistentTopic persistentTopic1 =
+                (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+        waitReplicatorStarted(topicName);
+        GeoPersistentReplicator replicator =
+                (GeoPersistentReplicator) 
persistentTopic1.getReplicators().values().iterator().next();
+        for (int i = 0; i < 10; i++) {
+            new Thread(() -> {
+                
BrokerServicePersistInternalMethodInvoker.replicatorReadMoreEntries(replicator);
+            }).start();
+        }
+
+        // Verify: after a few seconds, there is no "pending queue is full" 
error.
+        Thread.sleep(10_000);
+        assertEquals(replicator.producer.getPendingQueueFullCount(), 0);
+
+        // cleanup.
+        producer1.close();
+        admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+        waitReplicatorStopped(topicName);
+        admin1.topics().delete(topicName, false);
+        if (originalReplClient2 == null) {
+            
pulsar1.getBrokerService().getReplicationClients().remove(cluster2);
+        } else {
+            pulsar1.getBrokerService().getReplicationClients().put(cluster2, 
originalReplClient2);
+        }
+        injectedReplClient2.close();
+        admin1.brokers().updateDynamicConfiguration("replicationStartAt", 
originalReplicationStartAt);
+        
admin1.brokers().updateDynamicConfiguration("dispatcherMaxReadBatchSize",
+                originalDispatcherMaxReadBatchSize + "");
+        
admin1.brokers().updateDynamicConfiguration("dispatcherMinReadBatchSize",
+                originalDispatcherMinReadBatchSize + "");
+        
admin1.brokers().updateDynamicConfiguration("replicationProducerQueueSize",
+                originalReplicationProducerQueueSize + "");
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(pulsar1.getConfig().getReplicationStartAt(), 
originalReplicationStartAt);
+            assertEquals(pulsar1.getConfig().getDispatcherMaxReadBatchSize(), 
originalDispatcherMaxReadBatchSize);
+            assertEquals(pulsar1.getConfig().getDispatcherMinReadBatchSize(), 
originalDispatcherMinReadBatchSize);
+            
assertEquals(pulsar1.getConfig().getReplicationProducerQueueSize(), 
originalReplicationProducerQueueSize);
+        });
+    }
+
+    @Test
+    public void 
testReplicatorsInflightTaskListIsEmptyAfterReplicationFinished() throws 
Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
+        final String subscriptionName = "s1";
+        admin2.topics().createNonPartitionedTopic(topicName);
+        admin2.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+        admin1.topics().createNonPartitionedTopic(topicName);
+        admin1.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+
+        // Publish messages.
+        Producer<byte[]> producer1 = 
client1.newProducer().topic(topicName).enableBatching(false).create();
+        CompletableFuture<MessageId> latestSend = null;
+        for (int i = 0; i < 100; i++) {
+            latestSend = producer1.sendAsync(new byte[]{1});
+        }
+        latestSend.join();
+        log.info("Cluster: {}, Publish finished", cluster1);
+        producer1.close();
+
+        // Start replication.
+        waitForReplicationTaskFinish(topicName);
+        // Verify: all inflight tasks are done.
+        ensureNoBacklogByInflightTask(getReplicator(topicName));
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index d222abde3a3..a4df6752acd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -42,6 +42,9 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.ClientBuilder;
@@ -517,7 +520,7 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
         Optional<PartitionedTopicMetadata> metadata = 
pulsar1.getPulsarResources().getNamespaceResources()
                 .getPartitionedTopicResources()
                 .getPartitionedTopicMetadataAsync(topicNameObj).join();
-        Function<Replicator, Boolean> ensureNoBacklog = new 
Function<Replicator,Boolean>() {
+        Function<Replicator, Boolean> ensureNoBacklog = new 
Function<Replicator, Boolean>() {
 
             @Override
             public Boolean apply(Replicator replicator) {
@@ -543,8 +546,27 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
             PersistentTopic persistentTopic =
                     (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
             persistentTopic.getReplicators().values().forEach(replicator -> {
-               assertTrue(ensureNoBacklog.apply(replicator));
+                assertTrue(ensureNoBacklog.apply(replicator));
             });
         }
     }
+
+    protected void waitForReplicationTaskFinish(String topicName) throws 
Exception {
+        PersistentTopic persistentTopic1 = (PersistentTopic) 
pulsar1.getBrokerService()
+                .getTopic(topicName, false).join().get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic1.getManagedLedger();
+        Position lac = ml.getLastConfirmedEntry();
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ml.getCursors().get("pulsar.repl.r2");
+        Awaitility.await().untilAsserted(() -> {
+            if (cursor.getName().startsWith("pulsar.repl")) {
+                assertTrue(cursor.getMarkDeletedPosition().compareTo(lac) >= 
0);
+            }
+        });
+    }
+
+    protected GeoPersistentReplicator getReplicator(String topic) {
+        waitReplicatorStarted(topic);
+        return (GeoPersistentReplicator) 
pulsar1.getBrokerService().getTopic(topic, false).join().get()
+                .getReplicators().get(cluster2);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
index e3b61e09c23..ed6e57214f4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -276,4 +276,16 @@ public class OneWayReplicatorUsingGlobalPartitionedTest 
extends OneWayReplicator
     public void testTopicPoliciesReplicationRule() throws Exception {
         super.testTopicPoliciesReplicationRule();
     }
+
+    @Override
+    @Test
+    public void 
testReplicatorsInflightTaskListIsEmptyAfterReplicationFinished() throws 
Exception {
+        super.testReplicatorsInflightTaskListIsEmptyAfterReplicationFinished();
+    }
+
+    @Override
+    @Test(enabled = false)
+    public void testConcurrencyReplicationReadEntries() throws Exception {
+        super.testConcurrencyReplicationReadEntries();
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 450370a60d0..a7b330b3afa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -230,9 +230,21 @@ public class OneWayReplicatorUsingGlobalZKTest extends 
OneWayReplicatorTest {
         super.testIncompatibleMultiVersionSchema(enableDeduplication);
     }
 
-    @Override
+
     @Test
     public void testTopicPoliciesReplicationRule() throws Exception {
         super.testTopicPoliciesReplicationRule();
     }
+
+    @Override
+    @Test
+    public void 
testReplicatorsInflightTaskListIsEmptyAfterReplicationFinished() throws 
Exception {
+        super.testReplicatorsInflightTaskListIsEmptyAfterReplicationFinished();
+    }
+
+    @Override
+    @Test(enabled = false)
+    public void testConcurrencyReplicationReadEntries() throws Exception {
+        super.testConcurrencyReplicationReadEntries();
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
index 0f527993bba..aa0c0ec7c23 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
@@ -26,14 +26,12 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
 import io.opentelemetry.api.common.Attributes;
 import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
 import java.lang.reflect.Field;
-import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -42,12 +40,12 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import 
org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
@@ -812,15 +810,17 @@ public class ReplicatedSubscriptionTest extends 
ReplicatorTestBase {
         }
 
         // unsubscribe replicated subscription in r2
+        admin1.topics().unload(topicName);
         admin2.topics().deleteSubscription(topicName, subscriptionName);
         final PersistentTopic topic2 = (PersistentTopic) 
pulsar2.getBrokerService().getTopic(topicName, false).join().get();
-        assertNull(topic2.getSubscription(subscriptionName));
 
         // close replicator producer in r2
-        final Method closeReplProducersIfNoBacklog = 
PersistentTopic.class.getDeclaredMethod("closeReplProducersIfNoBacklog", null);
-        closeReplProducersIfNoBacklog.setAccessible(true);
-        ((CompletableFuture<Void>) 
closeReplProducersIfNoBacklog.invoke(topic2, null)).join();
-        assertFalse(topic2.getReplicators().get("r1").isConnected());
+        GeoPersistentReplicator replicator2 = (GeoPersistentReplicator) 
topic2.getReplicators().get("r1");
+        Awaitility.await().untilAsserted(() -> {
+            replicator2.disconnect().join();
+            assertEquals(replicator2.getState(), 
AbstractReplicator.State.Disconnected);
+            assertFalse(topic2.getReplicators().get("r1").isConnected());
+        });
 
         // send messages in r1
         int numMessages = 6;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index d1d7358f346..3419c477d65 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -20,13 +20,13 @@ package org.apache.pulsar.broker.service;
 
 import static 
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
 import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
+import static 
org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker.ensureNoBacklogByInflightTask;
 import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricDoubleGaugeValue;
 import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -112,7 +112,6 @@ import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
 import org.apache.pulsar.schema.Schemas;
 import org.awaitility.Awaitility;
-import org.awaitility.reflect.WhiteboxImpl;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -463,9 +462,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
         // Verify "pendingMessages" still is correct even if error occurs.
         PersistentReplicator replicator = ensureReplicatorCreated(topic, 
pulsar1);
         waitReplicateFinish(topic, admin1);
-        Awaitility.await().untilAsserted(() -> {
-            assertEquals((int) WhiteboxImpl.getInternalState(replicator, 
"pendingMessages"), 0);
-        });
+        ensureNoBacklogByInflightTask(replicator);
     }
 
     @Test
@@ -1783,11 +1780,11 @@ public class ReplicatorTest extends ReplicatorTestBase {
         Producer<byte[]> persistentProducer1 = 
client1.newProducer().topic(topic.toString()).create();
         // Send V1 message, which will be replicated to the remote cluster by 
the replicator.
         persistentProducer1.send("V1".getBytes());
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar1.getBrokerService().getTopicReference(topic.toString()).get();
         waitReplicateFinish(topic, admin1);
 
         // Pause replicator
-        PersistentTopic persistentTopic =
-                (PersistentTopic) 
pulsar1.getBrokerService().getTopicReference(topic.toString()).get();
         persistentTopic.getReplicators().forEach((cluster, replicator) -> {
             PersistentReplicator persistentReplicator = (PersistentReplicator) 
replicator;
             pauseReplicator(persistentReplicator);
@@ -1802,7 +1799,15 @@ public class ReplicatorTest extends ReplicatorTestBase {
         // Start replicator
         persistentTopic.getReplicators().forEach((cluster, replicator) -> {
             PersistentReplicator persistentReplicator = (PersistentReplicator) 
replicator;
-            persistentReplicator.startProducer();
+            resumeReplicator(persistentReplicator);
+            Awaitility.await().untilAsserted(() -> {
+                CompletableFuture<Optional<Topic>> topic2 =
+                        pulsar2.getBrokerService().getTopic(topic.toString(), 
false);
+                assertTrue(topic2 != null && topic2.isDone() && 
topic2.get().isPresent());
+                assertEquals(persistentReplicator.getState(), 
AbstractReplicator.State.Started);
+            });
+            assertEquals(persistentReplicator.getState(), 
AbstractReplicator.State.Started);
+            ensureNoBacklogByInflightTask(persistentReplicator);
         });
         waitReplicateFinish(topic, admin1);
 
@@ -1956,9 +1961,13 @@ public class ReplicatorTest extends ReplicatorTestBase {
         Awaitility.await().untilAsserted(() -> {
             assertTrue(replicator.isConnected());
         });
-        replicator.closeProducerAsync(true);
-        Awaitility.await().untilAsserted(() -> {
-            assertFalse(replicator.isConnected());
+        Awaitility.await().until(() -> {
+            replicator.disconnect().join();
+            return true;
         });
     }
+
+    private void resumeReplicator(PersistentReplicator replicator) {
+        replicator.startProducer();
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BrokerServicePersistInternalMethodInvoker.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BrokerServicePersistInternalMethodInvoker.java
new file mode 100644
index 00000000000..8c26aece764
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BrokerServicePersistInternalMethodInvoker.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import java.util.concurrent.TimeUnit;
+import org.awaitility.Awaitility;
+
+public class BrokerServicePersistInternalMethodInvoker {
+
+    public static void replicatorReadMoreEntries(GeoPersistentReplicator 
replicator) {
+        replicator.readMoreEntries();
+    }
+
+    public static void ensureNoBacklogByInflightTask(PersistentReplicator 
replicator) {
+        Awaitility.await().atMost(20, TimeUnit.SECONDS).until(() -> {
+            synchronized (replicator.inFlightTasks) {
+                for (PersistentReplicator.InFlightTask task : 
replicator.inFlightTasks) {
+                    if 
(task.readPos.compareTo(replicator.cursor.getManagedLedger().getLastConfirmedEntry())
 >= 0) {
+                        continue;
+                    }
+                    if (!task.isDone()) {
+                        return false;
+                    }
+                }
+            }
+            return true;
+        });
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
new file mode 100644
index 00000000000..ffbb4eb34bd
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.mockito.Mockito.mock;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.AbstractReplicator;
+import org.apache.pulsar.broker.service.BrokerServiceInternalMethodInvoker;
+import org.apache.pulsar.broker.service.OneWayReplicatorTestBase;
+import 
org.apache.pulsar.broker.service.persistent.PersistentReplicator.InFlightTask;
+import org.apache.pulsar.client.api.MessageId;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class PersistentReplicatorInflightTaskTest extends 
OneWayReplicatorTestBase {
+
+    private final String topicName = 
BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
+    private final String subscriptionName = "s1";
+
+    @Override
+    @BeforeClass(alwaysRun = true, timeOut = 300000)
+    public void setup() throws Exception {
+        super.setup();
+        createTopics();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true, timeOut = 300000)
+    public void cleanup() throws Exception {
+        super.cleanup();
+    }
+
+    private void createTopics() throws Exception {
+        admin2.topics().createNonPartitionedTopic(topicName);
+        admin2.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+        admin1.topics().createNonPartitionedTopic(topicName);
+        admin1.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+    }
+
+    @Test
+    public void testCreateOrRecycleInFlightTaskIntoQueue() throws Exception {
+        log.info("Starting testCreateOrRecycleInFlightTaskIntoQueue");
+
+        // Get the replicator for the test topic
+        PersistentReplicator replicator = getReplicator(topicName);
+        Assert.assertNotNull(replicator, "Replicator should not be null");
+
+        // Get access to the inFlightTasks list for verification
+        LinkedList<InFlightTask> inFlightTasks = replicator.inFlightTasks;
+        Assert.assertNotNull(inFlightTasks, "InFlightTasks list should not be 
null");
+
+        // Clear any existing tasks to start with a clean state
+        List<InFlightTask> originalTasks = new ArrayList<>(inFlightTasks);
+        inFlightTasks.clear();
+
+        // Test Case 1: Create a new task when the queue is empty
+        Position position1 = PositionFactory.create(1, 1);
+        Assert.assertNotNull(position1, "Position should not be null");
+        InFlightTask task1 = 
replicator.createOrRecycleInFlightTaskIntoQueue(position1, 10);
+        // Verify a new task was created and added to the queue
+        Assert.assertNotNull(task1, "Task should not be null");
+        Assert.assertEquals(inFlightTasks.size(), 1, "Queue should have one 
task");
+        Assert.assertEquals(task1.getReadPos(), position1, "Task should have 
the correct position");
+        Assert.assertEquals(task1.getReadingEntries(), 10, "Task should have 
the correct reading entries count");
+        // Mark the task as done to test recycling
+        task1.setEntries(Collections.emptyList());
+
+        // Test Case 2: Recycle an existing task
+        Position position2 = PositionFactory.create(2, 2);
+        Assert.assertNotNull(position2, "Position should not be null");
+        InFlightTask task2 = 
replicator.createOrRecycleInFlightTaskIntoQueue(position2, 20);
+        // Verify the task was recycled
+        Assert.assertNotNull(task2, "Task should not be null");
+        Assert.assertEquals(inFlightTasks.size(), 1, "Queue should still have 
one task");
+        Assert.assertEquals(task2.getReadPos(), position2, "Task should have 
the updated position");
+        Assert.assertEquals(task2.getReadingEntries(), 20, "Task should have 
the updated reading entries count");
+
+        // Test Case 3: Create a new task when no tasks can be recycled
+        task2.setEntries(null); // Make the task not done
+        Position position3 = PositionFactory.create(3, 3);
+        Assert.assertNotNull(position3, "Position should not be null");
+        InFlightTask task3 = 
replicator.createOrRecycleInFlightTaskIntoQueue(position3, 30);
+        // Verify a new task was created
+        Assert.assertNotNull(task3, "Task should not be null");
+        Assert.assertEquals(inFlightTasks.size(), 2, "Queue should have two 
tasks");
+        Assert.assertEquals(task3.getReadPos(), position3, "Task should have 
the correct position");
+        Assert.assertEquals(task3.getReadingEntries(), 30, "Task should have 
the correct reading entries count");
+
+        // cleanup.
+        log.info("Completed testCreateOrRecycleInFlightTaskIntoQueue");
+        inFlightTasks.clear();
+        inFlightTasks.addAll(originalTasks);
+    }
+
+    @Test
+    public void testGetInflightMessagesCount() throws Exception {
+        log.info("Starting testGetInflightMessagesCount");
+
+        // Get the replicator for the test topic
+        PersistentReplicator replicator = getReplicator(topicName);
+        Assert.assertNotNull(replicator, "Replicator should not be null");
+
+        // Get access to the inFlightTasks list for setup
+        LinkedList<InFlightTask> inFlightTasks = replicator.inFlightTasks;
+        Assert.assertNotNull(inFlightTasks, "InFlightTasks list should not be 
null");
+
+        // Save original tasks and clear for testing
+        List<InFlightTask> originalTasks = new ArrayList<>(inFlightTasks);
+        inFlightTasks.clear();
+
+        try {
+            // Test Case 1: no task.
+            Assert.assertEquals(replicator.getInflightMessagesCount(), 0);
+
+            // Test Case 2: cursor reading.
+            Position position1 = PositionFactory.create(1, 1);
+            InFlightTask task1 = new InFlightTask(position1, 3, "");
+            inFlightTasks.add(task1);
+            Assert.assertEquals(replicator.getInflightMessagesCount(), 3);
+
+            // Test Case 3: read completed.
+            inFlightTasks.clear();
+            Position position2 = PositionFactory.create(2, 2);
+            InFlightTask task2 = new InFlightTask(position2, 3, "");
+            task2.setEntries(Arrays.asList(mock(Entry.class), 
mock(Entry.class)));
+            inFlightTasks.add(task2);
+            Assert.assertEquals(replicator.getInflightMessagesCount(), 2);
+
+            // Test Case 4: Task with some completed entries
+            task2.setCompletedEntries(1);
+            Assert.assertEquals(replicator.getInflightMessagesCount(), 1);
+
+            // Test Case 5: Task with all entries completed
+            task2.setCompletedEntries(2);
+            Assert.assertEquals(replicator.getInflightMessagesCount(), 0);
+
+            // Test Case 6: Multiple tasks with different states
+            // task2 has 0 in-flight (2 completed out of 2)
+            // task3 has 2 in-flight (1 completed out of 3)
+            Position position3 = PositionFactory.create(3, 3);
+            InFlightTask task3 = new InFlightTask(position3, 4, "");
+            task3.setEntries(Arrays.asList(mock(Entry.class), 
mock(Entry.class), mock(Entry.class)));
+            task3.setCompletedEntries(1);
+            inFlightTasks.add(task3);
+            Assert.assertEquals(replicator.getInflightMessagesCount(), 2);
+
+            // Test Case 7: Multiple tasks with different states
+            // task2 has 0 in-flight (2 completed out of 2)
+            // task3 has 2 in-flight (1 completed out of 3)
+            // task4 has 0 in-flight (empty readoutEntries)
+            Position position4 = PositionFactory.create(4, 4);
+            InFlightTask task4 = new InFlightTask(position4, 2, "");
+            task4.setEntries(Collections.emptyList());
+            inFlightTasks.add(task4);
+            Assert.assertEquals(replicator.getInflightMessagesCount(), 2);
+
+            log.info("Completed testGetInflightMessagesCount");
+        } finally {
+            // Restore original tasks
+            inFlightTasks.clear();
+            inFlightTasks.addAll(originalTasks);
+        }
+    }
+
+    @Test
+    public void testGetPermitsIfNoPendingRead() throws Exception {
+        log.info("Starting testGetPermitsIfNoPendingRead");
+
+        // Get the replicator for the test topic
+        PersistentReplicator replicator = getReplicator(topicName);
+        Assert.assertNotNull(replicator, "Replicator should not be null");
+
+        // Get access to the inFlightTasks list for setup
+        LinkedList<InFlightTask> inFlightTasks = replicator.inFlightTasks;
+        Assert.assertNotNull(inFlightTasks, "InFlightTasks list should not be 
null");
+
+        // Save original tasks and clear for testing
+        List<InFlightTask> originalTasks = new ArrayList<>(inFlightTasks);
+        inFlightTasks.clear();
+
+        try {
+            // Test Case 1: Empty queue - should return producerQueueSize 
(1000)
+            Assert.assertEquals(replicator.getPermitsIfNoPendingRead(), 1000,
+                    "With empty queue, should return full producerQueueSize");
+
+            // Test Case 2: Task with pending read (readPos != null && 
readoutEntries == null)
+            Position position1 = PositionFactory.create(1, 1);
+            InFlightTask pendingReadTask = new InFlightTask(position1, 5, "");
+            // Don't set readoutEntries to simulate pending read
+            inFlightTasks.add(pendingReadTask);
+            Assert.assertEquals(replicator.getPermitsIfNoPendingRead(), 0,
+                    "With pending read task, should return 0");
+
+            // Test Case 3: Task with completed read but in-flight messages
+            inFlightTasks.clear();
+            Position position2 = PositionFactory.create(2, 2);
+            InFlightTask completedReadTask = new InFlightTask(position2, 5, 
"");
+            completedReadTask.setEntries(Arrays.asList(
+                    mock(Entry.class), mock(Entry.class), mock(Entry.class)));
+            inFlightTasks.add(completedReadTask);
+            Assert.assertEquals(replicator.getPermitsIfNoPendingRead(), 1000 - 
3,
+                    "With completed read task, should return producerQueueSize 
- inflightMessages");
+
+            // Test Case 4: Multiple tasks with no pending reads
+            Position position3 = PositionFactory.create(3, 3);
+            InFlightTask task2 = new InFlightTask(position3, 5, "");
+            task2.setEntries(Arrays.asList(mock(Entry.class), 
mock(Entry.class)));
+            task2.setCompletedEntries(1); // 1 in-flight message
+            inFlightTasks.add(task2);
+            // Now we have 3 + 1 = 4 in-flight messages
+            Assert.assertEquals(replicator.getPermitsIfNoPendingRead(), 1000 - 
4,
+                    "With multiple tasks, should return producerQueueSize - 
total inflightMessages");
+
+            // Test Case 5: Multiple tasks including one with pending read
+            Position position4 = PositionFactory.create(4, 4);
+            InFlightTask pendingReadTask2 = new InFlightTask(position4, 5, "");
+            // Don't set readoutEntries to simulate pending read
+            inFlightTasks.add(pendingReadTask2);
+            Assert.assertEquals(replicator.getPermitsIfNoPendingRead(), 0,
+                    "With any pending read task, should return 0 regardless of 
other tasks");
+
+            log.info("Completed testGetPermitsIfNoPendingRead");
+        } finally {
+            // Restore original tasks
+            inFlightTasks.clear();
+            inFlightTasks.addAll(originalTasks);
+        }
+    }
+
+    @Test
+    public void testAcquirePermitsIfNotFetchingSchema() throws Exception {
+        log.info("Starting testAcquirePermitsIfNotFetchingSchema");
+        // Get the replicator for the test topic
+        PersistentReplicator replicator = getReplicator(topicName);
+        Assert.assertNotNull(replicator, "Replicator should not be null");
+
+        // Get access to the inFlightTasks list for setup
+        LinkedList<InFlightTask> inFlightTasks = replicator.inFlightTasks;
+        Assert.assertNotNull(inFlightTasks, "InFlightTasks list should not be 
null");
+
+        // Save original tasks and clear for testing
+        List<InFlightTask> originalTasks = new ArrayList<>(inFlightTasks);
+        inFlightTasks.clear();
+
+        // Save original state
+        int originalWaitForCursorRewinding = 
replicator.waitForCursorRewindingRefCnf;
+        AbstractReplicator.State originalState = replicator.getState();
+
+        try {
+            // Test Case 1: Normal case - no pending read, not waiting for 
cursor rewinding, state is Started
+            // Should return a new InFlightTask
+            // First, check the current permits available
+            int expectedPermits = replicator.getPermitsIfNoPendingRead();
+            Assert.assertTrue(expectedPermits > 0, "Should have available 
permits for the test");
+            InFlightTask task1 = 
replicator.acquirePermitsIfNotFetchingSchema();
+            Assert.assertNotNull(task1, "Should return a new InFlightTask in 
normal case");
+            Assert.assertNotNull(task1.getReadPos(), "Task should have a read 
position");
+            Assert.assertEquals(task1.getReadingEntries(), expectedPermits,
+                    "Task readingEntries should equal the number of permits 
available");
+            Assert.assertTrue(inFlightTasks.contains(task1),
+                    "Task should be added to the inFlightTasks list");
+
+            // Test Case 2: With pending read - should return null
+            inFlightTasks.clear();
+            Position position1 = PositionFactory.create(1, 1);
+            InFlightTask pendingReadTask = new InFlightTask(position1, 5, "");
+            // Don't set readoutEntries to simulate pending read
+            inFlightTasks.add(pendingReadTask);
+            InFlightTask task2 = 
replicator.acquirePermitsIfNotFetchingSchema();
+            Assert.assertNull(task2, "Should return null when there is a 
pending read");
+
+            // Test Case 3: With waitForCursorRewinding=true - should return 
null
+            inFlightTasks.clear();
+            replicator.waitForCursorRewindingRefCnf = 1;
+            InFlightTask task3 = 
replicator.acquirePermitsIfNotFetchingSchema();
+            Assert.assertNull(task3, "Should return null when waiting for 
cursor rewinding");
+            // Reset for next test
+            replicator.waitForCursorRewindingRefCnf = 0;
+
+            // Test Case 4: With state != Started - should return null
+            // We need to use reflection to modify the state since it's 
protected by AtomicReferenceFieldUpdater
+            BrokerServiceInternalMethodInvoker.replicatorSetState(replicator, 
AbstractReplicator.State.Starting);
+            InFlightTask task4 = 
replicator.acquirePermitsIfNotFetchingSchema();
+            Assert.assertNull(task4, "Should return null when state is not 
Started");
+            // Reset state for next test
+            BrokerServiceInternalMethodInvoker.replicatorSetState(replicator, 
AbstractReplicator.State.Started);
+
+            // Test Case 5: With limited permits - verify readingEntries is 
set correctly
+            inFlightTasks.clear();
+            // Add a task with some in-flight messages to reduce available 
permits
+            Position positionLimited = PositionFactory.create(10, 10);
+            InFlightTask limitedTask = new InFlightTask(positionLimited, 5, 
"");
+            // Add enough entries to leave just a small number of permits 
(e.g., 10)
+            List<Entry> limitedEntries = new ArrayList<>();
+            int entriesCount = 990;
+            for (int j = 0; j < entriesCount; j++) {
+                limitedEntries.add(mock(Entry.class));
+            }
+            limitedTask.setEntries(limitedEntries);
+            inFlightTasks.add(limitedTask);
+            // Check that we have limited permits available
+            int limitedPermits = replicator.getPermitsIfNoPendingRead();
+            Assert.assertTrue(limitedPermits > 0 && limitedPermits < 20,
+                    "Should have a small number of permits available for 
testing");
+            // Now acquire permits and verify readingEntries matches the 
limited permits
+            InFlightTask task5 = 
replicator.acquirePermitsIfNotFetchingSchema();
+            Assert.assertNotNull(task5, "Should return a task with limited 
permits");
+            Assert.assertEquals(task5.getReadingEntries(), limitedPermits,
+                    "Task readingEntries should equal the limited number of 
permits available");
+
+            // Test Case 6: With permits=0 - should return null
+            inFlightTasks.clear();
+            // Add tasks that will make getPermitsIfNoPendingRead() return 0
+            // We need enough in-flight messages to equal producerQueueSize
+            for (int i = 0; i < 10; i++) {
+                Position position = PositionFactory.create(i, i);
+                InFlightTask task = new InFlightTask(position, 5, "");
+                List<Entry> entries = new ArrayList<>();
+                for (int j = 0; j < 100; j++) {
+                    entries.add(mock(Entry.class));
+                }
+                task.setEntries(entries);
+                inFlightTasks.add(task);
+            }
+            InFlightTask task6 = 
replicator.acquirePermitsIfNotFetchingSchema();
+            Assert.assertNull(task6, "Should return null when permits is 0");
+            log.info("Completed testAcquirePermitsIfNotFetchingSchema");
+        } finally {
+            // Restore original state
+            replicator.waitForCursorRewindingRefCnf = 
originalWaitForCursorRewinding;
+            BrokerServiceInternalMethodInvoker.replicatorSetState(replicator, 
originalState);
+            // Restore original tasks
+            inFlightTasks.clear();
+            inFlightTasks.addAll(originalTasks);
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
index 511cf87133a..007fc376687 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
-import static org.testng.Assert.assertEquals;
+import static 
org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker.ensureNoBacklogByInflightTask;
 import static org.testng.Assert.assertTrue;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -43,7 +43,6 @@ import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.schema.Schemas;
 import org.awaitility.Awaitility;
-import org.awaitility.reflect.WhiteboxImpl;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -123,7 +122,7 @@ public class ShadowReplicatorTest extends BrokerTestBase {
             replicator.msgOut.calculateRate();
             return replicator.msgOut.getCount() >= 1;
         });
-        Awaitility.await().until(() -> 
PersistentReplicator.PENDING_MESSAGES_UPDATER.get(replicator) == 0);
+        ensureNoBacklogByInflightTask(replicator);
 
         PersistentTopic shadowTopic =
                 (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(shadowTopicName).get().get();
@@ -164,7 +163,7 @@ public class ShadowReplicatorTest extends BrokerTestBase {
     }
 
     @Test
-    public void testCounterOfPengdingMessagesCorrect() throws Exception {
+    public void testCounterOfPendingMessagesCorrect() throws Exception {
         TopicName sourceTopicName = TopicName
                 
.get(BrokerTestUtil.newUniqueName("persistent://prop1/ns-source/source-topic"));
         TopicName shadowTopicName = TopicName
@@ -191,11 +190,11 @@ public class ShadowReplicatorTest extends BrokerTestBase {
             producer.send(new Schemas.PersonOne(i));
         }
 
-        // Verify "pendingMessages" still is correct even if error occurs.
+        // Verify "inflight replication tasks" are correct.
         PersistentReplicator replicator = 
getAnyShadowReplicator(sourceTopicName, pulsar);
         waitReplicateFinish(sourceTopicName, admin);
-        Awaitility.await().untilAsserted(() -> {
-            assertEquals((int) WhiteboxImpl.getInternalState(replicator, 
"pendingMessages"), 0);
-        });
+        ensureNoBacklogByInflightTask(replicator);
     }
+
+
 }
\ No newline at end of file
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 0563fa7e666..d32ab213648 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -482,9 +482,13 @@ public class ClientCnx extends PulsarHandler {
         }
         ProducerImpl<?> producer = producers.get(producerId);
         if (ledgerId == -1 && entryId == -1) {
-            log.warn("{} Message with sequence-id {}-{} published by producer 
[id:{}, name:{}] has been dropped",
-                    ctx.channel(), sequenceId, highestSequenceId, producerId,
-                    producer != null ? producer.getProducerName() : "null");
+            if (producer == null) {
+                log.warn("{} Message with sequence-id {}-{} published by 
producer [id:{}, name:{}] has been dropped",
+                        ctx.channel(), sequenceId, highestSequenceId, 
producerId, "null");
+            } else {
+                
producer.printWarnLogWhenCanNotDetermineDeduplication(ctx.channel(), 
sequenceId, highestSequenceId);
+            }
+
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("{} Got receipt for producer: [id:{}, name:{}] -- 
sequence-id: {}-{} -- entry-id: {}:{}",
@@ -1023,7 +1027,8 @@ public class ClientCnx extends PulsarHandler {
         return ctx;
     }
 
-    Channel channel() {
+    @VisibleForTesting
+    protected Channel channel() {
         return ctx.channel();
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java
index 12a6be07070..2595ce4ba61 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import io.netty.channel.Channel;
 import io.netty.util.ReferenceCountUtil;
 import java.util.List;
 import java.util.Optional;
@@ -244,6 +245,13 @@ public class GeoReplicationProducerImpl extends 
ProducerImpl{
                 && 
Markers.isReplicationMarker(op.msg.getMessageBuilder().getMarkerType());
     }
 
+    @Override
+    public void printWarnLogWhenCanNotDetermineDeduplication(Channel channel, 
long sourceLId, long sourceEId) {
+        log.warn("[{}] producer [id:{}, name:{}, channel: {}] message with 
source entry {}-{} published by has been"
+            + " dropped because Broker can not determine whether is duplicate 
or not",
+            topic, producerId, producerName, channel, sourceLId, sourceEId);
+    }
+
     private boolean isReplicationMarker(long highestSeq) {
         return Long.MIN_VALUE == highestSeq;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index e9decbfa0f5..825439794f9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -32,6 +32,7 @@ import static 
org.apache.pulsar.common.protocol.Commands.readChecksum;
 import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOutboundHandler;
 import io.netty.channel.ChannelPromise;
@@ -191,6 +192,8 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
     private final Counter producersOpenedCounter;
     private final Counter producersClosedCounter;
     private final boolean pauseSendingToPreservePublishOrderOnSchemaRegFailure;
+    // This variable can be exposed as a metrics in the future, a PIP is 
needed.
+    private final AtomicInteger pendingQueueFullCounter;
 
     public ProducerImpl(PulsarClientImpl client, String topic, 
ProducerConfigurationData conf,
                         CompletableFuture<Producer<T>> producerCreatedFuture, 
int partitionIndex, Schema<T> schema,
@@ -309,6 +312,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 "The number of producer sessions opened", topic, 
Attributes.empty());
         producersClosedCounter = 
ip.newCounter("pulsar.client.producer.closed", Unit.Sessions,
                 "The number of producer sessions closed", topic, 
Attributes.empty());
+        pendingQueueFullCounter = new AtomicInteger();
 
         this.connectionHandler = initConnectionHandler();
         setChunkMaxMessageSize();
@@ -316,6 +320,11 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         producersOpenedCounter.increment();
     }
 
+    @VisibleForTesting
+    public int getPendingQueueFullCount() {
+        return pendingQueueFullCounter.get();
+    }
+
     ConnectionHandler initConnectionHandler() {
         return new ConnectionHandler(this,
             new BackoffBuilder()
@@ -396,6 +405,13 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         return future;
     }
 
+    public void printWarnLogWhenCanNotDetermineDeduplication(Channel channel, 
long sequenceId,
+                                                             long 
highestSequenceId) {
+        log.warn("[{}] producer [id:{}, name:{}, channel: {}] message with 
sequence-id {}-{} published by has been"
+                        + " dropped because Broker can not determine whether 
is duplicate or not",
+                topic, producerId, producerName, channel, sequenceId, 
highestSequenceId);
+    }
+
     private class DefaultSendMessageCallback implements SendCallback {
 
         CompletableFuture<MessageId> sendFuture;
@@ -1066,6 +1082,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 client.getMemoryLimitController().reserveMemory(payloadSize);
             } else {
                 if (!semaphore.map(Semaphore::tryAcquire).orElse(true)) {
+                    pendingQueueFullCounter.incrementAndGet();
                     callback.sendComplete(new 
PulsarClientException.ProducerQueueIsFullError(
                             "Producer send queue is full", sequenceId), null);
                     return false;

Reply via email to