This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 96be3bb2e2a [fix][broker]excessive replication speed leads to error:
Producer send queue is full (#24189)
96be3bb2e2a is described below
commit 96be3bb2e2aad0bd7925cb7fb117d1ea58bf9507
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jul 2 14:11:25 2025 +0800
[fix][broker]excessive replication speed leads to error: Producer send
queue is full (#24189)
Co-authored-by: Kai Wang <[email protected]>
(cherry picked from commit 37b17d3e6920e851fa56f962b43bd698963033f5)
---
.../pulsar/broker/service/AbstractReplicator.java | 31 +-
.../apache/pulsar/broker/service/Replicator.java | 2 +-
.../nonpersistent/NonPersistentReplicator.java | 5 +
.../persistent/GeoPersistentReplicator.java | 34 +-
.../service/persistent/MessageDeduplication.java | 5 +-
.../service/persistent/PersistentReplicator.java | 375 ++++++++++++++++-----
.../broker/service/persistent/PersistentTopic.java | 11 +-
.../service/persistent/ShadowReplicator.java | 16 +-
.../broker/service/AbstractReplicatorTest.java | 3 +
.../BrokerServiceInternalMethodInvoker.java | 26 ++
.../broker/service/OneWayReplicatorTest.java | 154 ++++++++-
.../broker/service/OneWayReplicatorTestBase.java | 26 +-
...OneWayReplicatorUsingGlobalPartitionedTest.java | 12 +
.../service/OneWayReplicatorUsingGlobalZKTest.java | 14 +-
.../broker/service/ReplicatedSubscriptionTest.java | 16 +-
.../pulsar/broker/service/ReplicatorTest.java | 31 +-
.../BrokerServicePersistInternalMethodInvoker.java | 45 +++
.../PersistentReplicatorInflightTaskTest.java | 367 ++++++++++++++++++++
.../service/persistent/ShadowReplicatorTest.java | 15 +-
.../org/apache/pulsar/client/impl/ClientCnx.java | 13 +-
.../client/impl/GeoReplicationProducerImpl.java | 8 +
.../apache/pulsar/client/impl/ProducerImpl.java | 17 +
22 files changed, 1081 insertions(+), 145 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index aae54162e5a..365e7cc3c12 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -305,9 +305,10 @@ public abstract class AbstractReplicator implements
Replicator {
/**
* This method only be used by {@link PersistentTopic#checkGC} now.
*/
- public CompletableFuture<Void> disconnect(boolean failIfHasBacklog,
boolean closeTheStartingProducer) {
+ @Override
+ public CompletableFuture<Void> disconnect() {
long backlog = getNumberOfEntriesInBacklog();
- if (failIfHasBacklog && backlog > 0) {
+ if (backlog > 0) {
CompletableFuture<Void> disconnectFuture = new
CompletableFuture<>();
disconnectFuture.completeExceptionally(new
TopicBusyException("Cannot close a replicator with backlog"));
if (log.isDebugEnabled()) {
@@ -317,9 +318,30 @@ public abstract class AbstractReplicator implements
Replicator {
}
log.info("[{}] Disconnect replicator at position {} with backlog {}",
replicatorId,
getReplicatorReadPosition(), backlog);
- return closeProducerAsync(closeTheStartingProducer);
+ return beforeDisconnect()
+ .thenCompose(__ -> closeProducerAsync(true))
+ .thenApply(__ -> {
+ afterDisconnected();
+ return null;
+ });
+ }
+
+ /**
+ * This method and {@link #afterDisconnected()} are used to solve the
following race condition:
+ * - Thread 1: calling disconnect.
+ * passed the check: no backlog.
+ * - Thread 2: published a message, then the cursor.pendingRead completes.
+ * - Thread 1: continue to disconnect.
+ * - Thread 2: read entries from the cursor, and try to send messages, but
the messages will be discarded because
+ * the producer is closed.
+ * Issue: the pending reading's read position is not correct.
+ */
+ protected CompletableFuture<Void> beforeDisconnect() {
+ return CompletableFuture.completedFuture(null);
}
+ protected void afterDisconnected() {}
+
/**
* This method only be used by {@link PersistentTopic#checkGC} now.
*/
@@ -398,12 +420,15 @@ public abstract class AbstractReplicator implements
Replicator {
});
}
+ protected abstract void beforeTerminate();
+
public CompletableFuture<Void> terminate() {
if (!tryChangeStatusToTerminating()) {
log.info("[{}] Skip current termination since other thread is
doing termination, state : {}", replicatorId,
state);
return CompletableFuture.completedFuture(null);
}
+ beforeTerminate();
return doCloseProducerAsync(producer, () -> {
STATE_UPDATER.set(this, State.Terminated);
this.producer = null;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
index 667063e4910..86e2b6e74de 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
@@ -33,7 +33,7 @@ public interface Replicator {
CompletableFuture<Void> terminate();
- CompletableFuture<Void> disconnect(boolean failIfHasBacklog, boolean
closeTheStartingProducer);
+ CompletableFuture<Void> disconnect();
void updateRates();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index 45b4ebf6e17..32b58347ae6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -258,4 +258,9 @@ public class NonPersistentReplicator extends
AbstractReplicator implements Repli
protected void disableReplicatorRead() {
// No-op
}
+
+ @Override
+ protected void beforeTerminate() {
+ // No-op
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
index bc480635bab..437067edb69 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
@@ -81,7 +81,7 @@ public class GeoPersistentReplicator extends
PersistentReplicator {
}
@Override
- protected boolean replicateEntries(List<Entry> entries) {
+ protected boolean replicateEntries(List<Entry> entries, final InFlightTask
inFlightTask) {
boolean atLeastOneMessageSentForReplication = false;
boolean isEnableReplicatedSubscriptions =
brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();
@@ -90,12 +90,13 @@ public class GeoPersistentReplicator extends
PersistentReplicator {
// This flag is set to true when we skip at least one local
message,
// in order to skip remaining local messages.
boolean isLocalMessageSkippedOnce = false;
- boolean skipRemainingMessages = false;
+ boolean skipRemainingMessages =
inFlightTask.isSkipReadResultDueToCursorRewind();
for (int i = 0; i < entries.size(); i++) {
Entry entry = entries.get(i);
// Skip the messages since the replicator need to fetch the
schema info to replicate the schema to the
// remote cluster. Rewind the cursor first and continue the
message read after fetched the schema.
if (skipRemainingMessages) {
+ inFlightTask.incCompletedEntries();
entry.release();
continue;
}
@@ -108,12 +109,14 @@ public class GeoPersistentReplicator extends
PersistentReplicator {
log.error("[{}] Failed to deserialize message at {}
(buffer size: {}): {}", replicatorId,
entry.getPosition(), length, t.getMessage(), t);
cursor.asyncDelete(entry.getPosition(), this,
entry.getPosition());
+ inFlightTask.incCompletedEntries();
entry.release();
continue;
}
if (Markers.isTxnMarker(msg.getMessageBuilder())) {
cursor.asyncDelete(entry.getPosition(), this,
entry.getPosition());
+ inFlightTask.incCompletedEntries();
entry.release();
msg.recycle();
continue;
@@ -123,6 +126,7 @@ public class GeoPersistentReplicator extends
PersistentReplicator {
msg.getMessageBuilder().getTxnidLeastBits());
if (topic.isTxnAborted(tx, entry.getPosition())) {
cursor.asyncDelete(entry.getPosition(), this,
entry.getPosition());
+ inFlightTask.incCompletedEntries();
entry.release();
msg.recycle();
continue;
@@ -136,6 +140,7 @@ public class GeoPersistentReplicator extends
PersistentReplicator {
if (msg.isReplicated()) {
// Discard messages that were already replicated into this
region
cursor.asyncDelete(entry.getPosition(), this,
entry.getPosition());
+ inFlightTask.incCompletedEntries();
entry.release();
msg.recycle();
continue;
@@ -147,6 +152,7 @@ public class GeoPersistentReplicator extends
PersistentReplicator {
entry.getPosition(), msg.getReplicateTo());
}
cursor.asyncDelete(entry.getPosition(), this,
entry.getPosition());
+ inFlightTask.incCompletedEntries();
entry.release();
msg.recycle();
continue;
@@ -159,6 +165,7 @@ public class GeoPersistentReplicator extends
PersistentReplicator {
replicatorId, entry.getPosition(),
msg.getReplicateTo());
}
cursor.asyncDelete(entry.getPosition(), this,
entry.getPosition());
+ inFlightTask.incCompletedEntries();
entry.release();
msg.recycle();
continue;
@@ -166,12 +173,13 @@ public class GeoPersistentReplicator extends
PersistentReplicator {
if (STATE_UPDATER.get(this) != State.Started ||
isLocalMessageSkippedOnce) {
// The producer is not ready yet after having
stopped/restarted. Drop the message because it will
- // recovered when the producer is ready
+ // recover when the producer is ready
if (log.isDebugEnabled()) {
log.debug("[{}] Dropping read message at {} because
producer is not ready",
replicatorId, entry.getPosition());
}
isLocalMessageSkippedOnce = true;
+ inFlightTask.incCompletedEntries();
entry.release();
msg.recycle();
continue;
@@ -184,14 +192,23 @@ public class GeoPersistentReplicator extends
PersistentReplicator {
CompletableFuture<SchemaInfo> schemaFuture =
getSchemaInfo(msg);
if (!schemaFuture.isDone() ||
schemaFuture.isCompletedExceptionally()) {
+ /**
+ * Skip in flight reading tasks.
+ * Explain the result of the race-condition between:
+ * - {@link #readMoreEntries}
+ * - {@link
#beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding)}
+ * Since {@link #acquirePermitsIfNotFetchingSchema} and
+ * {@link
#beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding)} acquire the
+ * same lock, it is safe.
+ */
+
beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding.Fetching_Schema);
+ inFlightTask.incCompletedEntries();
entry.release();
headersAndPayload.release();
msg.recycle();
// Mark the replicator is fetching the schema for now and
rewind the cursor
// and trigger the next read after complete the schema
fetching.
- fetchSchemaInProgress = true;
skipRemainingMessages = true;
- cursor.cancelPendingReadRequest();
log.info("[{}] Pause the data replication due to new
detected schema", replicatorId);
schemaFuture.whenComplete((__, e) -> {
if (e != null) {
@@ -199,9 +216,7 @@ public class GeoPersistentReplicator extends
PersistentReplicator {
replicatorId, e);
}
log.info("[{}] Resume the data replication after the
schema fetching done", replicatorId);
- cursor.rewind();
- fetchSchemaInProgress = false;
- readMoreEntries();
+ doRewindCursor(true);
});
} else {
msg.setSchemaInfoForReplicator(schemaFuture.get());
@@ -214,11 +229,10 @@ public class GeoPersistentReplicator extends
PersistentReplicator {
stats.incrementMsgOutCounter();
stats.incrementBytesOutCounter(headersAndPayload.readableBytes());
// Increment pending messages for messages produced locally
- PENDING_MESSAGES_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] Publishing {}:{}", replicatorId,
entry.getLedgerId(), entry.getEntryId());
}
- producer.sendAsync(msg, ProducerSendCallback.create(this,
entry, msg));
+ producer.sendAsync(msg, ProducerSendCallback.create(this,
entry, msg, inFlightTask));
atLeastOneMessageSentForReplication = true;
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 9f734e69896..cb549b6b8fc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -95,8 +95,9 @@ public class MessageDeduplication {
}
public static class MessageDupUnknownException extends RuntimeException {
- public MessageDupUnknownException() {
- super("Cannot determine whether the message is a duplicate at this
time");
+ public MessageDupUnknownException(String topicName, String
producerName) {
+ super(String.format("[%s][%s]Cannot determine whether the message
is a duplicate at this time", topicName,
+ producerName));
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 881b1b804e8..5952bb14d22 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -27,13 +27,14 @@ import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
+import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
@@ -49,10 +50,12 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.MessageExpirer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.client.api.MessageId;
@@ -86,19 +89,6 @@ public abstract class PersistentReplicator extends
AbstractReplicator
private final int producerQueueThreshold;
- protected static final AtomicIntegerFieldUpdater<PersistentReplicator>
PENDING_MESSAGES_UPDATER =
- AtomicIntegerFieldUpdater
- .newUpdater(PersistentReplicator.class, "pendingMessages");
- private volatile int pendingMessages = 0;
-
- private static final int FALSE = 0;
- private static final int TRUE = 1;
-
- private static final AtomicIntegerFieldUpdater<PersistentReplicator>
HAVE_PENDING_READ_UPDATER =
- AtomicIntegerFieldUpdater
- .newUpdater(PersistentReplicator.class, "havePendingRead");
- private volatile int havePendingRead = FALSE;
-
protected final Rate msgOut = new Rate();
protected final Rate msgExpired = new Rate();
@@ -114,8 +104,17 @@ public abstract class PersistentReplicator extends
AbstractReplicator
@Getter
protected final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();
- protected volatile boolean fetchSchemaInProgress = false;
- private volatile boolean waitForCursorRewinding = false;
+ protected volatile int waitForCursorRewindingRefCnf = 0;
+
+ protected enum ReasonOfWaitForCursorRewinding {
+ Failed_Publishing,
+ Fetching_Schema,
+ Disconnecting,
+ Terminating;
+ }
+ protected ReasonOfWaitForCursorRewinding reasonOfWaitForCursorRewinding =
null;
+
+ protected final LinkedList<InFlightTask> inFlightTasks = new
LinkedList<>();
public PersistentReplicator(String localCluster, PersistentTopic
localTopic, ManagedCursor cursor,
String remoteCluster, String remoteTopic,
@@ -127,8 +126,6 @@ public abstract class PersistentReplicator extends
AbstractReplicator
this.cursor = Objects.requireNonNull(cursor);
this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopic,
Codec.decode(cursor.getName()), cursor, null);
- HAVE_PENDING_READ_UPDATER.set(this, FALSE);
- PENDING_MESSAGES_UPDATER.set(this, 0);
readBatchSize = Math.min(
producerQueueSize,
@@ -143,11 +140,8 @@ public abstract class PersistentReplicator extends
AbstractReplicator
@Override
protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer)
{
- waitForCursorRewinding = true;
-
// Repeat until there are no read operations in progress
- if (STATE_UPDATER.get(this) == State.Starting &&
HAVE_PENDING_READ_UPDATER.get(this) == TRUE
- && !cursor.cancelPendingReadRequest()) {
+ if (STATE_UPDATER.get(this) == State.Starting && hasPendingRead() &&
!cursor.cancelPendingReadRequest()) {
brokerService.getPulsar().getExecutor()
.schedule(() ->
setProducerAndTriggerReadEntries(producer), 10, TimeUnit.MILLISECONDS);
return;
@@ -164,12 +158,10 @@ public abstract class PersistentReplicator extends
AbstractReplicator
if (!(producer instanceof ProducerImpl)) {
log.error("[{}] The partitions count between two clusters is
not the same, the replicator can not be"
+ " created successfully: {}", replicatorId, state);
- waitForCursorRewinding = false;
doCloseProducerAsync(producer, () -> {});
throw new ClassCastException(producer.getClass().getName() + "
can not be cast to ProducerImpl");
}
this.producer = (ProducerImpl) producer;
- HAVE_PENDING_READ_UPDATER.set(this, FALSE);
// Trigger a new read.
log.info("[{}] Created replicator producer, Replicator state: {}",
replicatorId, state);
backOff.reset();
@@ -178,8 +170,6 @@ public abstract class PersistentReplicator extends
AbstractReplicator
// Rewind the cursor to be sure to read again all non-acked
messages sent while restarting
cursor.rewind();
- waitForCursorRewinding = false;
-
// read entries
readMoreEntries();
} else {
@@ -195,7 +185,6 @@ public abstract class PersistentReplicator extends
AbstractReplicator
log.error("[{}] Replicator state is not expected, so close the
producer. Replicator state: {}",
replicatorId, changeStateRes.getRight());
}
- waitForCursorRewinding = false;
// Close the producer if change the state fail.
doCloseProducerAsync(producer, () -> {});
}
@@ -245,8 +234,7 @@ public abstract class PersistentReplicator extends
AbstractReplicator
/**
* Calculate available permits for read entries.
*/
- private AvailablePermits getAvailablePermits() {
- int availablePermits = producerQueueSize -
PENDING_MESSAGES_UPDATER.get(this);
+ private AvailablePermits getRateLimiterAvailablePermits(int
availablePermits) {
// return 0, if Producer queue is full, it will pause read entries.
if (availablePermits <= 0) {
@@ -291,51 +279,59 @@ public abstract class PersistentReplicator extends
AbstractReplicator
}
protected void readMoreEntries() {
- if (fetchSchemaInProgress) {
- log.info("[{}] Skip the reading due to new detected schema",
replicatorId);
- return;
- }
- AvailablePermits availablePermits = getAvailablePermits();
- if (availablePermits.isReadable()) {
- int messagesToRead = availablePermits.getMessages();
- long bytesToRead = availablePermits.getBytes();
- if (!isWritable()) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Throttling replication traffic because
producer is not writable", replicatorId);
- }
- // Minimize the read size if the producer is disconnected or
the window is already full
- messagesToRead = 1;
+ // Acquire permits and check state of producer.
+ InFlightTask newInFlightTask = acquirePermitsIfNotFetchingSchema();
+ if (newInFlightTask == null) {
+ // no permits from rate limit
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Not scheduling read due to pending read or no
permits.", replicatorId);
}
-
- // Schedule read
- if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE)) {
- if (waitForCursorRewinding) {
- log.info("[{}] Skip the reading because repl producer is
starting", replicatorId);
- HAVE_PENDING_READ_UPDATER.set(this, FALSE);
- return;
- }
- if (log.isDebugEnabled()) {
- log.debug("[{}] Schedule read of {} messages or {} bytes",
replicatorId, messagesToRead,
- bytesToRead);
- }
- cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead,
this,
- null, topic.getMaxReadPosition());
+ if (!hasPendingRead()) {
+ topic.getBrokerService().executor().schedule(
+ () -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
TimeUnit.MILLISECONDS);
+ return;
} else {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Not scheduling read due to pending read.
Messages To Read {}, Bytes To Read {}",
- replicatorId, messagesToRead, bytesToRead);
- }
+ return;
+ }
+ }
+ // If disabled RateLimiter.
+ if (!dispatchRateLimiter.isPresent() ||
!dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
+ cursor.asyncReadEntriesOrWait(newInFlightTask.readingEntries, -1,
this,
+ newInFlightTask/* Context object */,
topic.getMaxReadPosition());
+ return;
+ }
+ // No permits of RateLimiter.
+ AvailablePermits availablePermits =
getRateLimiterAvailablePermits(newInFlightTask.readingEntries);
+ if (!availablePermits.isReadable()) {
+ // no rate limiter permits from rate limit
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Throttling replication traffic. Messages To
Read {}, Bytes To Read {}",
+ replicatorId, availablePermits.getMessages(),
availablePermits.getBytes());
}
- } else if (availablePermits.isExceeded()) {
- // no permits from rate limit
topic.getBrokerService().executor().schedule(
- () -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
TimeUnit.MILLISECONDS);
- } else {
+ () -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
TimeUnit.MILLISECONDS);
+ return;
+ }
+ // Has permits of RateLimiter.
+ int messagesToRead = availablePermits.getMessages();
+ long bytesToRead = availablePermits.getBytes();
+ if (!isWritable()) {
if (log.isDebugEnabled()) {
- log.debug("[{}] No Permits for reading. availablePermits: {}",
- replicatorId, availablePermits);
+ log.debug("[{}] Throttling replication traffic because
producer is not writable", replicatorId);
}
+ // Minimize the read size if the producer is disconnected or the
window is already full
+ messagesToRead = 1;
+ }
+ // Update acquired permits exceeds limitation.
+ if (messagesToRead < newInFlightTask.readingEntries) {
+ newInFlightTask.setReadingEntries(messagesToRead);
}
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Schedule read of {} messages or {} bytes",
replicatorId, newInFlightTask.readingEntries,
+ bytesToRead);
+ }
+ cursor.asyncReadEntriesOrWait(newInFlightTask.readingEntries,
bytesToRead, this,
+ newInFlightTask/* Context object */,
topic.getMaxReadPosition());
}
@Override
@@ -343,7 +339,10 @@ public abstract class PersistentReplicator extends
AbstractReplicator
if (log.isDebugEnabled()) {
log.debug("[{}] Read entries complete of {} messages",
replicatorId, entries.size());
}
+ InFlightTask inFlightTask = (InFlightTask) ctx;
+ inFlightTask.setEntries(entries);
+ // After the replicator starts, the speed will be gradually increased.
int maxReadBatchSize =
topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize();
if (readBatchSize < maxReadBatchSize) {
int newReadBatchSize = Math.min(readBatchSize * 2,
maxReadBatchSize);
@@ -357,9 +356,7 @@ public abstract class PersistentReplicator extends
AbstractReplicator
readFailureBackoff.reduceToHalf();
- boolean atLeastOneMessageSentForReplication =
replicateEntries(entries);
-
- HAVE_PENDING_READ_UPDATER.set(this, FALSE);
+ boolean atLeastOneMessageSentForReplication =
replicateEntries(entries, inFlightTask);
if (atLeastOneMessageSentForReplication && !isWritable()) {
// Don't read any more entries until the current pending entries
are persisted
@@ -372,7 +369,7 @@ public abstract class PersistentReplicator extends
AbstractReplicator
}
}
- protected abstract boolean replicateEntries(List<Entry> entries);
+ protected abstract boolean replicateEntries(List<Entry> entries,
InFlightTask inFlightTask);
protected CompletableFuture<SchemaInfo> getSchemaInfo(MessageImpl msg)
throws ExecutionException {
if (msg.getSchemaVersion() == null || msg.getSchemaVersion().length ==
0) {
@@ -394,37 +391,40 @@ public abstract class PersistentReplicator extends
AbstractReplicator
private PersistentReplicator replicator;
private Entry entry;
private MessageImpl msg;
+ private InFlightTask inFlightTask;
@Override
public void sendComplete(Throwable exception, OpSendMsgStats
opSendMsgStats) {
if (exception != null && !(exception instanceof
PulsarClientException.InvalidMessageException)) {
- log.error("[{}] Error producing on remote broker",
replicator.replicatorId, exception);
- // cursor should be rewinded since it was incremented when
readMoreEntries
- replicator.cursor.rewind();
+ log.error("[{}] Error producing on remote broker, in-flight
messages: {}, producer pending queue size:"
+ + " {}",
+ replicator.replicatorId, replicator.inFlightTasks,
+ replicator.producer.getPendingQueueSize(), exception);
+ // cursor should be rewound since it was incremented when
readMoreEntries
+
replicator.beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding.Failed_Publishing);
+ replicator.doRewindCursor(false);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Message persisted on remote broker",
replicator.replicatorId, exception);
}
+ inFlightTask.incCompletedEntries();
replicator.cursor.asyncDelete(entry.getPosition(), replicator,
entry.getPosition());
}
entry.release();
- int pending = PENDING_MESSAGES_UPDATER.decrementAndGet(replicator);
-
// In general, we schedule a new batch read operation when the
occupied queue size gets smaller than half
// the max size, unless another read operation is already in
progress.
// If the producer is not currently writable (disconnected or TCP
window full), we want to defer the reads
// until we have emptied the whole queue, and at that point we
will read a batch of 1 single message if the
// producer is still not "writable".
- if (pending < replicator.producerQueueThreshold //
- && HAVE_PENDING_READ_UPDATER.get(replicator) == FALSE //
- ) {
- if (pending == 0 || replicator.producer.isWritable()) {
+ int permits = replicator.getPermitsIfNoPendingRead();
+ if (replicator.producerQueueSize - permits <
replicator.producerQueueThreshold) {
+ if (replicator.producerQueueSize == permits ||
replicator.producer.isWritable()) {
replicator.readMoreEntries();
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Not resuming reads. pending: {}
is-writable: {}",
- replicator.replicatorId, pending,
+ replicator.replicatorId,
replicator.producerQueueSize - permits,
replicator.producer.isWritable());
}
}
@@ -439,15 +439,18 @@ public abstract class PersistentReplicator extends
AbstractReplicator
this.recyclerHandle = recyclerHandle;
}
- static ProducerSendCallback create(PersistentReplicator replicator,
Entry entry, MessageImpl msg) {
+ static ProducerSendCallback create(PersistentReplicator replicator,
Entry entry, MessageImpl msg,
+ InFlightTask inFlightTask) {
ProducerSendCallback sendCallback = RECYCLER.get();
sendCallback.replicator = replicator;
sendCallback.entry = entry;
sendCallback.msg = msg;
+ sendCallback.inFlightTask = inFlightTask;
return sendCallback;
}
private void recycle() {
+ inFlightTask = null;
replicator = null;
entry = null; //already released and recycled on sendComplete
if (msg != null) {
@@ -517,7 +520,6 @@ public abstract class PersistentReplicator extends
AbstractReplicator
}
}
- HAVE_PENDING_READ_UPDATER.set(this, FALSE);
brokerService.executor().schedule(this::readMoreEntries,
waitTimeMillis, TimeUnit.MILLISECONDS);
}
@@ -747,4 +749,209 @@ public abstract class PersistentReplicator extends
AbstractReplicator
public ManagedCursor getCursor() {
return cursor;
}
-}
+
+ @Data
+ protected static class InFlightTask {
+ Position readPos;
+ int readingEntries;
+ volatile List<Entry> entries;
+ volatile int completedEntries;
+ volatile boolean skipReadResultDueToCursorRewind;
+ final String replicatorId;
+
+ public synchronized void incCompletedEntries() {
+ if (!CollectionUtils.isEmpty(entries) && completedEntries <
entries.size()) {
+ completedEntries++;
+ } else {
+ log.error("Unexpected calling of increase completed entries.
{}", this.toString());
+ }
+ }
+
+ synchronized void recycle(Position readStart, int readingEntries) {
+ this.readPos = readStart;
+ this.readingEntries = readingEntries;
+ this.entries = null;
+ this.completedEntries = 0;
+ this.skipReadResultDueToCursorRewind = false;
+ }
+
+ public InFlightTask(Position readPos, int readingEntries, String
replicatorId) {
+ this.readPos = readPos;
+ this.readingEntries = readingEntries;
+ this.replicatorId = replicatorId;
+ }
+
+ public boolean isDone() {
+ if (entries == null) {
+ return false;
+ }
+ if (entries != null && entries.isEmpty()) {
+ return true;
+ }
+ return completedEntries >= entries.size();
+ }
+
+ @Override
+ public String toString() {
+ return "Replicator InFlightTask "
+ + "{replicatorId=" + replicatorId
+ + ", readPos=" + readPos
+ + ", readingEntries=" + readingEntries
+ + ", readoutEntries=" + (entries == null ? "-1" :
entries.size())
+ + ", completedEntries=" + completedEntries
+ + ", skipReadResultDueToCursorRewound=" +
skipReadResultDueToCursorRewind
+ + "}";
+ }
+ }
+
+ @VisibleForTesting
+ InFlightTask createOrRecycleInFlightTaskIntoQueue(Position readPos, int
readingEntries) {
+ synchronized (inFlightTasks) {
+ // Reuse projects that has done.
+ if (!inFlightTasks.isEmpty()) {
+ InFlightTask first = inFlightTasks.peek();
+ if (first.isDone()) {
+ // Remove from the first index, and add to the latest
index.
+ inFlightTasks.poll();
+ first.recycle(readPos, readingEntries);
+ inFlightTasks.add(first);
+ return first;
+ }
+ }
+ // New project if nothing can be reused.
+ InFlightTask task = new InFlightTask(readPos, readingEntries,
replicatorId);
+ inFlightTasks.add(task);
+ return task;
+ }
+ }
+
+ protected InFlightTask acquirePermitsIfNotFetchingSchema() {
+ synchronized (inFlightTasks) {
+ if (hasPendingRead()) {
+ log.info("[{}] Skip the reading because there is a pending
read task", replicatorId);
+ return null;
+ }
+ if (waitForCursorRewindingRefCnf > 0) {
+ log.info("[{}] Skip the reading due to new detected schema",
replicatorId);
+ return null;
+ }
+ if (state != Started) {
+ log.info("[{}] Skip the reading because producer has not
started [{}]", replicatorId, state);
+ return null;
+ }
+ // Guarantee that there is a unique cursor reading task.
+ int permits = getPermitsIfNoPendingRead();
+ if (permits == 0) {
+ return null;
+ }
+ return
createOrRecycleInFlightTaskIntoQueue(cursor.getReadPosition(), permits);
+ }
+ }
+
+ protected int getPermitsIfNoPendingRead() {
+ synchronized (inFlightTasks) {
+ for (InFlightTask task : inFlightTasks) {
+ boolean hasPendingCursorRead = task.readPos != null &&
task.entries == null;
+ if (hasPendingCursorRead) {
+ // Skip the current reading if there is a pending cursor
reading.
+ return 0;
+ }
+ }
+ return producerQueueSize - getInflightMessagesCount();
+ }
+ }
+
+ protected int getInflightMessagesCount() {
+ int inFlight = 0;
+ synchronized (inFlightTasks) {
+ for (InFlightTask task : inFlightTasks) {
+ if (task.isDone()) {
+ continue;
+ }
+ if (task.entries == null) {
+ inFlight += task.readingEntries;
+ continue;
+ }
+ inFlight += Math.max(task.entries.size() -
task.completedEntries, 0);
+ }
+ }
+ return inFlight;
+ }
+
+ protected CompletableFuture<Void> beforeDisconnect() {
+ // Ensure no in-flight task.
+ synchronized (inFlightTasks) {
+ for (PersistentReplicator.InFlightTask task : inFlightTasks) {
+ if (!task.isDone() &&
task.readPos.compareTo(cursor.getManagedLedger().getLastConfirmedEntry()) < 0) {
+ return CompletableFuture.failedFuture(new
BrokerServiceException
+ .TopicBusyException("Cannot close a replicator
with backlog"));
+ }
+ }
+
beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding.Disconnecting);
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ protected void afterDisconnected() {
+ doRewindCursor(false);
+ }
+
+ protected void
beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding reason) {
+ synchronized (inFlightTasks) {
+ boolean hasCanceledPendingRead = cursor.cancelPendingReadRequest();
+ reasonOfWaitForCursorRewinding = reason;
+ waitForCursorRewindingRefCnf += 1;
+ cancelPendingReadTasks(hasCanceledPendingRead);
+ }
+ }
+
+ protected void doRewindCursor(boolean triggerReadMoreEntries) {
+ synchronized (inFlightTasks) {
+ cursor.rewind();
+ waitForCursorRewindingRefCnf -= 1;
+ reasonOfWaitForCursorRewinding = null;
+ }
+ if (triggerReadMoreEntries) {
+ readMoreEntries();
+ }
+ }
+
+ private void cancelPendingReadTasks(boolean canceledPendingRead) {
+ InFlightTask readingTask = null;
+ synchronized (inFlightTasks) {
+ for (InFlightTask task : inFlightTasks) {
+ task.setSkipReadResultDueToCursorRewind(true);
+ if (task.entries == null) {
+ if (readingTask != null) {
+ log.error("Unexpected state because there are more
than one tasks' state is pending read. {}",
+ inFlightTasks);
+ }
+ readingTask = task;
+ }
+ }
+ // Correct state to avoid a replicate stuck because a pending
reading task occupies permits.
+ // There is at most one reading task.
+ // The task will never receive a read completed callback if cancel
pending reading successfully.
+ if (canceledPendingRead && readingTask != null) {
+ readingTask.setEntries(Collections.emptyList());
+ }
+ }
+ }
+
+ @Override
+ public void beforeTerminate() {
+
beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding.Terminating);
+ }
+
+ protected boolean hasPendingRead() {
+ synchronized (inFlightTasks) {
+ for (InFlightTask task : inFlightTasks) {
+ if (task.readPos != null && task.entries == null) {
+ // Skip the current reading if there is a pending cursor
reading.
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index bda9f5e29c1..f21a18c56c0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -646,7 +646,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
decrementPendingWriteOpsAndCheck();
break;
default:
- publishContext.completed(new
MessageDeduplication.MessageDupUnknownException(), -1, -1);
+ publishContext.completed(
+ new MessageDeduplication.MessageDupUnknownException(
+ topic, publishContext.getProducerName()), -1,
-1);
decrementPendingWriteOpsAndCheck();
}
@@ -884,8 +886,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
private synchronized CompletableFuture<Void>
closeReplProducersIfNoBacklog() {
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
- replicators.forEach((region, replicator) ->
closeFutures.add(replicator.disconnect(true, true)));
- shadowReplicators.forEach((__, replicator) ->
closeFutures.add(replicator.disconnect(true, true)));
+ replicators.forEach((region, replicator) ->
closeFutures.add(replicator.disconnect()));
+ shadowReplicators.forEach((__, replicator) ->
closeFutures.add(replicator.disconnect()));
return FutureUtil.waitForAll(closeFutures);
}
@@ -4330,7 +4332,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
decrementPendingWriteOpsAndCheck();
break;
default:
- publishContext.completed(new
MessageDeduplication.MessageDupUnknownException(), -1, -1);
+ publishContext.completed(new
MessageDeduplication.MessageDupUnknownException(
+ topic, publishContext.getProducerName()), -1, -1);
decrementPendingWriteOpsAndCheck();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
index cb2e0457e36..a334fd86dd0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
@@ -55,15 +55,23 @@ public class ShadowReplicator extends PersistentReplicator {
}
@Override
- protected boolean replicateEntries(List<Entry> entries) {
+ protected boolean replicateEntries(List<Entry> entries, InFlightTask
inFlightTask) {
boolean atLeastOneMessageSentForReplication = false;
try {
// This flag is set to true when we skip at least one local
message,
// in order to skip remaining local messages.
boolean isLocalMessageSkippedOnce = false;
+ boolean skipRemainingMessages =
inFlightTask.isSkipReadResultDueToCursorRewind();
for (int i = 0; i < entries.size(); i++) {
Entry entry = entries.get(i);
+ // Skip the messages since the replicator need to fetch the
schema info to replicate the schema to the
+ // remote cluster. Rewind the cursor first and continue the
message read after fetched the schema.
+ if (skipRemainingMessages) {
+ inFlightTask.incCompletedEntries();
+ entry.release();
+ continue;
+ }
int length = entry.getLength();
ByteBuf headersAndPayload = entry.getDataBuffer();
MessageImpl msg;
@@ -73,6 +81,7 @@ public class ShadowReplicator extends PersistentReplicator {
log.error("[{}] Failed to deserialize message at {}
(buffer size: {}): {}", replicatorId,
entry.getPosition(), length, t.getMessage(), t);
cursor.asyncDelete(entry.getPosition(), this,
entry.getPosition());
+ inFlightTask.incCompletedEntries();
entry.release();
continue;
}
@@ -84,6 +93,7 @@ public class ShadowReplicator extends PersistentReplicator {
replicatorId, entry.getPosition(),
msg.getReplicateTo());
}
cursor.asyncDelete(entry.getPosition(), this,
entry.getPosition());
+ inFlightTask.incCompletedEntries();
entry.release();
msg.recycle();
continue;
@@ -97,6 +107,7 @@ public class ShadowReplicator extends PersistentReplicator {
replicatorId, entry.getPosition());
}
isLocalMessageSkippedOnce = true;
+ inFlightTask.incCompletedEntries();
entry.release();
msg.recycle();
continue;
@@ -118,8 +129,7 @@ public class ShadowReplicator extends PersistentReplicator {
headersAndPayload.retain();
// Increment pending messages for messages produced locally
- PENDING_MESSAGES_UPDATER.incrementAndGet(this);
- producer.sendAsync(msg, ProducerSendCallback.create(this,
entry, msg));
+ producer.sendAsync(msg, ProducerSendCallback.create(this,
entry, msg, inFlightTask));
atLeastOneMessageSentForReplication = true;
}
} catch (Exception e) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
index 374296e6867..afb9cadd55f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
@@ -162,6 +162,9 @@ public class AbstractReplicatorTest {
return false;
}
+ @Override
+ protected void beforeTerminate() {}
+
@Override
public long getNumberOfEntriesInBacklog() {
return 0;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceInternalMethodInvoker.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceInternalMethodInvoker.java
new file mode 100644
index 00000000000..f33642ab13a
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceInternalMethodInvoker.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+public class BrokerServiceInternalMethodInvoker {
+
+ public static void replicatorSetState(AbstractReplicator replicator,
AbstractReplicator.State state) {
+ replicator.state = state;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 1244300378a..57abd267ad6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;
+import static
org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker.ensureNoBacklogByInflightTask;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -34,6 +35,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import io.netty.channel.Channel;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
@@ -71,11 +73,13 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
+import
org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
@@ -83,10 +87,14 @@ import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.common.api.proto.CommandSendReceipt;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -1367,10 +1375,10 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
}
// Verify: no individual acks.
Awaitility.await().untilAsserted(() -> {
- PersistentTopic persistentTopic2 =
- (PersistentTopic)
pulsar2.getBrokerService().getTopic(topicName, false).join().get();
- assertTrue(
-
persistentTopic2.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(true)
> 0);
+ PersistentTopic persistentTopic2 =
+ (PersistentTopic)
pulsar2.getBrokerService().getTopic(topicName, false).join().get();
+ assertTrue(
+
persistentTopic2.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(true)
> 0);
PersistentTopic persistentTopic1 =
(PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
ManagedLedgerImpl ml = (ManagedLedgerImpl)
persistentTopic1.getManagedLedger();
@@ -1507,4 +1515,142 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
assertEquals(valueLocal, publishRateAddLocal2);
});
}
+
+ @Test
+ public void testConcurrencyReplicationReadEntries() throws Exception {
+ String originalReplicationStartAt =
pulsar1.getConfig().getReplicationStartAt();
+ int originalDispatcherMaxReadBatchSize =
pulsar1.getConfig().getDispatcherMaxReadBatchSize();
+ int originalDispatcherMinReadBatchSize =
pulsar1.getConfig().getDispatcherMinReadBatchSize();
+ int originalReplicationProducerQueueSize =
pulsar1.getConfig().getReplicationProducerQueueSize();
+ admin1.brokers().updateDynamicConfiguration("replicationStartAt",
"earliest");
+
admin1.brokers().updateDynamicConfiguration("dispatcherMaxReadBatchSize", "10");
+
admin1.brokers().updateDynamicConfiguration("dispatcherMinReadBatchSize", "10");
+
admin1.brokers().updateDynamicConfiguration("replicationProducerQueueSize",
"10");
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(pulsar1.getConfig().getReplicationStartAt(),
"earliest");
+ assertEquals(pulsar1.getConfig().getDispatcherMaxReadBatchSize(),
10);
+ assertEquals(pulsar1.getConfig().getDispatcherMinReadBatchSize(),
10);
+
assertEquals(pulsar1.getConfig().getReplicationProducerQueueSize(), 10);
+ });
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ nonReplicatedNamespace + "/tp_");
+ final String subscriptionName = "s1";
+ admin1.topics().createNonPartitionedTopic(topicName);
+ admin2.topics().createNonPartitionedTopic(topicName);
+ admin1.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
+ admin2.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
+
+ // Publish messages.
+ Producer<byte[]> producer1 =
client1.newProducer().topic(topicName).enableBatching(false).create();
+ CompletableFuture<MessageId> latestSend = null;
+ for (int i = 0; i < 1000; i++) {
+ latestSend = producer1.sendAsync(new byte[]{1});
+ }
+ latestSend.join();
+ log.info("Cluster: {}, Publish finished", cluster1);
+
+ // Inject two delay:
+ // 1. delay publish responding,
+ // 2. delay switch concurrent mechanism of
"replicator.readMoreEntries".
+ ClientBuilderImpl clientBuilder2 = (ClientBuilderImpl)
PulsarClient.builder().serviceUrl(url2.toString());
+ PulsarClient injectedReplClient2 =
InjectedClientCnxClientBuilder.create(clientBuilder2,
+ (conf, eventLoopGroup) -> {
+ return new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup)
{
+
+ @Override
+ protected void handleSendReceipt(CommandSendReceipt
sendReceipt) {
+ ctx().executor().schedule(() ->
super.handleSendReceipt(sendReceipt), 3600, TimeUnit.SECONDS);
+ }
+
+ @Override
+ protected Channel channel() {
+ boolean delay = false;
+ StackTraceElement[] stacks =
Thread.currentThread().getStackTrace();
+ for (StackTraceElement stack : stacks) {
+ if (stack.toString().contains("readMoreEntries")) {
+ delay = true;
+ break;
+ }
+ }
+ if (!delay) {
+ return super.ctx().channel();
+ }
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return super.ctx().channel();
+ }
+ };
+ });
+ PulsarClient originalReplClient2 =
pulsar1.getBrokerService().getReplicationClients()
+ .put(cluster2, injectedReplClient2);
+
+ // Start replication and inject race conditions of
"replicator.readMoreEntries".
+ admin1.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1, cluster2));
+ PersistentTopic persistentTopic1 =
+ (PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+ waitReplicatorStarted(topicName);
+ GeoPersistentReplicator replicator =
+ (GeoPersistentReplicator)
persistentTopic1.getReplicators().values().iterator().next();
+ for (int i = 0; i < 10; i++) {
+ new Thread(() -> {
+
BrokerServicePersistInternalMethodInvoker.replicatorReadMoreEntries(replicator);
+ }).start();
+ }
+
+ // Verify: after a few seconds, there is no "pending queue is full"
error.
+ Thread.sleep(10_000);
+ assertEquals(replicator.producer.getPendingQueueFullCount(), 0);
+
+ // cleanup.
+ producer1.close();
+ admin1.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1));
+ waitReplicatorStopped(topicName);
+ admin1.topics().delete(topicName, false);
+ if (originalReplClient2 == null) {
+
pulsar1.getBrokerService().getReplicationClients().remove(cluster2);
+ } else {
+ pulsar1.getBrokerService().getReplicationClients().put(cluster2,
originalReplClient2);
+ }
+ injectedReplClient2.close();
+ admin1.brokers().updateDynamicConfiguration("replicationStartAt",
originalReplicationStartAt);
+
admin1.brokers().updateDynamicConfiguration("dispatcherMaxReadBatchSize",
+ originalDispatcherMaxReadBatchSize + "");
+
admin1.brokers().updateDynamicConfiguration("dispatcherMinReadBatchSize",
+ originalDispatcherMinReadBatchSize + "");
+
admin1.brokers().updateDynamicConfiguration("replicationProducerQueueSize",
+ originalReplicationProducerQueueSize + "");
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(pulsar1.getConfig().getReplicationStartAt(),
originalReplicationStartAt);
+ assertEquals(pulsar1.getConfig().getDispatcherMaxReadBatchSize(),
originalDispatcherMaxReadBatchSize);
+ assertEquals(pulsar1.getConfig().getDispatcherMinReadBatchSize(),
originalDispatcherMinReadBatchSize);
+
assertEquals(pulsar1.getConfig().getReplicationProducerQueueSize(),
originalReplicationProducerQueueSize);
+ });
+ }
+
+ @Test
+ public void
testReplicatorsInflightTaskListIsEmptyAfterReplicationFinished() throws
Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_");
+ final String subscriptionName = "s1";
+ admin2.topics().createNonPartitionedTopic(topicName);
+ admin2.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
+ admin1.topics().createNonPartitionedTopic(topicName);
+ admin1.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
+
+ // Publish messages.
+ Producer<byte[]> producer1 =
client1.newProducer().topic(topicName).enableBatching(false).create();
+ CompletableFuture<MessageId> latestSend = null;
+ for (int i = 0; i < 100; i++) {
+ latestSend = producer1.sendAsync(new byte[]{1});
+ }
+ latestSend.join();
+ log.info("Cluster: {}, Publish finished", cluster1);
+ producer1.close();
+
+ // Start replication.
+ waitForReplicationTaskFinish(topicName);
+ // Verify: all inflight tasks are done.
+ ensureNoBacklogByInflightTask(getReplicator(topicName));
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index d222abde3a3..a4df6752acd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -42,6 +42,9 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
@@ -517,7 +520,7 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
Optional<PartitionedTopicMetadata> metadata =
pulsar1.getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources()
.getPartitionedTopicMetadataAsync(topicNameObj).join();
- Function<Replicator, Boolean> ensureNoBacklog = new
Function<Replicator,Boolean>() {
+ Function<Replicator, Boolean> ensureNoBacklog = new
Function<Replicator, Boolean>() {
@Override
public Boolean apply(Replicator replicator) {
@@ -543,8 +546,27 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
PersistentTopic persistentTopic =
(PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
persistentTopic.getReplicators().values().forEach(replicator -> {
- assertTrue(ensureNoBacklog.apply(replicator));
+ assertTrue(ensureNoBacklog.apply(replicator));
});
}
}
+
+ protected void waitForReplicationTaskFinish(String topicName) throws
Exception {
+ PersistentTopic persistentTopic1 = (PersistentTopic)
pulsar1.getBrokerService()
+ .getTopic(topicName, false).join().get();
+ ManagedLedgerImpl ml = (ManagedLedgerImpl)
persistentTopic1.getManagedLedger();
+ Position lac = ml.getLastConfirmedEntry();
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
ml.getCursors().get("pulsar.repl.r2");
+ Awaitility.await().untilAsserted(() -> {
+ if (cursor.getName().startsWith("pulsar.repl")) {
+ assertTrue(cursor.getMarkDeletedPosition().compareTo(lac) >=
0);
+ }
+ });
+ }
+
+ protected GeoPersistentReplicator getReplicator(String topic) {
+ waitReplicatorStarted(topic);
+ return (GeoPersistentReplicator)
pulsar1.getBrokerService().getTopic(topic, false).join().get()
+ .getReplicators().get(cluster2);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
index e3b61e09c23..ed6e57214f4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -276,4 +276,16 @@ public class OneWayReplicatorUsingGlobalPartitionedTest
extends OneWayReplicator
public void testTopicPoliciesReplicationRule() throws Exception {
super.testTopicPoliciesReplicationRule();
}
+
+ @Override
+ @Test
+ public void
testReplicatorsInflightTaskListIsEmptyAfterReplicationFinished() throws
Exception {
+ super.testReplicatorsInflightTaskListIsEmptyAfterReplicationFinished();
+ }
+
+ @Override
+ @Test(enabled = false)
+ public void testConcurrencyReplicationReadEntries() throws Exception {
+ super.testConcurrencyReplicationReadEntries();
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 450370a60d0..a7b330b3afa 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -230,9 +230,21 @@ public class OneWayReplicatorUsingGlobalZKTest extends
OneWayReplicatorTest {
super.testIncompatibleMultiVersionSchema(enableDeduplication);
}
- @Override
+
@Test
public void testTopicPoliciesReplicationRule() throws Exception {
super.testTopicPoliciesReplicationRule();
}
+
+ @Override
+ @Test
+ public void
testReplicatorsInflightTaskListIsEmptyAfterReplicationFinished() throws
Exception {
+ super.testReplicatorsInflightTaskListIsEmptyAfterReplicationFinished();
+ }
+
+ @Override
+ @Test(enabled = false)
+ public void testConcurrencyReplicationReadEntries() throws Exception {
+ super.testConcurrencyReplicationReadEntries();
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
index 0f527993bba..aa0c0ec7c23 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
@@ -26,14 +26,12 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
import java.lang.reflect.Field;
-import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
@@ -42,12 +40,12 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import
org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
import
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
@@ -812,15 +810,17 @@ public class ReplicatedSubscriptionTest extends
ReplicatorTestBase {
}
// unsubscribe replicated subscription in r2
+ admin1.topics().unload(topicName);
admin2.topics().deleteSubscription(topicName, subscriptionName);
final PersistentTopic topic2 = (PersistentTopic)
pulsar2.getBrokerService().getTopic(topicName, false).join().get();
- assertNull(topic2.getSubscription(subscriptionName));
// close replicator producer in r2
- final Method closeReplProducersIfNoBacklog =
PersistentTopic.class.getDeclaredMethod("closeReplProducersIfNoBacklog", null);
- closeReplProducersIfNoBacklog.setAccessible(true);
- ((CompletableFuture<Void>)
closeReplProducersIfNoBacklog.invoke(topic2, null)).join();
- assertFalse(topic2.getReplicators().get("r1").isConnected());
+ GeoPersistentReplicator replicator2 = (GeoPersistentReplicator)
topic2.getReplicators().get("r1");
+ Awaitility.await().untilAsserted(() -> {
+ replicator2.disconnect().join();
+ assertEquals(replicator2.getState(),
AbstractReplicator.State.Disconnected);
+ assertFalse(topic2.getReplicators().get("r1").isConnected());
+ });
// send messages in r1
int numMessages = 6;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index d1d7358f346..3419c477d65 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -20,13 +20,13 @@ package org.apache.pulsar.broker.service;
import static
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
+import static
org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker.ensureNoBacklogByInflightTask;
import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricDoubleGaugeValue;
import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -112,7 +112,6 @@ import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.apache.pulsar.schema.Schemas;
import org.awaitility.Awaitility;
-import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -463,9 +462,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
// Verify "pendingMessages" still is correct even if error occurs.
PersistentReplicator replicator = ensureReplicatorCreated(topic,
pulsar1);
waitReplicateFinish(topic, admin1);
- Awaitility.await().untilAsserted(() -> {
- assertEquals((int) WhiteboxImpl.getInternalState(replicator,
"pendingMessages"), 0);
- });
+ ensureNoBacklogByInflightTask(replicator);
}
@Test
@@ -1783,11 +1780,11 @@ public class ReplicatorTest extends ReplicatorTestBase {
Producer<byte[]> persistentProducer1 =
client1.newProducer().topic(topic.toString()).create();
// Send V1 message, which will be replicated to the remote cluster by
the replicator.
persistentProducer1.send("V1".getBytes());
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar1.getBrokerService().getTopicReference(topic.toString()).get();
waitReplicateFinish(topic, admin1);
// Pause replicator
- PersistentTopic persistentTopic =
- (PersistentTopic)
pulsar1.getBrokerService().getTopicReference(topic.toString()).get();
persistentTopic.getReplicators().forEach((cluster, replicator) -> {
PersistentReplicator persistentReplicator = (PersistentReplicator)
replicator;
pauseReplicator(persistentReplicator);
@@ -1802,7 +1799,15 @@ public class ReplicatorTest extends ReplicatorTestBase {
// Start replicator
persistentTopic.getReplicators().forEach((cluster, replicator) -> {
PersistentReplicator persistentReplicator = (PersistentReplicator)
replicator;
- persistentReplicator.startProducer();
+ resumeReplicator(persistentReplicator);
+ Awaitility.await().untilAsserted(() -> {
+ CompletableFuture<Optional<Topic>> topic2 =
+ pulsar2.getBrokerService().getTopic(topic.toString(),
false);
+ assertTrue(topic2 != null && topic2.isDone() &&
topic2.get().isPresent());
+ assertEquals(persistentReplicator.getState(),
AbstractReplicator.State.Started);
+ });
+ assertEquals(persistentReplicator.getState(),
AbstractReplicator.State.Started);
+ ensureNoBacklogByInflightTask(persistentReplicator);
});
waitReplicateFinish(topic, admin1);
@@ -1956,9 +1961,13 @@ public class ReplicatorTest extends ReplicatorTestBase {
Awaitility.await().untilAsserted(() -> {
assertTrue(replicator.isConnected());
});
- replicator.closeProducerAsync(true);
- Awaitility.await().untilAsserted(() -> {
- assertFalse(replicator.isConnected());
+ Awaitility.await().until(() -> {
+ replicator.disconnect().join();
+ return true;
});
}
+
+ private void resumeReplicator(PersistentReplicator replicator) {
+ replicator.startProducer();
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BrokerServicePersistInternalMethodInvoker.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BrokerServicePersistInternalMethodInvoker.java
new file mode 100644
index 00000000000..8c26aece764
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BrokerServicePersistInternalMethodInvoker.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import java.util.concurrent.TimeUnit;
+import org.awaitility.Awaitility;
+
+public class BrokerServicePersistInternalMethodInvoker {
+
+ public static void replicatorReadMoreEntries(GeoPersistentReplicator
replicator) {
+ replicator.readMoreEntries();
+ }
+
+ public static void ensureNoBacklogByInflightTask(PersistentReplicator
replicator) {
+ Awaitility.await().atMost(20, TimeUnit.SECONDS).until(() -> {
+ synchronized (replicator.inFlightTasks) {
+ for (PersistentReplicator.InFlightTask task :
replicator.inFlightTasks) {
+ if
(task.readPos.compareTo(replicator.cursor.getManagedLedger().getLastConfirmedEntry())
>= 0) {
+ continue;
+ }
+ if (!task.isDone()) {
+ return false;
+ }
+ }
+ }
+ return true;
+ });
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
new file mode 100644
index 00000000000..ffbb4eb34bd
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.mockito.Mockito.mock;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.AbstractReplicator;
+import org.apache.pulsar.broker.service.BrokerServiceInternalMethodInvoker;
+import org.apache.pulsar.broker.service.OneWayReplicatorTestBase;
+import
org.apache.pulsar.broker.service.persistent.PersistentReplicator.InFlightTask;
+import org.apache.pulsar.client.api.MessageId;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class PersistentReplicatorInflightTaskTest extends
OneWayReplicatorTestBase {
+
+ private final String topicName =
BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
+ private final String subscriptionName = "s1";
+
+ @Override
+ @BeforeClass(alwaysRun = true, timeOut = 300000)
+ public void setup() throws Exception {
+ super.setup();
+ createTopics();
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true, timeOut = 300000)
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ private void createTopics() throws Exception {
+ admin2.topics().createNonPartitionedTopic(topicName);
+ admin2.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
+ admin1.topics().createNonPartitionedTopic(topicName);
+ admin1.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
+ }
+
+ @Test
+ public void testCreateOrRecycleInFlightTaskIntoQueue() throws Exception {
+ log.info("Starting testCreateOrRecycleInFlightTaskIntoQueue");
+
+ // Get the replicator for the test topic
+ PersistentReplicator replicator = getReplicator(topicName);
+ Assert.assertNotNull(replicator, "Replicator should not be null");
+
+ // Get access to the inFlightTasks list for verification
+ LinkedList<InFlightTask> inFlightTasks = replicator.inFlightTasks;
+ Assert.assertNotNull(inFlightTasks, "InFlightTasks list should not be
null");
+
+ // Clear any existing tasks to start with a clean state
+ List<InFlightTask> originalTasks = new ArrayList<>(inFlightTasks);
+ inFlightTasks.clear();
+
+ // Test Case 1: Create a new task when the queue is empty
+ Position position1 = PositionFactory.create(1, 1);
+ Assert.assertNotNull(position1, "Position should not be null");
+ InFlightTask task1 =
replicator.createOrRecycleInFlightTaskIntoQueue(position1, 10);
+ // Verify a new task was created and added to the queue
+ Assert.assertNotNull(task1, "Task should not be null");
+ Assert.assertEquals(inFlightTasks.size(), 1, "Queue should have one
task");
+ Assert.assertEquals(task1.getReadPos(), position1, "Task should have
the correct position");
+ Assert.assertEquals(task1.getReadingEntries(), 10, "Task should have
the correct reading entries count");
+ // Mark the task as done to test recycling
+ task1.setEntries(Collections.emptyList());
+
+ // Test Case 2: Recycle an existing task
+ Position position2 = PositionFactory.create(2, 2);
+ Assert.assertNotNull(position2, "Position should not be null");
+ InFlightTask task2 =
replicator.createOrRecycleInFlightTaskIntoQueue(position2, 20);
+ // Verify the task was recycled
+ Assert.assertNotNull(task2, "Task should not be null");
+ Assert.assertEquals(inFlightTasks.size(), 1, "Queue should still have
one task");
+ Assert.assertEquals(task2.getReadPos(), position2, "Task should have
the updated position");
+ Assert.assertEquals(task2.getReadingEntries(), 20, "Task should have
the updated reading entries count");
+
+ // Test Case 3: Create a new task when no tasks can be recycled
+ task2.setEntries(null); // Make the task not done
+ Position position3 = PositionFactory.create(3, 3);
+ Assert.assertNotNull(position3, "Position should not be null");
+ InFlightTask task3 =
replicator.createOrRecycleInFlightTaskIntoQueue(position3, 30);
+ // Verify a new task was created
+ Assert.assertNotNull(task3, "Task should not be null");
+ Assert.assertEquals(inFlightTasks.size(), 2, "Queue should have two
tasks");
+ Assert.assertEquals(task3.getReadPos(), position3, "Task should have
the correct position");
+ Assert.assertEquals(task3.getReadingEntries(), 30, "Task should have
the correct reading entries count");
+
+ // cleanup.
+ log.info("Completed testCreateOrRecycleInFlightTaskIntoQueue");
+ inFlightTasks.clear();
+ inFlightTasks.addAll(originalTasks);
+ }
+
+ @Test
+ public void testGetInflightMessagesCount() throws Exception {
+ log.info("Starting testGetInflightMessagesCount");
+
+ // Get the replicator for the test topic
+ PersistentReplicator replicator = getReplicator(topicName);
+ Assert.assertNotNull(replicator, "Replicator should not be null");
+
+ // Get access to the inFlightTasks list for setup
+ LinkedList<InFlightTask> inFlightTasks = replicator.inFlightTasks;
+ Assert.assertNotNull(inFlightTasks, "InFlightTasks list should not be
null");
+
+ // Save original tasks and clear for testing
+ List<InFlightTask> originalTasks = new ArrayList<>(inFlightTasks);
+ inFlightTasks.clear();
+
+ try {
+ // Test Case 1: no task.
+ Assert.assertEquals(replicator.getInflightMessagesCount(), 0);
+
+ // Test Case 2: cursor reading.
+ Position position1 = PositionFactory.create(1, 1);
+ InFlightTask task1 = new InFlightTask(position1, 3, "");
+ inFlightTasks.add(task1);
+ Assert.assertEquals(replicator.getInflightMessagesCount(), 3);
+
+ // Test Case 3: read completed.
+ inFlightTasks.clear();
+ Position position2 = PositionFactory.create(2, 2);
+ InFlightTask task2 = new InFlightTask(position2, 3, "");
+ task2.setEntries(Arrays.asList(mock(Entry.class),
mock(Entry.class)));
+ inFlightTasks.add(task2);
+ Assert.assertEquals(replicator.getInflightMessagesCount(), 2);
+
+ // Test Case 4: Task with some completed entries
+ task2.setCompletedEntries(1);
+ Assert.assertEquals(replicator.getInflightMessagesCount(), 1);
+
+ // Test Case 5: Task with all entries completed
+ task2.setCompletedEntries(2);
+ Assert.assertEquals(replicator.getInflightMessagesCount(), 0);
+
+ // Test Case 6: Multiple tasks with different states
+ // task2 has 0 in-flight (2 completed out of 2)
+ // task3 has 2 in-flight (1 completed out of 3)
+ Position position3 = PositionFactory.create(3, 3);
+ InFlightTask task3 = new InFlightTask(position3, 4, "");
+ task3.setEntries(Arrays.asList(mock(Entry.class),
mock(Entry.class), mock(Entry.class)));
+ task3.setCompletedEntries(1);
+ inFlightTasks.add(task3);
+ Assert.assertEquals(replicator.getInflightMessagesCount(), 2);
+
+ // Test Case 7: Multiple tasks with different states
+ // task2 has 0 in-flight (2 completed out of 2)
+ // task3 has 2 in-flight (1 completed out of 3)
+ // task4 has 0 in-flight (empty readoutEntries)
+ Position position4 = PositionFactory.create(4, 4);
+ InFlightTask task4 = new InFlightTask(position4, 2, "");
+ task4.setEntries(Collections.emptyList());
+ inFlightTasks.add(task4);
+ Assert.assertEquals(replicator.getInflightMessagesCount(), 2);
+
+ log.info("Completed testGetInflightMessagesCount");
+ } finally {
+ // Restore original tasks
+ inFlightTasks.clear();
+ inFlightTasks.addAll(originalTasks);
+ }
+ }
+
+ @Test
+ public void testGetPermitsIfNoPendingRead() throws Exception {
+ log.info("Starting testGetPermitsIfNoPendingRead");
+
+ // Get the replicator for the test topic
+ PersistentReplicator replicator = getReplicator(topicName);
+ Assert.assertNotNull(replicator, "Replicator should not be null");
+
+ // Get access to the inFlightTasks list for setup
+ LinkedList<InFlightTask> inFlightTasks = replicator.inFlightTasks;
+ Assert.assertNotNull(inFlightTasks, "InFlightTasks list should not be
null");
+
+ // Save original tasks and clear for testing
+ List<InFlightTask> originalTasks = new ArrayList<>(inFlightTasks);
+ inFlightTasks.clear();
+
+ try {
+ // Test Case 1: Empty queue - should return producerQueueSize
(1000)
+ Assert.assertEquals(replicator.getPermitsIfNoPendingRead(), 1000,
+ "With empty queue, should return full producerQueueSize");
+
+ // Test Case 2: Task with pending read (readPos != null &&
readoutEntries == null)
+ Position position1 = PositionFactory.create(1, 1);
+ InFlightTask pendingReadTask = new InFlightTask(position1, 5, "");
+ // Don't set readoutEntries to simulate pending read
+ inFlightTasks.add(pendingReadTask);
+ Assert.assertEquals(replicator.getPermitsIfNoPendingRead(), 0,
+ "With pending read task, should return 0");
+
+ // Test Case 3: Task with completed read but in-flight messages
+ inFlightTasks.clear();
+ Position position2 = PositionFactory.create(2, 2);
+ InFlightTask completedReadTask = new InFlightTask(position2, 5,
"");
+ completedReadTask.setEntries(Arrays.asList(
+ mock(Entry.class), mock(Entry.class), mock(Entry.class)));
+ inFlightTasks.add(completedReadTask);
+ Assert.assertEquals(replicator.getPermitsIfNoPendingRead(), 1000 -
3,
+ "With completed read task, should return producerQueueSize
- inflightMessages");
+
+ // Test Case 4: Multiple tasks with no pending reads
+ Position position3 = PositionFactory.create(3, 3);
+ InFlightTask task2 = new InFlightTask(position3, 5, "");
+ task2.setEntries(Arrays.asList(mock(Entry.class),
mock(Entry.class)));
+ task2.setCompletedEntries(1); // 1 in-flight message
+ inFlightTasks.add(task2);
+ // Now we have 3 + 1 = 4 in-flight messages
+ Assert.assertEquals(replicator.getPermitsIfNoPendingRead(), 1000 -
4,
+ "With multiple tasks, should return producerQueueSize -
total inflightMessages");
+
+ // Test Case 5: Multiple tasks including one with pending read
+ Position position4 = PositionFactory.create(4, 4);
+ InFlightTask pendingReadTask2 = new InFlightTask(position4, 5, "");
+ // Don't set readoutEntries to simulate pending read
+ inFlightTasks.add(pendingReadTask2);
+ Assert.assertEquals(replicator.getPermitsIfNoPendingRead(), 0,
+ "With any pending read task, should return 0 regardless of
other tasks");
+
+ log.info("Completed testGetPermitsIfNoPendingRead");
+ } finally {
+ // Restore original tasks
+ inFlightTasks.clear();
+ inFlightTasks.addAll(originalTasks);
+ }
+ }
+
+ @Test
+ public void testAcquirePermitsIfNotFetchingSchema() throws Exception {
+ log.info("Starting testAcquirePermitsIfNotFetchingSchema");
+ // Get the replicator for the test topic
+ PersistentReplicator replicator = getReplicator(topicName);
+ Assert.assertNotNull(replicator, "Replicator should not be null");
+
+ // Get access to the inFlightTasks list for setup
+ LinkedList<InFlightTask> inFlightTasks = replicator.inFlightTasks;
+ Assert.assertNotNull(inFlightTasks, "InFlightTasks list should not be
null");
+
+ // Save original tasks and clear for testing
+ List<InFlightTask> originalTasks = new ArrayList<>(inFlightTasks);
+ inFlightTasks.clear();
+
+ // Save original state
+ int originalWaitForCursorRewinding =
replicator.waitForCursorRewindingRefCnf;
+ AbstractReplicator.State originalState = replicator.getState();
+
+ try {
+ // Test Case 1: Normal case - no pending read, not waiting for
cursor rewinding, state is Started
+ // Should return a new InFlightTask
+ // First, check the current permits available
+ int expectedPermits = replicator.getPermitsIfNoPendingRead();
+ Assert.assertTrue(expectedPermits > 0, "Should have available
permits for the test");
+ InFlightTask task1 =
replicator.acquirePermitsIfNotFetchingSchema();
+ Assert.assertNotNull(task1, "Should return a new InFlightTask in
normal case");
+ Assert.assertNotNull(task1.getReadPos(), "Task should have a read
position");
+ Assert.assertEquals(task1.getReadingEntries(), expectedPermits,
+ "Task readingEntries should equal the number of permits
available");
+ Assert.assertTrue(inFlightTasks.contains(task1),
+ "Task should be added to the inFlightTasks list");
+
+ // Test Case 2: With pending read - should return null
+ inFlightTasks.clear();
+ Position position1 = PositionFactory.create(1, 1);
+ InFlightTask pendingReadTask = new InFlightTask(position1, 5, "");
+ // Don't set readoutEntries to simulate pending read
+ inFlightTasks.add(pendingReadTask);
+ InFlightTask task2 =
replicator.acquirePermitsIfNotFetchingSchema();
+ Assert.assertNull(task2, "Should return null when there is a
pending read");
+
+ // Test Case 3: With waitForCursorRewinding=true - should return
null
+ inFlightTasks.clear();
+ replicator.waitForCursorRewindingRefCnf = 1;
+ InFlightTask task3 =
replicator.acquirePermitsIfNotFetchingSchema();
+ Assert.assertNull(task3, "Should return null when waiting for
cursor rewinding");
+ // Reset for next test
+ replicator.waitForCursorRewindingRefCnf = 0;
+
+ // Test Case 4: With state != Started - should return null
+ // We need to use reflection to modify the state since it's
protected by AtomicReferenceFieldUpdater
+ BrokerServiceInternalMethodInvoker.replicatorSetState(replicator,
AbstractReplicator.State.Starting);
+ InFlightTask task4 =
replicator.acquirePermitsIfNotFetchingSchema();
+ Assert.assertNull(task4, "Should return null when state is not
Started");
+ // Reset state for next test
+ BrokerServiceInternalMethodInvoker.replicatorSetState(replicator,
AbstractReplicator.State.Started);
+
+ // Test Case 5: With limited permits - verify readingEntries is
set correctly
+ inFlightTasks.clear();
+ // Add a task with some in-flight messages to reduce available
permits
+ Position positionLimited = PositionFactory.create(10, 10);
+ InFlightTask limitedTask = new InFlightTask(positionLimited, 5,
"");
+ // Add enough entries to leave just a small number of permits
(e.g., 10)
+ List<Entry> limitedEntries = new ArrayList<>();
+ int entriesCount = 990;
+ for (int j = 0; j < entriesCount; j++) {
+ limitedEntries.add(mock(Entry.class));
+ }
+ limitedTask.setEntries(limitedEntries);
+ inFlightTasks.add(limitedTask);
+ // Check that we have limited permits available
+ int limitedPermits = replicator.getPermitsIfNoPendingRead();
+ Assert.assertTrue(limitedPermits > 0 && limitedPermits < 20,
+ "Should have a small number of permits available for
testing");
+ // Now acquire permits and verify readingEntries matches the
limited permits
+ InFlightTask task5 =
replicator.acquirePermitsIfNotFetchingSchema();
+ Assert.assertNotNull(task5, "Should return a task with limited
permits");
+ Assert.assertEquals(task5.getReadingEntries(), limitedPermits,
+ "Task readingEntries should equal the limited number of
permits available");
+
+ // Test Case 6: With permits=0 - should return null
+ inFlightTasks.clear();
+ // Add tasks that will make getPermitsIfNoPendingRead() return 0
+ // We need enough in-flight messages to equal producerQueueSize
+ for (int i = 0; i < 10; i++) {
+ Position position = PositionFactory.create(i, i);
+ InFlightTask task = new InFlightTask(position, 5, "");
+ List<Entry> entries = new ArrayList<>();
+ for (int j = 0; j < 100; j++) {
+ entries.add(mock(Entry.class));
+ }
+ task.setEntries(entries);
+ inFlightTasks.add(task);
+ }
+ InFlightTask task6 =
replicator.acquirePermitsIfNotFetchingSchema();
+ Assert.assertNull(task6, "Should return null when permits is 0");
+ log.info("Completed testAcquirePermitsIfNotFetchingSchema");
+ } finally {
+ // Restore original state
+ replicator.waitForCursorRewindingRefCnf =
originalWaitForCursorRewinding;
+ BrokerServiceInternalMethodInvoker.replicatorSetState(replicator,
originalState);
+ // Restore original tasks
+ inFlightTasks.clear();
+ inFlightTasks.addAll(originalTasks);
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
index 511cf87133a..007fc376687 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;
-import static org.testng.Assert.assertEquals;
+import static
org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker.ensureNoBacklogByInflightTask;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -43,7 +43,6 @@ import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.schema.Schemas;
import org.awaitility.Awaitility;
-import org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -123,7 +122,7 @@ public class ShadowReplicatorTest extends BrokerTestBase {
replicator.msgOut.calculateRate();
return replicator.msgOut.getCount() >= 1;
});
- Awaitility.await().until(() ->
PersistentReplicator.PENDING_MESSAGES_UPDATER.get(replicator) == 0);
+ ensureNoBacklogByInflightTask(replicator);
PersistentTopic shadowTopic =
(PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(shadowTopicName).get().get();
@@ -164,7 +163,7 @@ public class ShadowReplicatorTest extends BrokerTestBase {
}
@Test
- public void testCounterOfPengdingMessagesCorrect() throws Exception {
+ public void testCounterOfPendingMessagesCorrect() throws Exception {
TopicName sourceTopicName = TopicName
.get(BrokerTestUtil.newUniqueName("persistent://prop1/ns-source/source-topic"));
TopicName shadowTopicName = TopicName
@@ -191,11 +190,11 @@ public class ShadowReplicatorTest extends BrokerTestBase {
producer.send(new Schemas.PersonOne(i));
}
- // Verify "pendingMessages" still is correct even if error occurs.
+ // Verify "inflight replication tasks" are correct.
PersistentReplicator replicator =
getAnyShadowReplicator(sourceTopicName, pulsar);
waitReplicateFinish(sourceTopicName, admin);
- Awaitility.await().untilAsserted(() -> {
- assertEquals((int) WhiteboxImpl.getInternalState(replicator,
"pendingMessages"), 0);
- });
+ ensureNoBacklogByInflightTask(replicator);
}
+
+
}
\ No newline at end of file
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 0563fa7e666..d32ab213648 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -482,9 +482,13 @@ public class ClientCnx extends PulsarHandler {
}
ProducerImpl<?> producer = producers.get(producerId);
if (ledgerId == -1 && entryId == -1) {
- log.warn("{} Message with sequence-id {}-{} published by producer
[id:{}, name:{}] has been dropped",
- ctx.channel(), sequenceId, highestSequenceId, producerId,
- producer != null ? producer.getProducerName() : "null");
+ if (producer == null) {
+ log.warn("{} Message with sequence-id {}-{} published by
producer [id:{}, name:{}] has been dropped",
+ ctx.channel(), sequenceId, highestSequenceId,
producerId, "null");
+ } else {
+
producer.printWarnLogWhenCanNotDetermineDeduplication(ctx.channel(),
sequenceId, highestSequenceId);
+ }
+
} else {
if (log.isDebugEnabled()) {
log.debug("{} Got receipt for producer: [id:{}, name:{}] --
sequence-id: {}-{} -- entry-id: {}:{}",
@@ -1023,7 +1027,8 @@ public class ClientCnx extends PulsarHandler {
return ctx;
}
- Channel channel() {
+ @VisibleForTesting
+ protected Channel channel() {
return ctx.channel();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java
index 12a6be07070..2595ce4ba61 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
+import io.netty.channel.Channel;
import io.netty.util.ReferenceCountUtil;
import java.util.List;
import java.util.Optional;
@@ -244,6 +245,13 @@ public class GeoReplicationProducerImpl extends
ProducerImpl{
&&
Markers.isReplicationMarker(op.msg.getMessageBuilder().getMarkerType());
}
+ @Override
+ public void printWarnLogWhenCanNotDetermineDeduplication(Channel channel,
long sourceLId, long sourceEId) {
+ log.warn("[{}] producer [id:{}, name:{}, channel: {}] message with
source entry {}-{} published by has been"
+ + " dropped because Broker can not determine whether is duplicate
or not",
+ topic, producerId, producerName, channel, sourceLId, sourceEId);
+ }
+
private boolean isReplicationMarker(long highestSeq) {
return Long.MIN_VALUE == highestSeq;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index e9decbfa0f5..825439794f9 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -32,6 +32,7 @@ import static
org.apache.pulsar.common.protocol.Commands.readChecksum;
import static
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPromise;
@@ -191,6 +192,8 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
private final Counter producersOpenedCounter;
private final Counter producersClosedCounter;
private final boolean pauseSendingToPreservePublishOrderOnSchemaRegFailure;
+ // This variable can be exposed as a metrics in the future, a PIP is
needed.
+ private final AtomicInteger pendingQueueFullCounter;
public ProducerImpl(PulsarClientImpl client, String topic,
ProducerConfigurationData conf,
CompletableFuture<Producer<T>> producerCreatedFuture,
int partitionIndex, Schema<T> schema,
@@ -309,6 +312,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
"The number of producer sessions opened", topic,
Attributes.empty());
producersClosedCounter =
ip.newCounter("pulsar.client.producer.closed", Unit.Sessions,
"The number of producer sessions closed", topic,
Attributes.empty());
+ pendingQueueFullCounter = new AtomicInteger();
this.connectionHandler = initConnectionHandler();
setChunkMaxMessageSize();
@@ -316,6 +320,11 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
producersOpenedCounter.increment();
}
+ @VisibleForTesting
+ public int getPendingQueueFullCount() {
+ return pendingQueueFullCounter.get();
+ }
+
ConnectionHandler initConnectionHandler() {
return new ConnectionHandler(this,
new BackoffBuilder()
@@ -396,6 +405,13 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
return future;
}
+ public void printWarnLogWhenCanNotDetermineDeduplication(Channel channel,
long sequenceId,
+ long
highestSequenceId) {
+ log.warn("[{}] producer [id:{}, name:{}, channel: {}] message with
sequence-id {}-{} published by has been"
+ + " dropped because Broker can not determine whether
is duplicate or not",
+ topic, producerId, producerName, channel, sequenceId,
highestSequenceId);
+ }
+
private class DefaultSendMessageCallback implements SendCallback {
CompletableFuture<MessageId> sendFuture;
@@ -1066,6 +1082,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
client.getMemoryLimitController().reserveMemory(payloadSize);
} else {
if (!semaphore.map(Semaphore::tryAcquire).orElse(true)) {
+ pendingQueueFullCounter.incrementAndGet();
callback.sendComplete(new
PulsarClientException.ProducerQueueIsFullError(
"Producer send queue is full", sequenceId), null);
return false;